logger v1

This commit is contained in:
marko-kraemer 2025-03-31 19:27:06 -07:00
parent cbf7ca141e
commit c86d7d687a
3 changed files with 120 additions and 94 deletions

View File

@ -13,6 +13,7 @@ from services.supabase import DBConnection
from services import redis from services import redis
from agent.run import run_agent from agent.run import run_agent
from services.auth_utils import get_current_user_id, get_user_id_from_stream_auth, verify_thread_access, verify_agent_run_access from services.auth_utils import get_current_user_id, get_user_id_from_stream_auth, verify_thread_access, verify_agent_run_access
from agentpress.logger import logger
# Initialize shared resources # Initialize shared resources
router = APIRouter() router = APIRouter()
@ -31,16 +32,19 @@ def initialize(
# Generate instance ID # Generate instance ID
instance_id = str(uuid.uuid4())[:8] instance_id = str(uuid.uuid4())[:8]
logger.info(f"Initialized agent API with instance ID: {instance_id}")
# Note: Redis will be initialized in the lifespan function in api.py # Note: Redis will be initialized in the lifespan function in api.py
async def cleanup(): async def cleanup():
"""Clean up resources and stop running agents on shutdown.""" """Clean up resources and stop running agents on shutdown."""
logger.info("Starting cleanup of agent API resources")
# Get Redis client # Get Redis client
redis_client = await redis.get_client() redis_client = await redis.get_client()
# Use the instance_id to find and clean up this instance's keys # Use the instance_id to find and clean up this instance's keys
running_keys = await redis_client.keys(f"active_run:{instance_id}:*") running_keys = await redis_client.keys(f"active_run:{instance_id}:*")
logger.info(f"Found {len(running_keys)} running agent runs to clean up")
for key in running_keys: for key in running_keys:
agent_run_id = key.split(":")[-1] agent_run_id = key.split(":")[-1]
@ -48,9 +52,11 @@ async def cleanup():
# Close Redis connection # Close Redis connection
await redis.close() await redis.close()
logger.info("Completed cleanup of agent API resources")
async def stop_agent_run(agent_run_id: str): async def stop_agent_run(agent_run_id: str):
"""Update database and publish stop signal to Redis.""" """Update database and publish stop signal to Redis."""
logger.info(f"Stopping agent run: {agent_run_id}")
client = await db.client client = await db.client
redis_client = await redis.get_client() redis_client = await redis.get_client()
@ -62,13 +68,16 @@ async def stop_agent_run(agent_run_id: str):
# Publish stop signal to the agent run channel as a string # Publish stop signal to the agent run channel as a string
await redis_client.publish(f"agent_run:{agent_run_id}:control", "STOP") await redis_client.publish(f"agent_run:{agent_run_id}:control", "STOP")
logger.info(f"Successfully stopped agent run: {agent_run_id}")
async def restore_running_agent_runs(): async def restore_running_agent_runs():
"""Restore any agent runs that were still marked as running in the database.""" """Restore any agent runs that were still marked as running in the database."""
logger.info("Restoring running agent runs after server restart")
client = await db.client client = await db.client
running_agent_runs = await client.table('agent_runs').select('*').eq("status", "running").execute() running_agent_runs = await client.table('agent_runs').select('*').eq("status", "running").execute()
for run in running_agent_runs.data: for run in running_agent_runs.data:
logger.warning(f"Found running agent run {run['id']} from before server restart")
await client.table('agent_runs').update({ await client.table('agent_runs').update({
"status": "failed", "status": "failed",
"error": "Server restarted while agent was running", "error": "Server restarted while agent was running",
@ -78,6 +87,7 @@ async def restore_running_agent_runs():
@router.post("/thread/{thread_id}/agent/start") @router.post("/thread/{thread_id}/agent/start")
async def start_agent(thread_id: str, user_id: str = Depends(get_current_user_id)): async def start_agent(thread_id: str, user_id: str = Depends(get_current_user_id)):
"""Start an agent for a specific thread in the background.""" """Start an agent for a specific thread in the background."""
logger.info(f"Starting new agent for thread: {thread_id}")
client = await db.client client = await db.client
redis_client = await redis.get_client() redis_client = await redis.get_client()
@ -93,6 +103,7 @@ async def start_agent(thread_id: str, user_id: str = Depends(get_current_user_id
}).execute() }).execute()
agent_run_id = agent_run.data[0]['id'] agent_run_id = agent_run.data[0]['id']
logger.info(f"Created new agent run: {agent_run_id}")
# Register this run in Redis with TTL # Register this run in Redis with TTL
await redis_client.set( await redis_client.set(
@ -117,12 +128,14 @@ async def start_agent(thread_id: str, user_id: str = Depends(get_current_user_id
async def _cleanup_agent_run(agent_run_id: str): async def _cleanup_agent_run(agent_run_id: str):
"""Clean up Redis keys when an agent run is done.""" """Clean up Redis keys when an agent run is done."""
logger.debug(f"Cleaning up Redis keys for agent run: {agent_run_id}")
redis_client = await redis.get_client() redis_client = await redis.get_client()
await redis_client.delete(f"active_run:{instance_id}:{agent_run_id}") await redis_client.delete(f"active_run:{instance_id}:{agent_run_id}")
@router.post("/agent-run/{agent_run_id}/stop") @router.post("/agent-run/{agent_run_id}/stop")
async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_id)): async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_id)):
"""Stop a running agent.""" """Stop a running agent."""
logger.info(f"Stopping agent run: {agent_run_id}")
client = await db.client client = await db.client
# Verify user has access to the agent run # Verify user has access to the agent run
@ -140,6 +153,7 @@ async def stream_agent_run(
request: Request = None request: Request = None
): ):
"""Stream the responses of an agent run from where they left off.""" """Stream the responses of an agent run from where they left off."""
logger.info(f"Starting stream for agent run: {agent_run_id}")
client = await db.client client = await db.client
redis_client = await redis.get_client() redis_client = await redis.get_client()
@ -150,6 +164,7 @@ async def stream_agent_run(
agent_run_data = await verify_agent_run_access(client, agent_run_id, user_id) agent_run_data = await verify_agent_run_access(client, agent_run_id, user_id)
responses = json.loads(agent_run_data['responses']) if agent_run_data['responses'] else [] responses = json.loads(agent_run_data['responses']) if agent_run_data['responses'] else []
logger.debug(f"Found {len(responses)} existing responses for agent run: {agent_run_id}")
# Create a pubsub to listen for new responses # Create a pubsub to listen for new responses
pubsub = redis_client.pubsub() pubsub = redis_client.pubsub()
@ -171,6 +186,7 @@ async def stream_agent_run(
# Check if this is the end marker # Check if this is the end marker
end_stream_marker = "END_STREAM" end_stream_marker = "END_STREAM"
if data == end_stream_marker or data == end_stream_marker.encode('utf-8'): if data == end_stream_marker or data == end_stream_marker.encode('utf-8'):
logger.debug(f"Received end stream marker for agent run: {agent_run_id}")
break break
# Handle both string and bytes data # Handle both string and bytes data
@ -192,7 +208,7 @@ async def stream_agent_run(
await asyncio.sleep(0.01) # Minimal sleep to prevent CPU spinning await asyncio.sleep(0.01) # Minimal sleep to prevent CPU spinning
except asyncio.CancelledError: except asyncio.CancelledError:
pass logger.info(f"Stream cancelled for agent run: {agent_run_id}")
finally: finally:
await pubsub.unsubscribe() await pubsub.unsubscribe()
@ -212,23 +228,27 @@ async def stream_agent_run(
@router.get("/thread/{thread_id}/agent-runs") @router.get("/thread/{thread_id}/agent-runs")
async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user_id)): async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user_id)):
"""Get all agent runs for a thread.""" """Get all agent runs for a thread."""
logger.info(f"Fetching agent runs for thread: {thread_id}")
client = await db.client client = await db.client
# Verify user has access to this thread # Verify user has access to this thread
await verify_thread_access(client, thread_id, user_id) await verify_thread_access(client, thread_id, user_id)
agent_runs = await client.table('agent_runs').select('*').eq("thread_id", thread_id).execute() agent_runs = await client.table('agent_runs').select('*').eq("thread_id", thread_id).execute()
logger.debug(f"Found {len(agent_runs.data)} agent runs for thread: {thread_id}")
return {"agent_runs": agent_runs.data} return {"agent_runs": agent_runs.data}
@router.get("/agent-run/{agent_run_id}") @router.get("/agent-run/{agent_run_id}")
async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_user_id)): async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_user_id)):
"""Get agent run status and responses.""" """Get agent run status and responses."""
logger.info(f"Fetching agent run details: {agent_run_id}")
client = await db.client client = await db.client
# Verify user has access to the agent run and get run data # Verify user has access to the agent run and get run data
agent_run_data = await verify_agent_run_access(client, agent_run_id, user_id) agent_run_data = await verify_agent_run_access(client, agent_run_id, user_id)
responses = json.loads(agent_run_data['responses']) if agent_run_data['responses'] else [] responses = json.loads(agent_run_data['responses']) if agent_run_data['responses'] else []
logger.debug(f"Found {len(responses)} responses for agent run: {agent_run_id}")
return { return {
"id": agent_run_data['id'], "id": agent_run_data['id'],
@ -242,6 +262,7 @@ async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_us
async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: str): async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: str):
"""Run the agent in the background and store responses.""" """Run the agent in the background and store responses."""
logger.info(f"Starting background agent run: {agent_run_id} for thread: {thread_id}")
client = await db.client client = await db.client
redis_client = await redis.get_client() redis_client = await redis.get_client()
@ -249,10 +270,13 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s
responses = [] responses = []
batch = [] batch = []
last_db_update = datetime.now(timezone.utc) last_db_update = datetime.now(timezone.utc)
total_responses = 0
start_time = datetime.now(timezone.utc)
# Create a pubsub to listen for control messages # Create a pubsub to listen for control messages
pubsub = redis_client.pubsub() pubsub = redis_client.pubsub()
await pubsub.subscribe(f"agent_run:{agent_run_id}:control") await pubsub.subscribe(f"agent_run:{agent_run_id}:control")
logger.debug(f"Subscribed to control channel for agent run: {agent_run_id}")
# Start a background task to check for stop signals # Start a background task to check for stop signals
stop_signal_received = False stop_signal_received = False
@ -264,6 +288,7 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s
if message and message["type"] == "message": if message and message["type"] == "message":
stop_signal = "STOP" stop_signal = "STOP"
if message["data"] == stop_signal or message["data"] == stop_signal.encode('utf-8'): if message["data"] == stop_signal or message["data"] == stop_signal.encode('utf-8'):
logger.info(f"Received stop signal for agent run: {agent_run_id}")
stop_signal_received = True stop_signal_received = True
break break
await asyncio.sleep(0.01) # Minimal sleep await asyncio.sleep(0.01) # Minimal sleep
@ -272,15 +297,18 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s
# Start the stop signal checker # Start the stop signal checker
stop_checker = asyncio.create_task(check_for_stop_signal()) stop_checker = asyncio.create_task(check_for_stop_signal())
logger.debug(f"Started stop signal checker for agent run: {agent_run_id}")
try: try:
# Run the agent and collect responses # Run the agent and collect responses
logger.debug(f"Initializing agent generator for thread: {thread_id}")
agent_gen = run_agent(thread_id, stream=True, agent_gen = run_agent(thread_id, stream=True,
thread_manager=thread_manager) thread_manager=thread_manager)
async for response in agent_gen: async for response in agent_gen:
# Check if stop signal received # Check if stop signal received
if stop_signal_received: if stop_signal_received:
logger.info(f"Agent run stopped due to stop signal: {agent_run_id}")
break break
# Format the response properly # Format the response properly
@ -300,6 +328,10 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s
# Add response to batch and responses list # Add response to batch and responses list
responses.append(formatted_response) responses.append(formatted_response)
batch.append(formatted_response) batch.append(formatted_response)
total_responses += 1
# Log response type for debugging
# logger.debug(f"Received response type '{formatted_response.get('type', 'unknown')}' for agent run: {agent_run_id}")
# Immediately publish the response to Redis # Immediately publish the response to Redis
await redis_client.publish( await redis_client.publish(
@ -310,6 +342,7 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s
# Update database less frequently to reduce overhead # Update database less frequently to reduce overhead
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
if (now - last_db_update).total_seconds() >= 2.0 and batch: # Increased interval if (now - last_db_update).total_seconds() >= 2.0 and batch: # Increased interval
# logger.debug(f"Batch update for agent run {agent_run_id}: {len(batch)} responses")
await client.table('agent_runs').update({ await client.table('agent_runs').update({
"responses": json.dumps(responses) "responses": json.dumps(responses)
}).eq("id", agent_run_id).execute() }).eq("id", agent_run_id).execute()
@ -321,12 +354,15 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s
# Final update to database with all responses # Final update to database with all responses
if batch: if batch:
logger.debug(f"Final batch update for agent run {agent_run_id}: {len(batch)} responses")
await client.table('agent_runs').update({ await client.table('agent_runs').update({
"responses": json.dumps(responses) "responses": json.dumps(responses)
}).eq("id", agent_run_id).execute() }).eq("id", agent_run_id).execute()
# Signal all done if we weren't stopped # Signal all done if we weren't stopped
if not stop_signal_received: if not stop_signal_received:
duration = (datetime.now(timezone.utc) - start_time).total_seconds()
logger.info(f"Agent run completed successfully: {agent_run_id} (duration: {duration:.2f}s, total responses: {total_responses})")
await client.table('agent_runs').update({ await client.table('agent_runs').update({
"status": "completed", "status": "completed",
"completed_at": datetime.now(timezone.utc).isoformat() "completed_at": datetime.now(timezone.utc).isoformat()
@ -338,12 +374,14 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s
f"agent_run:{agent_run_id}:responses", f"agent_run:{agent_run_id}:responses",
end_stream_marker end_stream_marker
) )
logger.debug(f"Sent END_STREAM signal for agent run: {agent_run_id}")
except Exception as e: except Exception as e:
# Log the error and update the agent run # Log the error and update the agent run
error_message = str(e) error_message = str(e)
traceback_str = traceback.format_exc() traceback_str = traceback.format_exc()
print(f"Error in agent run {agent_run_id}: {error_message}\n{traceback_str}") duration = (datetime.now(timezone.utc) - start_time).total_seconds()
logger.error(f"Error in agent run {agent_run_id} after {duration:.2f}s: {error_message}\n{traceback_str}")
# Update the agent run with the error # Update the agent run with the error
await client.table('agent_runs').update({ await client.table('agent_runs').update({
@ -358,15 +396,19 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s
f"agent_run:{agent_run_id}:responses", f"agent_run:{agent_run_id}:responses",
end_stream_marker end_stream_marker
) )
logger.debug(f"Sent END_STREAM signal after error for agent run: {agent_run_id}")
finally: finally:
# Ensure we always clean up the pubsub and stop checker # Ensure we always clean up the pubsub and stop checker
stop_checker.cancel() stop_checker.cancel()
await pubsub.unsubscribe() await pubsub.unsubscribe()
logger.debug(f"Cleaned up pubsub and stop checker for agent run: {agent_run_id}")
# Make sure we mark the run as completed or failed if it was still running # Make sure we mark the run as completed or failed if it was still running
current_run = await client.table('agent_runs').select('status').eq("id", agent_run_id).execute() current_run = await client.table('agent_runs').select('status').eq("id", agent_run_id).execute()
if current_run.data and current_run.data[0]['status'] == 'running': if current_run.data and current_run.data[0]['status'] == 'running':
final_status = "failed" if stop_signal_received else "completed"
logger.info(f"Marking agent run {agent_run_id} as {final_status} in cleanup")
await client.table('agent_runs').update({ await client.table('agent_runs').update({
"status": "failed" if stop_signal_received else "completed", "status": final_status,
"completed_at": datetime.now(timezone.utc).isoformat() "completed_at": datetime.now(timezone.utc).isoformat()
}).eq("id", agent_run_id).execute() }).eq("id", agent_run_id).execute()

View File

@ -292,6 +292,80 @@ function addTrafficToggle() {
document.querySelector('.map-container').appendChild(trafficToggle); document.querySelector('.map-container').appendChild(trafficToggle);
} }
// Add voice command functionality
function addVoiceCommandButton() {
const voiceButton = document.createElement('div');
voiceButton.className = 'voice-command-button';
voiceButton.innerHTML = '<i class="fas fa-microphone"></i>';
voiceButton.style.position = 'absolute';
voiceButton.style.bottom = '170px';
voiceButton.style.right = '20px';
voiceButton.style.width = '48px';
voiceButton.style.height = '48px';
voiceButton.style.borderRadius = '50%';
voiceButton.style.backgroundColor = 'white';
voiceButton.style.color = '#276EF1';
voiceButton.style.display = 'flex';
voiceButton.style.alignItems = 'center';
voiceButton.style.justifyContent = 'center';
voiceButton.style.boxShadow = '0 2px 6px rgba(0,0,0,0.2)';
voiceButton.style.cursor = 'pointer';
voiceButton.style.zIndex = '100';
voiceButton.style.transition = 'transform 0.2s, background-color 0.2s';
// Add hover effect
voiceButton.addEventListener('mouseenter', () => {
voiceButton.style.transform = 'scale(1.1)';
});
voiceButton.addEventListener('mouseleave', () => {
voiceButton.style.transform = 'scale(1)';
});
// Add click functionality
voiceButton.addEventListener('click', () => {
// Simulate voice recognition
voiceButton.style.backgroundColor = '#276EF1';
voiceButton.style.color = 'white';
voiceButton.innerHTML = '<i class="fas fa-microphone-alt"></i>';
showToast('Listening for commands...');
// Simulate processing
setTimeout(() => {
voiceButton.style.backgroundColor = 'white';
voiceButton.style.color = '#276EF1';
voiceButton.innerHTML = '<i class="fas fa-microphone"></i>';
// Simulate a random command
const commands = [
'Take me home',
'Take me to work',
'Show my trip history',
'Schedule a ride'
];
const randomCommand = commands[Math.floor(Math.random() * commands.length)];
showToast(`Command recognized: "${randomCommand}"`);
// Execute the simulated command
setTimeout(() => {
if (randomCommand === 'Take me home') {
selectSavedPlace('Home');
} else if (randomCommand === 'Take me to work') {
selectSavedPlace('Work');
} else if (randomCommand === 'Show my trip history') {
document.getElementById('tripHistoryBtn').click();
} else if (randomCommand === 'Schedule a ride') {
document.getElementById('scheduleRideBtn').click();
}
}, 500);
}, 2000);
});
document.querySelector('.map-container').appendChild(voiceButton);
}
// Initialize enhanced UI // Initialize enhanced UI
function initEnhancedUI() { function initEnhancedUI() {
// Add with a slight delay to ensure the map is loaded // Add with a slight delay to ensure the map is loaded
@ -299,6 +373,7 @@ function initEnhancedUI() {
addFloatingActionButton(); addFloatingActionButton();
addMapStyleSwitcher(); addMapStyleSwitcher();
addTrafficToggle(); addTrafficToggle();
addVoiceCommandButton();
}, 1500); }, 1500);
} }

View File

@ -103,96 +103,5 @@ def setup_logger(name: str = 'agentpress') -> logging.Logger:
return logger return logger
def log_with_context(logger: logging.Logger, level: str = 'INFO'):
"""Decorator to add contextual information to log messages.
Args:
logger: Logger instance to use
level: Logging level for the decorator
"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
# Extract thread_id from args if available
thread_id = None
if len(args) > 1 and isinstance(args[1], str):
thread_id = args[1]
elif 'thread_id' in kwargs:
thread_id = kwargs['thread_id']
# Create extra context
extra = {
'thread_id': thread_id,
'correlation_id': request_id.get(),
'function': func.__name__,
'module': func.__module__
}
# Log function entry
getattr(logger, level.lower())(
f"Entering {func.__name__}",
extra=extra
)
try:
result = await func(*args, **kwargs)
# Log successful completion
getattr(logger, level.lower())(
f"Completed {func.__name__} successfully",
extra=extra
)
return result
except Exception as e:
# Log error with full context
logger.error(
f"Error in {func.__name__}: {str(e)}",
exc_info=True,
extra=extra
)
raise
@wraps(func)
def sync_wrapper(*args, **kwargs):
# Extract thread_id from args if available
thread_id = None
if len(args) > 1 and isinstance(args[1], str):
thread_id = args[1]
elif 'thread_id' in kwargs:
thread_id = kwargs['thread_id']
# Create extra context
extra = {
'thread_id': thread_id,
'correlation_id': request_id.get(),
'function': func.__name__,
'module': func.__module__
}
# Log function entry
getattr(logger, level.lower())(
f"Entering {func.__name__}",
extra=extra
)
try:
result = func(*args, **kwargs)
# Log successful completion
getattr(logger, level.lower())(
f"Completed {func.__name__} successfully",
extra=extra
)
return result
except Exception as e:
# Log error with full context
logger.error(
f"Error in {func.__name__}: {str(e)}",
exc_info=True,
extra=extra
)
raise
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
# Create default logger instance # Create default logger instance
logger = setup_logger() logger = setup_logger()