diff --git a/backend/agent/api.py b/backend/agent/api.py index 47464b24..bea70dac 100644 --- a/backend/agent/api.py +++ b/backend/agent/api.py @@ -13,6 +13,7 @@ from services.supabase import DBConnection from services import redis from agent.run import run_agent from services.auth_utils import get_current_user_id, get_user_id_from_stream_auth, verify_thread_access, verify_agent_run_access +from agentpress.logger import logger # Initialize shared resources router = APIRouter() @@ -31,16 +32,19 @@ def initialize( # Generate instance ID instance_id = str(uuid.uuid4())[:8] + logger.info(f"Initialized agent API with instance ID: {instance_id}") # Note: Redis will be initialized in the lifespan function in api.py async def cleanup(): """Clean up resources and stop running agents on shutdown.""" + logger.info("Starting cleanup of agent API resources") # Get Redis client redis_client = await redis.get_client() # Use the instance_id to find and clean up this instance's keys running_keys = await redis_client.keys(f"active_run:{instance_id}:*") + logger.info(f"Found {len(running_keys)} running agent runs to clean up") for key in running_keys: agent_run_id = key.split(":")[-1] @@ -48,9 +52,11 @@ async def cleanup(): # Close Redis connection await redis.close() + logger.info("Completed cleanup of agent API resources") async def stop_agent_run(agent_run_id: str): """Update database and publish stop signal to Redis.""" + logger.info(f"Stopping agent run: {agent_run_id}") client = await db.client redis_client = await redis.get_client() @@ -62,13 +68,16 @@ async def stop_agent_run(agent_run_id: str): # Publish stop signal to the agent run channel as a string await redis_client.publish(f"agent_run:{agent_run_id}:control", "STOP") + logger.info(f"Successfully stopped agent run: {agent_run_id}") async def restore_running_agent_runs(): """Restore any agent runs that were still marked as running in the database.""" + logger.info("Restoring running agent runs after server restart") client = await db.client running_agent_runs = await client.table('agent_runs').select('*').eq("status", "running").execute() for run in running_agent_runs.data: + logger.warning(f"Found running agent run {run['id']} from before server restart") await client.table('agent_runs').update({ "status": "failed", "error": "Server restarted while agent was running", @@ -78,6 +87,7 @@ async def restore_running_agent_runs(): @router.post("/thread/{thread_id}/agent/start") async def start_agent(thread_id: str, user_id: str = Depends(get_current_user_id)): """Start an agent for a specific thread in the background.""" + logger.info(f"Starting new agent for thread: {thread_id}") client = await db.client redis_client = await redis.get_client() @@ -93,6 +103,7 @@ async def start_agent(thread_id: str, user_id: str = Depends(get_current_user_id }).execute() agent_run_id = agent_run.data[0]['id'] + logger.info(f"Created new agent run: {agent_run_id}") # Register this run in Redis with TTL await redis_client.set( @@ -117,12 +128,14 @@ async def start_agent(thread_id: str, user_id: str = Depends(get_current_user_id async def _cleanup_agent_run(agent_run_id: str): """Clean up Redis keys when an agent run is done.""" + logger.debug(f"Cleaning up Redis keys for agent run: {agent_run_id}") redis_client = await redis.get_client() await redis_client.delete(f"active_run:{instance_id}:{agent_run_id}") @router.post("/agent-run/{agent_run_id}/stop") async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_id)): """Stop a running agent.""" + logger.info(f"Stopping agent run: {agent_run_id}") client = await db.client # Verify user has access to the agent run @@ -140,6 +153,7 @@ async def stream_agent_run( request: Request = None ): """Stream the responses of an agent run from where they left off.""" + logger.info(f"Starting stream for agent run: {agent_run_id}") client = await db.client redis_client = await redis.get_client() @@ -150,6 +164,7 @@ async def stream_agent_run( agent_run_data = await verify_agent_run_access(client, agent_run_id, user_id) responses = json.loads(agent_run_data['responses']) if agent_run_data['responses'] else [] + logger.debug(f"Found {len(responses)} existing responses for agent run: {agent_run_id}") # Create a pubsub to listen for new responses pubsub = redis_client.pubsub() @@ -171,6 +186,7 @@ async def stream_agent_run( # Check if this is the end marker end_stream_marker = "END_STREAM" if data == end_stream_marker or data == end_stream_marker.encode('utf-8'): + logger.debug(f"Received end stream marker for agent run: {agent_run_id}") break # Handle both string and bytes data @@ -192,7 +208,7 @@ async def stream_agent_run( await asyncio.sleep(0.01) # Minimal sleep to prevent CPU spinning except asyncio.CancelledError: - pass + logger.info(f"Stream cancelled for agent run: {agent_run_id}") finally: await pubsub.unsubscribe() @@ -212,23 +228,27 @@ async def stream_agent_run( @router.get("/thread/{thread_id}/agent-runs") async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user_id)): """Get all agent runs for a thread.""" + logger.info(f"Fetching agent runs for thread: {thread_id}") client = await db.client # Verify user has access to this thread await verify_thread_access(client, thread_id, user_id) agent_runs = await client.table('agent_runs').select('*').eq("thread_id", thread_id).execute() + logger.debug(f"Found {len(agent_runs.data)} agent runs for thread: {thread_id}") return {"agent_runs": agent_runs.data} @router.get("/agent-run/{agent_run_id}") async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_user_id)): """Get agent run status and responses.""" + logger.info(f"Fetching agent run details: {agent_run_id}") client = await db.client # Verify user has access to the agent run and get run data agent_run_data = await verify_agent_run_access(client, agent_run_id, user_id) responses = json.loads(agent_run_data['responses']) if agent_run_data['responses'] else [] + logger.debug(f"Found {len(responses)} responses for agent run: {agent_run_id}") return { "id": agent_run_data['id'], @@ -242,6 +262,7 @@ async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_us async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: str): """Run the agent in the background and store responses.""" + logger.info(f"Starting background agent run: {agent_run_id} for thread: {thread_id}") client = await db.client redis_client = await redis.get_client() @@ -249,10 +270,13 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s responses = [] batch = [] last_db_update = datetime.now(timezone.utc) + total_responses = 0 + start_time = datetime.now(timezone.utc) # Create a pubsub to listen for control messages pubsub = redis_client.pubsub() await pubsub.subscribe(f"agent_run:{agent_run_id}:control") + logger.debug(f"Subscribed to control channel for agent run: {agent_run_id}") # Start a background task to check for stop signals stop_signal_received = False @@ -264,6 +288,7 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s if message and message["type"] == "message": stop_signal = "STOP" if message["data"] == stop_signal or message["data"] == stop_signal.encode('utf-8'): + logger.info(f"Received stop signal for agent run: {agent_run_id}") stop_signal_received = True break await asyncio.sleep(0.01) # Minimal sleep @@ -272,15 +297,18 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s # Start the stop signal checker stop_checker = asyncio.create_task(check_for_stop_signal()) + logger.debug(f"Started stop signal checker for agent run: {agent_run_id}") try: # Run the agent and collect responses + logger.debug(f"Initializing agent generator for thread: {thread_id}") agent_gen = run_agent(thread_id, stream=True, thread_manager=thread_manager) async for response in agent_gen: # Check if stop signal received if stop_signal_received: + logger.info(f"Agent run stopped due to stop signal: {agent_run_id}") break # Format the response properly @@ -300,6 +328,10 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s # Add response to batch and responses list responses.append(formatted_response) batch.append(formatted_response) + total_responses += 1 + + # Log response type for debugging + # logger.debug(f"Received response type '{formatted_response.get('type', 'unknown')}' for agent run: {agent_run_id}") # Immediately publish the response to Redis await redis_client.publish( @@ -310,6 +342,7 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s # Update database less frequently to reduce overhead now = datetime.now(timezone.utc) if (now - last_db_update).total_seconds() >= 2.0 and batch: # Increased interval + # logger.debug(f"Batch update for agent run {agent_run_id}: {len(batch)} responses") await client.table('agent_runs').update({ "responses": json.dumps(responses) }).eq("id", agent_run_id).execute() @@ -321,12 +354,15 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s # Final update to database with all responses if batch: + logger.debug(f"Final batch update for agent run {agent_run_id}: {len(batch)} responses") await client.table('agent_runs').update({ "responses": json.dumps(responses) }).eq("id", agent_run_id).execute() # Signal all done if we weren't stopped if not stop_signal_received: + duration = (datetime.now(timezone.utc) - start_time).total_seconds() + logger.info(f"Agent run completed successfully: {agent_run_id} (duration: {duration:.2f}s, total responses: {total_responses})") await client.table('agent_runs').update({ "status": "completed", "completed_at": datetime.now(timezone.utc).isoformat() @@ -338,12 +374,14 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s f"agent_run:{agent_run_id}:responses", end_stream_marker ) + logger.debug(f"Sent END_STREAM signal for agent run: {agent_run_id}") except Exception as e: # Log the error and update the agent run error_message = str(e) traceback_str = traceback.format_exc() - print(f"Error in agent run {agent_run_id}: {error_message}\n{traceback_str}") + duration = (datetime.now(timezone.utc) - start_time).total_seconds() + logger.error(f"Error in agent run {agent_run_id} after {duration:.2f}s: {error_message}\n{traceback_str}") # Update the agent run with the error await client.table('agent_runs').update({ @@ -358,15 +396,19 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s f"agent_run:{agent_run_id}:responses", end_stream_marker ) + logger.debug(f"Sent END_STREAM signal after error for agent run: {agent_run_id}") finally: # Ensure we always clean up the pubsub and stop checker stop_checker.cancel() await pubsub.unsubscribe() + logger.debug(f"Cleaned up pubsub and stop checker for agent run: {agent_run_id}") # Make sure we mark the run as completed or failed if it was still running current_run = await client.table('agent_runs').select('status').eq("id", agent_run_id).execute() if current_run.data and current_run.data[0]['status'] == 'running': + final_status = "failed" if stop_signal_received else "completed" + logger.info(f"Marking agent run {agent_run_id} as {final_status} in cleanup") await client.table('agent_runs').update({ - "status": "failed" if stop_signal_received else "completed", + "status": final_status, "completed_at": datetime.now(timezone.utc).isoformat() }).eq("id", agent_run_id).execute() diff --git a/backend/agent/workspace/assets/custom-ui.js b/backend/agent/workspace/assets/custom-ui.js index b1e33ca0..8c32ae6c 100644 --- a/backend/agent/workspace/assets/custom-ui.js +++ b/backend/agent/workspace/assets/custom-ui.js @@ -292,6 +292,80 @@ function addTrafficToggle() { document.querySelector('.map-container').appendChild(trafficToggle); } +// Add voice command functionality +function addVoiceCommandButton() { + const voiceButton = document.createElement('div'); + voiceButton.className = 'voice-command-button'; + voiceButton.innerHTML = ''; + voiceButton.style.position = 'absolute'; + voiceButton.style.bottom = '170px'; + voiceButton.style.right = '20px'; + voiceButton.style.width = '48px'; + voiceButton.style.height = '48px'; + voiceButton.style.borderRadius = '50%'; + voiceButton.style.backgroundColor = 'white'; + voiceButton.style.color = '#276EF1'; + voiceButton.style.display = 'flex'; + voiceButton.style.alignItems = 'center'; + voiceButton.style.justifyContent = 'center'; + voiceButton.style.boxShadow = '0 2px 6px rgba(0,0,0,0.2)'; + voiceButton.style.cursor = 'pointer'; + voiceButton.style.zIndex = '100'; + voiceButton.style.transition = 'transform 0.2s, background-color 0.2s'; + + // Add hover effect + voiceButton.addEventListener('mouseenter', () => { + voiceButton.style.transform = 'scale(1.1)'; + }); + + voiceButton.addEventListener('mouseleave', () => { + voiceButton.style.transform = 'scale(1)'; + }); + + // Add click functionality + voiceButton.addEventListener('click', () => { + // Simulate voice recognition + voiceButton.style.backgroundColor = '#276EF1'; + voiceButton.style.color = 'white'; + voiceButton.innerHTML = ''; + + showToast('Listening for commands...'); + + // Simulate processing + setTimeout(() => { + voiceButton.style.backgroundColor = 'white'; + voiceButton.style.color = '#276EF1'; + voiceButton.innerHTML = ''; + + // Simulate a random command + const commands = [ + 'Take me home', + 'Take me to work', + 'Show my trip history', + 'Schedule a ride' + ]; + + const randomCommand = commands[Math.floor(Math.random() * commands.length)]; + showToast(`Command recognized: "${randomCommand}"`); + + // Execute the simulated command + setTimeout(() => { + if (randomCommand === 'Take me home') { + selectSavedPlace('Home'); + } else if (randomCommand === 'Take me to work') { + selectSavedPlace('Work'); + } else if (randomCommand === 'Show my trip history') { + document.getElementById('tripHistoryBtn').click(); + } else if (randomCommand === 'Schedule a ride') { + document.getElementById('scheduleRideBtn').click(); + } + }, 500); + }, 2000); + }); + + document.querySelector('.map-container').appendChild(voiceButton); +} + // Initialize enhanced UI function initEnhancedUI() { // Add with a slight delay to ensure the map is loaded @@ -299,6 +373,7 @@ function initEnhancedUI() { addFloatingActionButton(); addMapStyleSwitcher(); addTrafficToggle(); + addVoiceCommandButton(); }, 1500); } diff --git a/backend/agentpress/logger.py b/backend/agentpress/logger.py index 43eecb8e..80063207 100644 --- a/backend/agentpress/logger.py +++ b/backend/agentpress/logger.py @@ -103,96 +103,5 @@ def setup_logger(name: str = 'agentpress') -> logging.Logger: return logger -def log_with_context(logger: logging.Logger, level: str = 'INFO'): - """Decorator to add contextual information to log messages. - - Args: - logger: Logger instance to use - level: Logging level for the decorator - """ - def decorator(func): - @wraps(func) - async def async_wrapper(*args, **kwargs): - # Extract thread_id from args if available - thread_id = None - if len(args) > 1 and isinstance(args[1], str): - thread_id = args[1] - elif 'thread_id' in kwargs: - thread_id = kwargs['thread_id'] - - # Create extra context - extra = { - 'thread_id': thread_id, - 'correlation_id': request_id.get(), - 'function': func.__name__, - 'module': func.__module__ - } - - # Log function entry - getattr(logger, level.lower())( - f"Entering {func.__name__}", - extra=extra - ) - - try: - result = await func(*args, **kwargs) - # Log successful completion - getattr(logger, level.lower())( - f"Completed {func.__name__} successfully", - extra=extra - ) - return result - except Exception as e: - # Log error with full context - logger.error( - f"Error in {func.__name__}: {str(e)}", - exc_info=True, - extra=extra - ) - raise - - @wraps(func) - def sync_wrapper(*args, **kwargs): - # Extract thread_id from args if available - thread_id = None - if len(args) > 1 and isinstance(args[1], str): - thread_id = args[1] - elif 'thread_id' in kwargs: - thread_id = kwargs['thread_id'] - - # Create extra context - extra = { - 'thread_id': thread_id, - 'correlation_id': request_id.get(), - 'function': func.__name__, - 'module': func.__module__ - } - - # Log function entry - getattr(logger, level.lower())( - f"Entering {func.__name__}", - extra=extra - ) - - try: - result = func(*args, **kwargs) - # Log successful completion - getattr(logger, level.lower())( - f"Completed {func.__name__} successfully", - extra=extra - ) - return result - except Exception as e: - # Log error with full context - logger.error( - f"Error in {func.__name__}: {str(e)}", - exc_info=True, - extra=extra - ) - raise - - return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper - return decorator - # Create default logger instance logger = setup_logger() \ No newline at end of file