chore(dramatiq): cleanup code

This commit is contained in:
sharath 2025-05-15 06:34:17 +00:00
parent 3a4debd407
commit 1e62257ab1
No known key found for this signature in database
1 changed files with 1 additions and 263 deletions

View File

@ -21,7 +21,7 @@ from services.billing import check_billing_status
from utils.config import config
from sandbox.sandbox import create_sandbox, get_or_start_sandbox
from services.llm import make_llm_api_call
from run_agent_background import run_agent_background
from run_agent_background import run_agent_background, _cleanup_redis_response_list, update_agent_run_status
# Initialize shared resources
router = APIRouter()
@ -117,63 +117,6 @@ async def cleanup():
await redis.close()
logger.info("Completed cleanup of agent API resources")
async def update_agent_run_status(
client,
agent_run_id: str,
status: str,
error: Optional[str] = None,
responses: Optional[List[Any]] = None # Expects parsed list of dicts
) -> bool:
"""
Centralized function to update agent run status.
Returns True if update was successful.
"""
try:
update_data = {
"status": status,
"completed_at": datetime.now(timezone.utc).isoformat()
}
if error:
update_data["error"] = error
if responses:
# Ensure responses are stored correctly as JSONB
update_data["responses"] = responses
# Retry up to 3 times
for retry in range(3):
try:
update_result = await client.table('agent_runs').update(update_data).eq("id", agent_run_id).execute()
if hasattr(update_result, 'data') and update_result.data:
logger.info(f"Successfully updated agent run {agent_run_id} status to '{status}' (retry {retry})")
# Verify the update
verify_result = await client.table('agent_runs').select('status', 'completed_at').eq("id", agent_run_id).execute()
if verify_result.data:
actual_status = verify_result.data[0].get('status')
completed_at = verify_result.data[0].get('completed_at')
logger.info(f"Verified agent run update: status={actual_status}, completed_at={completed_at}")
return True
else:
logger.warning(f"Database update returned no data for agent run {agent_run_id} on retry {retry}: {update_result}")
if retry == 2: # Last retry
logger.error(f"Failed to update agent run status after all retries: {agent_run_id}")
return False
except Exception as db_error:
logger.error(f"Database error on retry {retry} updating status for {agent_run_id}: {str(db_error)}")
if retry < 2: # Not the last retry yet
await asyncio.sleep(0.5 * (2 ** retry)) # Exponential backoff
else:
logger.error(f"Failed to update agent run status after all retries: {agent_run_id}", exc_info=True)
return False
except Exception as e:
logger.error(f"Unexpected error updating agent run status for {agent_run_id}: {str(e)}", exc_info=True)
return False
return False
async def stop_agent_run(agent_run_id: str, error_message: Optional[str] = None):
"""Update database and publish stop signal to Redis."""
logger.info(f"Stopping agent run: {agent_run_id}")
@ -234,16 +177,6 @@ async def stop_agent_run(agent_run_id: str, error_message: Optional[str] = None)
logger.info(f"Successfully initiated stop process for agent run: {agent_run_id}")
async def _cleanup_redis_response_list(agent_run_id: str):
"""Set TTL on the Redis response list."""
response_list_key = f"agent_run:{agent_run_id}:responses"
try:
await redis.expire(response_list_key, REDIS_RESPONSE_LIST_TTL)
logger.debug(f"Set TTL ({REDIS_RESPONSE_LIST_TTL}s) on response list: {response_list_key}")
except Exception as e:
logger.warning(f"Failed to set TTL on response list {response_list_key}: {str(e)}")
# async def restore_running_agent_runs():
# """Mark agent runs that were still 'running' in the database as failed and clean up Redis resources."""
# logger.info("Restoring running agent runs after server restart")
@ -302,20 +235,6 @@ async def get_agent_run_with_access_check(client, agent_run_id: str, user_id: st
await verify_thread_access(client, thread_id, user_id)
return agent_run_data
async def _cleanup_redis_instance_key(agent_run_id: str):
"""Clean up the instance-specific Redis key for an agent run."""
if not instance_id:
logger.warning("Instance ID not set, cannot clean up instance key.")
return
key = f"active_run:{instance_id}:{agent_run_id}"
logger.debug(f"Cleaning up Redis instance key: {key}")
try:
await redis.delete(key)
logger.debug(f"Successfully cleaned up Redis key: {key}")
except Exception as e:
logger.warning(f"Failed to clean up Redis key {key}: {str(e)}")
async def get_or_create_project_sandbox(client, project_id: str):
"""Get or create a sandbox for a project."""
project = await client.table('projects').select('*').eq('project_id', project_id).execute()
@ -658,187 +577,6 @@ async def stream_agent_run(
"Access-Control-Allow-Origin": "*"
})
# @dramatiq.actor
# async def run_agent_background(
# agent_run_id: str,
# thread_id: str,
# instance_id: str, # Use the global instance ID passed during initialization
# project_id: str,
# model_name: str,
# enable_thinking: Optional[bool],
# reasoning_effort: Optional[str],
# stream: bool,
# enable_context_manager: bool
# ):
# """Run the agent in the background using Redis for state."""
# logger.info(f"Starting background agent run: {agent_run_id} for thread: {thread_id} (Instance: {instance_id})")
# logger.info(f"🚀 Using model: {model_name} (thinking: {enable_thinking}, reasoning_effort: {reasoning_effort})")
# client = await db.client
# start_time = datetime.now(timezone.utc)
# total_responses = 0
# pubsub = None
# stop_checker = None
# stop_signal_received = False
# # Define Redis keys and channels
# response_list_key = f"agent_run:{agent_run_id}:responses"
# response_channel = f"agent_run:{agent_run_id}:new_response"
# instance_control_channel = f"agent_run:{agent_run_id}:control:{instance_id}"
# global_control_channel = f"agent_run:{agent_run_id}:control"
# instance_active_key = f"active_run:{instance_id}:{agent_run_id}"
# async def check_for_stop_signal():
# nonlocal stop_signal_received
# if not pubsub: return
# try:
# while not stop_signal_received:
# message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.5)
# if message and message.get("type") == "message":
# data = message.get("data")
# if isinstance(data, bytes): data = data.decode('utf-8')
# if data == "STOP":
# logger.info(f"Received STOP signal for agent run {agent_run_id} (Instance: {instance_id})")
# stop_signal_received = True
# break
# # Periodically refresh the active run key TTL
# if total_responses % 50 == 0: # Refresh every 50 responses or so
# try: await redis.expire(instance_active_key, redis.REDIS_KEY_TTL)
# except Exception as ttl_err: logger.warning(f"Failed to refresh TTL for {instance_active_key}: {ttl_err}")
# await asyncio.sleep(0.1) # Short sleep to prevent tight loop
# except asyncio.CancelledError:
# logger.info(f"Stop signal checker cancelled for {agent_run_id} (Instance: {instance_id})")
# except Exception as e:
# logger.error(f"Error in stop signal checker for {agent_run_id}: {e}", exc_info=True)
# stop_signal_received = True # Stop the run if the checker fails
# try:
# # Setup Pub/Sub listener for control signals
# pubsub = await redis.create_pubsub()
# await pubsub.subscribe(instance_control_channel, global_control_channel)
# logger.debug(f"Subscribed to control channels: {instance_control_channel}, {global_control_channel}")
# stop_checker = asyncio.create_task(check_for_stop_signal())
# # Ensure active run key exists and has TTL
# await redis.set(instance_active_key, "running", ex=redis.REDIS_KEY_TTL)
# # Initialize agent generator
# agent_gen = run_agent(
# thread_id=thread_id, project_id=project_id, stream=stream,
# thread_manager=thread_manager, model_name=model_name,
# enable_thinking=enable_thinking, reasoning_effort=reasoning_effort,
# enable_context_manager=enable_context_manager
# )
# final_status = "running"
# error_message = None
# async for response in agent_gen:
# if stop_signal_received:
# logger.info(f"Agent run {agent_run_id} stopped by signal.")
# final_status = "stopped"
# break
# # Store response in Redis list and publish notification
# response_json = json.dumps(response)
# await redis.rpush(response_list_key, response_json)
# await redis.publish(response_channel, "new")
# total_responses += 1
# # Check for agent-signaled completion or error
# if response.get('type') == 'status':
# status_val = response.get('status')
# if status_val in ['completed', 'failed', 'stopped']:
# logger.info(f"Agent run {agent_run_id} finished via status message: {status_val}")
# final_status = status_val
# if status_val == 'failed' or status_val == 'stopped':
# error_message = response.get('message', f"Run ended with status: {status_val}")
# break
# # If loop finished without explicit completion/error/stop signal, mark as completed
# if final_status == "running":
# final_status = "completed"
# duration = (datetime.now(timezone.utc) - start_time).total_seconds()
# logger.info(f"Agent run {agent_run_id} completed normally (duration: {duration:.2f}s, responses: {total_responses})")
# completion_message = {"type": "status", "status": "completed", "message": "Agent run completed successfully"}
# await redis.rpush(response_list_key, json.dumps(completion_message))
# await redis.publish(response_channel, "new") # Notify about the completion message
# # Fetch final responses from Redis for DB update
# all_responses_json = await redis.lrange(response_list_key, 0, -1)
# all_responses = [json.loads(r) for r in all_responses_json]
# # Update DB status
# await update_agent_run_status(client, agent_run_id, final_status, error=error_message, responses=all_responses)
# # Publish final control signal (END_STREAM or ERROR)
# control_signal = "END_STREAM" if final_status == "completed" else "ERROR" if final_status == "failed" else "STOP"
# try:
# await redis.publish(global_control_channel, control_signal)
# # No need to publish to instance channel as the run is ending on this instance
# logger.debug(f"Published final control signal '{control_signal}' to {global_control_channel}")
# except Exception as e:
# logger.warning(f"Failed to publish final control signal {control_signal}: {str(e)}")
# except Exception as e:
# error_message = str(e)
# traceback_str = traceback.format_exc()
# duration = (datetime.now(timezone.utc) - start_time).total_seconds()
# logger.error(f"Error in agent run {agent_run_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})")
# final_status = "failed"
# # Push error message to Redis list
# error_response = {"type": "status", "status": "error", "message": error_message}
# try:
# await redis.rpush(response_list_key, json.dumps(error_response))
# await redis.publish(response_channel, "new")
# except Exception as redis_err:
# logger.error(f"Failed to push error response to Redis for {agent_run_id}: {redis_err}")
# # Fetch final responses (including the error)
# all_responses = []
# try:
# all_responses_json = await redis.lrange(response_list_key, 0, -1)
# all_responses = [json.loads(r) for r in all_responses_json]
# except Exception as fetch_err:
# logger.error(f"Failed to fetch responses from Redis after error for {agent_run_id}: {fetch_err}")
# all_responses = [error_response] # Use the error message we tried to push
# # Update DB status
# await update_agent_run_status(client, agent_run_id, "failed", error=f"{error_message}\n{traceback_str}", responses=all_responses)
# # Publish ERROR signal
# try:
# await redis.publish(global_control_channel, "ERROR")
# logger.debug(f"Published ERROR signal to {global_control_channel}")
# except Exception as e:
# logger.warning(f"Failed to publish ERROR signal: {str(e)}")
# finally:
# # Cleanup stop checker task
# if stop_checker and not stop_checker.done():
# stop_checker.cancel()
# try: await stop_checker
# except asyncio.CancelledError: pass
# except Exception as e: logger.warning(f"Error during stop_checker cancellation: {e}")
# # Close pubsub connection
# if pubsub:
# try:
# await pubsub.unsubscribe()
# await pubsub.close()
# logger.debug(f"Closed pubsub connection for {agent_run_id}")
# except Exception as e:
# logger.warning(f"Error closing pubsub for {agent_run_id}: {str(e)}")
# # Set TTL on the response list in Redis
# await _cleanup_redis_response_list(agent_run_id)
# # Remove the instance-specific active run key
# await _cleanup_redis_instance_key(agent_run_id)
# logger.info(f"Agent run background task fully completed for: {agent_run_id} (Instance: {instance_id}) with final status: {final_status}")
async def generate_and_update_project_name(project_id: str, prompt: str):
"""Generates a project name using an LLM and updates the database."""
logger.info(f"Starting background task to generate name for project: {project_id}")