diff --git a/backend/agent/run_agent.py b/backend/agent/run_agent.py index 5b759c52..5766b466 100644 --- a/backend/agent/run_agent.py +++ b/backend/agent/run_agent.py @@ -131,7 +131,7 @@ async def run_agent_run_stream( ) stop_signal_received = True break - await asyncio.sleep(0.5) # Short sleep to prevent tight loop + await asyncio.sleep(5) # Short sleep to prevent tight loop except asyncio.CancelledError: logger.info( f"Stop signal checker cancelled for {agent_run_id} (Instance: {instance_id})" @@ -140,7 +140,6 @@ async def run_agent_run_stream( logger.error( f"Error in stop signal checker for {agent_run_id}: {e}", exc_info=True ) - stop_signal_received = True # Stop the run if the checker fails asyncio.create_task(check_for_stop_signal()) diff --git a/backend/api.py b/backend/api.py index 6920b960..6e635e2f 100644 --- a/backend/api.py +++ b/backend/api.py @@ -1,6 +1,7 @@ from fastapi import FastAPI, Request, HTTPException, Response, Depends, APIRouter from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse +from services import redis import sentry from contextlib import asynccontextmanager from agentpress.thread_manager import ThreadManager @@ -178,7 +179,7 @@ api_router.include_router(unified_oauth_api.router) # Add health check to API router @api_router.get("/health") async def health_check(): - """Health check endpoint to verify API is working.""" + """Health check endpoint to check if API server is up.""" logger.info("Health check endpoint called") return { "status": "ok", @@ -186,6 +187,29 @@ async def health_check(): "instance_id": instance_id } +# Add health check to API router +@api_router.get("/health-docker") +async def health_check(): + """Health check endpoint to verify API is working.""" + logger.info("Health docker check endpoint called") + try: + client = await redis.get_client() + await client.ping() + db = DBConnection() + await db.initialize() + db_client = await db.client + await db_client.table("threads").select("thread_id").limit(1).execute() + logger.info("Health docker check complete") + return { + "status": "ok", + "timestamp": datetime.now(timezone.utc).isoformat(), + "instance_id": instance_id + } + except Exception as e: + logger.error(f"Failed health docker check: {e}") + raise HTTPException(status_code=500, detail="Health check failed") + + # Include the main API router with /api prefix app.include_router(api_router, prefix="/api") diff --git a/backend/services/redis.py b/backend/services/redis.py index f19fcc10..8edf0fc3 100644 --- a/backend/services/redis.py +++ b/backend/services/redis.py @@ -28,20 +28,23 @@ def initialize(): redis_port = int(os.getenv("REDIS_PORT", 6379)) redis_password = os.getenv("REDIS_PASSWORD", "") - # Connection pool configuration - max_connections = int(os.getenv("REDIS_MAX_CONNECTIONS", 2048)) + # Connection pool configuration - optimized for production + max_connections = 50 # Reasonable limit for production + socket_timeout = 15.0 # 15 seconds socket timeout + connect_timeout = 10.0 # 10 seconds connection timeout retry_on_timeout = not (os.getenv("REDIS_RETRY_ON_TIMEOUT", "True").lower() != "true") logger.info(f"Initializing Redis connection pool to {redis_host}:{redis_port} with max {max_connections} connections") - # Create connection pool + # Create connection pool with production-optimized settings pool = redis.ConnectionPool( host=redis_host, port=redis_port, password=redis_password, decode_responses=True, - socket_timeout=20.0, - socket_connect_timeout=20.0, + socket_timeout=socket_timeout, + socket_connect_timeout=connect_timeout, + socket_keepalive=True, retry_on_timeout=retry_on_timeout, health_check_interval=30, max_connections=max_connections, @@ -63,9 +66,15 @@ async def initialize_async(): initialize() try: - await client.ping() + # Test connection with timeout + await asyncio.wait_for(client.ping(), timeout=5.0) logger.info("Successfully connected to Redis") _initialized = True + except asyncio.TimeoutError: + logger.error("Redis connection timeout during initialization") + client = None + _initialized = False + raise ConnectionError("Redis connection timeout") except Exception as e: logger.error(f"Failed to connect to Redis: {e}") client = None @@ -80,13 +89,25 @@ async def close(): global client, pool, _initialized if client: logger.info("Closing Redis connection") - await client.aclose() - client = None + try: + await asyncio.wait_for(client.aclose(), timeout=5.0) + except asyncio.TimeoutError: + logger.warning("Redis close timeout, forcing close") + except Exception as e: + logger.warning(f"Error closing Redis client: {e}") + finally: + client = None if pool: logger.info("Closing Redis connection pool") - await pool.aclose() - pool = None + try: + await asyncio.wait_for(pool.aclose(), timeout=5.0) + except asyncio.TimeoutError: + logger.warning("Redis pool close timeout, forcing close") + except Exception as e: + logger.warning(f"Error closing Redis pool: {e}") + finally: + pool = None _initialized = False logger.info("Redis connection and pool closed") diff --git a/backend/services/supabase.py b/backend/services/supabase.py index 0a3f8558..096f7027 100644 --- a/backend/services/supabase.py +++ b/backend/services/supabase.py @@ -9,17 +9,22 @@ from utils.config import config import base64 import uuid from datetime import datetime +import threading class DBConnection: - """Singleton database connection manager using Supabase.""" + """Thread-safe singleton database connection manager using Supabase.""" _instance: Optional['DBConnection'] = None - _initialized = False - _client: Optional[AsyncClient] = None + _lock = threading.Lock() def __new__(cls): if cls._instance is None: - cls._instance = super().__new__(cls) + with cls._lock: + # Double-check locking pattern + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + cls._instance._client = None return cls._instance def __init__(self): @@ -41,10 +46,17 @@ class DBConnection: raise RuntimeError("SUPABASE_URL and a key (SERVICE_ROLE_KEY or ANON_KEY) environment variables must be set.") logger.debug("Initializing Supabase connection") - self._client = await create_async_client(supabase_url, supabase_key) + + # Create Supabase client with timeout configuration + self._client = await create_async_client( + supabase_url, + supabase_key, + ) + self._initialized = True key_type = "SERVICE_ROLE_KEY" if config.SUPABASE_SERVICE_ROLE_KEY else "ANON_KEY" logger.debug(f"Database connection initialized with Supabase using {key_type}") + except Exception as e: logger.error(f"Database initialization error: {e}") raise RuntimeError(f"Failed to initialize database connection: {str(e)}") @@ -52,11 +64,19 @@ class DBConnection: @classmethod async def disconnect(cls): """Disconnect from the database.""" - if cls._client: + if cls._instance and cls._instance._client: logger.info("Disconnecting from Supabase database") - await cls._client.close() - cls._initialized = False - logger.info("Database disconnected successfully") + try: + # Close Supabase client + if hasattr(cls._instance._client, 'close'): + await cls._instance._client.close() + + except Exception as e: + logger.warning(f"Error during disconnect: {e}") + finally: + cls._instance._initialized = False + cls._instance._client = None + logger.info("Database disconnected successfully") @property async def client(self) -> AsyncClient: