mirror of https://github.com/kortix-ai/suna.git
feat(api): add health check endpoint for Docker and improve Redis connection handling
This commit is contained in:
parent
a59e8c8c8a
commit
909b51dfbe
|
@ -131,7 +131,7 @@ async def run_agent_run_stream(
|
|||
)
|
||||
stop_signal_received = True
|
||||
break
|
||||
await asyncio.sleep(0.5) # Short sleep to prevent tight loop
|
||||
await asyncio.sleep(5) # Short sleep to prevent tight loop
|
||||
except asyncio.CancelledError:
|
||||
logger.info(
|
||||
f"Stop signal checker cancelled for {agent_run_id} (Instance: {instance_id})"
|
||||
|
@ -140,7 +140,6 @@ async def run_agent_run_stream(
|
|||
logger.error(
|
||||
f"Error in stop signal checker for {agent_run_id}: {e}", exc_info=True
|
||||
)
|
||||
stop_signal_received = True # Stop the run if the checker fails
|
||||
|
||||
asyncio.create_task(check_for_stop_signal())
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
from fastapi import FastAPI, Request, HTTPException, Response, Depends, APIRouter
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from fastapi.responses import JSONResponse, StreamingResponse
|
||||
from services import redis
|
||||
import sentry
|
||||
from contextlib import asynccontextmanager
|
||||
from agentpress.thread_manager import ThreadManager
|
||||
|
@ -178,7 +179,7 @@ api_router.include_router(unified_oauth_api.router)
|
|||
# Add health check to API router
|
||||
@api_router.get("/health")
|
||||
async def health_check():
|
||||
"""Health check endpoint to verify API is working."""
|
||||
"""Health check endpoint to check if API server is up."""
|
||||
logger.info("Health check endpoint called")
|
||||
return {
|
||||
"status": "ok",
|
||||
|
@ -186,6 +187,29 @@ async def health_check():
|
|||
"instance_id": instance_id
|
||||
}
|
||||
|
||||
# Add health check to API router
|
||||
@api_router.get("/health-docker")
|
||||
async def health_check():
|
||||
"""Health check endpoint to verify API is working."""
|
||||
logger.info("Health docker check endpoint called")
|
||||
try:
|
||||
client = await redis.get_client()
|
||||
await client.ping()
|
||||
db = DBConnection()
|
||||
await db.initialize()
|
||||
db_client = await db.client
|
||||
await db_client.table("threads").select("thread_id").limit(1).execute()
|
||||
logger.info("Health docker check complete")
|
||||
return {
|
||||
"status": "ok",
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"instance_id": instance_id
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed health docker check: {e}")
|
||||
raise HTTPException(status_code=500, detail="Health check failed")
|
||||
|
||||
|
||||
# Include the main API router with /api prefix
|
||||
app.include_router(api_router, prefix="/api")
|
||||
|
||||
|
|
|
@ -28,20 +28,23 @@ def initialize():
|
|||
redis_port = int(os.getenv("REDIS_PORT", 6379))
|
||||
redis_password = os.getenv("REDIS_PASSWORD", "")
|
||||
|
||||
# Connection pool configuration
|
||||
max_connections = int(os.getenv("REDIS_MAX_CONNECTIONS", 2048))
|
||||
# Connection pool configuration - optimized for production
|
||||
max_connections = 50 # Reasonable limit for production
|
||||
socket_timeout = 15.0 # 15 seconds socket timeout
|
||||
connect_timeout = 10.0 # 10 seconds connection timeout
|
||||
retry_on_timeout = not (os.getenv("REDIS_RETRY_ON_TIMEOUT", "True").lower() != "true")
|
||||
|
||||
logger.info(f"Initializing Redis connection pool to {redis_host}:{redis_port} with max {max_connections} connections")
|
||||
|
||||
# Create connection pool
|
||||
# Create connection pool with production-optimized settings
|
||||
pool = redis.ConnectionPool(
|
||||
host=redis_host,
|
||||
port=redis_port,
|
||||
password=redis_password,
|
||||
decode_responses=True,
|
||||
socket_timeout=20.0,
|
||||
socket_connect_timeout=20.0,
|
||||
socket_timeout=socket_timeout,
|
||||
socket_connect_timeout=connect_timeout,
|
||||
socket_keepalive=True,
|
||||
retry_on_timeout=retry_on_timeout,
|
||||
health_check_interval=30,
|
||||
max_connections=max_connections,
|
||||
|
@ -63,9 +66,15 @@ async def initialize_async():
|
|||
initialize()
|
||||
|
||||
try:
|
||||
await client.ping()
|
||||
# Test connection with timeout
|
||||
await asyncio.wait_for(client.ping(), timeout=5.0)
|
||||
logger.info("Successfully connected to Redis")
|
||||
_initialized = True
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("Redis connection timeout during initialization")
|
||||
client = None
|
||||
_initialized = False
|
||||
raise ConnectionError("Redis connection timeout")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to connect to Redis: {e}")
|
||||
client = None
|
||||
|
@ -80,12 +89,24 @@ async def close():
|
|||
global client, pool, _initialized
|
||||
if client:
|
||||
logger.info("Closing Redis connection")
|
||||
await client.aclose()
|
||||
try:
|
||||
await asyncio.wait_for(client.aclose(), timeout=5.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Redis close timeout, forcing close")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing Redis client: {e}")
|
||||
finally:
|
||||
client = None
|
||||
|
||||
if pool:
|
||||
logger.info("Closing Redis connection pool")
|
||||
await pool.aclose()
|
||||
try:
|
||||
await asyncio.wait_for(pool.aclose(), timeout=5.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Redis pool close timeout, forcing close")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing Redis pool: {e}")
|
||||
finally:
|
||||
pool = None
|
||||
|
||||
_initialized = False
|
||||
|
|
|
@ -9,17 +9,22 @@ from utils.config import config
|
|||
import base64
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
import threading
|
||||
|
||||
class DBConnection:
|
||||
"""Singleton database connection manager using Supabase."""
|
||||
"""Thread-safe singleton database connection manager using Supabase."""
|
||||
|
||||
_instance: Optional['DBConnection'] = None
|
||||
_initialized = False
|
||||
_client: Optional[AsyncClient] = None
|
||||
_lock = threading.Lock()
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
with cls._lock:
|
||||
# Double-check locking pattern
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._initialized = False
|
||||
cls._instance._client = None
|
||||
return cls._instance
|
||||
|
||||
def __init__(self):
|
||||
|
@ -41,10 +46,17 @@ class DBConnection:
|
|||
raise RuntimeError("SUPABASE_URL and a key (SERVICE_ROLE_KEY or ANON_KEY) environment variables must be set.")
|
||||
|
||||
logger.debug("Initializing Supabase connection")
|
||||
self._client = await create_async_client(supabase_url, supabase_key)
|
||||
|
||||
# Create Supabase client with timeout configuration
|
||||
self._client = await create_async_client(
|
||||
supabase_url,
|
||||
supabase_key,
|
||||
)
|
||||
|
||||
self._initialized = True
|
||||
key_type = "SERVICE_ROLE_KEY" if config.SUPABASE_SERVICE_ROLE_KEY else "ANON_KEY"
|
||||
logger.debug(f"Database connection initialized with Supabase using {key_type}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Database initialization error: {e}")
|
||||
raise RuntimeError(f"Failed to initialize database connection: {str(e)}")
|
||||
|
@ -52,10 +64,18 @@ class DBConnection:
|
|||
@classmethod
|
||||
async def disconnect(cls):
|
||||
"""Disconnect from the database."""
|
||||
if cls._client:
|
||||
if cls._instance and cls._instance._client:
|
||||
logger.info("Disconnecting from Supabase database")
|
||||
await cls._client.close()
|
||||
cls._initialized = False
|
||||
try:
|
||||
# Close Supabase client
|
||||
if hasattr(cls._instance._client, 'close'):
|
||||
await cls._instance._client.close()
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error during disconnect: {e}")
|
||||
finally:
|
||||
cls._instance._initialized = False
|
||||
cls._instance._client = None
|
||||
logger.info("Database disconnected successfully")
|
||||
|
||||
@property
|
||||
|
|
Loading…
Reference in New Issue