""" Database connection module for FleetMind Handles PostgreSQL database connections and initialization """ import psycopg2 import psycopg2.extras import os from typing import Optional, List, Dict, Any from pathlib import Path import logging from dotenv import load_dotenv # Load environment variables load_dotenv() # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Database configuration from environment variables DB_CONFIG = { 'host': os.getenv('DB_HOST', 'localhost'), 'port': os.getenv('DB_PORT', '5432'), 'database': os.getenv('DB_NAME', 'fleetmind'), 'user': os.getenv('DB_USER', 'postgres'), 'password': os.getenv('DB_PASSWORD', ''), } def get_db_connection() -> psycopg2.extensions.connection: """ Create and return a PostgreSQL database connection. Returns: psycopg2.connection: Database connection object Raises: psycopg2.Error: If connection fails """ try: conn = psycopg2.connect( host=DB_CONFIG['host'], port=DB_CONFIG['port'], database=DB_CONFIG['database'], user=DB_CONFIG['user'], password=DB_CONFIG['password'], cursor_factory=psycopg2.extras.RealDictCursor, sslmode='require' ) logger.info(f"Database connection established: {DB_CONFIG['database']}@{DB_CONFIG['host']}") return conn except psycopg2.Error as e: logger.error(f"Database connection error: {e}") raise def init_database(schema_file: Optional[str] = None) -> None: """ Initialize the database with schema. Args: schema_file: Path to SQL schema file. If None, uses default schema. Raises: psycopg2.Error: If initialization fails """ try: conn = get_db_connection() cursor = conn.cursor() if schema_file and os.path.exists(schema_file): # Read and execute schema from file with open(schema_file, 'r') as f: schema_sql = f.read() cursor.execute(schema_sql) logger.info(f"Database initialized from schema file: {schema_file}") else: # Use default schema from .schema import SCHEMA_SQL cursor.execute(SCHEMA_SQL) logger.info("Database initialized with default schema") conn.commit() cursor.close() conn.close() logger.info("Database initialization completed successfully") except psycopg2.Error as e: logger.error(f"Database initialization error: {e}") if conn: conn.rollback() raise def close_connection(conn: psycopg2.extensions.connection) -> None: """ Safely close database connection. Args: conn: Database connection to close """ try: if conn and not conn.closed: conn.close() logger.info("Database connection closed") except psycopg2.Error as e: logger.error(f"Error closing connection: {e}") def execute_query(query: str, params: tuple = ()) -> List[Dict[str, Any]]: """ Execute a SELECT query and return results. Args: query: SQL query string params: Query parameters tuple Returns: list: Query results as list of dictionaries Raises: psycopg2.Error: If query fails """ conn = get_db_connection() try: cursor = conn.cursor() cursor.execute(query, params) results = cursor.fetchall() cursor.close() return results except psycopg2.Error as e: logger.error(f"Query execution error: {e}") raise finally: close_connection(conn) def execute_write(query: str, params: tuple = ()) -> int: """ Execute an INSERT, UPDATE, or DELETE query. Args: query: SQL query string params: Query parameters tuple Returns: int: Number of rows affected Raises: psycopg2.Error: If query fails """ conn = get_db_connection() try: cursor = conn.cursor() cursor.execute(query, params) rows_affected = cursor.rowcount conn.commit() cursor.close() return rows_affected except psycopg2.Error as e: conn.rollback() logger.error(f"Write query error: {e}") raise finally: close_connection(conn) def execute_many(query: str, params_list: List[tuple]) -> int: """ Execute multiple INSERT/UPDATE queries in a batch. Args: query: SQL query string params_list: List of parameter tuples Returns: int: Number of rows affected Raises: psycopg2.Error: If query fails """ conn = get_db_connection() try: cursor = conn.cursor() cursor.executemany(query, params_list) rows_affected = cursor.rowcount conn.commit() cursor.close() return rows_affected except psycopg2.Error as e: conn.rollback() logger.error(f"Batch write error: {e}") raise finally: close_connection(conn) def test_connection() -> bool: """ Test database connection. Returns: bool: True if connection successful, False otherwise """ try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT version();") version = cursor.fetchone() logger.info(f"PostgreSQL version: {version['version']}") cursor.close() close_connection(conn) return True except Exception as e: logger.error(f"Connection test failed: {e}") return False