diff --git a/backend/Dockerfile b/backend/Dockerfile index 28e0bcfe..433a4823 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -54,7 +54,7 @@ CMD ["sh", "-c", "gunicorn api:app \ --worker-connections $WORKER_CONNECTIONS \ --worker-tmp-dir /dev/shm \ --preload \ - --log-level info \ + --log-level critical \ --access-logfile - \ --error-logfile - \ --capture-output \ diff --git a/backend/agent/api.py b/backend/agent/api.py index 3be04065..66c8c900 100644 --- a/backend/agent/api.py +++ b/backend/agent/api.py @@ -15,7 +15,7 @@ from agentpress.thread_manager import ThreadManager from services.supabase import DBConnection from services import redis from utils.auth_utils import get_current_user_id_from_jwt, get_user_id_from_stream_auth, verify_thread_access -from utils.logger import logger +from utils.logger import logger, structlog from services.billing import check_billing_status, can_use_model from utils.config import config from sandbox.sandbox import create_sandbox, delete_sandbox, get_or_start_sandbox @@ -348,6 +348,9 @@ async def start_agent( user_id: str = Depends(get_current_user_id_from_jwt) ): """Start an agent for a specific thread in the background.""" + structlog.contextvars.bind_contextvars( + thread_id=thread_id, + ) global instance_id # Ensure instance_id is accessible if not instance_id: raise HTTPException(status_code=500, detail="Agent API not initialized with instance ID") @@ -379,6 +382,13 @@ async def start_agent( account_id = thread_data.get('account_id') thread_agent_id = thread_data.get('agent_id') thread_metadata = thread_data.get('metadata', {}) + + structlog.contextvars.bind_contextvars( + project_id=project_id, + account_id=account_id, + thread_agent_id=thread_agent_id, + thread_metadata=thread_metadata, + ) # Check if this is an agent builder thread is_agent_builder = thread_metadata.get('is_agent_builder', False) @@ -455,6 +465,9 @@ async def start_agent( "started_at": datetime.now(timezone.utc).isoformat() }).execute() agent_run_id = agent_run.data[0]['id'] + structlog.contextvars.bind_contextvars( + agent_run_id=agent_run_id, + ) logger.info(f"Created new agent run: {agent_run_id}") # Register this run in Redis with TTL using instance ID @@ -464,6 +477,8 @@ async def start_agent( except Exception as e: logger.warning(f"Failed to register agent run in Redis ({instance_key}): {str(e)}") + request_id = structlog.contextvars.get_contextvars().get('request_id') + # Run the agent in the background run_agent_background.send( agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id, @@ -473,7 +488,8 @@ async def start_agent( stream=body.stream, enable_context_manager=body.enable_context_manager, agent_config=agent_config, # Pass agent configuration is_agent_builder=is_agent_builder, - target_agent_id=target_agent_id + target_agent_id=target_agent_id, + request_id=request_id, ) return {"agent_run_id": agent_run_id, "status": "running"} @@ -481,6 +497,9 @@ async def start_agent( @router.post("/agent-run/{agent_run_id}/stop") async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_id_from_jwt)): """Stop a running agent.""" + structlog.contextvars.bind_contextvars( + agent_run_id=agent_run_id, + ) logger.info(f"Received request to stop agent run: {agent_run_id}") client = await db.client await get_agent_run_with_access_check(client, agent_run_id, user_id) @@ -490,6 +509,9 @@ async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_ @router.get("/thread/{thread_id}/agent-runs") async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user_id_from_jwt)): """Get all agent runs for a thread.""" + structlog.contextvars.bind_contextvars( + thread_id=thread_id, + ) logger.info(f"Fetching agent runs for thread: {thread_id}") client = await db.client await verify_thread_access(client, thread_id, user_id) @@ -500,6 +522,9 @@ async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user @router.get("/agent-run/{agent_run_id}") async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_user_id_from_jwt)): """Get agent run status and responses.""" + structlog.contextvars.bind_contextvars( + agent_run_id=agent_run_id, + ) logger.info(f"Fetching agent run details: {agent_run_id}") client = await db.client agent_run_data = await get_agent_run_with_access_check(client, agent_run_id, user_id) @@ -516,6 +541,9 @@ async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_us @router.get("/thread/{thread_id}/agent", response_model=ThreadAgentResponse) async def get_thread_agent(thread_id: str, user_id: str = Depends(get_current_user_id_from_jwt)): """Get the agent details for a specific thread.""" + structlog.contextvars.bind_contextvars( + thread_id=thread_id, + ) logger.info(f"Fetching agent details for thread: {thread_id}") client = await db.client @@ -605,6 +633,11 @@ async def stream_agent_run( user_id = await get_user_id_from_stream_auth(request, token) agent_run_data = await get_agent_run_with_access_check(client, agent_run_id, user_id) + structlog.contextvars.bind_contextvars( + agent_run_id=agent_run_id, + user_id=user_id, + ) + response_list_key = f"agent_run:{agent_run_id}:responses" response_channel = f"agent_run:{agent_run_id}:new_response" control_channel = f"agent_run:{agent_run_id}:control" # Global control channel @@ -631,13 +664,17 @@ async def stream_agent_run( initial_yield_complete = True # 2. Check run status *after* yielding initial data - run_status = await client.table('agent_runs').select('status').eq("id", agent_run_id).maybe_single().execute() + run_status = await client.table('agent_runs').select('status', 'thread_id').eq("id", agent_run_id).maybe_single().execute() current_status = run_status.data.get('status') if run_status.data else None if current_status != 'running': logger.info(f"Agent run {agent_run_id} is not running (status: {current_status}). Ending stream.") yield f"data: {json.dumps({'type': 'status', 'status': 'completed'})}\n\n" return + + structlog.contextvars.bind_contextvars( + thread_id=run_status.data.get('thread_id'), + ) # 3. Set up Pub/Sub listeners for new responses and control signals pubsub_response = await redis.create_pubsub() @@ -939,11 +976,20 @@ async def initiate_agent_with_files( "account_id": account_id, "created_at": datetime.now(timezone.utc).isoformat() } + + structlog.contextvars.bind_contextvars( + thread_id=thread_data["thread_id"], + project_id=project_id, + account_id=account_id, + ) # Store the agent_id in the thread if we have one if agent_config: thread_data["agent_id"] = agent_config['agent_id'] logger.info(f"Storing agent_id {agent_config['agent_id']} in thread") + structlog.contextvars.bind_contextvars( + agent_id=agent_config['agent_id'], + ) # Store agent builder metadata if this is an agent builder session if is_agent_builder: @@ -952,6 +998,9 @@ async def initiate_agent_with_files( "target_agent_id": target_agent_id } logger.info(f"Storing agent builder metadata in thread: target_agent_id={target_agent_id}") + structlog.contextvars.bind_contextvars( + target_agent_id=target_agent_id, + ) thread = await client.table('threads').insert(thread_data).execute() thread_id = thread.data[0]['thread_id'] @@ -1033,6 +1082,9 @@ async def initiate_agent_with_files( }).execute() agent_run_id = agent_run.data[0]['id'] logger.info(f"Created new agent run: {agent_run_id}") + structlog.contextvars.bind_contextvars( + agent_run_id=agent_run_id, + ) # Register run in Redis instance_key = f"active_run:{instance_id}:{agent_run_id}" @@ -1041,6 +1093,8 @@ async def initiate_agent_with_files( except Exception as e: logger.warning(f"Failed to register agent run in Redis ({instance_key}): {str(e)}") + request_id = structlog.contextvars.get_contextvars().get('request_id') + # Run agent in background run_agent_background.send( agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id, @@ -1050,7 +1104,8 @@ async def initiate_agent_with_files( stream=stream, enable_context_manager=enable_context_manager, agent_config=agent_config, # Pass agent configuration is_agent_builder=is_agent_builder, - target_agent_id=target_agent_id + target_agent_id=target_agent_id, + request_id=request_id, ) return {"thread_id": thread_id, "agent_run_id": agent_run_id} diff --git a/backend/api.py b/backend/api.py index c8d1b3a1..9d74fbb2 100644 --- a/backend/api.py +++ b/backend/api.py @@ -1,7 +1,7 @@ from fastapi import FastAPI, Request, HTTPException, Response, Depends from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse -import sentry +import sentry # Keep this import here, right after fastapi imports from contextlib import asynccontextmanager from agentpress.thread_manager import ThreadManager from services.supabase import DBConnection @@ -9,12 +9,13 @@ from datetime import datetime, timezone from dotenv import load_dotenv from utils.config import config, EnvMode import asyncio -from utils.logger import logger +from utils.logger import logger, structlog import time from collections import OrderedDict from typing import Dict, Any from pydantic import BaseModel +import uuid # Import the agent API module from agent import api as agent_api from sandbox import api as sandbox_api @@ -89,13 +90,23 @@ app = FastAPI(lifespan=lifespan) @app.middleware("http") async def log_requests_middleware(request: Request, call_next): + structlog.contextvars.clear_contextvars() + + request_id = str(uuid.uuid4()) start_time = time.time() client_ip = request.client.host method = request.method - url = str(request.url) path = request.url.path query_params = str(request.query_params) - + + structlog.contextvars.bind_contextvars( + request_id=request_id, + client_ip=client_ip, + method=method, + path=path, + query_params=query_params + ) + # Log the incoming request logger.info(f"Request started: {method} {path} from {client_ip} | Query: {query_params}") diff --git a/backend/docker-compose.prod.yml b/backend/docker-compose.prod.yml index be1ad790..f04d3182 100644 --- a/backend/docker-compose.prod.yml +++ b/backend/docker-compose.prod.yml @@ -10,7 +10,7 @@ services: memory: 32G worker: - command: python -m dramatiq --processes 40 --threads 8 run_agent_background + command: python -m dramatiq --skip-logging --processes 40 --threads 8 run_agent_background deploy: resources: limits: diff --git a/backend/docker-compose.yml b/backend/docker-compose.yml index f9431106..a20f2b10 100644 --- a/backend/docker-compose.yml +++ b/backend/docker-compose.yml @@ -42,7 +42,7 @@ services: build: context: . dockerfile: Dockerfile - command: python -m dramatiq --processes 4 --threads 4 run_agent_background + command: python -m dramatiq --skip-logging --processes 4 --threads 4 run_agent_background env_file: - .env volumes: diff --git a/backend/poetry.lock b/backend/poetry.lock index b7e35b61..f33c143c 100644 --- a/backend/poetry.lock +++ b/backend/poetry.lock @@ -3209,6 +3209,18 @@ files = [ requests = {version = ">=2.20", markers = "python_version >= \"3.0\""} typing_extensions = {version = ">=4.5.0", markers = "python_version >= \"3.7\""} +[[package]] +name = "structlog" +version = "25.4.0" +description = "Structured Logging for Python" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "structlog-25.4.0-py3-none-any.whl", hash = "sha256:fe809ff5c27e557d14e613f45ca441aabda051d119ee5a0102aaba6ce40eed2c"}, + {file = "structlog-25.4.0.tar.gz", hash = "sha256:186cd1b0a8ae762e29417095664adf1d6a31702160a46dacb7796ea82f7409e4"}, +] + [[package]] name = "supabase" version = "2.15.0" @@ -3904,4 +3916,4 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "99368309d41431f2104a182b680e9361409121e7a6df3ead3f14173dbaf9d066" +content-hash = "22e38245da1c442147f6b870361b56f898ec3f3b87cbe09c3f82e9c96c621ae7" diff --git a/backend/pyproject.toml b/backend/pyproject.toml index f9ae6b02..884a1771 100644 --- a/backend/pyproject.toml +++ b/backend/pyproject.toml @@ -60,6 +60,7 @@ httpx = "^0.28.0" aiohttp = "^3.9.0" email-validator = "^2.0.0" mailtrap = "^2.0.1" +structlog = "^25.4.0" [tool.poetry.scripts] agentpress = "agentpress.cli:main" diff --git a/backend/requirements.txt b/backend/requirements.txt index 58ebed1f..da78d4cb 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -43,3 +43,4 @@ mcp_use>=1.0.0 aiohttp>=3.9.0 email-validator>=2.0.0 mailtrap>=2.0.1 +structlog==25.4.0 diff --git a/backend/run_agent_background.py b/backend/run_agent_background.py index 547eafcb..ccacbe2d 100644 --- a/backend/run_agent_background.py +++ b/backend/run_agent_background.py @@ -6,7 +6,7 @@ from datetime import datetime, timezone from typing import Optional from services import redis from agent.run import run_agent -from utils.logger import logger +from utils.logger import logger, structlog import dramatiq import uuid from agentpress.thread_manager import ThreadManager @@ -54,9 +54,18 @@ async def run_agent_background( enable_context_manager: bool, agent_config: Optional[dict] = None, is_agent_builder: Optional[bool] = False, - target_agent_id: Optional[str] = None + target_agent_id: Optional[str] = None, + request_id: Optional[str] = None, ): """Run the agent in the background using Redis for state.""" + structlog.contextvars.clear_contextvars() + + structlog.contextvars.bind_contextvars( + agent_run_id=agent_run_id, + thread_id=thread_id, + request_id=request_id, + ) + try: await initialize() except Exception as e: @@ -85,6 +94,16 @@ async def run_agent_background( sentry.sentry.set_tag("thread_id", thread_id) logger.info(f"Starting background agent run: {agent_run_id} for thread: {thread_id} (Instance: {instance_id})") + logger.info({ + "model_name": model_name, + "enable_thinking": enable_thinking, + "reasoning_effort": reasoning_effort, + "stream": stream, + "enable_context_manager": enable_context_manager, + "agent_config": agent_config, + "is_agent_builder": is_agent_builder, + "target_agent_id": target_agent_id, + }) logger.info(f"🚀 Using model: {model_name} (thinking: {enable_thinking}, reasoning_effort: {reasoning_effort})") if agent_config: logger.info(f"Using custom agent: {agent_config.get('name', 'Unknown')}") diff --git a/backend/utils/auth_utils.py b/backend/utils/auth_utils.py index 83229b04..62de8205 100644 --- a/backend/utils/auth_utils.py +++ b/backend/utils/auth_utils.py @@ -3,6 +3,7 @@ from fastapi import HTTPException, Request from typing import Optional import jwt from jwt.exceptions import PyJWTError +from utils.logger import structlog # This function extracts the user ID from Supabase JWT async def get_current_user_id_from_jwt(request: Request) -> str: @@ -48,6 +49,9 @@ async def get_current_user_id_from_jwt(request: Request) -> str: ) sentry.sentry.set_user({ "id": user_id }) + structlog.contextvars.bind_contextvars( + user_id=user_id + ) return user_id except PyJWTError: @@ -121,8 +125,11 @@ async def get_user_id_from_stream_auth( # For Supabase JWT, we just need to decode and extract the user ID payload = jwt.decode(token, options={"verify_signature": False}) user_id = payload.get('sub') - sentry.sentry.set_user({ "id": user_id }) if user_id: + sentry.sentry.set_user({ "id": user_id }) + structlog.contextvars.bind_contextvars( + user_id=user_id + ) return user_id except Exception: pass @@ -213,6 +220,11 @@ async def get_optional_user_id(request: Request) -> Optional[str]: # Supabase stores the user ID in the 'sub' claim user_id = payload.get('sub') + if user_id: + sentry.sentry.set_user({ "id": user_id }) + structlog.contextvars.bind_contextvars( + user_id=user_id + ) return user_id except PyJWTError: diff --git a/backend/utils/logger.py b/backend/utils/logger.py index 32fae2fd..6b36fa41 100644 --- a/backend/utils/logger.py +++ b/backend/utils/logger.py @@ -1,131 +1,28 @@ -""" -Centralized logging configuration for AgentPress. +import structlog, logging, os -This module provides a unified logging interface with: -- Structured JSON logging for better parsing -- Log levels for different environments -- Correlation IDs for request tracing -- Contextual information for debugging -""" +ENV_MODE = os.getenv("ENV_MODE", "LOCAL") -import logging -import json -import sys -import os -from datetime import datetime, timezone -from contextvars import ContextVar -from functools import wraps -import traceback -from logging.handlers import RotatingFileHandler +renderer = [structlog.processors.JSONRenderer()] +if ENV_MODE.lower() == "local".lower(): + renderer = [structlog.dev.ConsoleRenderer()] -from utils.config import config, EnvMode - -# Context variable for request correlation ID -request_id: ContextVar[str] = ContextVar('request_id', default='') - -class JSONFormatter(logging.Formatter): - """Custom JSON formatter for structured logging.""" - - def format(self, record: logging.LogRecord) -> str: - """Format log record as JSON with contextual information.""" - log_data = { - 'timestamp': datetime.now(timezone.utc).replace(tzinfo=None).isoformat(), - 'level': record.levelname, - 'message': record.getMessage(), - 'module': record.module, - 'function': record.funcName, - 'line': record.lineno, - 'request_id': request_id.get(), - 'thread_id': getattr(record, 'thread_id', None), - 'correlation_id': getattr(record, 'correlation_id', None) - } - - # Add extra fields if present - if hasattr(record, 'extra'): - log_data.update(record.extra) - - # Add exception info if present - if record.exc_info: - log_data['exception'] = { - 'type': str(record.exc_info[0].__name__), - 'message': str(record.exc_info[1]), - 'traceback': traceback.format_exception(*record.exc_info) +structlog.configure( + processors=[ + structlog.stdlib.add_log_level, + structlog.stdlib.PositionalArgumentsFormatter(), + structlog.processors.dict_tracebacks, + structlog.processors.CallsiteParameterAdder( + { + structlog.processors.CallsiteParameter.FILENAME, + structlog.processors.CallsiteParameter.FUNC_NAME, + structlog.processors.CallsiteParameter.LINENO, } - - return json.dumps(log_data) + ), + structlog.processors.TimeStamper(fmt="iso"), + structlog.contextvars.merge_contextvars, + *renderer, + ], + cache_logger_on_first_use=True, +) -def setup_logger(name: str = 'agentpress') -> logging.Logger: - """ - Set up a centralized logger with both file and console handlers. - - Args: - name: The name of the logger - - Returns: - logging.Logger: Configured logger instance - """ - logger = logging.getLogger(name) - logger.setLevel(logging.DEBUG) - - # Create logs directory if it doesn't exist - log_dir = os.path.join(os.getcwd(), 'logs') - try: - if not os.path.exists(log_dir): - os.makedirs(log_dir) - print(f"Created log directory at: {log_dir}") - except Exception as e: - print(f"Error creating log directory: {e}") - return logger - - # File handler with rotation - try: - log_file = os.path.join(log_dir, f'{name}_{datetime.now().strftime("%Y%m%d")}.log') - file_handler = RotatingFileHandler( - log_file, - maxBytes=10*1024*1024, # 10MB - backupCount=5, - encoding='utf-8' - ) - file_handler.setLevel(logging.DEBUG) - - # Create formatters - file_formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' - ) - file_handler.setFormatter(file_formatter) - - # Add file handler to logger - logger.addHandler(file_handler) - print(f"Added file handler for: {log_file}") - except Exception as e: - print(f"Error setting up file handler: {e}") - - # Console handler - WARNING in production, DEBUG in other environments - try: - console_handler = logging.StreamHandler(sys.stdout) - if config.ENV_MODE == EnvMode.PRODUCTION: - console_handler.setLevel(logging.WARNING) - else: - console_handler.setLevel(logging.DEBUG) - - console_formatter = logging.Formatter( - '%(asctime)s - %(levelname)s - %(name)s - %(message)s' - ) - console_handler.setFormatter(console_formatter) - - # Add console handler to logger - logger.addHandler(console_handler) - logger.info(f"Added console handler with level: {console_handler.level}") - logger.info(f"Log file will be created at: {log_dir}") - except Exception as e: - print(f"Error setting up console handler: {e}") - - # # Test logging - # logger.debug("Logger setup complete - DEBUG test") - # logger.info("Logger setup complete - INFO test") - # logger.warning("Logger setup complete - WARNING test") - - return logger - -# Create default logger instance -logger = setup_logger() \ No newline at end of file +logger: structlog.stdlib.BoundLogger = structlog.get_logger(level=logging.DEBUG) diff --git a/docker-compose.yaml b/docker-compose.yaml index 8078f586..3fffa4e0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -57,7 +57,7 @@ services: build: context: ./backend dockerfile: Dockerfile - command: python -m dramatiq run_agent_background + command: python -m dramatiq --skip-logging run_agent_background volumes: - ./backend/.env:/app/.env:ro env_file: