2025-04-23 12:45:43 +08:00
from fastapi import APIRouter , HTTPException , Depends , Request , Body , File , UploadFile , Form
2025-03-29 06:33:55 +08:00
from fastapi . responses import StreamingResponse
import asyncio
import json
import traceback
from datetime import datetime , timezone
import uuid
2025-04-04 23:06:49 +08:00
from typing import Optional , List , Dict , Any
2025-03-30 14:48:57 +08:00
import jwt
2025-04-18 12:49:41 +08:00
from pydantic import BaseModel
2025-04-23 12:45:43 +08:00
import tempfile
import os
2025-03-29 06:33:55 +08:00
2025-03-30 14:48:57 +08:00
from agentpress . thread_manager import ThreadManager
from services . supabase import DBConnection
from services import redis
from agent . run import run_agent
2025-04-04 23:06:49 +08:00
from utils . auth_utils import get_current_user_id , get_user_id_from_stream_auth , verify_thread_access
2025-04-02 02:49:35 +08:00
from utils . logger import logger
2025-04-14 08:32:08 +08:00
from utils . billing import check_billing_status , get_account_id_from_thread
2025-04-18 06:17:48 +08:00
from sandbox . sandbox import create_sandbox , get_or_start_sandbox
2025-04-22 04:44:58 +08:00
from services . llm import make_llm_api_call
2025-03-29 06:33:55 +08:00
# Initialize shared resources
router = APIRouter ( )
thread_manager = None
2025-04-24 08:37:14 +08:00
db = None
instance_id = None # Global instance ID for this backend instance
2025-03-29 06:33:55 +08:00
2025-04-24 08:37:14 +08:00
# TTL for Redis response lists (24 hours)
REDIS_RESPONSE_LIST_TTL = 3600 * 24
2025-04-04 23:06:49 +08:00
2025-04-18 13:42:57 +08:00
MODEL_NAME_ALIASES = {
" sonnet-3.7 " : " anthropic/claude-3-7-sonnet-latest " ,
" gpt-4.1 " : " openai/gpt-4.1-2025-04-14 " ,
" gemini-flash-2.5 " : " openrouter/google/gemini-2.5-flash-preview " ,
2025-04-24 19:46:22 +08:00
" grok-3 " : " xai/grok-3-fast-latest " ,
" deepseek " : " deepseek/deepseek-chat " ,
" grok-3-mini " : " xai/grok-3-mini-fast-beta " ,
2025-04-18 13:42:57 +08:00
}
2025-04-18 12:49:41 +08:00
class AgentStartRequest ( BaseModel ) :
model_name : Optional [ str ] = " anthropic/claude-3-7-sonnet-latest "
enable_thinking : Optional [ bool ] = False
reasoning_effort : Optional [ str ] = ' low '
2025-04-18 14:13:44 +08:00
stream : Optional [ bool ] = True
enable_context_manager : Optional [ bool ] = False
2025-04-18 12:49:41 +08:00
2025-04-23 12:45:43 +08:00
class InitiateAgentResponse ( BaseModel ) :
thread_id : str
agent_run_id : Optional [ str ] = None
2025-03-29 06:33:55 +08:00
def initialize (
2025-03-30 14:48:57 +08:00
_thread_manager : ThreadManager ,
2025-04-06 17:10:18 +08:00
_db : DBConnection ,
_instance_id : str = None
2025-03-29 06:33:55 +08:00
) :
""" Initialize the agent API with resources from the main API. """
2025-03-30 14:48:57 +08:00
global thread_manager , db , instance_id
2025-03-29 06:33:55 +08:00
thread_manager = _thread_manager
db = _db
2025-04-24 08:37:14 +08:00
2025-04-06 17:10:18 +08:00
# Use provided instance_id or generate a new one
if _instance_id :
instance_id = _instance_id
else :
# Generate instance ID
instance_id = str ( uuid . uuid4 ( ) ) [ : 8 ]
2025-04-24 08:37:14 +08:00
2025-04-01 10:27:06 +08:00
logger . info ( f " Initialized agent API with instance ID: { instance_id } " )
2025-04-24 08:37:14 +08:00
2025-03-29 06:33:55 +08:00
# Note: Redis will be initialized in the lifespan function in api.py
async def cleanup ( ) :
""" Clean up resources and stop running agents on shutdown. """
2025-04-01 10:27:06 +08:00
logger . info ( " Starting cleanup of agent API resources " )
2025-04-24 08:37:14 +08:00
2025-03-29 06:33:55 +08:00
# Use the instance_id to find and clean up this instance's keys
2025-04-04 23:06:49 +08:00
try :
2025-04-24 08:37:14 +08:00
if instance_id : # Ensure instance_id is set
running_keys = await redis . keys ( f " active_run: { instance_id } :* " )
logger . info ( f " Found { len ( running_keys ) } running agent runs for instance { instance_id } to clean up " )
for key in running_keys :
# Key format: active_run:{instance_id}:{agent_run_id}
parts = key . split ( " : " )
if len ( parts ) == 3 :
agent_run_id = parts [ 2 ]
await stop_agent_run ( agent_run_id , error_message = f " Instance { instance_id } shutting down " )
else :
logger . warning ( f " Unexpected key format found: { key } " )
else :
logger . warning ( " Instance ID not set, cannot clean up instance-specific agent runs. " )
2025-04-04 23:06:49 +08:00
except Exception as e :
logger . error ( f " Failed to clean up running agent runs: { str ( e ) } " )
2025-04-24 08:37:14 +08:00
2025-03-29 06:33:55 +08:00
# Close Redis connection
2025-03-30 14:48:57 +08:00
await redis . close ( )
2025-04-01 10:27:06 +08:00
logger . info ( " Completed cleanup of agent API resources " )
2025-03-29 06:33:55 +08:00
2025-04-14 08:32:08 +08:00
async def update_agent_run_status (
client ,
agent_run_id : str ,
status : str ,
2025-04-23 22:56:24 +08:00
error : Optional [ str ] = None ,
2025-04-24 08:37:14 +08:00
responses : Optional [ List [ Any ] ] = None # Expects parsed list of dicts
2025-04-14 08:32:08 +08:00
) - > bool :
"""
Centralized function to update agent run status .
Returns True if update was successful .
"""
try :
update_data = {
" status " : status ,
" completed_at " : datetime . now ( timezone . utc ) . isoformat ( )
}
2025-04-24 08:37:14 +08:00
2025-04-14 08:32:08 +08:00
if error :
update_data [ " error " ] = error
2025-04-24 08:37:14 +08:00
2025-04-23 22:56:24 +08:00
if responses :
2025-04-24 08:37:14 +08:00
# Ensure responses are stored correctly as JSONB
2025-04-23 22:56:24 +08:00
update_data [ " responses " ] = responses
2025-04-24 08:37:14 +08:00
2025-04-14 08:32:08 +08:00
# Retry up to 3 times
for retry in range ( 3 ) :
try :
update_result = await client . table ( ' agent_runs ' ) . update ( update_data ) . eq ( " id " , agent_run_id ) . execute ( )
2025-04-24 08:37:14 +08:00
2025-04-14 08:32:08 +08:00
if hasattr ( update_result , ' data ' ) and update_result . data :
2025-04-24 08:37:14 +08:00
logger . info ( f " Successfully updated agent run { agent_run_id } status to ' { status } ' (retry { retry } ) " )
2025-04-14 08:32:08 +08:00
# Verify the update
verify_result = await client . table ( ' agent_runs ' ) . select ( ' status ' , ' completed_at ' ) . eq ( " id " , agent_run_id ) . execute ( )
if verify_result . data :
actual_status = verify_result . data [ 0 ] . get ( ' status ' )
completed_at = verify_result . data [ 0 ] . get ( ' completed_at ' )
logger . info ( f " Verified agent run update: status= { actual_status } , completed_at= { completed_at } " )
return True
else :
2025-04-24 08:37:14 +08:00
logger . warning ( f " Database update returned no data for agent run { agent_run_id } on retry { retry } : { update_result } " )
2025-04-14 08:32:08 +08:00
if retry == 2 : # Last retry
logger . error ( f " Failed to update agent run status after all retries: { agent_run_id } " )
return False
2025-04-23 22:56:24 +08:00
except Exception as db_error :
2025-04-24 08:37:14 +08:00
logger . error ( f " Database error on retry { retry } updating status for { agent_run_id } : { str ( db_error ) } " )
2025-04-23 22:56:24 +08:00
if retry < 2 : # Not the last retry yet
await asyncio . sleep ( 0.5 * ( 2 * * retry ) ) # Exponential backoff
else :
logger . error ( f " Failed to update agent run status after all retries: { agent_run_id } " , exc_info = True )
return False
2025-04-14 08:32:08 +08:00
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . error ( f " Unexpected error updating agent run status for { agent_run_id } : { str ( e ) } " , exc_info = True )
2025-04-14 08:32:08 +08:00
return False
2025-04-24 08:37:14 +08:00
2025-04-23 22:56:24 +08:00
return False
2025-04-14 08:32:08 +08:00
async def stop_agent_run ( agent_run_id : str , error_message : Optional [ str ] = None ) :
2025-03-29 06:33:55 +08:00
""" Update database and publish stop signal to Redis. """
2025-04-01 10:27:06 +08:00
logger . info ( f " Stopping agent run: { agent_run_id } " )
2025-03-29 06:33:55 +08:00
client = await db . client
2025-04-24 08:37:14 +08:00
final_status = " failed " if error_message else " stopped "
# Attempt to fetch final responses from Redis
response_list_key = f " agent_run: { agent_run_id } :responses "
all_responses = [ ]
2025-04-04 23:06:49 +08:00
try :
2025-04-24 08:37:14 +08:00
all_responses_json = await redis . lrange ( response_list_key , 0 , - 1 )
all_responses = [ json . loads ( r ) for r in all_responses_json ]
logger . info ( f " Fetched { len ( all_responses ) } responses from Redis for DB update on stop/fail: { agent_run_id } " )
2025-04-04 23:06:49 +08:00
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . error ( f " Failed to fetch responses from Redis for { agent_run_id } during stop/fail: { e } " )
# Try fetching from DB as a fallback? Or proceed without responses? Proceeding without for now.
# Update the agent run status in the database
update_success = await update_agent_run_status (
client , agent_run_id , final_status , error = error_message , responses = all_responses
)
if not update_success :
logger . error ( f " Failed to update database status for stopped/failed run { agent_run_id } " )
# Send STOP signal to the global control channel
global_control_channel = f " agent_run: { agent_run_id } :control "
try :
await redis . publish ( global_control_channel , " STOP " )
logger . debug ( f " Published STOP signal to global channel { global_control_channel } " )
except Exception as e :
logger . error ( f " Failed to publish STOP signal to global channel { global_control_channel } : { str ( e ) } " )
# Find all instances handling this agent run and send STOP to instance-specific channels
2025-04-04 23:06:49 +08:00
try :
instance_keys = await redis . keys ( f " active_run:*: { agent_run_id } " )
2025-04-24 08:37:14 +08:00
logger . debug ( f " Found { len ( instance_keys ) } active instance keys for agent run { agent_run_id } " )
2025-04-04 23:06:49 +08:00
for key in instance_keys :
2025-04-24 08:37:14 +08:00
# Key format: active_run:{instance_id}:{agent_run_id}
2025-04-04 23:06:49 +08:00
parts = key . split ( " : " )
2025-04-24 08:37:14 +08:00
if len ( parts ) == 3 :
instance_id_from_key = parts [ 1 ]
instance_control_channel = f " agent_run: { agent_run_id } :control: { instance_id_from_key } "
2025-04-04 23:06:49 +08:00
try :
2025-04-24 08:37:14 +08:00
await redis . publish ( instance_control_channel , " STOP " )
logger . debug ( f " Published STOP signal to instance channel { instance_control_channel } " )
2025-04-04 23:06:49 +08:00
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . warning ( f " Failed to publish STOP signal to instance channel { instance_control_channel } : { str ( e ) } " )
else :
logger . warning ( f " Unexpected key format found: { key } " )
# Clean up the response list immediately on stop/fail
await _cleanup_redis_response_list ( agent_run_id )
2025-04-04 23:06:49 +08:00
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . error ( f " Failed to find or signal active instances for { agent_run_id } : { str ( e ) } " )
2025-04-04 23:06:49 +08:00
logger . info ( f " Successfully initiated stop process for agent run: { agent_run_id } " )
2025-03-29 06:33:55 +08:00
2025-04-24 08:37:14 +08:00
async def _cleanup_redis_response_list ( agent_run_id : str ) :
""" Set TTL on the Redis response list. """
response_list_key = f " agent_run: { agent_run_id } :responses "
try :
await redis . expire ( response_list_key , REDIS_RESPONSE_LIST_TTL )
logger . debug ( f " Set TTL ( { REDIS_RESPONSE_LIST_TTL } s) on response list: { response_list_key } " )
except Exception as e :
logger . warning ( f " Failed to set TTL on response list { response_list_key } : { str ( e ) } " )
2025-03-29 06:33:55 +08:00
async def restore_running_agent_runs ( ) :
2025-04-25 20:23:05 +08:00
""" Mark agent runs that were still ' running ' in the database as failed and clean up Redis resources. """
2025-04-01 10:27:06 +08:00
logger . info ( " Restoring running agent runs after server restart " )
2025-03-29 06:33:55 +08:00
client = await db . client
2025-04-24 08:37:14 +08:00
running_agent_runs = await client . table ( ' agent_runs ' ) . select ( ' id ' ) . eq ( " status " , " running " ) . execute ( )
2025-03-29 06:33:55 +08:00
for run in running_agent_runs . data :
2025-04-24 08:37:14 +08:00
agent_run_id = run [ ' id ' ]
logger . warning ( f " Found running agent run { agent_run_id } from before server restart " )
2025-04-25 20:23:05 +08:00
# Clean up Redis resources for this run
try :
# Clean up active run key
active_run_key = f " active_run: { instance_id } : { agent_run_id } "
await redis . delete ( active_run_key )
# Clean up response list
response_list_key = f " agent_run: { agent_run_id } :responses "
await redis . delete ( response_list_key )
# Clean up control channels
control_channel = f " agent_run: { agent_run_id } :control "
instance_control_channel = f " agent_run: { agent_run_id } :control: { instance_id } "
await redis . delete ( control_channel )
await redis . delete ( instance_control_channel )
logger . info ( f " Cleaned up Redis resources for agent run { agent_run_id } " )
except Exception as e :
logger . error ( f " Error cleaning up Redis resources for agent run { agent_run_id } : { e } " )
2025-04-24 08:37:14 +08:00
# Call stop_agent_run to handle status update and cleanup
await stop_agent_run ( agent_run_id , error_message = " Server restarted while agent was running " )
2025-03-29 06:33:55 +08:00
2025-04-01 14:41:18 +08:00
async def check_for_active_project_agent_run ( client , project_id : str ) :
"""
Check if there is an active agent run for any thread in the given project .
2025-04-01 14:49:19 +08:00
If found , returns the ID of the active run , otherwise returns None .
2025-04-01 14:41:18 +08:00
"""
project_threads = await client . table ( ' threads ' ) . select ( ' thread_id ' ) . eq ( ' project_id ' , project_id ) . execute ( )
project_thread_ids = [ t [ ' thread_id ' ] for t in project_threads . data ]
2025-04-24 08:37:14 +08:00
2025-04-01 14:41:18 +08:00
if project_thread_ids :
active_runs = await client . table ( ' agent_runs ' ) . select ( ' id ' ) . in_ ( ' thread_id ' , project_thread_ids ) . eq ( ' status ' , ' running ' ) . execute ( )
if active_runs . data and len ( active_runs . data ) > 0 :
2025-04-01 14:49:19 +08:00
return active_runs . data [ 0 ] [ ' id ' ]
return None
2025-04-01 14:41:18 +08:00
2025-04-04 23:06:49 +08:00
async def get_agent_run_with_access_check ( client , agent_run_id : str , user_id : str ) :
2025-04-24 08:37:14 +08:00
""" Get agent run data after verifying user access. """
2025-04-04 23:06:49 +08:00
agent_run = await client . table ( ' agent_runs ' ) . select ( ' * ' ) . eq ( ' id ' , agent_run_id ) . execute ( )
2025-04-24 08:37:14 +08:00
if not agent_run . data :
2025-04-04 23:06:49 +08:00
raise HTTPException ( status_code = 404 , detail = " Agent run not found " )
2025-04-24 08:37:14 +08:00
2025-04-04 23:06:49 +08:00
agent_run_data = agent_run . data [ 0 ]
thread_id = agent_run_data [ ' thread_id ' ]
await verify_thread_access ( client , thread_id , user_id )
2025-04-12 08:04:40 +08:00
return agent_run_data
2025-04-04 23:06:49 +08:00
2025-04-24 08:37:14 +08:00
async def _cleanup_redis_instance_key ( agent_run_id : str ) :
""" Clean up the instance-specific Redis key for an agent run. """
if not instance_id :
logger . warning ( " Instance ID not set, cannot clean up instance key. " )
return
key = f " active_run: { instance_id } : { agent_run_id } "
logger . debug ( f " Cleaning up Redis instance key: { key } " )
2025-04-04 23:06:49 +08:00
try :
2025-04-24 08:37:14 +08:00
await redis . delete ( key )
logger . debug ( f " Successfully cleaned up Redis key: { key } " )
2025-04-04 23:06:49 +08:00
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . warning ( f " Failed to clean up Redis key { key } : { str ( e ) } " )
2025-04-04 23:06:49 +08:00
2025-04-24 02:46:22 +08:00
async def get_or_create_project_sandbox ( client , project_id : str ) :
2025-04-24 08:37:14 +08:00
""" Get or create a sandbox for a project. """
2025-04-23 13:20:38 +08:00
project = await client . table ( ' projects ' ) . select ( ' * ' ) . eq ( ' project_id ' , project_id ) . execute ( )
2025-04-24 08:37:14 +08:00
if not project . data :
2025-04-23 13:20:38 +08:00
raise ValueError ( f " Project { project_id } not found " )
project_data = project . data [ 0 ]
2025-04-24 08:37:14 +08:00
2025-04-23 13:20:38 +08:00
if project_data . get ( ' sandbox ' , { } ) . get ( ' id ' ) :
sandbox_id = project_data [ ' sandbox ' ] [ ' id ' ]
sandbox_pass = project_data [ ' sandbox ' ] [ ' pass ' ]
logger . info ( f " Project { project_id } already has sandbox { sandbox_id } , retrieving it " )
try :
sandbox = await get_or_start_sandbox ( sandbox_id )
2025-04-24 08:37:14 +08:00
return sandbox , sandbox_id , sandbox_pass
2025-04-23 13:20:38 +08:00
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . error ( f " Failed to retrieve existing sandbox { sandbox_id } : { str ( e ) } . Creating a new one. " )
logger . info ( f " Creating new sandbox for project { project_id } " )
sandbox_pass = str ( uuid . uuid4 ( ) )
2025-04-24 21:36:50 +08:00
sandbox = create_sandbox ( sandbox_pass , project_id )
2025-04-24 08:37:14 +08:00
sandbox_id = sandbox . id
logger . info ( f " Created new sandbox { sandbox_id } " )
vnc_link = sandbox . get_preview_link ( 6080 )
website_link = sandbox . get_preview_link ( 8080 )
vnc_url = vnc_link . url if hasattr ( vnc_link , ' url ' ) else str ( vnc_link ) . split ( " url= ' " ) [ 1 ] . split ( " ' " ) [ 0 ]
website_url = website_link . url if hasattr ( website_link , ' url ' ) else str ( website_link ) . split ( " url= ' " ) [ 1 ] . split ( " ' " ) [ 0 ]
token = None
if hasattr ( vnc_link , ' token ' ) :
token = vnc_link . token
elif " token= ' " in str ( vnc_link ) :
token = str ( vnc_link ) . split ( " token= ' " ) [ 1 ] . split ( " ' " ) [ 0 ]
update_result = await client . table ( ' projects ' ) . update ( {
' sandbox ' : {
' id ' : sandbox_id , ' pass ' : sandbox_pass , ' vnc_preview ' : vnc_url ,
' sandbox_url ' : website_url , ' token ' : token
}
} ) . eq ( ' project_id ' , project_id ) . execute ( )
if not update_result . data :
logger . error ( f " Failed to update project { project_id } with new sandbox { sandbox_id } " )
raise Exception ( " Database update failed " )
return sandbox , sandbox_id , sandbox_pass
2025-04-23 13:20:38 +08:00
2025-03-29 06:33:55 +08:00
@router.post ( " /thread/ {thread_id} /agent/start " )
2025-04-18 12:49:41 +08:00
async def start_agent (
thread_id : str ,
2025-04-24 08:37:14 +08:00
body : AgentStartRequest = Body ( . . . ) ,
2025-04-18 12:49:41 +08:00
user_id : str = Depends ( get_current_user_id )
) :
2025-03-29 06:33:55 +08:00
""" Start an agent for a specific thread in the background. """
2025-04-24 08:37:14 +08:00
global instance_id # Ensure instance_id is accessible
if not instance_id :
raise HTTPException ( status_code = 500 , detail = " Agent API not initialized with instance ID " )
logger . info ( f " Starting new agent for thread: { thread_id } with config: model= { body . model_name } , thinking= { body . enable_thinking } , effort= { body . reasoning_effort } , stream= { body . stream } , context_manager= { body . enable_context_manager } (Instance: { instance_id } ) " )
2025-03-29 06:33:55 +08:00
client = await db . client
2025-04-24 08:37:14 +08:00
2025-03-30 14:48:57 +08:00
await verify_thread_access ( client , thread_id , user_id )
2025-04-12 08:04:40 +08:00
thread_result = await client . table ( ' threads ' ) . select ( ' project_id ' , ' account_id ' ) . eq ( ' thread_id ' , thread_id ) . execute ( )
2025-04-01 14:41:18 +08:00
if not thread_result . data :
raise HTTPException ( status_code = 404 , detail = " Thread not found " )
2025-04-12 08:04:40 +08:00
thread_data = thread_result . data [ 0 ]
project_id = thread_data . get ( ' project_id ' )
2025-04-14 08:32:08 +08:00
account_id = thread_data . get ( ' account_id ' )
2025-04-24 08:37:14 +08:00
2025-04-14 08:32:08 +08:00
can_run , message , subscription = await check_billing_status ( client , account_id )
if not can_run :
2025-04-24 08:37:14 +08:00
raise HTTPException ( status_code = 402 , detail = { " message " : message , " subscription " : subscription } )
2025-04-01 14:49:19 +08:00
active_run_id = await check_for_active_project_agent_run ( client , project_id )
if active_run_id :
2025-04-24 08:37:14 +08:00
logger . info ( f " Stopping existing agent run { active_run_id } for project { project_id } " )
2025-04-01 14:49:19 +08:00
await stop_agent_run ( active_run_id )
2025-04-18 06:17:48 +08:00
2025-04-23 13:20:38 +08:00
try :
sandbox , sandbox_id , sandbox_pass = await get_or_create_project_sandbox ( client , project_id )
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . error ( f " Failed to get/create sandbox for project { project_id } : { str ( e ) } " )
2025-04-23 13:20:38 +08:00
raise HTTPException ( status_code = 500 , detail = f " Failed to initialize sandbox: { str ( e ) } " )
2025-04-24 08:37:14 +08:00
2025-03-29 06:33:55 +08:00
agent_run = await client . table ( ' agent_runs ' ) . insert ( {
2025-04-24 08:37:14 +08:00
" thread_id " : thread_id , " status " : " running " ,
2025-04-04 23:06:49 +08:00
" started_at " : datetime . now ( timezone . utc ) . isoformat ( )
2025-03-29 06:33:55 +08:00
} ) . execute ( )
agent_run_id = agent_run . data [ 0 ] [ ' id ' ]
2025-04-01 10:27:06 +08:00
logger . info ( f " Created new agent run: { agent_run_id } " )
2025-04-24 08:37:14 +08:00
# Register this run in Redis with TTL using instance ID
instance_key = f " active_run: { instance_id } : { agent_run_id } "
2025-04-04 23:06:49 +08:00
try :
2025-04-24 08:37:14 +08:00
await redis . set ( instance_key , " running " , ex = redis . REDIS_KEY_TTL )
2025-04-04 23:06:49 +08:00
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . warning ( f " Failed to register agent run in Redis ( { instance_key } ): { str ( e ) } " )
2025-03-29 06:33:55 +08:00
# Run the agent in the background
task = asyncio . create_task (
2025-04-18 12:49:41 +08:00
run_agent_background (
2025-04-24 08:37:14 +08:00
agent_run_id = agent_run_id , thread_id = thread_id , instance_id = instance_id ,
project_id = project_id , sandbox = sandbox ,
model_name = MODEL_NAME_ALIASES . get ( body . model_name , body . model_name ) ,
enable_thinking = body . enable_thinking , reasoning_effort = body . reasoning_effort ,
stream = body . stream , enable_context_manager = body . enable_context_manager
2025-03-29 06:33:55 +08:00
)
)
2025-04-24 08:37:14 +08:00
# Set a callback to clean up Redis instance key when task is done
task . add_done_callback ( lambda _ : asyncio . create_task ( _cleanup_redis_instance_key ( agent_run_id ) ) )
2025-03-29 06:33:55 +08:00
return { " agent_run_id " : agent_run_id , " status " : " running " }
@router.post ( " /agent-run/ {agent_run_id} /stop " )
async def stop_agent ( agent_run_id : str , user_id : str = Depends ( get_current_user_id ) ) :
""" Stop a running agent. """
2025-04-24 08:37:14 +08:00
logger . info ( f " Received request to stop agent run: { agent_run_id } " )
2025-03-29 06:33:55 +08:00
client = await db . client
2025-04-04 23:06:49 +08:00
await get_agent_run_with_access_check ( client , agent_run_id , user_id )
2025-03-29 06:33:55 +08:00
await stop_agent_run ( agent_run_id )
return { " status " : " stopped " }
@router.get ( " /thread/ {thread_id} /agent-runs " )
async def get_agent_runs ( thread_id : str , user_id : str = Depends ( get_current_user_id ) ) :
""" Get all agent runs for a thread. """
2025-04-01 10:27:06 +08:00
logger . info ( f " Fetching agent runs for thread: { thread_id } " )
2025-03-29 06:33:55 +08:00
client = await db . client
2025-03-30 14:48:57 +08:00
await verify_thread_access ( client , thread_id , user_id )
2025-04-24 08:37:14 +08:00
agent_runs = await client . table ( ' agent_runs ' ) . select ( ' * ' ) . eq ( " thread_id " , thread_id ) . order ( ' created_at ' , desc = True ) . execute ( )
2025-04-01 10:27:06 +08:00
logger . debug ( f " Found { len ( agent_runs . data ) } agent runs for thread: { thread_id } " )
2025-03-29 06:33:55 +08:00
return { " agent_runs " : agent_runs . data }
@router.get ( " /agent-run/ {agent_run_id} " )
async def get_agent_run ( agent_run_id : str , user_id : str = Depends ( get_current_user_id ) ) :
""" Get agent run status and responses. """
2025-04-01 10:27:06 +08:00
logger . info ( f " Fetching agent run details: { agent_run_id } " )
2025-03-29 06:33:55 +08:00
client = await db . client
2025-04-04 23:06:49 +08:00
agent_run_data = await get_agent_run_with_access_check ( client , agent_run_id , user_id )
2025-04-24 08:37:14 +08:00
# Note: Responses are not included here by default, they are in the stream or DB
2025-03-29 06:33:55 +08:00
return {
" id " : agent_run_data [ ' id ' ] ,
" threadId " : agent_run_data [ ' thread_id ' ] ,
" status " : agent_run_data [ ' status ' ] ,
" startedAt " : agent_run_data [ ' started_at ' ] ,
" completedAt " : agent_run_data [ ' completed_at ' ] ,
" error " : agent_run_data [ ' error ' ]
}
2025-04-04 23:06:49 +08:00
@router.get ( " /agent-run/ {agent_run_id} /stream " )
async def stream_agent_run (
2025-04-24 08:37:14 +08:00
agent_run_id : str ,
2025-04-04 23:06:49 +08:00
token : Optional [ str ] = None ,
request : Request = None
) :
2025-04-24 08:37:14 +08:00
""" Stream the responses of an agent run using Redis Lists and Pub/Sub. """
2025-04-04 23:06:49 +08:00
logger . info ( f " Starting stream for agent run: { agent_run_id } " )
client = await db . client
2025-04-24 08:37:14 +08:00
2025-04-04 23:06:49 +08:00
user_id = await get_user_id_from_stream_auth ( request , token )
agent_run_data = await get_agent_run_with_access_check ( client , agent_run_id , user_id )
2025-04-24 08:37:14 +08:00
response_list_key = f " agent_run: { agent_run_id } :responses "
response_channel = f " agent_run: { agent_run_id } :new_response "
control_channel = f " agent_run: { agent_run_id } :control " # Global control channel
2025-04-04 23:06:49 +08:00
async def stream_generator ( ) :
2025-04-24 08:37:14 +08:00
logger . debug ( f " Streaming responses for { agent_run_id } using Redis list { response_list_key } and channel { response_channel } " )
last_processed_index = - 1
pubsub_response = None
pubsub_control = None
listener_task = None
terminate_stream = False
initial_yield_complete = False
try :
# 1. Fetch and yield initial responses from Redis list
initial_responses_json = await redis . lrange ( response_list_key , 0 , - 1 )
initial_responses = [ ]
if initial_responses_json :
initial_responses = [ json . loads ( r ) for r in initial_responses_json ]
logger . debug ( f " Sending { len ( initial_responses ) } initial responses for { agent_run_id } " )
for response in initial_responses :
yield f " data: { json . dumps ( response ) } \n \n "
last_processed_index = len ( initial_responses ) - 1
initial_yield_complete = True
# 2. Check run status *after* yielding initial data
run_status = await client . table ( ' agent_runs ' ) . select ( ' status ' ) . eq ( " id " , agent_run_id ) . maybe_single ( ) . execute ( )
current_status = run_status . data . get ( ' status ' ) if run_status . data else None
if current_status != ' running ' :
logger . info ( f " Agent run { agent_run_id } is not running (status: { current_status } ). Ending stream. " )
yield f " data: { json . dumps ( { ' type ' : ' status ' , ' status ' : ' completed ' } ) } \n \n "
return
# 3. Set up Pub/Sub listeners for new responses and control signals
pubsub_response = await redis . create_pubsub ( )
await pubsub_response . subscribe ( response_channel )
logger . debug ( f " Subscribed to response channel: { response_channel } " )
pubsub_control = await redis . create_pubsub ( )
await pubsub_control . subscribe ( control_channel )
logger . debug ( f " Subscribed to control channel: { control_channel } " )
# Queue to communicate between listeners and the main generator loop
message_queue = asyncio . Queue ( )
async def listen_messages ( ) :
response_reader = pubsub_response . listen ( )
control_reader = pubsub_control . listen ( )
tasks = [ asyncio . create_task ( response_reader . __anext__ ( ) ) , asyncio . create_task ( control_reader . __anext__ ( ) ) ]
while not terminate_stream :
done , pending = await asyncio . wait ( tasks , return_when = asyncio . FIRST_COMPLETED )
for task in done :
try :
message = task . result ( )
if message and isinstance ( message , dict ) and message . get ( " type " ) == " message " :
channel = message . get ( " channel " )
data = message . get ( " data " )
if isinstance ( data , bytes ) : data = data . decode ( ' utf-8 ' )
if channel == response_channel and data == " new " :
await message_queue . put ( { " type " : " new_response " } )
elif channel == control_channel and data in [ " STOP " , " END_STREAM " , " ERROR " ] :
logger . info ( f " Received control signal ' { data } ' for { agent_run_id } " )
await message_queue . put ( { " type " : " control " , " data " : data } )
return # Stop listening on control signal
except StopAsyncIteration :
logger . warning ( f " Listener { task } stopped. " )
# Decide how to handle listener stopping, maybe terminate?
await message_queue . put ( { " type " : " error " , " data " : " Listener stopped unexpectedly " } )
return
except Exception as e :
logger . error ( f " Error in listener for { agent_run_id } : { e } " )
await message_queue . put ( { " type " : " error " , " data " : " Listener failed " } )
return
finally :
# Reschedule the completed listener task
if task in tasks :
tasks . remove ( task )
if message and isinstance ( message , dict ) and message . get ( " channel " ) == response_channel :
tasks . append ( asyncio . create_task ( response_reader . __anext__ ( ) ) )
elif message and isinstance ( message , dict ) and message . get ( " channel " ) == control_channel :
tasks . append ( asyncio . create_task ( control_reader . __anext__ ( ) ) )
# Cancel pending listener tasks on exit
for p_task in pending : p_task . cancel ( )
for task in tasks : task . cancel ( )
listener_task = asyncio . create_task ( listen_messages ( ) )
# 4. Main loop to process messages from the queue
while not terminate_stream :
try :
queue_item = await message_queue . get ( )
if queue_item [ " type " ] == " new_response " :
# Fetch new responses from Redis list starting after the last processed index
new_start_index = last_processed_index + 1
new_responses_json = await redis . lrange ( response_list_key , new_start_index , - 1 )
if new_responses_json :
new_responses = [ json . loads ( r ) for r in new_responses_json ]
num_new = len ( new_responses )
logger . debug ( f " Received { num_new } new responses for { agent_run_id } (index { new_start_index } onwards) " )
for response in new_responses :
yield f " data: { json . dumps ( response ) } \n \n "
# Check if this response signals completion
if response . get ( ' type ' ) == ' status ' and response . get ( ' status ' ) in [ ' completed ' , ' failed ' , ' stopped ' ] :
logger . info ( f " Detected run completion via status message in stream: { response . get ( ' status ' ) } " )
terminate_stream = True
break # Stop processing further new responses
last_processed_index + = num_new
if terminate_stream : break
elif queue_item [ " type " ] == " control " :
control_signal = queue_item [ " data " ]
terminate_stream = True # Stop the stream on any control signal
yield f " data: { json . dumps ( { ' type ' : ' status ' , ' status ' : control_signal } ) } \n \n "
break
elif queue_item [ " type " ] == " error " :
logger . error ( f " Listener error for { agent_run_id } : { queue_item [ ' data ' ] } " )
terminate_stream = True
yield f " data: { json . dumps ( { ' type ' : ' status ' , ' status ' : ' error ' } ) } \n \n "
break
except asyncio . CancelledError :
logger . info ( f " Stream generator main loop cancelled for { agent_run_id } " )
terminate_stream = True
break
except Exception as loop_err :
logger . error ( f " Error in stream generator main loop for { agent_run_id } : { loop_err } " , exc_info = True )
terminate_stream = True
yield f " data: { json . dumps ( { ' type ' : ' status ' , ' status ' : ' error ' , ' message ' : f ' Stream failed: { loop_err } ' } ) } \n \n "
break
except Exception as e :
logger . error ( f " Error setting up stream for agent run { agent_run_id } : { e } " , exc_info = True )
# Only yield error if initial yield didn't happen
if not initial_yield_complete :
yield f " data: { json . dumps ( { ' type ' : ' status ' , ' status ' : ' error ' , ' message ' : f ' Failed to start stream: { e } ' } ) } \n \n "
finally :
terminate_stream = True
# Graceful shutdown order: unsubscribe → close → cancel
if pubsub_response : await pubsub_response . unsubscribe ( response_channel )
if pubsub_control : await pubsub_control . unsubscribe ( control_channel )
if pubsub_response : await pubsub_response . close ( )
if pubsub_control : await pubsub_control . close ( )
if listener_task :
listener_task . cancel ( )
try :
await listener_task # Reap inner tasks & swallow their errors
except asyncio . CancelledError :
pass
except Exception as e :
logger . debug ( f " listener_task ended with: { e } " )
# Wait briefly for tasks to cancel
await asyncio . sleep ( 0.1 )
logger . debug ( f " Streaming cleanup complete for agent run: { agent_run_id } " )
return StreamingResponse ( stream_generator ( ) , media_type = " text/event-stream " , headers = {
" Cache-Control " : " no-cache, no-transform " , " Connection " : " keep-alive " ,
" X-Accel-Buffering " : " no " , " Content-Type " : " text/event-stream " ,
" Access-Control-Allow-Origin " : " * "
} )
2025-04-04 23:06:49 +08:00
2025-04-18 12:49:41 +08:00
async def run_agent_background (
agent_run_id : str ,
thread_id : str ,
2025-04-24 08:37:14 +08:00
instance_id : str , # Use the global instance ID passed during initialization
2025-04-18 12:49:41 +08:00
project_id : str ,
2025-04-23 22:56:24 +08:00
sandbox ,
2025-04-18 12:49:41 +08:00
model_name : str ,
enable_thinking : Optional [ bool ] ,
reasoning_effort : Optional [ str ] ,
2025-04-18 14:13:44 +08:00
stream : bool ,
enable_context_manager : bool
2025-04-18 12:49:41 +08:00
) :
2025-04-24 08:37:14 +08:00
""" Run the agent in the background using Redis for state. """
logger . debug ( f " Starting background agent run: { agent_run_id } for thread: { thread_id } (Instance: { instance_id } ) " )
2025-03-29 06:33:55 +08:00
client = await db . client
2025-04-01 10:27:06 +08:00
start_time = datetime . now ( timezone . utc )
2025-04-24 08:37:14 +08:00
total_responses = 0
2025-04-04 23:06:49 +08:00
pubsub = None
stop_checker = None
2025-04-24 08:37:14 +08:00
stop_signal_received = False
# Define Redis keys and channels
response_list_key = f " agent_run: { agent_run_id } :responses "
response_channel = f " agent_run: { agent_run_id } :new_response "
instance_control_channel = f " agent_run: { agent_run_id } :control: { instance_id } "
global_control_channel = f " agent_run: { agent_run_id } :control "
instance_active_key = f " active_run: { instance_id } : { agent_run_id } "
2025-03-29 06:33:55 +08:00
async def check_for_stop_signal ( ) :
nonlocal stop_signal_received
2025-04-24 08:37:14 +08:00
if not pubsub : return
2025-04-04 23:06:49 +08:00
try :
2025-04-24 08:37:14 +08:00
while not stop_signal_received :
message = await pubsub . get_message ( ignore_subscribe_messages = True , timeout = 0.5 )
if message and message . get ( " type " ) == " message " :
data = message . get ( " data " )
if isinstance ( data , bytes ) : data = data . decode ( ' utf-8 ' )
if data == " STOP " :
logger . info ( f " Received STOP signal for agent run { agent_run_id } (Instance: { instance_id } ) " )
stop_signal_received = True
break
# Periodically refresh the active run key TTL
if total_responses % 50 == 0 : # Refresh every 50 responses or so
try : await redis . expire ( instance_active_key , redis . REDIS_KEY_TTL )
except Exception as ttl_err : logger . warning ( f " Failed to refresh TTL for { instance_active_key } : { ttl_err } " )
await asyncio . sleep ( 0.1 ) # Short sleep to prevent tight loop
2025-04-04 23:06:49 +08:00
except asyncio . CancelledError :
2025-04-24 08:37:14 +08:00
logger . info ( f " Stop signal checker cancelled for { agent_run_id } (Instance: { instance_id } ) " )
2025-04-04 23:06:49 +08:00
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . error ( f " Error in stop signal checker for { agent_run_id } : { e } " , exc_info = True )
stop_signal_received = True # Stop the run if the checker fails
2025-03-29 06:33:55 +08:00
try :
2025-04-24 08:37:14 +08:00
# Setup Pub/Sub listener for control signals
pubsub = await redis . create_pubsub ( )
await pubsub . subscribe ( instance_control_channel , global_control_channel )
logger . debug ( f " Subscribed to control channels: { instance_control_channel } , { global_control_channel } " )
stop_checker = asyncio . create_task ( check_for_stop_signal ( ) )
# Ensure active run key exists and has TTL
await redis . set ( instance_active_key , " running " , ex = redis . REDIS_KEY_TTL )
# Initialize agent generator
2025-04-18 12:49:41 +08:00
agent_gen = run_agent (
2025-04-24 08:37:14 +08:00
thread_id = thread_id , project_id = project_id , stream = stream ,
thread_manager = thread_manager , model_name = model_name ,
enable_thinking = enable_thinking , reasoning_effort = reasoning_effort ,
2025-04-18 14:13:44 +08:00
enable_context_manager = enable_context_manager
2025-04-18 12:49:41 +08:00
)
2025-04-24 08:37:14 +08:00
final_status = " running "
error_message = None
2025-03-29 06:33:55 +08:00
async for response in agent_gen :
if stop_signal_received :
2025-04-24 08:37:14 +08:00
logger . info ( f " Agent run { agent_run_id } stopped by signal. " )
final_status = " stopped "
2025-03-29 06:33:55 +08:00
break
2025-04-24 08:37:14 +08:00
# Store response in Redis list and publish notification
response_json = json . dumps ( response )
await redis . rpush ( response_list_key , response_json )
await redis . publish ( response_channel , " new " )
total_responses + = 1
# Check for agent-signaled completion or error
if response . get ( ' type ' ) == ' status ' :
status_val = response . get ( ' status ' )
if status_val in [ ' completed ' , ' failed ' , ' stopped ' ] :
logger . info ( f " Agent run { agent_run_id } finished via status message: { status_val } " )
final_status = status_val
if status_val == ' failed ' or status_val == ' stopped ' :
error_message = response . get ( ' message ' , f " Run ended with status: { status_val } " )
break
# If loop finished without explicit completion/error/stop signal, mark as completed
if final_status == " running " :
final_status = " completed "
duration = ( datetime . now ( timezone . utc ) - start_time ) . total_seconds ( )
logger . info ( f " Agent run { agent_run_id } completed normally (duration: { duration : .2f } s, responses: { total_responses } ) " )
completion_message = { " type " : " status " , " status " : " completed " , " message " : " Agent run completed successfully " }
await redis . rpush ( response_list_key , json . dumps ( completion_message ) )
await redis . publish ( response_channel , " new " ) # Notify about the completion message
# Fetch final responses from Redis for DB update
all_responses_json = await redis . lrange ( response_list_key , 0 , - 1 )
all_responses = [ json . loads ( r ) for r in all_responses_json ]
# Update DB status
await update_agent_run_status ( client , agent_run_id , final_status , error = error_message , responses = all_responses )
# Publish final control signal (END_STREAM or ERROR)
control_signal = " END_STREAM " if final_status == " completed " else " ERROR " if final_status == " failed " else " STOP "
try :
await redis . publish ( global_control_channel , control_signal )
# No need to publish to instance channel as the run is ending on this instance
logger . debug ( f " Published final control signal ' { control_signal } ' to { global_control_channel } " )
except Exception as e :
logger . warning ( f " Failed to publish final control signal { control_signal } : { str ( e ) } " )
2025-03-29 06:33:55 +08:00
except Exception as e :
error_message = str ( e )
traceback_str = traceback . format_exc ( )
2025-04-01 10:27:06 +08:00
duration = ( datetime . now ( timezone . utc ) - start_time ) . total_seconds ( )
2025-04-24 08:37:14 +08:00
logger . error ( f " Error in agent run { agent_run_id } after { duration : .2f } s: { error_message } \n { traceback_str } (Instance: { instance_id } ) " )
final_status = " failed "
# Push error message to Redis list
error_response = { " type " : " status " , " status " : " error " , " message " : error_message }
try :
await redis . rpush ( response_list_key , json . dumps ( error_response ) )
await redis . publish ( response_channel , " new " )
except Exception as redis_err :
logger . error ( f " Failed to push error response to Redis for { agent_run_id } : { redis_err } " )
# Fetch final responses (including the error)
all_responses = [ ]
2025-04-04 23:06:49 +08:00
try :
2025-04-24 08:37:14 +08:00
all_responses_json = await redis . lrange ( response_list_key , 0 , - 1 )
all_responses = [ json . loads ( r ) for r in all_responses_json ]
except Exception as fetch_err :
logger . error ( f " Failed to fetch responses from Redis after error for { agent_run_id } : { fetch_err } " )
all_responses = [ error_response ] # Use the error message we tried to push
# Update DB status
await update_agent_run_status ( client , agent_run_id , " failed " , error = f " { error_message } \n { traceback_str } " , responses = all_responses )
# Publish ERROR signal
try :
await redis . publish ( global_control_channel , " ERROR " )
logger . debug ( f " Published ERROR signal to { global_control_channel } " )
2025-04-04 23:06:49 +08:00
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . warning ( f " Failed to publish ERROR signal: { str ( e ) } " )
2025-03-29 06:33:55 +08:00
finally :
2025-04-24 08:37:14 +08:00
# Cleanup stop checker task
if stop_checker and not stop_checker . done ( ) :
stop_checker . cancel ( )
try : await stop_checker
except asyncio . CancelledError : pass
except Exception as e : logger . warning ( f " Error during stop_checker cancellation: { e } " )
# Close pubsub connection
2025-04-04 23:06:49 +08:00
if pubsub :
try :
await pubsub . unsubscribe ( )
2025-04-24 08:37:14 +08:00
await pubsub . close ( )
logger . debug ( f " Closed pubsub connection for { agent_run_id } " )
2025-04-04 23:06:49 +08:00
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . warning ( f " Error closing pubsub for { agent_run_id } : { str ( e ) } " )
# Set TTL on the response list in Redis
await _cleanup_redis_response_list ( agent_run_id )
# Remove the instance-specific active run key
await _cleanup_redis_instance_key ( agent_run_id )
logger . info ( f " Agent run background task fully completed for: { agent_run_id } (Instance: { instance_id } ) with final status: { final_status } " )
2025-04-23 12:45:43 +08:00
2025-04-22 04:44:58 +08:00
async def generate_and_update_project_name ( project_id : str , prompt : str ) :
""" Generates a project name using an LLM and updates the database. """
logger . info ( f " Starting background task to generate name for project: { project_id } " )
try :
db_conn = DBConnection ( )
client = await db_conn . client
2025-04-24 08:37:14 +08:00
model_name = " openai/gpt-4o-mini "
2025-04-22 04:44:58 +08:00
system_prompt = " You are a helpful assistant that generates extremely concise titles (2-4 words maximum) for chat threads based on the user ' s message. Respond with only the title, no other text or punctuation. "
user_message = f " Generate an extremely brief title (2-4 words only) for a chat thread that starts with this message: \" { prompt } \" "
2025-04-24 08:37:14 +08:00
messages = [ { " role " : " system " , " content " : system_prompt } , { " role " : " user " , " content " : user_message } ]
2025-04-22 04:44:58 +08:00
logger . debug ( f " Calling LLM ( { model_name } ) for project { project_id } naming. " )
2025-04-24 08:37:14 +08:00
response = await make_llm_api_call ( messages = messages , model_name = model_name , max_tokens = 20 , temperature = 0.7 )
2025-04-22 04:44:58 +08:00
generated_name = None
if response and response . get ( ' choices ' ) and response [ ' choices ' ] [ 0 ] . get ( ' message ' ) :
raw_name = response [ ' choices ' ] [ 0 ] [ ' message ' ] . get ( ' content ' , ' ' ) . strip ( )
cleaned_name = raw_name . strip ( ' \' " \n \t ' )
if cleaned_name :
generated_name = cleaned_name
logger . info ( f " LLM generated name for project { project_id } : ' { generated_name } ' " )
else :
logger . warning ( f " LLM returned an empty name for project { project_id } . " )
else :
logger . warning ( f " Failed to get valid response from LLM for project { project_id } naming. Response: { response } " )
if generated_name :
2025-04-24 08:37:14 +08:00
update_result = await client . table ( ' projects ' ) . update ( { " name " : generated_name } ) . eq ( " project_id " , project_id ) . execute ( )
2025-04-22 04:44:58 +08:00
if hasattr ( update_result , ' data ' ) and update_result . data :
logger . info ( f " Successfully updated project { project_id } name to ' { generated_name } ' " )
else :
logger . error ( f " Failed to update project { project_id } name in database. Update result: { update_result } " )
else :
logger . warning ( f " No generated name, skipping database update for project { project_id } . " )
except Exception as e :
logger . error ( f " Error in background naming task for project { project_id } : { str ( e ) } \n { traceback . format_exc ( ) } " )
finally :
2025-04-24 08:37:14 +08:00
# No need to disconnect DBConnection singleton instance here
2025-04-22 04:44:58 +08:00
logger . info ( f " Finished background naming task for project: { project_id } " )
2025-04-23 12:45:43 +08:00
@router.post ( " /agent/initiate " , response_model = InitiateAgentResponse )
async def initiate_agent_with_files (
prompt : str = Form ( . . . ) ,
model_name : Optional [ str ] = Form ( " anthropic/claude-3-7-sonnet-latest " ) ,
enable_thinking : Optional [ bool ] = Form ( False ) ,
reasoning_effort : Optional [ str ] = Form ( " low " ) ,
stream : Optional [ bool ] = Form ( True ) ,
enable_context_manager : Optional [ bool ] = Form ( False ) ,
files : List [ UploadFile ] = File ( default = [ ] ) ,
user_id : str = Depends ( get_current_user_id )
) :
2025-04-24 08:37:14 +08:00
""" Initiate a new agent session with optional file attachments. """
global instance_id # Ensure instance_id is accessible
if not instance_id :
raise HTTPException ( status_code = 500 , detail = " Agent API not initialized with instance ID " )
logger . info ( f " [ \033 [91mDEBUG \033 [0m] Initiating new agent with prompt and { len ( files ) } files (Instance: { instance_id } ), model: { model_name } , enable_thinking: { enable_thinking } " )
2025-04-23 12:45:43 +08:00
client = await db . client
2025-04-24 08:37:14 +08:00
account_id = user_id # In Basejump, personal account_id is the same as user_id
2025-04-23 12:45:43 +08:00
can_run , message , subscription = await check_billing_status ( client , account_id )
if not can_run :
2025-04-24 08:37:14 +08:00
raise HTTPException ( status_code = 402 , detail = { " message " : message , " subscription " : subscription } )
2025-04-23 12:45:43 +08:00
try :
# 1. Create Project
2025-04-22 04:44:58 +08:00
placeholder_name = f " { prompt [ : 30 ] } ... " if len ( prompt ) > 30 else prompt
2025-04-23 12:45:43 +08:00
project = await client . table ( ' projects ' ) . insert ( {
2025-04-24 08:37:14 +08:00
" project_id " : str ( uuid . uuid4 ( ) ) , " account_id " : account_id , " name " : placeholder_name ,
2025-04-23 12:45:43 +08:00
" created_at " : datetime . now ( timezone . utc ) . isoformat ( )
} ) . execute ( )
project_id = project . data [ 0 ] [ ' project_id ' ]
logger . info ( f " Created new project: { project_id } " )
2025-04-24 08:37:14 +08:00
2025-04-23 12:45:43 +08:00
# 2. Create Thread
thread = await client . table ( ' threads ' ) . insert ( {
2025-04-24 08:37:14 +08:00
" thread_id " : str ( uuid . uuid4 ( ) ) , " project_id " : project_id , " account_id " : account_id ,
2025-04-23 12:45:43 +08:00
" created_at " : datetime . now ( timezone . utc ) . isoformat ( )
} ) . execute ( )
thread_id = thread . data [ 0 ] [ ' thread_id ' ]
logger . info ( f " Created new thread: { thread_id } " )
2025-04-22 04:44:58 +08:00
2025-04-24 08:37:14 +08:00
# Trigger Background Naming Task
asyncio . create_task ( generate_and_update_project_name ( project_id = project_id , prompt = prompt ) )
# 3. Create Sandbox
sandbox , sandbox_id , sandbox_pass = await get_or_create_project_sandbox ( client , project_id )
logger . info ( f " Using sandbox { sandbox_id } for new project { project_id } " )
2025-04-23 12:45:43 +08:00
# 4. Upload Files to Sandbox (if any)
message_content = prompt
if files :
successful_uploads = [ ]
failed_uploads = [ ]
for file in files :
if file . filename :
try :
safe_filename = file . filename . replace ( ' / ' , ' _ ' ) . replace ( ' \\ ' , ' _ ' )
target_path = f " /workspace/ { safe_filename } "
logger . info ( f " Attempting to upload { safe_filename } to { target_path } in sandbox { sandbox_id } " )
content = await file . read ( )
upload_successful = False
try :
if hasattr ( sandbox , ' fs ' ) and hasattr ( sandbox . fs , ' upload_file ' ) :
import inspect
if inspect . iscoroutinefunction ( sandbox . fs . upload_file ) :
await sandbox . fs . upload_file ( target_path , content )
else :
2025-04-24 08:37:14 +08:00
sandbox . fs . upload_file ( target_path , content )
2025-04-23 12:45:43 +08:00
logger . debug ( f " Called sandbox.fs.upload_file for { target_path } " )
2025-04-24 08:37:14 +08:00
upload_successful = True
2025-04-23 12:45:43 +08:00
else :
raise NotImplementedError ( " Suitable upload method not found on sandbox object. " )
except Exception as upload_error :
logger . error ( f " Error during sandbox upload call for { safe_filename } : { str ( upload_error ) } " , exc_info = True )
if upload_successful :
try :
await asyncio . sleep ( 0.2 )
parent_dir = os . path . dirname ( target_path )
files_in_dir = sandbox . fs . list_files ( parent_dir )
file_names_in_dir = [ f . name for f in files_in_dir ]
if safe_filename in file_names_in_dir :
successful_uploads . append ( target_path )
logger . info ( f " Successfully uploaded and verified file { safe_filename } to sandbox path { target_path } " )
else :
logger . error ( f " Verification failed for { safe_filename } : File not found in { parent_dir } after upload attempt. " )
failed_uploads . append ( safe_filename )
except Exception as verify_error :
logger . error ( f " Error verifying file { safe_filename } after upload: { str ( verify_error ) } " , exc_info = True )
failed_uploads . append ( safe_filename )
else :
failed_uploads . append ( safe_filename )
except Exception as file_error :
logger . error ( f " Error processing file { file . filename } : { str ( file_error ) } " , exc_info = True )
failed_uploads . append ( file . filename )
finally :
await file . close ( )
if successful_uploads :
message_content + = " \n \n " if message_content else " "
2025-04-24 08:37:14 +08:00
for file_path in successful_uploads : message_content + = f " [Uploaded File: { file_path } ] \n "
2025-04-23 12:45:43 +08:00
if failed_uploads :
message_content + = " \n \n The following files failed to upload: \n "
2025-04-24 08:37:14 +08:00
for failed_file in failed_uploads : message_content + = f " - { failed_file } \n "
2025-04-23 12:45:43 +08:00
# 5. Add initial user message to thread
message_id = str ( uuid . uuid4 ( ) )
2025-04-24 08:37:14 +08:00
message_payload = { " role " : " user " , " content " : message_content }
2025-04-23 12:45:43 +08:00
await client . table ( ' messages ' ) . insert ( {
2025-04-24 08:37:14 +08:00
" message_id " : message_id , " thread_id " : thread_id , " type " : " user " ,
" is_llm_message " : True , " content " : json . dumps ( message_payload ) ,
2025-04-23 12:45:43 +08:00
" created_at " : datetime . now ( timezone . utc ) . isoformat ( )
} ) . execute ( )
2025-04-24 08:37:14 +08:00
2025-04-23 12:45:43 +08:00
# 6. Start Agent Run
agent_run = await client . table ( ' agent_runs ' ) . insert ( {
2025-04-24 08:37:14 +08:00
" thread_id " : thread_id , " status " : " running " ,
2025-04-23 12:45:43 +08:00
" started_at " : datetime . now ( timezone . utc ) . isoformat ( )
} ) . execute ( )
agent_run_id = agent_run . data [ 0 ] [ ' id ' ]
logger . info ( f " Created new agent run: { agent_run_id } " )
2025-04-24 08:37:14 +08:00
# Register run in Redis
instance_key = f " active_run: { instance_id } : { agent_run_id } "
2025-04-23 12:45:43 +08:00
try :
2025-04-24 08:37:14 +08:00
await redis . set ( instance_key , " running " , ex = redis . REDIS_KEY_TTL )
2025-04-23 12:45:43 +08:00
except Exception as e :
2025-04-24 08:37:14 +08:00
logger . warning ( f " Failed to register agent run in Redis ( { instance_key } ): { str ( e ) } " )
# Run agent in background
2025-04-23 12:45:43 +08:00
task = asyncio . create_task (
run_agent_background (
2025-04-24 08:37:14 +08:00
agent_run_id = agent_run_id , thread_id = thread_id , instance_id = instance_id ,
project_id = project_id , sandbox = sandbox ,
model_name = MODEL_NAME_ALIASES . get ( model_name , model_name ) ,
enable_thinking = enable_thinking , reasoning_effort = reasoning_effort ,
stream = stream , enable_context_manager = enable_context_manager
2025-04-23 12:45:43 +08:00
)
)
2025-04-24 08:37:14 +08:00
task . add_done_callback ( lambda _ : asyncio . create_task ( _cleanup_redis_instance_key ( agent_run_id ) ) )
2025-04-23 12:45:43 +08:00
return { " thread_id " : thread_id , " agent_run_id " : agent_run_id }
2025-04-24 08:37:14 +08:00
2025-04-23 12:45:43 +08:00
except Exception as e :
logger . error ( f " Error in agent initiation: { str ( e ) } \n { traceback . format_exc ( ) } " )
2025-04-24 08:37:14 +08:00
# TODO: Clean up created project/thread if initiation fails mid-way
raise HTTPException ( status_code = 500 , detail = f " Failed to initiate agent session: { str ( e ) } " )