This commit is contained in:
marko-kraemer 2025-07-07 03:50:42 +02:00
parent 945ff8820c
commit 75dc15f936
1 changed files with 18 additions and 44 deletions

View File

@ -26,8 +26,8 @@ from flags.flags import is_enabled
# Initialize shared resources
router = APIRouter()
db: Optional[DBConnection] = None
instance_id: Optional[str] = None # Global instance ID for this backend instance
db = None
instance_id = None # Global instance ID for this backend instance
# TTL for Redis response lists (24 hours)
REDIS_RESPONSE_LIST_TTL = 3600 * 24
@ -173,8 +173,6 @@ async def cleanup():
async def stop_agent_run(agent_run_id: str, error_message: Optional[str] = None):
"""Update database and publish stop signal to Redis."""
logger.info(f"Stopping agent run: {agent_run_id}")
if db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
client = await db.client
final_status = "failed" if error_message else "stopped"
@ -280,15 +278,13 @@ async def start_agent(
logger.info(f"Using model from config: {model_name}")
# Log the model name after alias resolution
resolved_model = MODEL_NAME_ALIASES.get(model_name, model_name) if model_name else config.MODEL_TO_USE
resolved_model = MODEL_NAME_ALIASES.get(model_name, model_name)
logger.info(f"Resolved model name: {resolved_model}")
# Update model_name to use the resolved version
model_name = resolved_model
logger.info(f"Starting new agent for thread: {thread_id} with config: model={model_name}, thinking={body.enable_thinking}, effort={body.reasoning_effort}, stream={body.stream}, context_manager={body.enable_context_manager} (Instance: {instance_id})")
if db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
client = await db.client
await verify_thread_access(client, thread_id, user_id)
@ -383,10 +379,7 @@ async def start_agent(
if body.agent_id and body.agent_id != thread_agent_id and agent_config:
logger.info(f"Using agent {agent_config['agent_id']} for this agent run (thread remains agent-agnostic)")
# Ensure model_name is not None at this point
final_model_name = model_name if model_name is not None else config.MODEL_TO_USE
can_use, model_message, allowed_models = await can_use_model(client, account_id, final_model_name)
can_use, model_message, allowed_models = await can_use_model(client, account_id, model_name)
if not can_use:
raise HTTPException(status_code=403, detail={"message": model_message, "allowed_models": allowed_models})
@ -442,9 +435,9 @@ async def start_agent(
run_agent_background.send(
agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id,
project_id=project_id,
model_name=final_model_name, # Use the validated model name
enable_thinking=body.enable_thinking or False, reasoning_effort=body.reasoning_effort or "low",
stream=body.stream or True, enable_context_manager=body.enable_context_manager or False,
model_name=model_name, # Already resolved above
enable_thinking=body.enable_thinking, reasoning_effort=body.reasoning_effort,
stream=body.stream, enable_context_manager=body.enable_context_manager,
agent_config=agent_config, # Pass agent configuration
is_agent_builder=is_agent_builder,
target_agent_id=target_agent_id,
@ -460,8 +453,6 @@ async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_
agent_run_id=agent_run_id,
)
logger.info(f"Received request to stop agent run: {agent_run_id}")
if db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
client = await db.client
await get_agent_run_with_access_check(client, agent_run_id, user_id)
await stop_agent_run(agent_run_id)
@ -474,8 +465,6 @@ async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user
thread_id=thread_id,
)
logger.info(f"Fetching agent runs for thread: {thread_id}")
if db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
client = await db.client
await verify_thread_access(client, thread_id, user_id)
agent_runs = await client.table('agent_runs').select('id, thread_id, status, started_at, completed_at, error, created_at, updated_at').eq("thread_id", thread_id).order('created_at', desc=True).execute()
@ -489,8 +478,6 @@ async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_us
agent_run_id=agent_run_id,
)
logger.info(f"Fetching agent run details: {agent_run_id}")
if db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
client = await db.client
agent_run_data = await get_agent_run_with_access_check(client, agent_run_id, user_id)
# Note: Responses are not included here by default, they are in the stream or DB
@ -511,8 +498,6 @@ async def get_thread_agent(thread_id: str, user_id: str = Depends(get_current_us
thread_id=thread_id,
)
logger.info(f"Fetching agent details for thread: {thread_id}")
if db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
client = await db.client
try:
@ -625,13 +610,11 @@ async def get_thread_agent(thread_id: str, user_id: str = Depends(get_current_us
@router.get("/agent-run/{agent_run_id}/stream")
async def stream_agent_run(
agent_run_id: str,
request: Request,
token: Optional[str] = None
token: Optional[str] = None,
request: Request = None
):
"""Stream the responses of an agent run using Redis Lists and Pub/Sub."""
logger.info(f"Starting stream for agent run: {agent_run_id}")
if db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
client = await db.client
user_id = await get_user_id_from_stream_auth(request, token)
@ -885,7 +868,7 @@ async def initiate_agent_with_files(
logger.info(f"Using model from config: {model_name}")
# Log the model name after alias resolution
resolved_model = MODEL_NAME_ALIASES.get(model_name, model_name) if model_name is not None else config.MODEL_TO_USE
resolved_model = MODEL_NAME_ALIASES.get(model_name, model_name)
logger.info(f"Resolved model name: {resolved_model}")
# Update model_name to use the resolved version
@ -894,8 +877,6 @@ async def initiate_agent_with_files(
logger.info(f"Starting new agent in agent builder mode: {is_agent_builder}, target_agent_id: {target_agent_id}")
logger.info(f"[\033[91mDEBUG\033[0m] Initiating new agent with prompt and {len(files)} files (Instance: {instance_id}), model: {model_name}, enable_thinking: {enable_thinking}")
if db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
client = await db.client
account_id = user_id # In Basejump, personal account_id is the same as user_id
@ -914,10 +895,7 @@ async def initiate_agent_with_files(
agent_config = default_agent_result.data[0]
logger.info(f"Using default agent: {agent_config['name']} ({agent_config['agent_id']})")
# Ensure model_name is not None
final_model_name = model_name if model_name is not None else config.MODEL_TO_USE
can_use, model_message, allowed_models = await can_use_model(client, account_id, final_model_name)
can_use, model_message, allowed_models = await can_use_model(client, account_id, model_name)
if not can_use:
raise HTTPException(status_code=403, detail={"message": model_message, "allowed_models": allowed_models})
@ -1105,9 +1083,9 @@ async def initiate_agent_with_files(
run_agent_background.send(
agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id,
project_id=project_id,
model_name=final_model_name, # Use the validated model name
enable_thinking=enable_thinking or False, reasoning_effort=reasoning_effort or "low",
stream=stream or True, enable_context_manager=enable_context_manager or False,
model_name=model_name, # Already resolved above
enable_thinking=enable_thinking, reasoning_effort=reasoning_effort,
stream=stream, enable_context_manager=enable_context_manager,
agent_config=agent_config, # Pass agent configuration
is_agent_builder=is_agent_builder,
target_agent_id=target_agent_id,
@ -1143,18 +1121,14 @@ async def get_agents(
detail="Custom agents currently disabled. This feature is not available at the moment."
)
logger.info(f"Fetching agents for user: {user_id} with page={page}, limit={limit}, search='{search}', sort_by={sort_by}, sort_order={sort_order}")
if db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
client = await db.client
try:
# Calculate offset - ensure page and limit are integers
page_num = page if page is not None else 1
limit_num = limit if limit is not None else 20
offset = (page_num - 1) * limit_num
# Calculate offset
offset = (page - 1) * limit
# Start building the query - include version data
query = client.table('agents').select('*, agent_versions!current_version_id(*)').eq("account_id", user_id)
query = client.table('agents').select('*, agent_versions!current_version_id(*)', count='exact').eq("account_id", user_id)
# Apply search filter
if search:
@ -1990,4 +1964,4 @@ async def get_agent_version(
if not version_result.data:
raise HTTPException(status_code=404, detail="Version not found")
return version_result.data[0]
return version_result.data[0]