chore(logging): integrate structlog for structured logging and update dependencies

This commit is contained in:
sharath 2025-06-18 19:20:15 +00:00
parent 6be7556275
commit aed3c38fc9
No known key found for this signature in database
12 changed files with 150 additions and 142 deletions

View File

@ -54,7 +54,7 @@ CMD ["sh", "-c", "gunicorn api:app \
--worker-connections $WORKER_CONNECTIONS \
--worker-tmp-dir /dev/shm \
--preload \
--log-level info \
--log-level critical \
--access-logfile - \
--error-logfile - \
--capture-output \

View File

@ -15,7 +15,7 @@ from agentpress.thread_manager import ThreadManager
from services.supabase import DBConnection
from services import redis
from utils.auth_utils import get_current_user_id_from_jwt, get_user_id_from_stream_auth, verify_thread_access
from utils.logger import logger
from utils.logger import logger, structlog
from services.billing import check_billing_status, can_use_model
from utils.config import config
from sandbox.sandbox import create_sandbox, delete_sandbox, get_or_start_sandbox
@ -348,6 +348,9 @@ async def start_agent(
user_id: str = Depends(get_current_user_id_from_jwt)
):
"""Start an agent for a specific thread in the background."""
structlog.contextvars.bind_contextvars(
thread_id=thread_id,
)
global instance_id # Ensure instance_id is accessible
if not instance_id:
raise HTTPException(status_code=500, detail="Agent API not initialized with instance ID")
@ -379,6 +382,13 @@ async def start_agent(
account_id = thread_data.get('account_id')
thread_agent_id = thread_data.get('agent_id')
thread_metadata = thread_data.get('metadata', {})
structlog.contextvars.bind_contextvars(
project_id=project_id,
account_id=account_id,
thread_agent_id=thread_agent_id,
thread_metadata=thread_metadata,
)
# Check if this is an agent builder thread
is_agent_builder = thread_metadata.get('is_agent_builder', False)
@ -455,6 +465,9 @@ async def start_agent(
"started_at": datetime.now(timezone.utc).isoformat()
}).execute()
agent_run_id = agent_run.data[0]['id']
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
)
logger.info(f"Created new agent run: {agent_run_id}")
# Register this run in Redis with TTL using instance ID
@ -464,6 +477,8 @@ async def start_agent(
except Exception as e:
logger.warning(f"Failed to register agent run in Redis ({instance_key}): {str(e)}")
request_id = structlog.contextvars.get_contextvars().get('request_id')
# Run the agent in the background
run_agent_background.send(
agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id,
@ -473,7 +488,8 @@ async def start_agent(
stream=body.stream, enable_context_manager=body.enable_context_manager,
agent_config=agent_config, # Pass agent configuration
is_agent_builder=is_agent_builder,
target_agent_id=target_agent_id
target_agent_id=target_agent_id,
request_id=request_id,
)
return {"agent_run_id": agent_run_id, "status": "running"}
@ -481,6 +497,9 @@ async def start_agent(
@router.post("/agent-run/{agent_run_id}/stop")
async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
"""Stop a running agent."""
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
)
logger.info(f"Received request to stop agent run: {agent_run_id}")
client = await db.client
await get_agent_run_with_access_check(client, agent_run_id, user_id)
@ -490,6 +509,9 @@ async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_
@router.get("/thread/{thread_id}/agent-runs")
async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
"""Get all agent runs for a thread."""
structlog.contextvars.bind_contextvars(
thread_id=thread_id,
)
logger.info(f"Fetching agent runs for thread: {thread_id}")
client = await db.client
await verify_thread_access(client, thread_id, user_id)
@ -500,6 +522,9 @@ async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user
@router.get("/agent-run/{agent_run_id}")
async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
"""Get agent run status and responses."""
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
)
logger.info(f"Fetching agent run details: {agent_run_id}")
client = await db.client
agent_run_data = await get_agent_run_with_access_check(client, agent_run_id, user_id)
@ -516,6 +541,9 @@ async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_us
@router.get("/thread/{thread_id}/agent", response_model=ThreadAgentResponse)
async def get_thread_agent(thread_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
"""Get the agent details for a specific thread."""
structlog.contextvars.bind_contextvars(
thread_id=thread_id,
)
logger.info(f"Fetching agent details for thread: {thread_id}")
client = await db.client
@ -605,6 +633,11 @@ async def stream_agent_run(
user_id = await get_user_id_from_stream_auth(request, token)
agent_run_data = await get_agent_run_with_access_check(client, agent_run_id, user_id)
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
user_id=user_id,
)
response_list_key = f"agent_run:{agent_run_id}:responses"
response_channel = f"agent_run:{agent_run_id}:new_response"
control_channel = f"agent_run:{agent_run_id}:control" # Global control channel
@ -631,13 +664,17 @@ async def stream_agent_run(
initial_yield_complete = True
# 2. Check run status *after* yielding initial data
run_status = await client.table('agent_runs').select('status').eq("id", agent_run_id).maybe_single().execute()
run_status = await client.table('agent_runs').select('status', 'thread_id').eq("id", agent_run_id).maybe_single().execute()
current_status = run_status.data.get('status') if run_status.data else None
if current_status != 'running':
logger.info(f"Agent run {agent_run_id} is not running (status: {current_status}). Ending stream.")
yield f"data: {json.dumps({'type': 'status', 'status': 'completed'})}\n\n"
return
structlog.contextvars.bind_contextvars(
thread_id=run_status.data.get('thread_id'),
)
# 3. Set up Pub/Sub listeners for new responses and control signals
pubsub_response = await redis.create_pubsub()
@ -939,11 +976,20 @@ async def initiate_agent_with_files(
"account_id": account_id,
"created_at": datetime.now(timezone.utc).isoformat()
}
structlog.contextvars.bind_contextvars(
thread_id=thread_data["thread_id"],
project_id=project_id,
account_id=account_id,
)
# Store the agent_id in the thread if we have one
if agent_config:
thread_data["agent_id"] = agent_config['agent_id']
logger.info(f"Storing agent_id {agent_config['agent_id']} in thread")
structlog.contextvars.bind_contextvars(
agent_id=agent_config['agent_id'],
)
# Store agent builder metadata if this is an agent builder session
if is_agent_builder:
@ -952,6 +998,9 @@ async def initiate_agent_with_files(
"target_agent_id": target_agent_id
}
logger.info(f"Storing agent builder metadata in thread: target_agent_id={target_agent_id}")
structlog.contextvars.bind_contextvars(
target_agent_id=target_agent_id,
)
thread = await client.table('threads').insert(thread_data).execute()
thread_id = thread.data[0]['thread_id']
@ -1033,6 +1082,9 @@ async def initiate_agent_with_files(
}).execute()
agent_run_id = agent_run.data[0]['id']
logger.info(f"Created new agent run: {agent_run_id}")
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
)
# Register run in Redis
instance_key = f"active_run:{instance_id}:{agent_run_id}"
@ -1041,6 +1093,8 @@ async def initiate_agent_with_files(
except Exception as e:
logger.warning(f"Failed to register agent run in Redis ({instance_key}): {str(e)}")
request_id = structlog.contextvars.get_contextvars().get('request_id')
# Run agent in background
run_agent_background.send(
agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id,
@ -1050,7 +1104,8 @@ async def initiate_agent_with_files(
stream=stream, enable_context_manager=enable_context_manager,
agent_config=agent_config, # Pass agent configuration
is_agent_builder=is_agent_builder,
target_agent_id=target_agent_id
target_agent_id=target_agent_id,
request_id=request_id,
)
return {"thread_id": thread_id, "agent_run_id": agent_run_id}

View File

@ -1,7 +1,7 @@
from fastapi import FastAPI, Request, HTTPException, Response, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
import sentry
import sentry # Keep this import here, right after fastapi imports
from contextlib import asynccontextmanager
from agentpress.thread_manager import ThreadManager
from services.supabase import DBConnection
@ -9,12 +9,13 @@ from datetime import datetime, timezone
from dotenv import load_dotenv
from utils.config import config, EnvMode
import asyncio
from utils.logger import logger
from utils.logger import logger, structlog
import time
from collections import OrderedDict
from typing import Dict, Any
from pydantic import BaseModel
import uuid
# Import the agent API module
from agent import api as agent_api
from sandbox import api as sandbox_api
@ -89,13 +90,23 @@ app = FastAPI(lifespan=lifespan)
@app.middleware("http")
async def log_requests_middleware(request: Request, call_next):
structlog.contextvars.clear_contextvars()
request_id = str(uuid.uuid4())
start_time = time.time()
client_ip = request.client.host
method = request.method
url = str(request.url)
path = request.url.path
query_params = str(request.query_params)
structlog.contextvars.bind_contextvars(
request_id=request_id,
client_ip=client_ip,
method=method,
path=path,
query_params=query_params
)
# Log the incoming request
logger.info(f"Request started: {method} {path} from {client_ip} | Query: {query_params}")

View File

@ -10,7 +10,7 @@ services:
memory: 32G
worker:
command: python -m dramatiq --processes 40 --threads 8 run_agent_background
command: python -m dramatiq --skip-logging --processes 40 --threads 8 run_agent_background
deploy:
resources:
limits:

View File

@ -42,7 +42,7 @@ services:
build:
context: .
dockerfile: Dockerfile
command: python -m dramatiq --processes 4 --threads 4 run_agent_background
command: python -m dramatiq --skip-logging --processes 4 --threads 4 run_agent_background
env_file:
- .env
volumes:

14
backend/poetry.lock generated
View File

@ -3209,6 +3209,18 @@ files = [
requests = {version = ">=2.20", markers = "python_version >= \"3.0\""}
typing_extensions = {version = ">=4.5.0", markers = "python_version >= \"3.7\""}
[[package]]
name = "structlog"
version = "25.4.0"
description = "Structured Logging for Python"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "structlog-25.4.0-py3-none-any.whl", hash = "sha256:fe809ff5c27e557d14e613f45ca441aabda051d119ee5a0102aaba6ce40eed2c"},
{file = "structlog-25.4.0.tar.gz", hash = "sha256:186cd1b0a8ae762e29417095664adf1d6a31702160a46dacb7796ea82f7409e4"},
]
[[package]]
name = "supabase"
version = "2.15.0"
@ -3904,4 +3916,4 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "99368309d41431f2104a182b680e9361409121e7a6df3ead3f14173dbaf9d066"
content-hash = "22e38245da1c442147f6b870361b56f898ec3f3b87cbe09c3f82e9c96c621ae7"

View File

@ -60,6 +60,7 @@ httpx = "^0.28.0"
aiohttp = "^3.9.0"
email-validator = "^2.0.0"
mailtrap = "^2.0.1"
structlog = "^25.4.0"
[tool.poetry.scripts]
agentpress = "agentpress.cli:main"

View File

@ -43,3 +43,4 @@ mcp_use>=1.0.0
aiohttp>=3.9.0
email-validator>=2.0.0
mailtrap>=2.0.1
structlog==25.4.0

View File

@ -6,7 +6,7 @@ from datetime import datetime, timezone
from typing import Optional
from services import redis
from agent.run import run_agent
from utils.logger import logger
from utils.logger import logger, structlog
import dramatiq
import uuid
from agentpress.thread_manager import ThreadManager
@ -54,9 +54,18 @@ async def run_agent_background(
enable_context_manager: bool,
agent_config: Optional[dict] = None,
is_agent_builder: Optional[bool] = False,
target_agent_id: Optional[str] = None
target_agent_id: Optional[str] = None,
request_id: Optional[str] = None,
):
"""Run the agent in the background using Redis for state."""
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
thread_id=thread_id,
request_id=request_id,
)
try:
await initialize()
except Exception as e:
@ -85,6 +94,16 @@ async def run_agent_background(
sentry.sentry.set_tag("thread_id", thread_id)
logger.info(f"Starting background agent run: {agent_run_id} for thread: {thread_id} (Instance: {instance_id})")
logger.info({
"model_name": model_name,
"enable_thinking": enable_thinking,
"reasoning_effort": reasoning_effort,
"stream": stream,
"enable_context_manager": enable_context_manager,
"agent_config": agent_config,
"is_agent_builder": is_agent_builder,
"target_agent_id": target_agent_id,
})
logger.info(f"🚀 Using model: {model_name} (thinking: {enable_thinking}, reasoning_effort: {reasoning_effort})")
if agent_config:
logger.info(f"Using custom agent: {agent_config.get('name', 'Unknown')}")

View File

@ -3,6 +3,7 @@ from fastapi import HTTPException, Request
from typing import Optional
import jwt
from jwt.exceptions import PyJWTError
from utils.logger import structlog
# This function extracts the user ID from Supabase JWT
async def get_current_user_id_from_jwt(request: Request) -> str:
@ -48,6 +49,9 @@ async def get_current_user_id_from_jwt(request: Request) -> str:
)
sentry.sentry.set_user({ "id": user_id })
structlog.contextvars.bind_contextvars(
user_id=user_id
)
return user_id
except PyJWTError:
@ -121,8 +125,11 @@ async def get_user_id_from_stream_auth(
# For Supabase JWT, we just need to decode and extract the user ID
payload = jwt.decode(token, options={"verify_signature": False})
user_id = payload.get('sub')
sentry.sentry.set_user({ "id": user_id })
if user_id:
sentry.sentry.set_user({ "id": user_id })
structlog.contextvars.bind_contextvars(
user_id=user_id
)
return user_id
except Exception:
pass
@ -213,6 +220,11 @@ async def get_optional_user_id(request: Request) -> Optional[str]:
# Supabase stores the user ID in the 'sub' claim
user_id = payload.get('sub')
if user_id:
sentry.sentry.set_user({ "id": user_id })
structlog.contextvars.bind_contextvars(
user_id=user_id
)
return user_id
except PyJWTError:

View File

@ -1,131 +1,28 @@
"""
Centralized logging configuration for AgentPress.
import structlog, logging, os
This module provides a unified logging interface with:
- Structured JSON logging for better parsing
- Log levels for different environments
- Correlation IDs for request tracing
- Contextual information for debugging
"""
ENV_MODE = os.getenv("ENV_MODE", "LOCAL")
import logging
import json
import sys
import os
from datetime import datetime, timezone
from contextvars import ContextVar
from functools import wraps
import traceback
from logging.handlers import RotatingFileHandler
renderer = [structlog.processors.JSONRenderer()]
if ENV_MODE.lower() == "local".lower():
renderer = [structlog.dev.ConsoleRenderer()]
from utils.config import config, EnvMode
# Context variable for request correlation ID
request_id: ContextVar[str] = ContextVar('request_id', default='')
class JSONFormatter(logging.Formatter):
"""Custom JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
"""Format log record as JSON with contextual information."""
log_data = {
'timestamp': datetime.now(timezone.utc).replace(tzinfo=None).isoformat(),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
'request_id': request_id.get(),
'thread_id': getattr(record, 'thread_id', None),
'correlation_id': getattr(record, 'correlation_id', None)
}
# Add extra fields if present
if hasattr(record, 'extra'):
log_data.update(record.extra)
# Add exception info if present
if record.exc_info:
log_data['exception'] = {
'type': str(record.exc_info[0].__name__),
'message': str(record.exc_info[1]),
'traceback': traceback.format_exception(*record.exc_info)
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.dict_tracebacks,
structlog.processors.CallsiteParameterAdder(
{
structlog.processors.CallsiteParameter.FILENAME,
structlog.processors.CallsiteParameter.FUNC_NAME,
structlog.processors.CallsiteParameter.LINENO,
}
return json.dumps(log_data)
),
structlog.processors.TimeStamper(fmt="iso"),
structlog.contextvars.merge_contextvars,
*renderer,
],
cache_logger_on_first_use=True,
)
def setup_logger(name: str = 'agentpress') -> logging.Logger:
"""
Set up a centralized logger with both file and console handlers.
Args:
name: The name of the logger
Returns:
logging.Logger: Configured logger instance
"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# Create logs directory if it doesn't exist
log_dir = os.path.join(os.getcwd(), 'logs')
try:
if not os.path.exists(log_dir):
os.makedirs(log_dir)
print(f"Created log directory at: {log_dir}")
except Exception as e:
print(f"Error creating log directory: {e}")
return logger
# File handler with rotation
try:
log_file = os.path.join(log_dir, f'{name}_{datetime.now().strftime("%Y%m%d")}.log')
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
# Create formatters
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
)
file_handler.setFormatter(file_formatter)
# Add file handler to logger
logger.addHandler(file_handler)
print(f"Added file handler for: {log_file}")
except Exception as e:
print(f"Error setting up file handler: {e}")
# Console handler - WARNING in production, DEBUG in other environments
try:
console_handler = logging.StreamHandler(sys.stdout)
if config.ENV_MODE == EnvMode.PRODUCTION:
console_handler.setLevel(logging.WARNING)
else:
console_handler.setLevel(logging.DEBUG)
console_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
# Add console handler to logger
logger.addHandler(console_handler)
logger.info(f"Added console handler with level: {console_handler.level}")
logger.info(f"Log file will be created at: {log_dir}")
except Exception as e:
print(f"Error setting up console handler: {e}")
# # Test logging
# logger.debug("Logger setup complete - DEBUG test")
# logger.info("Logger setup complete - INFO test")
# logger.warning("Logger setup complete - WARNING test")
return logger
# Create default logger instance
logger = setup_logger()
logger: structlog.stdlib.BoundLogger = structlog.get_logger(level=logging.DEBUG)

View File

@ -57,7 +57,7 @@ services:
build:
context: ./backend
dockerfile: Dockerfile
command: python -m dramatiq run_agent_background
command: python -m dramatiq --skip-logging run_agent_background
volumes:
- ./backend/.env:/app/.env:ro
env_file: