Merge pull request #627 from tnfssc/fix/redis-connection-issue

This commit is contained in:
Sharath 2025-06-05 06:55:08 +05:30 committed by GitHub
commit 7c3e06496d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 10 additions and 5 deletions

View File

@ -10,7 +10,7 @@ services:
memory: 32G
worker:
command: python -m dramatiq --processes 20 --threads 16 run_agent_background
command: python -m dramatiq --processes 40 --threads 8 run_agent_background
deploy:
resources:
limits:

View File

@ -124,6 +124,8 @@ async def run_agent_background(
final_status = "running"
error_message = None
pending_redis_operations = []
async for response in agent_gen:
if stop_signal_received:
logger.info(f"Agent run {agent_run_id} stopped by signal.")
@ -133,8 +135,8 @@ async def run_agent_background(
# Store response in Redis list and publish notification
response_json = json.dumps(response)
asyncio.create_task(redis.rpush(response_list_key, response_json))
asyncio.create_task(redis.publish(response_channel, "new"))
pending_redis_operations.append(redis.rpush(response_list_key, response_json))
pending_redis_operations.append(redis.publish(response_channel, "new"))
total_responses += 1
# Check for agent-signaled completion or error
@ -231,8 +233,11 @@ async def run_agent_background(
# Remove the instance-specific active run key
await _cleanup_redis_instance_key(agent_run_id)
# Wait for 5 seconds for any pending redis operations to complete
await asyncio.sleep(5)
# Wait for all pending redis operations to complete, with timeout
try:
await asyncio.wait_for(asyncio.gather(*pending_redis_operations), timeout=5.0)
except asyncio.TimeoutError:
logger.warning(f"Timeout waiting for pending Redis operations for {agent_run_id}")
logger.info(f"Agent run background task fully completed for: {agent_run_id} (Instance: {instance_id}) with final status: {final_status}")