fix(run_agent_background): optimize Redis operation handling and increase worker processes

This commit is contained in:
sharath 2025-06-05 01:23:57 +00:00
parent a10a4c711b
commit 72759e06c7
No known key found for this signature in database
2 changed files with 10 additions and 5 deletions

View File

@ -10,7 +10,7 @@ services:
memory: 32G memory: 32G
worker: worker:
command: python -m dramatiq --processes 20 --threads 16 run_agent_background command: python -m dramatiq --processes 40 --threads 8 run_agent_background
deploy: deploy:
resources: resources:
limits: limits:

View File

@ -124,6 +124,8 @@ async def run_agent_background(
final_status = "running" final_status = "running"
error_message = None error_message = None
pending_redis_operations = []
async for response in agent_gen: async for response in agent_gen:
if stop_signal_received: if stop_signal_received:
logger.info(f"Agent run {agent_run_id} stopped by signal.") logger.info(f"Agent run {agent_run_id} stopped by signal.")
@ -133,8 +135,8 @@ async def run_agent_background(
# Store response in Redis list and publish notification # Store response in Redis list and publish notification
response_json = json.dumps(response) response_json = json.dumps(response)
asyncio.create_task(redis.rpush(response_list_key, response_json)) pending_redis_operations.append(redis.rpush(response_list_key, response_json))
asyncio.create_task(redis.publish(response_channel, "new")) pending_redis_operations.append(redis.publish(response_channel, "new"))
total_responses += 1 total_responses += 1
# Check for agent-signaled completion or error # Check for agent-signaled completion or error
@ -231,8 +233,11 @@ async def run_agent_background(
# Remove the instance-specific active run key # Remove the instance-specific active run key
await _cleanup_redis_instance_key(agent_run_id) await _cleanup_redis_instance_key(agent_run_id)
# Wait for 5 seconds for any pending redis operations to complete # Wait for all pending redis operations to complete, with timeout
await asyncio.sleep(5) try:
await asyncio.wait_for(asyncio.gather(*pending_redis_operations), timeout=5.0)
except asyncio.TimeoutError:
logger.warning(f"Timeout waiting for pending Redis operations for {agent_run_id}")
logger.info(f"Agent run background task fully completed for: {agent_run_id} (Instance: {instance_id}) with final status: {final_status}") logger.info(f"Agent run background task fully completed for: {agent_run_id} (Instance: {instance_id}) with final status: {final_status}")