2025-04-18 12:49:41 +08:00
from fastapi import APIRouter , HTTPException , Depends , Request , Body
2025-03-29 06:33:55 +08:00
from fastapi . responses import StreamingResponse
import asyncio
import json
import traceback
from datetime import datetime , timezone
import uuid
2025-04-04 23:06:49 +08:00
from typing import Optional , List , Dict , Any
2025-03-30 14:48:57 +08:00
import jwt
2025-04-18 12:49:41 +08:00
from pydantic import BaseModel
2025-03-29 06:33:55 +08:00
2025-03-30 14:48:57 +08:00
from agentpress . thread_manager import ThreadManager
from services . supabase import DBConnection
from services import redis
from agent . run import run_agent
2025-04-04 23:06:49 +08:00
from utils . auth_utils import get_current_user_id , get_user_id_from_stream_auth , verify_thread_access
2025-04-02 02:49:35 +08:00
from utils . logger import logger
2025-04-14 08:32:08 +08:00
from utils . billing import check_billing_status , get_account_id_from_thread
from utils . db import update_agent_run_status
2025-04-18 06:17:48 +08:00
from sandbox . sandbox import create_sandbox , get_or_start_sandbox
2025-03-29 06:33:55 +08:00
# Initialize shared resources
router = APIRouter ( )
thread_manager = None
db = None
2025-04-04 23:06:49 +08:00
# In-memory storage for active agent runs and their responses
active_agent_runs : Dict [ str , List [ Any ] ] = { }
2025-04-18 12:49:41 +08:00
class AgentStartRequest ( BaseModel ) :
model_name : Optional [ str ] = " anthropic/claude-3-7-sonnet-latest "
enable_thinking : Optional [ bool ] = False
reasoning_effort : Optional [ str ] = ' low '
stream : Optional [ bool ] = False # Default stream to False for API
2025-03-29 06:33:55 +08:00
def initialize (
2025-03-30 14:48:57 +08:00
_thread_manager : ThreadManager ,
2025-04-06 17:10:18 +08:00
_db : DBConnection ,
_instance_id : str = None
2025-03-29 06:33:55 +08:00
) :
""" Initialize the agent API with resources from the main API. """
2025-03-30 14:48:57 +08:00
global thread_manager , db , instance_id
2025-03-29 06:33:55 +08:00
thread_manager = _thread_manager
db = _db
2025-04-06 17:10:18 +08:00
# Use provided instance_id or generate a new one
if _instance_id :
instance_id = _instance_id
else :
# Generate instance ID
instance_id = str ( uuid . uuid4 ( ) ) [ : 8 ]
2025-04-01 10:27:06 +08:00
logger . info ( f " Initialized agent API with instance ID: { instance_id } " )
2025-03-29 06:33:55 +08:00
# Note: Redis will be initialized in the lifespan function in api.py
async def cleanup ( ) :
""" Clean up resources and stop running agents on shutdown. """
2025-04-01 10:27:06 +08:00
logger . info ( " Starting cleanup of agent API resources " )
2025-03-29 06:33:55 +08:00
# Use the instance_id to find and clean up this instance's keys
2025-04-04 23:06:49 +08:00
try :
running_keys = await redis . keys ( f " active_run: { instance_id } :* " )
logger . info ( f " Found { len ( running_keys ) } running agent runs to clean up " )
for key in running_keys :
agent_run_id = key . split ( " : " ) [ - 1 ]
await stop_agent_run ( agent_run_id )
except Exception as e :
logger . error ( f " Failed to clean up running agent runs: { str ( e ) } " )
2025-03-29 06:33:55 +08:00
# Close Redis connection
2025-03-30 14:48:57 +08:00
await redis . close ( )
2025-04-01 10:27:06 +08:00
logger . info ( " Completed cleanup of agent API resources " )
2025-03-29 06:33:55 +08:00
2025-04-14 08:32:08 +08:00
async def update_agent_run_status (
client ,
agent_run_id : str ,
status : str ,
error : Optional [ str ] = None ,
responses : Optional [ List [ Any ] ] = None
) - > bool :
"""
Centralized function to update agent run status .
Returns True if update was successful .
"""
try :
update_data = {
" status " : status ,
" completed_at " : datetime . now ( timezone . utc ) . isoformat ( )
}
if error :
update_data [ " error " ] = error
if responses :
update_data [ " responses " ] = responses
# Retry up to 3 times
for retry in range ( 3 ) :
try :
update_result = await client . table ( ' agent_runs ' ) . update ( update_data ) . eq ( " id " , agent_run_id ) . execute ( )
if hasattr ( update_result , ' data ' ) and update_result . data :
logger . info ( f " Successfully updated agent run status to ' { status } ' (retry { retry } ): { agent_run_id } " )
# Verify the update
verify_result = await client . table ( ' agent_runs ' ) . select ( ' status ' , ' completed_at ' ) . eq ( " id " , agent_run_id ) . execute ( )
if verify_result . data :
actual_status = verify_result . data [ 0 ] . get ( ' status ' )
completed_at = verify_result . data [ 0 ] . get ( ' completed_at ' )
logger . info ( f " Verified agent run update: status= { actual_status } , completed_at= { completed_at } " )
return True
else :
logger . warning ( f " Database update returned no data on retry { retry } : { update_result } " )
if retry == 2 : # Last retry
logger . error ( f " Failed to update agent run status after all retries: { agent_run_id } " )
return False
except Exception as db_error :
logger . error ( f " Database error on retry { retry } updating status: { str ( db_error ) } " )
if retry < 2 : # Not the last retry yet
await asyncio . sleep ( 0.5 * ( 2 * * retry ) ) # Exponential backoff
else :
logger . error ( f " Failed to update agent run status after all retries: { agent_run_id } " , exc_info = True )
return False
except Exception as e :
logger . error ( f " Unexpected error updating agent run status: { str ( e ) } " , exc_info = True )
return False
return False
async def stop_agent_run ( agent_run_id : str , error_message : Optional [ str ] = None ) :
2025-03-29 06:33:55 +08:00
""" Update database and publish stop signal to Redis. """
2025-04-01 10:27:06 +08:00
logger . info ( f " Stopping agent run: { agent_run_id } " )
2025-03-29 06:33:55 +08:00
client = await db . client
2025-04-14 08:32:08 +08:00
# Update the agent run status
status = " failed " if error_message else " stopped "
await update_agent_run_status ( client , agent_run_id , status , error = error_message )
2025-04-04 23:06:49 +08:00
# Send stop signal to global channel
try :
await redis . publish ( f " agent_run: { agent_run_id } :control " , " STOP " )
logger . debug ( f " Published STOP signal to global channel for agent run { agent_run_id } " )
except Exception as e :
logger . error ( f " Failed to publish STOP signal to global channel: { str ( e ) } " )
# Find all instances handling this agent run
try :
instance_keys = await redis . keys ( f " active_run:*: { agent_run_id } " )
logger . debug ( f " Found { len ( instance_keys ) } active instances for agent run { agent_run_id } " )
for key in instance_keys :
# Extract instance ID from the key pattern: active_run:{instance_id}:{agent_run_id}
parts = key . split ( " : " )
if len ( parts ) > = 3 :
instance_id = parts [ 1 ]
try :
# Send stop signal to instance-specific channel
await redis . publish ( f " agent_run: { agent_run_id } :control: { instance_id } " , " STOP " )
logger . debug ( f " Published STOP signal to instance { instance_id } for agent run { agent_run_id } " )
except Exception as e :
logger . warning ( f " Failed to publish STOP signal to instance { instance_id } : { str ( e ) } " )
except Exception as e :
logger . error ( f " Failed to find or signal active instances: { str ( e ) } " )
logger . info ( f " Successfully initiated stop process for agent run: { agent_run_id } " )
2025-03-29 06:33:55 +08:00
async def restore_running_agent_runs ( ) :
""" Restore any agent runs that were still marked as running in the database. """
2025-04-01 10:27:06 +08:00
logger . info ( " Restoring running agent runs after server restart " )
2025-03-29 06:33:55 +08:00
client = await db . client
running_agent_runs = await client . table ( ' agent_runs ' ) . select ( ' * ' ) . eq ( " status " , " running " ) . execute ( )
for run in running_agent_runs . data :
2025-04-01 10:27:06 +08:00
logger . warning ( f " Found running agent run { run [ ' id ' ] } from before server restart " )
2025-03-29 06:33:55 +08:00
await client . table ( ' agent_runs ' ) . update ( {
" status " : " failed " ,
" error " : " Server restarted while agent was running " ,
" completed_at " : datetime . now ( timezone . utc ) . isoformat ( )
} ) . eq ( " id " , run [ ' id ' ] ) . execute ( )
2025-04-01 14:41:18 +08:00
async def check_for_active_project_agent_run ( client , project_id : str ) :
"""
Check if there is an active agent run for any thread in the given project .
2025-04-01 14:49:19 +08:00
If found , returns the ID of the active run , otherwise returns None .
2025-04-01 14:41:18 +08:00
Args :
client : The Supabase client
project_id : The project ID to check
2025-04-01 14:49:19 +08:00
Returns :
str or None : The ID of the active agent run if found , None otherwise
2025-04-01 14:41:18 +08:00
"""
# Get all threads from this project
project_threads = await client . table ( ' threads ' ) . select ( ' thread_id ' ) . eq ( ' project_id ' , project_id ) . execute ( )
project_thread_ids = [ t [ ' thread_id ' ] for t in project_threads . data ]
# Check if there are any active agent runs for any thread in this project
if project_thread_ids :
active_runs = await client . table ( ' agent_runs ' ) . select ( ' id ' ) . in_ ( ' thread_id ' , project_thread_ids ) . eq ( ' status ' , ' running ' ) . execute ( )
if active_runs . data and len ( active_runs . data ) > 0 :
2025-04-01 14:49:19 +08:00
return active_runs . data [ 0 ] [ ' id ' ]
return None
2025-04-01 14:41:18 +08:00
2025-04-04 23:06:49 +08:00
async def get_agent_run_with_access_check ( client , agent_run_id : str , user_id : str ) :
"""
2025-04-12 08:04:40 +08:00
Get an agent run ' s data after verifying the user has access to it through account membership.
2025-04-04 23:06:49 +08:00
Args :
client : The Supabase client
agent_run_id : The agent run ID to check access for
user_id : The user ID to check permissions for
Returns :
dict : The agent run data if access is granted
Raises :
HTTPException : If the user doesn ' t have access or the agent run doesn ' t exist
"""
agent_run = await client . table ( ' agent_runs ' ) . select ( ' * ' ) . eq ( ' id ' , agent_run_id ) . execute ( )
if not agent_run . data or len ( agent_run . data ) == 0 :
raise HTTPException ( status_code = 404 , detail = " Agent run not found " )
agent_run_data = agent_run . data [ 0 ]
thread_id = agent_run_data [ ' thread_id ' ]
2025-04-12 08:04:40 +08:00
# Verify user has access to this thread using the updated verify_thread_access function
2025-04-04 23:06:49 +08:00
await verify_thread_access ( client , thread_id , user_id )
2025-04-12 08:04:40 +08:00
return agent_run_data
2025-04-04 23:06:49 +08:00
async def _cleanup_agent_run ( agent_run_id : str ) :
""" Clean up Redis keys when an agent run is done. """
logger . debug ( f " Cleaning up Redis keys for agent run: { agent_run_id } " )
try :
await redis . delete ( f " active_run: { instance_id } : { agent_run_id } " )
logger . debug ( f " Successfully cleaned up Redis keys for agent run: { agent_run_id } " )
except Exception as e :
logger . warning ( f " Failed to clean up Redis keys for agent run { agent_run_id } : { str ( e ) } " )
# Non-fatal error, can continue
2025-03-29 06:33:55 +08:00
@router.post ( " /thread/ {thread_id} /agent/start " )
2025-04-18 12:49:41 +08:00
async def start_agent (
thread_id : str ,
body : AgentStartRequest = Body ( . . . ) , # Accept request body
user_id : str = Depends ( get_current_user_id )
) :
2025-03-29 06:33:55 +08:00
""" Start an agent for a specific thread in the background. """
2025-04-18 12:49:41 +08:00
logger . info ( f " Starting new agent for thread: { thread_id } with config: model= { body . model_name } , thinking= { body . enable_thinking } , effort= { body . reasoning_effort } , stream= { body . stream } " )
2025-03-29 06:33:55 +08:00
client = await db . client
# Verify user has access to this thread
2025-03-30 14:48:57 +08:00
await verify_thread_access ( client , thread_id , user_id )
2025-03-29 06:33:55 +08:00
2025-04-12 08:04:40 +08:00
# Get the project_id and account_id for this thread
thread_result = await client . table ( ' threads ' ) . select ( ' project_id ' , ' account_id ' ) . eq ( ' thread_id ' , thread_id ) . execute ( )
2025-04-01 14:41:18 +08:00
if not thread_result . data :
raise HTTPException ( status_code = 404 , detail = " Thread not found " )
2025-04-12 08:04:40 +08:00
thread_data = thread_result . data [ 0 ]
project_id = thread_data . get ( ' project_id ' )
2025-04-14 08:32:08 +08:00
account_id = thread_data . get ( ' account_id ' )
# Check billing status
can_run , message , subscription = await check_billing_status ( client , account_id )
if not can_run :
raise HTTPException ( status_code = 402 , detail = {
" message " : message ,
" subscription " : subscription
} )
2025-04-01 14:41:18 +08:00
# Check if there is already an active agent run for this project
2025-04-01 14:49:19 +08:00
active_run_id = await check_for_active_project_agent_run ( client , project_id )
# If there's an active run, stop it first
if active_run_id :
logger . info ( f " Stopping existing agent run { active_run_id } before starting new one " )
await stop_agent_run ( active_run_id )
2025-04-18 06:17:48 +08:00
# Initialize or get sandbox for this project
project = await client . table ( ' projects ' ) . select ( ' * ' ) . eq ( ' project_id ' , project_id ) . execute ( )
if project . data [ 0 ] . get ( ' sandbox ' , { } ) . get ( ' id ' ) :
sandbox_id = project . data [ 0 ] [ ' sandbox ' ] [ ' id ' ]
sandbox_pass = project . data [ 0 ] [ ' sandbox ' ] [ ' pass ' ]
sandbox = await get_or_start_sandbox ( sandbox_id )
else :
sandbox_pass = str ( uuid . uuid4 ( ) )
sandbox = create_sandbox ( sandbox_pass )
logger . info ( f " Created new sandbox with preview: { sandbox . get_preview_link ( 6080 ) } /vnc_lite.html?password= { sandbox_pass } " )
sandbox_id = sandbox . id
await client . table ( ' projects ' ) . update ( {
' sandbox ' : {
' id ' : sandbox_id ,
' pass ' : sandbox_pass ,
2025-04-18 11:51:09 +08:00
' vnc_preview ' : str ( sandbox . get_preview_link ( 6080 ) ) ,
' sandbox_url ' : str ( sandbox . get_preview_link ( 8080 ) )
2025-04-18 06:17:48 +08:00
}
} ) . eq ( ' project_id ' , project_id ) . execute ( )
2025-04-01 14:41:18 +08:00
2025-03-29 06:33:55 +08:00
agent_run = await client . table ( ' agent_runs ' ) . insert ( {
" thread_id " : thread_id ,
" status " : " running " ,
2025-04-04 23:06:49 +08:00
" started_at " : datetime . now ( timezone . utc ) . isoformat ( )
2025-03-29 06:33:55 +08:00
} ) . execute ( )
agent_run_id = agent_run . data [ 0 ] [ ' id ' ]
2025-04-01 10:27:06 +08:00
logger . info ( f " Created new agent run: { agent_run_id } " )
2025-03-29 06:33:55 +08:00
2025-04-04 23:06:49 +08:00
# Initialize in-memory storage for this agent run
active_agent_runs [ agent_run_id ] = [ ]
2025-03-29 06:33:55 +08:00
# Register this run in Redis with TTL
2025-04-04 23:06:49 +08:00
try :
await redis . set (
f " active_run: { instance_id } : { agent_run_id } " ,
" running " ,
ex = redis . REDIS_KEY_TTL
)
except Exception as e :
logger . warning ( f " Failed to register agent run in Redis, continuing without Redis tracking: { str ( e ) } " )
2025-03-29 06:33:55 +08:00
# Run the agent in the background
task = asyncio . create_task (
2025-04-18 12:49:41 +08:00
run_agent_background (
agent_run_id = agent_run_id ,
thread_id = thread_id ,
instance_id = instance_id ,
project_id = project_id ,
sandbox = sandbox ,
model_name = body . model_name ,
enable_thinking = body . enable_thinking ,
reasoning_effort = body . reasoning_effort ,
stream = body . stream # Pass stream parameter
)
2025-03-29 06:33:55 +08:00
)
# Set a callback to clean up when task is done
task . add_done_callback (
lambda _ : asyncio . create_task (
_cleanup_agent_run ( agent_run_id )
)
)
return { " agent_run_id " : agent_run_id , " status " : " running " }
@router.post ( " /agent-run/ {agent_run_id} /stop " )
async def stop_agent ( agent_run_id : str , user_id : str = Depends ( get_current_user_id ) ) :
""" Stop a running agent. """
2025-04-01 10:27:06 +08:00
logger . info ( f " Stopping agent run: { agent_run_id } " )
2025-03-29 06:33:55 +08:00
client = await db . client
2025-03-30 14:48:57 +08:00
# Verify user has access to the agent run
2025-04-04 23:06:49 +08:00
await get_agent_run_with_access_check ( client , agent_run_id , user_id )
2025-03-29 06:33:55 +08:00
# Stop the agent run
await stop_agent_run ( agent_run_id )
return { " status " : " stopped " }
@router.get ( " /thread/ {thread_id} /agent-runs " )
async def get_agent_runs ( thread_id : str , user_id : str = Depends ( get_current_user_id ) ) :
""" Get all agent runs for a thread. """
2025-04-01 10:27:06 +08:00
logger . info ( f " Fetching agent runs for thread: { thread_id } " )
2025-03-29 06:33:55 +08:00
client = await db . client
# Verify user has access to this thread
2025-03-30 14:48:57 +08:00
await verify_thread_access ( client , thread_id , user_id )
2025-03-29 06:33:55 +08:00
agent_runs = await client . table ( ' agent_runs ' ) . select ( ' * ' ) . eq ( " thread_id " , thread_id ) . execute ( )
2025-04-01 10:27:06 +08:00
logger . debug ( f " Found { len ( agent_runs . data ) } agent runs for thread: { thread_id } " )
2025-03-29 06:33:55 +08:00
return { " agent_runs " : agent_runs . data }
@router.get ( " /agent-run/ {agent_run_id} " )
async def get_agent_run ( agent_run_id : str , user_id : str = Depends ( get_current_user_id ) ) :
""" Get agent run status and responses. """
2025-04-01 10:27:06 +08:00
logger . info ( f " Fetching agent run details: { agent_run_id } " )
2025-03-29 06:33:55 +08:00
client = await db . client
2025-04-04 23:06:49 +08:00
agent_run_data = await get_agent_run_with_access_check ( client , agent_run_id , user_id )
2025-03-29 06:33:55 +08:00
return {
" id " : agent_run_data [ ' id ' ] ,
" threadId " : agent_run_data [ ' thread_id ' ] ,
" status " : agent_run_data [ ' status ' ] ,
" startedAt " : agent_run_data [ ' started_at ' ] ,
" completedAt " : agent_run_data [ ' completed_at ' ] ,
" error " : agent_run_data [ ' error ' ]
}
2025-04-04 23:06:49 +08:00
@router.get ( " /agent-run/ {agent_run_id} /stream " )
async def stream_agent_run (
agent_run_id : str ,
token : Optional [ str ] = None ,
request : Request = None
) :
""" Stream the responses of an agent run from in-memory storage or reconnect to ongoing run. """
logger . info ( f " Starting stream for agent run: { agent_run_id } " )
client = await db . client
# Get user ID using the streaming auth function
user_id = await get_user_id_from_stream_auth ( request , token )
# Verify user has access to the agent run and get run data
agent_run_data = await get_agent_run_with_access_check ( client , agent_run_id , user_id )
# Define a streaming generator that uses in-memory responses
async def stream_generator ( ) :
2025-04-13 05:40:01 +08:00
logger . debug ( f " Streaming responses for agent run: { agent_run_id } " )
2025-04-04 23:06:49 +08:00
# Check if this is an active run with stored responses
if agent_run_id in active_agent_runs :
# First, send all existing responses
stored_responses = active_agent_runs [ agent_run_id ]
2025-04-13 05:40:01 +08:00
logger . debug ( f " Sending { len ( stored_responses ) } existing responses for agent run: { agent_run_id } " )
2025-04-04 23:06:49 +08:00
for response in stored_responses :
yield f " data: { json . dumps ( response ) } \n \n "
# If the run is still active (status is running), set up to stream new responses
if agent_run_data [ ' status ' ] == ' running ' :
# Get the current length to know where to start watching for new responses
current_length = len ( stored_responses )
# Keep checking for new responses
while agent_run_id in active_agent_runs :
# Check if there are new responses
if len ( active_agent_runs [ agent_run_id ] ) > current_length :
# Send all new responses
for i in range ( current_length , len ( active_agent_runs [ agent_run_id ] ) ) :
response = active_agent_runs [ agent_run_id ] [ i ]
yield f " data: { json . dumps ( response ) } \n \n "
# Update current length
current_length = len ( active_agent_runs [ agent_run_id ] )
# Brief pause before checking again
await asyncio . sleep ( 0.1 )
else :
# If the run is not active or we don't have stored responses,
# send a message indicating the run is not available for streaming
logger . warning ( f " Agent run { agent_run_id } not found in active runs " )
yield f " data: { json . dumps ( { ' type ' : ' status ' , ' status ' : agent_run_data [ ' status ' ] , ' message ' : ' Run data not available for streaming ' } ) } \n \n "
# Always send a completion status at the end
yield f " data: { json . dumps ( { ' type ' : ' status ' , ' status ' : ' completed ' } ) } \n \n "
2025-04-13 05:40:01 +08:00
logger . debug ( f " Streaming complete for agent run: { agent_run_id } " )
2025-04-04 23:06:49 +08:00
# Return a streaming response
return StreamingResponse (
stream_generator ( ) ,
media_type = " text/event-stream " ,
headers = {
" Cache-Control " : " no-cache, no-transform " ,
" Connection " : " keep-alive " ,
" X-Accel-Buffering " : " no " ,
" Content-Type " : " text/event-stream " ,
" Access-Control-Allow-Origin " : " * "
}
)
2025-04-18 12:49:41 +08:00
async def run_agent_background (
agent_run_id : str ,
thread_id : str ,
instance_id : str ,
project_id : str ,
sandbox ,
model_name : str ,
enable_thinking : Optional [ bool ] ,
reasoning_effort : Optional [ str ] ,
stream : bool # Add stream parameter
) :
2025-04-04 23:06:49 +08:00
""" Run the agent in the background and handle status updates. """
2025-04-18 12:49:41 +08:00
logger . debug ( f " Starting background agent run: { agent_run_id } for thread: { thread_id } (instance: { instance_id } ) with model= { model_name } , thinking= { enable_thinking } , effort= { reasoning_effort } , stream= { stream } " )
2025-03-29 06:33:55 +08:00
client = await db . client
2025-04-04 23:06:49 +08:00
# Tracking variables
2025-04-01 10:27:06 +08:00
total_responses = 0
start_time = datetime . now ( timezone . utc )
2025-03-29 06:33:55 +08:00
# Create a pubsub to listen for control messages
2025-04-04 23:06:49 +08:00
pubsub = None
try :
pubsub = await redis . create_pubsub ( )
# Use instance-specific control channel to avoid cross-talk between instances
control_channel = f " agent_run: { agent_run_id } :control: { instance_id } "
# Use backoff retry pattern for pubsub connection
retry_count = 0
while retry_count < 3 :
try :
await pubsub . subscribe ( control_channel )
logger . debug ( f " Subscribed to control channel: { control_channel } " )
break
except Exception as e :
retry_count + = 1
if retry_count > = 3 :
logger . error ( f " Failed to subscribe to control channel after 3 attempts: { str ( e ) } " )
raise
wait_time = 0.5 * ( 2 * * ( retry_count - 1 ) )
logger . warning ( f " Failed to subscribe to control channel (attempt { retry_count } /3): { str ( e ) } . Retrying in { wait_time } s... " )
await asyncio . sleep ( wait_time )
# Also subscribe to the global control channel for cross-instance control
global_control_channel = f " agent_run: { agent_run_id } :control "
retry_count = 0
while retry_count < 3 :
try :
await pubsub . subscribe ( global_control_channel )
logger . debug ( f " Subscribed to global control channel: { global_control_channel } " )
break
except Exception as e :
retry_count + = 1
if retry_count > = 3 :
logger . error ( f " Failed to subscribe to global control channel after 3 attempts: { str ( e ) } " )
# We can continue with just the instance-specific channel
break
wait_time = 0.5 * ( 2 * * ( retry_count - 1 ) )
logger . warning ( f " Failed to subscribe to global control channel (attempt { retry_count } /3): { str ( e ) } . Retrying in { wait_time } s... " )
await asyncio . sleep ( wait_time )
except Exception as e :
logger . error ( f " Failed to initialize Redis pubsub: { str ( e ) } " )
pubsub = None
# Keep Redis key up-to-date with TTL refresh
try :
# Extend TTL on the active run key to prevent expiration during long runs
await redis . set (
f " active_run: { instance_id } : { agent_run_id } " ,
" running " ,
ex = redis . REDIS_KEY_TTL
)
except Exception as e :
logger . warning ( f " Failed to refresh active run key TTL: { str ( e ) } " )
2025-03-29 06:33:55 +08:00
# Start a background task to check for stop signals
stop_signal_received = False
2025-04-04 23:06:49 +08:00
stop_checker = None
2025-03-29 06:33:55 +08:00
async def check_for_stop_signal ( ) :
nonlocal stop_signal_received
2025-04-04 23:06:49 +08:00
if not pubsub :
logger . warning ( " Stop signal checker not started - pubsub not available " )
return
try :
while True :
try :
message = await pubsub . get_message ( timeout = 0.5 )
if message and message [ " type " ] == " message " :
stop_signal = " STOP "
if message [ " data " ] == stop_signal or message [ " data " ] == stop_signal . encode ( ' utf-8 ' ) :
logger . info ( f " Received stop signal for agent run: { agent_run_id } (instance: { instance_id } ) " )
stop_signal_received = True
break
except Exception as e :
logger . warning ( f " Error checking for stop signals: { str ( e ) } " )
# Brief pause before retry
await asyncio . sleep ( 1 )
# Check if we should stop naturally
if stop_signal_received :
2025-03-29 06:33:55 +08:00
break
2025-04-04 23:06:49 +08:00
# Periodically refresh the active run key's TTL
try :
if total_responses % 100 == 0 :
await redis . set (
f " active_run: { instance_id } : { agent_run_id } " ,
" running " ,
ex = redis . REDIS_KEY_TTL
)
except Exception as e :
logger . warning ( f " Failed to refresh active run key TTL: { str ( e ) } " )
await asyncio . sleep ( 0.1 )
except asyncio . CancelledError :
logger . info ( f " Stop signal checker task cancelled (instance: { instance_id } ) " )
except Exception as e :
logger . error ( f " Unexpected error in stop signal checker: { str ( e ) } " , exc_info = True )
2025-03-29 06:33:55 +08:00
2025-04-04 23:06:49 +08:00
# Start the stop signal checker if pubsub is available
if pubsub :
stop_checker = asyncio . create_task ( check_for_stop_signal ( ) )
logger . debug ( f " Started stop signal checker for agent run: { agent_run_id } (instance: { instance_id } ) " )
else :
logger . warning ( f " No stop signal checker for agent run: { agent_run_id } - pubsub unavailable " )
2025-03-29 06:33:55 +08:00
try :
2025-04-04 23:06:49 +08:00
# Run the agent
logger . debug ( f " Initializing agent generator for thread: { thread_id } (instance: { instance_id } ) " )
2025-04-18 12:49:41 +08:00
agent_gen = run_agent (
thread_id = thread_id ,
project_id = project_id ,
stream = stream , # Pass stream parameter from API request
thread_manager = thread_manager ,
sandbox = sandbox ,
model_name = model_name , # Pass model_name
enable_thinking = enable_thinking , # Pass enable_thinking
reasoning_effort = reasoning_effort # Pass reasoning_effort
)
2025-03-29 06:33:55 +08:00
2025-04-07 00:45:02 +08:00
# Collect all responses to save to database
all_responses = [ ]
2025-03-29 06:33:55 +08:00
async for response in agent_gen :
# Check if stop signal received
if stop_signal_received :
2025-04-04 23:06:49 +08:00
logger . info ( f " Agent run stopped due to stop signal: { agent_run_id } (instance: { instance_id } ) " )
2025-04-14 08:32:08 +08:00
await update_agent_run_status ( client , agent_run_id , " stopped " , responses = all_responses )
break
# Check for billing error status
if response . get ( ' type ' ) == ' status ' and response . get ( ' status ' ) == ' error ' :
error_msg = response . get ( ' message ' , ' ' )
logger . info ( f " Agent run failed with error: { error_msg } (instance: { instance_id } ) " )
await update_agent_run_status ( client , agent_run_id , " failed " , error = error_msg , responses = all_responses )
2025-03-29 06:33:55 +08:00
break
2025-04-04 23:06:49 +08:00
# Store response in memory
if agent_run_id in active_agent_runs :
active_agent_runs [ agent_run_id ] . append ( response )
2025-04-07 00:45:02 +08:00
all_responses . append ( response )
2025-04-04 23:06:49 +08:00
total_responses + = 1
2025-04-14 08:32:08 +08:00
2025-03-29 06:33:55 +08:00
# Signal all done if we weren't stopped
if not stop_signal_received :
2025-04-01 10:27:06 +08:00
duration = ( datetime . now ( timezone . utc ) - start_time ) . total_seconds ( )
2025-04-18 06:17:48 +08:00
logger . info ( f " Thread Run Response completed successfully: { agent_run_id } (duration: { duration : .2f } s, total responses: { total_responses } , instance: { instance_id } ) " )
2025-04-04 23:06:49 +08:00
# Add completion message to the stream
2025-04-14 08:32:08 +08:00
completion_message = {
" type " : " status " ,
" status " : " completed " ,
" message " : " Agent run completed successfully "
}
2025-04-04 23:06:49 +08:00
if agent_run_id in active_agent_runs :
2025-04-07 00:45:02 +08:00
active_agent_runs [ agent_run_id ] . append ( completion_message )
all_responses . append ( completion_message )
2025-03-29 06:33:55 +08:00
2025-04-14 08:32:08 +08:00
# Update the agent run status
await update_agent_run_status ( client , agent_run_id , " completed " , responses = all_responses )
2025-04-04 23:06:49 +08:00
# Notify any clients monitoring the control channels that we're done
try :
if pubsub :
2025-04-14 08:32:08 +08:00
await redis . publish ( f " agent_run: { agent_run_id } :control: { instance_id } " , " END_STREAM " )
await redis . publish ( f " agent_run: { agent_run_id } :control " , " END_STREAM " )
2025-04-04 23:06:49 +08:00
logger . debug ( f " Sent END_STREAM signals for agent run: { agent_run_id } (instance: { instance_id } ) " )
except Exception as e :
logger . warning ( f " Failed to publish END_STREAM signals: { str ( e ) } " )
2025-03-29 06:33:55 +08:00
except Exception as e :
# Log the error and update the agent run
error_message = str ( e )
traceback_str = traceback . format_exc ( )
2025-04-01 10:27:06 +08:00
duration = ( datetime . now ( timezone . utc ) - start_time ) . total_seconds ( )
2025-04-04 23:06:49 +08:00
logger . error ( f " Error in agent run { agent_run_id } after { duration : .2f } s: { error_message } \n { traceback_str } (instance: { instance_id } ) " )
2025-03-29 06:33:55 +08:00
2025-04-04 23:06:49 +08:00
# Add error message to the stream
2025-04-14 08:32:08 +08:00
error_response = {
" type " : " status " ,
" status " : " error " ,
" message " : error_message
}
2025-04-04 23:06:49 +08:00
if agent_run_id in active_agent_runs :
2025-04-07 00:45:02 +08:00
active_agent_runs [ agent_run_id ] . append ( error_response )
if ' all_responses ' in locals ( ) :
all_responses . append ( error_response )
else :
2025-04-14 08:32:08 +08:00
all_responses = [ error_response ]
2025-04-04 23:06:49 +08:00
2025-04-14 08:32:08 +08:00
# Update the agent run with the error
await update_agent_run_status (
client ,
agent_run_id ,
" failed " ,
error = f " { error_message } \n { traceback_str } " ,
responses = all_responses
)
2025-04-04 23:06:49 +08:00
# Notify any clients of the error
try :
if pubsub :
2025-04-14 08:32:08 +08:00
await redis . publish ( f " agent_run: { agent_run_id } :control: { instance_id } " , " ERROR " )
await redis . publish ( f " agent_run: { agent_run_id } :control " , " ERROR " )
2025-04-04 23:06:49 +08:00
logger . debug ( f " Sent ERROR signals for agent run: { agent_run_id } (instance: { instance_id } ) " )
except Exception as e :
logger . warning ( f " Failed to publish ERROR signals: { str ( e ) } " )
2025-03-29 06:33:55 +08:00
finally :
# Ensure we always clean up the pubsub and stop checker
2025-04-04 23:06:49 +08:00
if stop_checker :
try :
stop_checker . cancel ( )
logger . debug ( f " Cancelled stop signal checker task for agent run: { agent_run_id } (instance: { instance_id } ) " )
except Exception as e :
logger . warning ( f " Error cancelling stop checker: { str ( e ) } " )
if pubsub :
try :
await pubsub . unsubscribe ( )
logger . debug ( f " Successfully unsubscribed from pubsub for agent run: { agent_run_id } (instance: { instance_id } ) " )
except Exception as e :
logger . warning ( f " Error unsubscribing from pubsub: { str ( e ) } " )
2025-03-29 06:33:55 +08:00
2025-04-04 23:06:49 +08:00
# Clean up the Redis key
try :
await redis . delete ( f " active_run: { instance_id } : { agent_run_id } " )
logger . debug ( f " Deleted active run key for agent run: { agent_run_id } (instance: { instance_id } ) " )
except Exception as e :
logger . warning ( f " Error deleting active run key: { str ( e ) } " )
2025-04-07 00:45:02 +08:00
logger . info ( f " Agent run background task fully completed for: { agent_run_id } (instance: { instance_id } ) " )