mirror of https://github.com/kortix-ai/suna.git
Merge pull request #769 from tnfssc/sharath/suna-299-logging-refactor-to-json-including-thread_id-project_id-etc
This commit is contained in:
commit
1de2f6cc46
|
@ -54,7 +54,7 @@ CMD ["sh", "-c", "gunicorn api:app \
|
||||||
--worker-connections $WORKER_CONNECTIONS \
|
--worker-connections $WORKER_CONNECTIONS \
|
||||||
--worker-tmp-dir /dev/shm \
|
--worker-tmp-dir /dev/shm \
|
||||||
--preload \
|
--preload \
|
||||||
--log-level info \
|
--log-level critical \
|
||||||
--access-logfile - \
|
--access-logfile - \
|
||||||
--error-logfile - \
|
--error-logfile - \
|
||||||
--capture-output \
|
--capture-output \
|
||||||
|
|
|
@ -15,7 +15,7 @@ from agentpress.thread_manager import ThreadManager
|
||||||
from services.supabase import DBConnection
|
from services.supabase import DBConnection
|
||||||
from services import redis
|
from services import redis
|
||||||
from utils.auth_utils import get_current_user_id_from_jwt, get_user_id_from_stream_auth, verify_thread_access
|
from utils.auth_utils import get_current_user_id_from_jwt, get_user_id_from_stream_auth, verify_thread_access
|
||||||
from utils.logger import logger
|
from utils.logger import logger, structlog
|
||||||
from services.billing import check_billing_status, can_use_model
|
from services.billing import check_billing_status, can_use_model
|
||||||
from utils.config import config
|
from utils.config import config
|
||||||
from sandbox.sandbox import create_sandbox, delete_sandbox, get_or_start_sandbox
|
from sandbox.sandbox import create_sandbox, delete_sandbox, get_or_start_sandbox
|
||||||
|
@ -348,6 +348,9 @@ async def start_agent(
|
||||||
user_id: str = Depends(get_current_user_id_from_jwt)
|
user_id: str = Depends(get_current_user_id_from_jwt)
|
||||||
):
|
):
|
||||||
"""Start an agent for a specific thread in the background."""
|
"""Start an agent for a specific thread in the background."""
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
thread_id=thread_id,
|
||||||
|
)
|
||||||
global instance_id # Ensure instance_id is accessible
|
global instance_id # Ensure instance_id is accessible
|
||||||
if not instance_id:
|
if not instance_id:
|
||||||
raise HTTPException(status_code=500, detail="Agent API not initialized with instance ID")
|
raise HTTPException(status_code=500, detail="Agent API not initialized with instance ID")
|
||||||
|
@ -380,6 +383,13 @@ async def start_agent(
|
||||||
thread_agent_id = thread_data.get('agent_id')
|
thread_agent_id = thread_data.get('agent_id')
|
||||||
thread_metadata = thread_data.get('metadata', {})
|
thread_metadata = thread_data.get('metadata', {})
|
||||||
|
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
project_id=project_id,
|
||||||
|
account_id=account_id,
|
||||||
|
thread_agent_id=thread_agent_id,
|
||||||
|
thread_metadata=thread_metadata,
|
||||||
|
)
|
||||||
|
|
||||||
# Check if this is an agent builder thread
|
# Check if this is an agent builder thread
|
||||||
is_agent_builder = thread_metadata.get('is_agent_builder', False)
|
is_agent_builder = thread_metadata.get('is_agent_builder', False)
|
||||||
target_agent_id = thread_metadata.get('target_agent_id')
|
target_agent_id = thread_metadata.get('target_agent_id')
|
||||||
|
@ -455,6 +465,9 @@ async def start_agent(
|
||||||
"started_at": datetime.now(timezone.utc).isoformat()
|
"started_at": datetime.now(timezone.utc).isoformat()
|
||||||
}).execute()
|
}).execute()
|
||||||
agent_run_id = agent_run.data[0]['id']
|
agent_run_id = agent_run.data[0]['id']
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
agent_run_id=agent_run_id,
|
||||||
|
)
|
||||||
logger.info(f"Created new agent run: {agent_run_id}")
|
logger.info(f"Created new agent run: {agent_run_id}")
|
||||||
|
|
||||||
# Register this run in Redis with TTL using instance ID
|
# Register this run in Redis with TTL using instance ID
|
||||||
|
@ -464,6 +477,8 @@ async def start_agent(
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to register agent run in Redis ({instance_key}): {str(e)}")
|
logger.warning(f"Failed to register agent run in Redis ({instance_key}): {str(e)}")
|
||||||
|
|
||||||
|
request_id = structlog.contextvars.get_contextvars().get('request_id')
|
||||||
|
|
||||||
# Run the agent in the background
|
# Run the agent in the background
|
||||||
run_agent_background.send(
|
run_agent_background.send(
|
||||||
agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id,
|
agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id,
|
||||||
|
@ -473,7 +488,8 @@ async def start_agent(
|
||||||
stream=body.stream, enable_context_manager=body.enable_context_manager,
|
stream=body.stream, enable_context_manager=body.enable_context_manager,
|
||||||
agent_config=agent_config, # Pass agent configuration
|
agent_config=agent_config, # Pass agent configuration
|
||||||
is_agent_builder=is_agent_builder,
|
is_agent_builder=is_agent_builder,
|
||||||
target_agent_id=target_agent_id
|
target_agent_id=target_agent_id,
|
||||||
|
request_id=request_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
return {"agent_run_id": agent_run_id, "status": "running"}
|
return {"agent_run_id": agent_run_id, "status": "running"}
|
||||||
|
@ -481,6 +497,9 @@ async def start_agent(
|
||||||
@router.post("/agent-run/{agent_run_id}/stop")
|
@router.post("/agent-run/{agent_run_id}/stop")
|
||||||
async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
|
async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
|
||||||
"""Stop a running agent."""
|
"""Stop a running agent."""
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
agent_run_id=agent_run_id,
|
||||||
|
)
|
||||||
logger.info(f"Received request to stop agent run: {agent_run_id}")
|
logger.info(f"Received request to stop agent run: {agent_run_id}")
|
||||||
client = await db.client
|
client = await db.client
|
||||||
await get_agent_run_with_access_check(client, agent_run_id, user_id)
|
await get_agent_run_with_access_check(client, agent_run_id, user_id)
|
||||||
|
@ -490,6 +509,9 @@ async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_
|
||||||
@router.get("/thread/{thread_id}/agent-runs")
|
@router.get("/thread/{thread_id}/agent-runs")
|
||||||
async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
|
async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
|
||||||
"""Get all agent runs for a thread."""
|
"""Get all agent runs for a thread."""
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
thread_id=thread_id,
|
||||||
|
)
|
||||||
logger.info(f"Fetching agent runs for thread: {thread_id}")
|
logger.info(f"Fetching agent runs for thread: {thread_id}")
|
||||||
client = await db.client
|
client = await db.client
|
||||||
await verify_thread_access(client, thread_id, user_id)
|
await verify_thread_access(client, thread_id, user_id)
|
||||||
|
@ -500,6 +522,9 @@ async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user
|
||||||
@router.get("/agent-run/{agent_run_id}")
|
@router.get("/agent-run/{agent_run_id}")
|
||||||
async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
|
async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
|
||||||
"""Get agent run status and responses."""
|
"""Get agent run status and responses."""
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
agent_run_id=agent_run_id,
|
||||||
|
)
|
||||||
logger.info(f"Fetching agent run details: {agent_run_id}")
|
logger.info(f"Fetching agent run details: {agent_run_id}")
|
||||||
client = await db.client
|
client = await db.client
|
||||||
agent_run_data = await get_agent_run_with_access_check(client, agent_run_id, user_id)
|
agent_run_data = await get_agent_run_with_access_check(client, agent_run_id, user_id)
|
||||||
|
@ -516,6 +541,9 @@ async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_us
|
||||||
@router.get("/thread/{thread_id}/agent", response_model=ThreadAgentResponse)
|
@router.get("/thread/{thread_id}/agent", response_model=ThreadAgentResponse)
|
||||||
async def get_thread_agent(thread_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
|
async def get_thread_agent(thread_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
|
||||||
"""Get the agent details for a specific thread."""
|
"""Get the agent details for a specific thread."""
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
thread_id=thread_id,
|
||||||
|
)
|
||||||
logger.info(f"Fetching agent details for thread: {thread_id}")
|
logger.info(f"Fetching agent details for thread: {thread_id}")
|
||||||
client = await db.client
|
client = await db.client
|
||||||
|
|
||||||
|
@ -605,6 +633,11 @@ async def stream_agent_run(
|
||||||
user_id = await get_user_id_from_stream_auth(request, token)
|
user_id = await get_user_id_from_stream_auth(request, token)
|
||||||
agent_run_data = await get_agent_run_with_access_check(client, agent_run_id, user_id)
|
agent_run_data = await get_agent_run_with_access_check(client, agent_run_id, user_id)
|
||||||
|
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
agent_run_id=agent_run_id,
|
||||||
|
user_id=user_id,
|
||||||
|
)
|
||||||
|
|
||||||
response_list_key = f"agent_run:{agent_run_id}:responses"
|
response_list_key = f"agent_run:{agent_run_id}:responses"
|
||||||
response_channel = f"agent_run:{agent_run_id}:new_response"
|
response_channel = f"agent_run:{agent_run_id}:new_response"
|
||||||
control_channel = f"agent_run:{agent_run_id}:control" # Global control channel
|
control_channel = f"agent_run:{agent_run_id}:control" # Global control channel
|
||||||
|
@ -631,7 +664,7 @@ async def stream_agent_run(
|
||||||
initial_yield_complete = True
|
initial_yield_complete = True
|
||||||
|
|
||||||
# 2. Check run status *after* yielding initial data
|
# 2. Check run status *after* yielding initial data
|
||||||
run_status = await client.table('agent_runs').select('status').eq("id", agent_run_id).maybe_single().execute()
|
run_status = await client.table('agent_runs').select('status', 'thread_id').eq("id", agent_run_id).maybe_single().execute()
|
||||||
current_status = run_status.data.get('status') if run_status.data else None
|
current_status = run_status.data.get('status') if run_status.data else None
|
||||||
|
|
||||||
if current_status != 'running':
|
if current_status != 'running':
|
||||||
|
@ -639,6 +672,10 @@ async def stream_agent_run(
|
||||||
yield f"data: {json.dumps({'type': 'status', 'status': 'completed'})}\n\n"
|
yield f"data: {json.dumps({'type': 'status', 'status': 'completed'})}\n\n"
|
||||||
return
|
return
|
||||||
|
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
thread_id=run_status.data.get('thread_id'),
|
||||||
|
)
|
||||||
|
|
||||||
# 3. Set up Pub/Sub listeners for new responses and control signals
|
# 3. Set up Pub/Sub listeners for new responses and control signals
|
||||||
pubsub_response = await redis.create_pubsub()
|
pubsub_response = await redis.create_pubsub()
|
||||||
await pubsub_response.subscribe(response_channel)
|
await pubsub_response.subscribe(response_channel)
|
||||||
|
@ -940,10 +977,19 @@ async def initiate_agent_with_files(
|
||||||
"created_at": datetime.now(timezone.utc).isoformat()
|
"created_at": datetime.now(timezone.utc).isoformat()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
thread_id=thread_data["thread_id"],
|
||||||
|
project_id=project_id,
|
||||||
|
account_id=account_id,
|
||||||
|
)
|
||||||
|
|
||||||
# Store the agent_id in the thread if we have one
|
# Store the agent_id in the thread if we have one
|
||||||
if agent_config:
|
if agent_config:
|
||||||
thread_data["agent_id"] = agent_config['agent_id']
|
thread_data["agent_id"] = agent_config['agent_id']
|
||||||
logger.info(f"Storing agent_id {agent_config['agent_id']} in thread")
|
logger.info(f"Storing agent_id {agent_config['agent_id']} in thread")
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
agent_id=agent_config['agent_id'],
|
||||||
|
)
|
||||||
|
|
||||||
# Store agent builder metadata if this is an agent builder session
|
# Store agent builder metadata if this is an agent builder session
|
||||||
if is_agent_builder:
|
if is_agent_builder:
|
||||||
|
@ -952,6 +998,9 @@ async def initiate_agent_with_files(
|
||||||
"target_agent_id": target_agent_id
|
"target_agent_id": target_agent_id
|
||||||
}
|
}
|
||||||
logger.info(f"Storing agent builder metadata in thread: target_agent_id={target_agent_id}")
|
logger.info(f"Storing agent builder metadata in thread: target_agent_id={target_agent_id}")
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
target_agent_id=target_agent_id,
|
||||||
|
)
|
||||||
|
|
||||||
thread = await client.table('threads').insert(thread_data).execute()
|
thread = await client.table('threads').insert(thread_data).execute()
|
||||||
thread_id = thread.data[0]['thread_id']
|
thread_id = thread.data[0]['thread_id']
|
||||||
|
@ -1033,6 +1082,9 @@ async def initiate_agent_with_files(
|
||||||
}).execute()
|
}).execute()
|
||||||
agent_run_id = agent_run.data[0]['id']
|
agent_run_id = agent_run.data[0]['id']
|
||||||
logger.info(f"Created new agent run: {agent_run_id}")
|
logger.info(f"Created new agent run: {agent_run_id}")
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
agent_run_id=agent_run_id,
|
||||||
|
)
|
||||||
|
|
||||||
# Register run in Redis
|
# Register run in Redis
|
||||||
instance_key = f"active_run:{instance_id}:{agent_run_id}"
|
instance_key = f"active_run:{instance_id}:{agent_run_id}"
|
||||||
|
@ -1041,6 +1093,8 @@ async def initiate_agent_with_files(
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to register agent run in Redis ({instance_key}): {str(e)}")
|
logger.warning(f"Failed to register agent run in Redis ({instance_key}): {str(e)}")
|
||||||
|
|
||||||
|
request_id = structlog.contextvars.get_contextvars().get('request_id')
|
||||||
|
|
||||||
# Run agent in background
|
# Run agent in background
|
||||||
run_agent_background.send(
|
run_agent_background.send(
|
||||||
agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id,
|
agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id,
|
||||||
|
@ -1050,7 +1104,8 @@ async def initiate_agent_with_files(
|
||||||
stream=stream, enable_context_manager=enable_context_manager,
|
stream=stream, enable_context_manager=enable_context_manager,
|
||||||
agent_config=agent_config, # Pass agent configuration
|
agent_config=agent_config, # Pass agent configuration
|
||||||
is_agent_builder=is_agent_builder,
|
is_agent_builder=is_agent_builder,
|
||||||
target_agent_id=target_agent_id
|
target_agent_id=target_agent_id,
|
||||||
|
request_id=request_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
return {"thread_id": thread_id, "agent_run_id": agent_run_id}
|
return {"thread_id": thread_id, "agent_run_id": agent_run_id}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
from fastapi import FastAPI, Request, HTTPException, Response, Depends
|
from fastapi import FastAPI, Request, HTTPException, Response, Depends
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from fastapi.responses import JSONResponse, StreamingResponse
|
from fastapi.responses import JSONResponse, StreamingResponse
|
||||||
import sentry
|
import sentry # Keep this import here, right after fastapi imports
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from agentpress.thread_manager import ThreadManager
|
from agentpress.thread_manager import ThreadManager
|
||||||
from services.supabase import DBConnection
|
from services.supabase import DBConnection
|
||||||
|
@ -9,12 +9,13 @@ from datetime import datetime, timezone
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from utils.config import config, EnvMode
|
from utils.config import config, EnvMode
|
||||||
import asyncio
|
import asyncio
|
||||||
from utils.logger import logger
|
from utils.logger import logger, structlog
|
||||||
import time
|
import time
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from typing import Dict, Any
|
from typing import Dict, Any
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
import uuid
|
||||||
# Import the agent API module
|
# Import the agent API module
|
||||||
from agent import api as agent_api
|
from agent import api as agent_api
|
||||||
from sandbox import api as sandbox_api
|
from sandbox import api as sandbox_api
|
||||||
|
@ -89,13 +90,23 @@ app = FastAPI(lifespan=lifespan)
|
||||||
|
|
||||||
@app.middleware("http")
|
@app.middleware("http")
|
||||||
async def log_requests_middleware(request: Request, call_next):
|
async def log_requests_middleware(request: Request, call_next):
|
||||||
|
structlog.contextvars.clear_contextvars()
|
||||||
|
|
||||||
|
request_id = str(uuid.uuid4())
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
client_ip = request.client.host
|
client_ip = request.client.host
|
||||||
method = request.method
|
method = request.method
|
||||||
url = str(request.url)
|
|
||||||
path = request.url.path
|
path = request.url.path
|
||||||
query_params = str(request.query_params)
|
query_params = str(request.query_params)
|
||||||
|
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
request_id=request_id,
|
||||||
|
client_ip=client_ip,
|
||||||
|
method=method,
|
||||||
|
path=path,
|
||||||
|
query_params=query_params
|
||||||
|
)
|
||||||
|
|
||||||
# Log the incoming request
|
# Log the incoming request
|
||||||
logger.info(f"Request started: {method} {path} from {client_ip} | Query: {query_params}")
|
logger.info(f"Request started: {method} {path} from {client_ip} | Query: {query_params}")
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ services:
|
||||||
memory: 32G
|
memory: 32G
|
||||||
|
|
||||||
worker:
|
worker:
|
||||||
command: python -m dramatiq --processes 40 --threads 8 run_agent_background
|
command: python -m dramatiq --skip-logging --processes 40 --threads 8 run_agent_background
|
||||||
deploy:
|
deploy:
|
||||||
resources:
|
resources:
|
||||||
limits:
|
limits:
|
||||||
|
|
|
@ -42,7 +42,7 @@ services:
|
||||||
build:
|
build:
|
||||||
context: .
|
context: .
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
command: python -m dramatiq --processes 4 --threads 4 run_agent_background
|
command: python -m dramatiq --skip-logging --processes 4 --threads 4 run_agent_background
|
||||||
env_file:
|
env_file:
|
||||||
- .env
|
- .env
|
||||||
volumes:
|
volumes:
|
||||||
|
|
|
@ -3209,6 +3209,18 @@ files = [
|
||||||
requests = {version = ">=2.20", markers = "python_version >= \"3.0\""}
|
requests = {version = ">=2.20", markers = "python_version >= \"3.0\""}
|
||||||
typing_extensions = {version = ">=4.5.0", markers = "python_version >= \"3.7\""}
|
typing_extensions = {version = ">=4.5.0", markers = "python_version >= \"3.7\""}
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "structlog"
|
||||||
|
version = "25.4.0"
|
||||||
|
description = "Structured Logging for Python"
|
||||||
|
optional = false
|
||||||
|
python-versions = ">=3.8"
|
||||||
|
groups = ["main"]
|
||||||
|
files = [
|
||||||
|
{file = "structlog-25.4.0-py3-none-any.whl", hash = "sha256:fe809ff5c27e557d14e613f45ca441aabda051d119ee5a0102aaba6ce40eed2c"},
|
||||||
|
{file = "structlog-25.4.0.tar.gz", hash = "sha256:186cd1b0a8ae762e29417095664adf1d6a31702160a46dacb7796ea82f7409e4"},
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "supabase"
|
name = "supabase"
|
||||||
version = "2.15.0"
|
version = "2.15.0"
|
||||||
|
@ -3904,4 +3916,4 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"]
|
||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "2.1"
|
lock-version = "2.1"
|
||||||
python-versions = "^3.11"
|
python-versions = "^3.11"
|
||||||
content-hash = "99368309d41431f2104a182b680e9361409121e7a6df3ead3f14173dbaf9d066"
|
content-hash = "22e38245da1c442147f6b870361b56f898ec3f3b87cbe09c3f82e9c96c621ae7"
|
||||||
|
|
|
@ -60,6 +60,7 @@ httpx = "^0.28.0"
|
||||||
aiohttp = "^3.9.0"
|
aiohttp = "^3.9.0"
|
||||||
email-validator = "^2.0.0"
|
email-validator = "^2.0.0"
|
||||||
mailtrap = "^2.0.1"
|
mailtrap = "^2.0.1"
|
||||||
|
structlog = "^25.4.0"
|
||||||
|
|
||||||
[tool.poetry.scripts]
|
[tool.poetry.scripts]
|
||||||
agentpress = "agentpress.cli:main"
|
agentpress = "agentpress.cli:main"
|
||||||
|
|
|
@ -43,3 +43,4 @@ mcp_use>=1.0.0
|
||||||
aiohttp>=3.9.0
|
aiohttp>=3.9.0
|
||||||
email-validator>=2.0.0
|
email-validator>=2.0.0
|
||||||
mailtrap>=2.0.1
|
mailtrap>=2.0.1
|
||||||
|
structlog==25.4.0
|
||||||
|
|
|
@ -6,7 +6,7 @@ from datetime import datetime, timezone
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from services import redis
|
from services import redis
|
||||||
from agent.run import run_agent
|
from agent.run import run_agent
|
||||||
from utils.logger import logger
|
from utils.logger import logger, structlog
|
||||||
import dramatiq
|
import dramatiq
|
||||||
import uuid
|
import uuid
|
||||||
from agentpress.thread_manager import ThreadManager
|
from agentpress.thread_manager import ThreadManager
|
||||||
|
@ -54,9 +54,18 @@ async def run_agent_background(
|
||||||
enable_context_manager: bool,
|
enable_context_manager: bool,
|
||||||
agent_config: Optional[dict] = None,
|
agent_config: Optional[dict] = None,
|
||||||
is_agent_builder: Optional[bool] = False,
|
is_agent_builder: Optional[bool] = False,
|
||||||
target_agent_id: Optional[str] = None
|
target_agent_id: Optional[str] = None,
|
||||||
|
request_id: Optional[str] = None,
|
||||||
):
|
):
|
||||||
"""Run the agent in the background using Redis for state."""
|
"""Run the agent in the background using Redis for state."""
|
||||||
|
structlog.contextvars.clear_contextvars()
|
||||||
|
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
agent_run_id=agent_run_id,
|
||||||
|
thread_id=thread_id,
|
||||||
|
request_id=request_id,
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await initialize()
|
await initialize()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -85,6 +94,16 @@ async def run_agent_background(
|
||||||
sentry.sentry.set_tag("thread_id", thread_id)
|
sentry.sentry.set_tag("thread_id", thread_id)
|
||||||
|
|
||||||
logger.info(f"Starting background agent run: {agent_run_id} for thread: {thread_id} (Instance: {instance_id})")
|
logger.info(f"Starting background agent run: {agent_run_id} for thread: {thread_id} (Instance: {instance_id})")
|
||||||
|
logger.info({
|
||||||
|
"model_name": model_name,
|
||||||
|
"enable_thinking": enable_thinking,
|
||||||
|
"reasoning_effort": reasoning_effort,
|
||||||
|
"stream": stream,
|
||||||
|
"enable_context_manager": enable_context_manager,
|
||||||
|
"agent_config": agent_config,
|
||||||
|
"is_agent_builder": is_agent_builder,
|
||||||
|
"target_agent_id": target_agent_id,
|
||||||
|
})
|
||||||
logger.info(f"🚀 Using model: {model_name} (thinking: {enable_thinking}, reasoning_effort: {reasoning_effort})")
|
logger.info(f"🚀 Using model: {model_name} (thinking: {enable_thinking}, reasoning_effort: {reasoning_effort})")
|
||||||
if agent_config:
|
if agent_config:
|
||||||
logger.info(f"Using custom agent: {agent_config.get('name', 'Unknown')}")
|
logger.info(f"Using custom agent: {agent_config.get('name', 'Unknown')}")
|
||||||
|
|
|
@ -3,6 +3,7 @@ from fastapi import HTTPException, Request
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
import jwt
|
import jwt
|
||||||
from jwt.exceptions import PyJWTError
|
from jwt.exceptions import PyJWTError
|
||||||
|
from utils.logger import structlog
|
||||||
|
|
||||||
# This function extracts the user ID from Supabase JWT
|
# This function extracts the user ID from Supabase JWT
|
||||||
async def get_current_user_id_from_jwt(request: Request) -> str:
|
async def get_current_user_id_from_jwt(request: Request) -> str:
|
||||||
|
@ -48,6 +49,9 @@ async def get_current_user_id_from_jwt(request: Request) -> str:
|
||||||
)
|
)
|
||||||
|
|
||||||
sentry.sentry.set_user({ "id": user_id })
|
sentry.sentry.set_user({ "id": user_id })
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
user_id=user_id
|
||||||
|
)
|
||||||
return user_id
|
return user_id
|
||||||
|
|
||||||
except PyJWTError:
|
except PyJWTError:
|
||||||
|
@ -121,8 +125,11 @@ async def get_user_id_from_stream_auth(
|
||||||
# For Supabase JWT, we just need to decode and extract the user ID
|
# For Supabase JWT, we just need to decode and extract the user ID
|
||||||
payload = jwt.decode(token, options={"verify_signature": False})
|
payload = jwt.decode(token, options={"verify_signature": False})
|
||||||
user_id = payload.get('sub')
|
user_id = payload.get('sub')
|
||||||
sentry.sentry.set_user({ "id": user_id })
|
|
||||||
if user_id:
|
if user_id:
|
||||||
|
sentry.sentry.set_user({ "id": user_id })
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
user_id=user_id
|
||||||
|
)
|
||||||
return user_id
|
return user_id
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
@ -213,6 +220,11 @@ async def get_optional_user_id(request: Request) -> Optional[str]:
|
||||||
|
|
||||||
# Supabase stores the user ID in the 'sub' claim
|
# Supabase stores the user ID in the 'sub' claim
|
||||||
user_id = payload.get('sub')
|
user_id = payload.get('sub')
|
||||||
|
if user_id:
|
||||||
|
sentry.sentry.set_user({ "id": user_id })
|
||||||
|
structlog.contextvars.bind_contextvars(
|
||||||
|
user_id=user_id
|
||||||
|
)
|
||||||
|
|
||||||
return user_id
|
return user_id
|
||||||
except PyJWTError:
|
except PyJWTError:
|
||||||
|
|
|
@ -1,131 +1,28 @@
|
||||||
"""
|
import structlog, logging, os
|
||||||
Centralized logging configuration for AgentPress.
|
|
||||||
|
|
||||||
This module provides a unified logging interface with:
|
ENV_MODE = os.getenv("ENV_MODE", "LOCAL")
|
||||||
- Structured JSON logging for better parsing
|
|
||||||
- Log levels for different environments
|
|
||||||
- Correlation IDs for request tracing
|
|
||||||
- Contextual information for debugging
|
|
||||||
"""
|
|
||||||
|
|
||||||
import logging
|
renderer = [structlog.processors.JSONRenderer()]
|
||||||
import json
|
if ENV_MODE.lower() == "local".lower():
|
||||||
import sys
|
renderer = [structlog.dev.ConsoleRenderer()]
|
||||||
import os
|
|
||||||
from datetime import datetime, timezone
|
|
||||||
from contextvars import ContextVar
|
|
||||||
from functools import wraps
|
|
||||||
import traceback
|
|
||||||
from logging.handlers import RotatingFileHandler
|
|
||||||
|
|
||||||
from utils.config import config, EnvMode
|
structlog.configure(
|
||||||
|
processors=[
|
||||||
# Context variable for request correlation ID
|
structlog.stdlib.add_log_level,
|
||||||
request_id: ContextVar[str] = ContextVar('request_id', default='')
|
structlog.stdlib.PositionalArgumentsFormatter(),
|
||||||
|
structlog.processors.dict_tracebacks,
|
||||||
class JSONFormatter(logging.Formatter):
|
structlog.processors.CallsiteParameterAdder(
|
||||||
"""Custom JSON formatter for structured logging."""
|
{
|
||||||
|
structlog.processors.CallsiteParameter.FILENAME,
|
||||||
def format(self, record: logging.LogRecord) -> str:
|
structlog.processors.CallsiteParameter.FUNC_NAME,
|
||||||
"""Format log record as JSON with contextual information."""
|
structlog.processors.CallsiteParameter.LINENO,
|
||||||
log_data = {
|
|
||||||
'timestamp': datetime.now(timezone.utc).replace(tzinfo=None).isoformat(),
|
|
||||||
'level': record.levelname,
|
|
||||||
'message': record.getMessage(),
|
|
||||||
'module': record.module,
|
|
||||||
'function': record.funcName,
|
|
||||||
'line': record.lineno,
|
|
||||||
'request_id': request_id.get(),
|
|
||||||
'thread_id': getattr(record, 'thread_id', None),
|
|
||||||
'correlation_id': getattr(record, 'correlation_id', None)
|
|
||||||
}
|
|
||||||
|
|
||||||
# Add extra fields if present
|
|
||||||
if hasattr(record, 'extra'):
|
|
||||||
log_data.update(record.extra)
|
|
||||||
|
|
||||||
# Add exception info if present
|
|
||||||
if record.exc_info:
|
|
||||||
log_data['exception'] = {
|
|
||||||
'type': str(record.exc_info[0].__name__),
|
|
||||||
'message': str(record.exc_info[1]),
|
|
||||||
'traceback': traceback.format_exception(*record.exc_info)
|
|
||||||
}
|
}
|
||||||
|
),
|
||||||
|
structlog.processors.TimeStamper(fmt="iso"),
|
||||||
|
structlog.contextvars.merge_contextvars,
|
||||||
|
*renderer,
|
||||||
|
],
|
||||||
|
cache_logger_on_first_use=True,
|
||||||
|
)
|
||||||
|
|
||||||
return json.dumps(log_data)
|
logger: structlog.stdlib.BoundLogger = structlog.get_logger(level=logging.DEBUG)
|
||||||
|
|
||||||
def setup_logger(name: str = 'agentpress') -> logging.Logger:
|
|
||||||
"""
|
|
||||||
Set up a centralized logger with both file and console handlers.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name: The name of the logger
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
logging.Logger: Configured logger instance
|
|
||||||
"""
|
|
||||||
logger = logging.getLogger(name)
|
|
||||||
logger.setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
# Create logs directory if it doesn't exist
|
|
||||||
log_dir = os.path.join(os.getcwd(), 'logs')
|
|
||||||
try:
|
|
||||||
if not os.path.exists(log_dir):
|
|
||||||
os.makedirs(log_dir)
|
|
||||||
print(f"Created log directory at: {log_dir}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error creating log directory: {e}")
|
|
||||||
return logger
|
|
||||||
|
|
||||||
# File handler with rotation
|
|
||||||
try:
|
|
||||||
log_file = os.path.join(log_dir, f'{name}_{datetime.now().strftime("%Y%m%d")}.log')
|
|
||||||
file_handler = RotatingFileHandler(
|
|
||||||
log_file,
|
|
||||||
maxBytes=10*1024*1024, # 10MB
|
|
||||||
backupCount=5,
|
|
||||||
encoding='utf-8'
|
|
||||||
)
|
|
||||||
file_handler.setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
# Create formatters
|
|
||||||
file_formatter = logging.Formatter(
|
|
||||||
'%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
|
|
||||||
)
|
|
||||||
file_handler.setFormatter(file_formatter)
|
|
||||||
|
|
||||||
# Add file handler to logger
|
|
||||||
logger.addHandler(file_handler)
|
|
||||||
print(f"Added file handler for: {log_file}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error setting up file handler: {e}")
|
|
||||||
|
|
||||||
# Console handler - WARNING in production, DEBUG in other environments
|
|
||||||
try:
|
|
||||||
console_handler = logging.StreamHandler(sys.stdout)
|
|
||||||
if config.ENV_MODE == EnvMode.PRODUCTION:
|
|
||||||
console_handler.setLevel(logging.WARNING)
|
|
||||||
else:
|
|
||||||
console_handler.setLevel(logging.DEBUG)
|
|
||||||
|
|
||||||
console_formatter = logging.Formatter(
|
|
||||||
'%(asctime)s - %(levelname)s - %(name)s - %(message)s'
|
|
||||||
)
|
|
||||||
console_handler.setFormatter(console_formatter)
|
|
||||||
|
|
||||||
# Add console handler to logger
|
|
||||||
logger.addHandler(console_handler)
|
|
||||||
logger.info(f"Added console handler with level: {console_handler.level}")
|
|
||||||
logger.info(f"Log file will be created at: {log_dir}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error setting up console handler: {e}")
|
|
||||||
|
|
||||||
# # Test logging
|
|
||||||
# logger.debug("Logger setup complete - DEBUG test")
|
|
||||||
# logger.info("Logger setup complete - INFO test")
|
|
||||||
# logger.warning("Logger setup complete - WARNING test")
|
|
||||||
|
|
||||||
return logger
|
|
||||||
|
|
||||||
# Create default logger instance
|
|
||||||
logger = setup_logger()
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ services:
|
||||||
build:
|
build:
|
||||||
context: ./backend
|
context: ./backend
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
command: python -m dramatiq run_agent_background
|
command: python -m dramatiq --skip-logging run_agent_background
|
||||||
volumes:
|
volumes:
|
||||||
- ./backend/.env:/app/.env:ro
|
- ./backend/.env:/app/.env:ro
|
||||||
env_file:
|
env_file:
|
||||||
|
|
Loading…
Reference in New Issue