from fastapi import FastAPI, Request, HTTPException, Response, Depends, APIRouter from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from services import redis import sentry from contextlib import asynccontextmanager from agentpress.thread_manager import ThreadManager from services.supabase import DBConnection from datetime import datetime, timezone from dotenv import load_dotenv from utils.config import config, EnvMode import asyncio from utils.logger import logger, structlog import time from collections import OrderedDict from typing import Dict, Any from pydantic import BaseModel import uuid # Import the agent API module from agent import api as agent_api from sandbox import api as sandbox_api from services import billing as billing_api from flags import api as feature_flags_api from services import transcription as transcription_api import sys from services import email_api from triggers import api as triggers_api load_dotenv() if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) # Initialize managers db = DBConnection() instance_id = "single" # Rate limiter state ip_tracker = OrderedDict() MAX_CONCURRENT_IPS = 25 @asynccontextmanager async def lifespan(app: FastAPI): logger.info(f"Starting up FastAPI application with instance ID: {instance_id} in {config.ENV_MODE.value} mode") try: await db.initialize() agent_api.initialize( db, instance_id ) sandbox_api.initialize(db) # Initialize Redis connection from services import redis try: await redis.initialize_async() logger.info("Redis connection initialized successfully") except Exception as e: logger.error(f"Failed to initialize Redis connection: {e}") # Continue without Redis - the application will handle Redis failures gracefully # Start background tasks # asyncio.create_task(agent_api.restore_running_agent_runs()) # Initialize triggers API triggers_api.initialize(db) unified_oauth_api.initialize(db) yield # Clean up agent resources logger.info("Cleaning up agent resources") await agent_api.cleanup() # Clean up Redis connection try: logger.info("Closing Redis connection") await redis.close() logger.info("Redis connection closed successfully") except Exception as e: logger.error(f"Error closing Redis connection: {e}") # Clean up database connection logger.info("Disconnecting from database") await db.disconnect() except Exception as e: logger.error(f"Error during application startup: {e}") raise app = FastAPI(lifespan=lifespan) @app.middleware("http") async def log_requests_middleware(request: Request, call_next): structlog.contextvars.clear_contextvars() request_id = str(uuid.uuid4()) start_time = time.time() client_ip = request.client.host if request.client else "unknown" method = request.method path = request.url.path query_params = str(request.query_params) structlog.contextvars.bind_contextvars( request_id=request_id, client_ip=client_ip, method=method, path=path, query_params=query_params ) # Log the incoming request logger.info(f"Request started: {method} {path} from {client_ip} | Query: {query_params}") try: response = await call_next(request) process_time = time.time() - start_time logger.debug(f"Request completed: {method} {path} | Status: {response.status_code} | Time: {process_time:.2f}s") return response except Exception as e: process_time = time.time() - start_time logger.error(f"Request failed: {method} {path} | Error: {str(e)} | Time: {process_time:.2f}s") raise # Define allowed origins based on environment allowed_origins = ["https://www.suna.so", "https://suna.so"] allow_origin_regex = None # Add staging-specific origins if config.ENV_MODE == EnvMode.LOCAL: allowed_origins.append("http://localhost:3000") # Add staging-specific origins if config.ENV_MODE == EnvMode.STAGING: allowed_origins.append("https://staging.suna.so") allowed_origins.append("http://localhost:3000") allow_origin_regex = r"https://suna-.*-prjcts\.vercel\.app" app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, allow_origin_regex=allow_origin_regex, allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["Content-Type", "Authorization", "X-Project-Id"], ) # Create a main API router api_router = APIRouter() # Include all API routers without individual prefixes api_router.include_router(agent_api.router) api_router.include_router(sandbox_api.router) api_router.include_router(billing_api.router) api_router.include_router(feature_flags_api.router) from mcp_service import api as mcp_api from mcp_service import secure_api as secure_mcp_api from mcp_service import template_api as template_api api_router.include_router(mcp_api.router) api_router.include_router(secure_mcp_api.router, prefix="/secure-mcp") api_router.include_router(template_api.router, prefix="/templates") api_router.include_router(transcription_api.router) api_router.include_router(email_api.router) from knowledge_base import api as knowledge_base_api api_router.include_router(knowledge_base_api.router) from triggers import api as triggers_api from triggers import unified_oauth_api api_router.include_router(triggers_api.router) api_router.include_router(unified_oauth_api.router) # Add health check to API router @api_router.get("/health") async def health_check(): """Health check endpoint to check if API server is up.""" logger.info("Health check endpoint called") return { "status": "ok", "timestamp": datetime.now(timezone.utc).isoformat(), "instance_id": instance_id } # Add health check to API router @api_router.get("/health-docker") async def health_check(): """Health check endpoint to verify API is working.""" logger.info("Health docker check endpoint called") try: client = await redis.get_client() await client.ping() db = DBConnection() await db.initialize() db_client = await db.client await db_client.table("threads").select("thread_id").limit(1).execute() logger.info("Health docker check complete") return { "status": "ok", "timestamp": datetime.now(timezone.utc).isoformat(), "instance_id": instance_id } except Exception as e: logger.error(f"Failed health docker check: {e}") raise HTTPException(status_code=500, detail="Health check failed") # Include the main API router with /api prefix app.include_router(api_router, prefix="/api") if __name__ == "__main__": import uvicorn if sys.platform == "win32": asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) workers = 4 logger.info(f"Starting server on 0.0.0.0:8000 with {workers} workers") uvicorn.run( "api:app", host="0.0.0.0", port=8000, workers=workers, loop="asyncio" )