mirror of https://github.com/kortix-ai/suna.git
wip
This commit is contained in:
parent
831331bf3f
commit
8087d112b5
|
@ -157,11 +157,6 @@ async def stop_agent_run(agent_run_id: str, error_message: Optional[str] = None)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to find or signal active instances: {str(e)}")
|
logger.error(f"Failed to find or signal active instances: {str(e)}")
|
||||||
|
|
||||||
# Make sure to remove from active_agent_runs
|
|
||||||
if agent_run_id in active_agent_runs:
|
|
||||||
del active_agent_runs[agent_run_id]
|
|
||||||
logger.debug(f"Removed agent run {agent_run_id} from active_agent_runs during stop")
|
|
||||||
|
|
||||||
logger.info(f"Successfully initiated stop process for agent run: {agent_run_id}")
|
logger.info(f"Successfully initiated stop process for agent run: {agent_run_id}")
|
||||||
|
|
||||||
async def restore_running_agent_runs():
|
async def restore_running_agent_runs():
|
||||||
|
@ -290,8 +285,8 @@ async def start_agent(thread_id: str, user_id: str = Depends(get_current_user_id
|
||||||
'sandbox': {
|
'sandbox': {
|
||||||
'id': sandbox_id,
|
'id': sandbox_id,
|
||||||
'pass': sandbox_pass,
|
'pass': sandbox_pass,
|
||||||
'vnc_preview': sandbox.get_preview_link(6080),
|
'vnc_preview': str(sandbox.get_preview_link(6080)),
|
||||||
'sandbox_url': sandbox.get_preview_link(8080)
|
'sandbox_url': str(sandbox.get_preview_link(8080))
|
||||||
}
|
}
|
||||||
}).eq('project_id', project_id).execute()
|
}).eq('project_id', project_id).execute()
|
||||||
|
|
||||||
|
@ -395,79 +390,43 @@ async def stream_agent_run(
|
||||||
async def stream_generator():
|
async def stream_generator():
|
||||||
logger.debug(f"Streaming responses for agent run: {agent_run_id}")
|
logger.debug(f"Streaming responses for agent run: {agent_run_id}")
|
||||||
|
|
||||||
# Track if we've sent a completion message
|
# Check if this is an active run with stored responses
|
||||||
sent_completion = False
|
if agent_run_id in active_agent_runs:
|
||||||
|
# First, send all existing responses
|
||||||
|
stored_responses = active_agent_runs[agent_run_id]
|
||||||
|
logger.debug(f"Sending {len(stored_responses)} existing responses for agent run: {agent_run_id}")
|
||||||
|
|
||||||
try:
|
for response in stored_responses:
|
||||||
# Check if this is an active run with stored responses
|
yield f"data: {json.dumps(response)}\n\n"
|
||||||
if agent_run_id in active_agent_runs:
|
|
||||||
# First, send all existing responses
|
|
||||||
stored_responses = active_agent_runs[agent_run_id]
|
|
||||||
logger.debug(f"Sending {len(stored_responses)} existing responses for agent run: {agent_run_id}")
|
|
||||||
|
|
||||||
for response in stored_responses:
|
# If the run is still active (status is running), set up to stream new responses
|
||||||
yield f"data: {json.dumps(response)}\n\n"
|
if agent_run_data['status'] == 'running':
|
||||||
|
# Get the current length to know where to start watching for new responses
|
||||||
|
current_length = len(stored_responses)
|
||||||
|
|
||||||
# Check if this is a completion message
|
# Keep checking for new responses
|
||||||
if response.get('type') == 'status':
|
while agent_run_id in active_agent_runs:
|
||||||
if response.get('status') == 'completed' or response.get('status_type') == 'thread_run_end':
|
# Check if there are new responses
|
||||||
sent_completion = True
|
if len(active_agent_runs[agent_run_id]) > current_length:
|
||||||
|
# Send all new responses
|
||||||
|
for i in range(current_length, len(active_agent_runs[agent_run_id])):
|
||||||
|
response = active_agent_runs[agent_run_id][i]
|
||||||
|
yield f"data: {json.dumps(response)}\n\n"
|
||||||
|
|
||||||
# If the run is still active (status is running), set up to stream new responses
|
# Update current length
|
||||||
if agent_run_data['status'] == 'running':
|
current_length = len(active_agent_runs[agent_run_id])
|
||||||
# Get the current length to know where to start watching for new responses
|
|
||||||
current_length = len(stored_responses)
|
|
||||||
|
|
||||||
# Setup a timeout mechanism
|
# Brief pause before checking again
|
||||||
start_time = datetime.now(timezone.utc)
|
await asyncio.sleep(0.1)
|
||||||
timeout_seconds = 300 # 5 minutes max wait time
|
else:
|
||||||
|
# If the run is not active or we don't have stored responses,
|
||||||
|
# send a message indicating the run is not available for streaming
|
||||||
|
logger.warning(f"Agent run {agent_run_id} not found in active runs")
|
||||||
|
yield f"data: {json.dumps({'type': 'status', 'status': agent_run_data['status'], 'message': 'Run data not available for streaming'})}\n\n"
|
||||||
|
|
||||||
# Keep checking for new responses
|
# Always send a completion status at the end
|
||||||
while agent_run_id in active_agent_runs:
|
yield f"data: {json.dumps({'type': 'status', 'status': 'completed'})}\n\n"
|
||||||
# Check if there are new responses
|
logger.debug(f"Streaming complete for agent run: {agent_run_id}")
|
||||||
if len(active_agent_runs[agent_run_id]) > current_length:
|
|
||||||
# Send all new responses
|
|
||||||
for i in range(current_length, len(active_agent_runs[agent_run_id])):
|
|
||||||
response = active_agent_runs[agent_run_id][i]
|
|
||||||
yield f"data: {json.dumps(response)}\n\n"
|
|
||||||
|
|
||||||
# Check if this is a completion message
|
|
||||||
if response.get('type') == 'status':
|
|
||||||
if response.get('status') == 'completed' or response.get('status_type') == 'thread_run_end':
|
|
||||||
sent_completion = True
|
|
||||||
|
|
||||||
# Update current length
|
|
||||||
current_length = len(active_agent_runs[agent_run_id])
|
|
||||||
|
|
||||||
# Check for timeout
|
|
||||||
elapsed = (datetime.now(timezone.utc) - start_time).total_seconds()
|
|
||||||
if elapsed > timeout_seconds:
|
|
||||||
logger.warning(f"Stream timeout after {timeout_seconds}s for agent run: {agent_run_id}")
|
|
||||||
break
|
|
||||||
|
|
||||||
# Brief pause before checking again
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
else:
|
|
||||||
# If the run is not active or we don't have stored responses,
|
|
||||||
# send a message indicating the run is not available for streaming
|
|
||||||
logger.warning(f"Agent run {agent_run_id} not found in active runs")
|
|
||||||
yield f"data: {json.dumps({'type': 'status', 'status': agent_run_data['status'], 'message': 'Run data not available for streaming'})}\n\n"
|
|
||||||
|
|
||||||
# Always send a completion status at the end if we haven't already
|
|
||||||
if not sent_completion:
|
|
||||||
completion_status = 'completed'
|
|
||||||
# Use the actual status from database if available
|
|
||||||
if agent_run_data['status'] in ['failed', 'stopped']:
|
|
||||||
completion_status = agent_run_data['status']
|
|
||||||
|
|
||||||
yield f"data: {json.dumps({'type': 'status', 'status': completion_status, 'message': f'Stream ended with status: {completion_status}'})}\n\n"
|
|
||||||
|
|
||||||
logger.debug(f"Streaming complete for agent run: {agent_run_id}")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error in stream generator: {str(e)}", exc_info=True)
|
|
||||||
# Send error message if we encounter an exception
|
|
||||||
if not sent_completion:
|
|
||||||
yield f"data: {json.dumps({'type': 'status', 'status': 'error', 'message': f'Stream error: {str(e)}'})}\n\n"
|
|
||||||
|
|
||||||
# Return a streaming response
|
# Return a streaming response
|
||||||
return StreamingResponse(
|
return StreamingResponse(
|
||||||
|
@ -490,7 +449,6 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s
|
||||||
# Tracking variables
|
# Tracking variables
|
||||||
total_responses = 0
|
total_responses = 0
|
||||||
start_time = datetime.now(timezone.utc)
|
start_time = datetime.now(timezone.utc)
|
||||||
thread_run_ended = False # Track if we received a thread_run_end signal
|
|
||||||
|
|
||||||
# Create a pubsub to listen for control messages
|
# Create a pubsub to listen for control messages
|
||||||
pubsub = None
|
pubsub = None
|
||||||
|
@ -624,11 +582,6 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s
|
||||||
await update_agent_run_status(client, agent_run_id, "failed", error=error_msg, responses=all_responses)
|
await update_agent_run_status(client, agent_run_id, "failed", error=error_msg, responses=all_responses)
|
||||||
break
|
break
|
||||||
|
|
||||||
# Check for thread_run_end signal from ResponseProcessor
|
|
||||||
if response.get('type') == 'status' and response.get('status_type') == 'thread_run_end':
|
|
||||||
logger.info(f"Received thread_run_end signal from ResponseProcessor for agent run: {agent_run_id}")
|
|
||||||
thread_run_ended = True
|
|
||||||
|
|
||||||
# Store response in memory
|
# Store response in memory
|
||||||
if agent_run_id in active_agent_runs:
|
if agent_run_id in active_agent_runs:
|
||||||
active_agent_runs[agent_run_id].append(response)
|
active_agent_runs[agent_run_id].append(response)
|
||||||
|
@ -723,9 +676,4 @@ async def run_agent_background(agent_run_id: str, thread_id: str, instance_id: s
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Error deleting active run key: {str(e)}")
|
logger.warning(f"Error deleting active run key: {str(e)}")
|
||||||
|
|
||||||
# Remove from active_agent_runs to ensure stream stops
|
logger.info(f"Agent run background task fully completed for: {agent_run_id} (instance: {instance_id})")
|
||||||
if agent_run_id in active_agent_runs:
|
|
||||||
del active_agent_runs[agent_run_id]
|
|
||||||
logger.debug(f"Removed agent run {agent_run_id} from active_agent_runs")
|
|
||||||
|
|
||||||
logger.info(f"Agent run background task fully completed for: {agent_run_id} (instance: {instance_id}, thread_run_ended: {thread_run_ended})")
|
|
Loading…
Reference in New Issue