diff --git a/backend/agent/api.py b/backend/agent/api.py index b09a5e28..edb54c35 100644 --- a/backend/agent/api.py +++ b/backend/agent/api.py @@ -157,11 +157,6 @@ async def stop_agent_run(agent_run_id: str, error_message: Optional[str] = None) except Exception as e: logger.error(f"Failed to find or signal active instances: {str(e)}") - # Make sure to remove from active_agent_runs - if agent_run_id in active_agent_runs: - del active_agent_runs[agent_run_id] - logger.debug(f"Removed agent run {agent_run_id} from active_agent_runs during stop") - logger.info(f"Successfully initiated stop process for agent run: {agent_run_id}") async def restore_running_agent_runs(): @@ -290,8 +285,8 @@ async def start_agent(thread_id: str, user_id: str = Depends(get_current_user_id 'sandbox': { 'id': sandbox_id, 'pass': sandbox_pass, - 'vnc_preview': sandbox.get_preview_link(6080), - 'sandbox_url': sandbox.get_preview_link(8080) + 'vnc_preview': str(sandbox.get_preview_link(6080)), + 'sandbox_url': str(sandbox.get_preview_link(8080)) } }).eq('project_id', project_id).execute() @@ -395,79 +390,43 @@ async def stream_agent_run( async def stream_generator(): logger.debug(f"Streaming responses for agent run: {agent_run_id}") - # Track if we've sent a completion message - sent_completion = False - - try: - # Check if this is an active run with stored responses - if agent_run_id in active_agent_runs: - # First, send all existing responses - stored_responses = active_agent_runs[agent_run_id] - logger.debug(f"Sending {len(stored_responses)} existing responses for agent run: {agent_run_id}") + # Check if this is an active run with stored responses + if agent_run_id in active_agent_runs: + # First, send all existing responses + stored_responses = active_agent_runs[agent_run_id] + logger.debug(f"Sending {len(stored_responses)} existing responses for agent run: {agent_run_id}") + + for response in stored_responses: + yield f"data: {json.dumps(response)}\n\n" + + # If the run is still active (status is running), set up to stream new responses + if agent_run_data['status'] == 'running': + # Get the current length to know where to start watching for new responses + current_length = len(stored_responses) - for response in stored_responses: - yield f"data: {json.dumps(response)}\n\n" - - # Check if this is a completion message - if response.get('type') == 'status': - if response.get('status') == 'completed' or response.get('status_type') == 'thread_run_end': - sent_completion = True - - # If the run is still active (status is running), set up to stream new responses - if agent_run_data['status'] == 'running': - # Get the current length to know where to start watching for new responses - current_length = len(stored_responses) - - # Setup a timeout mechanism - start_time = datetime.now(timezone.utc) - timeout_seconds = 300 # 5 minutes max wait time - - # Keep checking for new responses - while agent_run_id in active_agent_runs: - # Check if there are new responses - if len(active_agent_runs[agent_run_id]) > current_length: - # Send all new responses - for i in range(current_length, len(active_agent_runs[agent_run_id])): - response = active_agent_runs[agent_run_id][i] - yield f"data: {json.dumps(response)}\n\n" - - # Check if this is a completion message - if response.get('type') == 'status': - if response.get('status') == 'completed' or response.get('status_type') == 'thread_run_end': - sent_completion = True - - # Update current length - current_length = len(active_agent_runs[agent_run_id]) + # Keep checking for new responses + while agent_run_id in active_agent_runs: + # Check if there are new responses + if len(active_agent_runs[agent_run_id]) > current_length: + # Send all new responses + for i in range(current_length, len(active_agent_runs[agent_run_id])): + response = active_agent_runs[agent_run_id][i] + yield f"data: {json.dumps(response)}\n\n" - # Check for timeout - elapsed = (datetime.now(timezone.utc) - start_time).total_seconds() - if elapsed > timeout_seconds: - logger.warning(f"Stream timeout after {timeout_seconds}s for agent run: {agent_run_id}") - break - - # Brief pause before checking again - await asyncio.sleep(0.1) - else: - # If the run is not active or we don't have stored responses, - # send a message indicating the run is not available for streaming - logger.warning(f"Agent run {agent_run_id} not found in active runs") - yield f"data: {json.dumps({'type': 'status', 'status': agent_run_data['status'], 'message': 'Run data not available for streaming'})}\n\n" - - # Always send a completion status at the end if we haven't already - if not sent_completion: - completion_status = 'completed' - # Use the actual status from database if available - if agent_run_data['status'] in ['failed', 'stopped']: - completion_status = agent_run_data['status'] + # Update current length + current_length = len(active_agent_runs[agent_run_id]) - yield f"data: {json.dumps({'type': 'status', 'status': completion_status, 'message': f'Stream ended with status: {completion_status}'})}\n\n" - - logger.debug(f"Streaming complete for agent run: {agent_run_id}") - except Exception as e: - logger.error(f"Error in stream generator: {str(e)}", exc_info=True) - # Send error message if we encounter an exception - if not sent_completion: - yield f"data: {json.dumps({'type': 'status', 'status': 'error', 'message': f'Stream error: {str(e)}'})}\n\n" + # Brief pause before checking again + await asyncio.sleep(0.1) + else: + # If the run is not active or we don't have stored responses, + # send a message indicating the run is not available for streaming + logger.warning(f"Agent run {agent_run_id} not found in active runs") + yield f"data: {json.dumps({'type': 'status', 'status': agent_run_data['status'], 'message': 'Run data not available for streaming'})}\n\n" + + # Always send a completion status at the end + yield f"data: {json.dumps({'type': 'status', 'status': 'completed'})}\n\n" + logger.debug(f"Streaming complete for agent run: {agent_run_id}") # Return a streaming response return StreamingResponse( @@ -490,7 +449,6 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s # Tracking variables total_responses = 0 start_time = datetime.now(timezone.utc) - thread_run_ended = False # Track if we received a thread_run_end signal # Create a pubsub to listen for control messages pubsub = None @@ -624,11 +582,6 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s await update_agent_run_status(client, agent_run_id, "failed", error=error_msg, responses=all_responses) break - # Check for thread_run_end signal from ResponseProcessor - if response.get('type') == 'status' and response.get('status_type') == 'thread_run_end': - logger.info(f"Received thread_run_end signal from ResponseProcessor for agent run: {agent_run_id}") - thread_run_ended = True - # Store response in memory if agent_run_id in active_agent_runs: active_agent_runs[agent_run_id].append(response) @@ -722,10 +675,5 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s logger.debug(f"Deleted active run key for agent run: {agent_run_id} (instance: {instance_id})") except Exception as e: logger.warning(f"Error deleting active run key: {str(e)}") - - # Remove from active_agent_runs to ensure stream stops - if agent_run_id in active_agent_runs: - del active_agent_runs[agent_run_id] - logger.debug(f"Removed agent run {agent_run_id} from active_agent_runs") - logger.info(f"Agent run background task fully completed for: {agent_run_id} (instance: {instance_id}, thread_run_ended: {thread_run_ended})") \ No newline at end of file + logger.info(f"Agent run background task fully completed for: {agent_run_id} (instance: {instance_id})") \ No newline at end of file