diff --git a/backend/docker-compose.prod.yml b/backend/docker-compose.prod.yml index 7c3d63a7..be1ad790 100644 --- a/backend/docker-compose.prod.yml +++ b/backend/docker-compose.prod.yml @@ -10,7 +10,7 @@ services: memory: 32G worker: - command: python -m dramatiq --processes 20 --threads 16 run_agent_background + command: python -m dramatiq --processes 40 --threads 8 run_agent_background deploy: resources: limits: diff --git a/backend/run_agent_background.py b/backend/run_agent_background.py index e957cf9c..2c3dcd48 100644 --- a/backend/run_agent_background.py +++ b/backend/run_agent_background.py @@ -124,6 +124,8 @@ async def run_agent_background( final_status = "running" error_message = None + pending_redis_operations = [] + async for response in agent_gen: if stop_signal_received: logger.info(f"Agent run {agent_run_id} stopped by signal.") @@ -133,8 +135,8 @@ async def run_agent_background( # Store response in Redis list and publish notification response_json = json.dumps(response) - asyncio.create_task(redis.rpush(response_list_key, response_json)) - asyncio.create_task(redis.publish(response_channel, "new")) + pending_redis_operations.append(redis.rpush(response_list_key, response_json)) + pending_redis_operations.append(redis.publish(response_channel, "new")) total_responses += 1 # Check for agent-signaled completion or error @@ -231,8 +233,11 @@ async def run_agent_background( # Remove the instance-specific active run key await _cleanup_redis_instance_key(agent_run_id) - # Wait for 5 seconds for any pending redis operations to complete - await asyncio.sleep(5) + # Wait for all pending redis operations to complete, with timeout + try: + await asyncio.wait_for(asyncio.gather(*pending_redis_operations), timeout=5.0) + except asyncio.TimeoutError: + logger.warning(f"Timeout waiting for pending Redis operations for {agent_run_id}") logger.info(f"Agent run background task fully completed for: {agent_run_id} (Instance: {instance_id}) with final status: {final_status}")