feat(redis): implement idempotency check for agent runs and add cleanup for run locks

This commit is contained in:
sharath 2025-06-09 01:51:11 +00:00
parent d40e2e3829
commit ad91ddf248
No known key found for this signature in database
2 changed files with 34 additions and 2 deletions

View File

@ -68,6 +68,25 @@ async def run_agent_background(
logger.critical(f"Failed to initialize Redis connection: {e}")
raise e
# Idempotency check: prevent duplicate runs
run_lock_key = f"agent_run_lock:{agent_run_id}"
# Try to acquire a lock for this agent run
lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL)
if not lock_acquired:
# Check if the run is already being handled by another instance
existing_instance = await redis.get(run_lock_key)
if existing_instance:
logger.info(f"Agent run {agent_run_id} is already being processed by instance {existing_instance.decode() if isinstance(existing_instance, bytes) else existing_instance}. Skipping duplicate execution.")
return
else:
# Lock exists but no value, try to acquire again
lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL)
if not lock_acquired:
logger.info(f"Agent run {agent_run_id} is already being processed by another instance. Skipping duplicate execution.")
return
sentry.sentry.set_tag("thread_id", thread_id)
logger.info(f"Starting background agent run: {agent_run_id} for thread: {thread_id} (Instance: {instance_id})")
@ -249,6 +268,9 @@ async def run_agent_background(
# Remove the instance-specific active run key
await _cleanup_redis_instance_key(agent_run_id)
# Clean up the run lock
await _cleanup_redis_run_lock(agent_run_id)
# Wait for all pending redis operations to complete, with timeout
try:
await asyncio.wait_for(asyncio.gather(*pending_redis_operations), timeout=30.0)
@ -270,6 +292,16 @@ async def _cleanup_redis_instance_key(agent_run_id: str):
except Exception as e:
logger.warning(f"Failed to clean up Redis key {key}: {str(e)}")
async def _cleanup_redis_run_lock(agent_run_id: str):
"""Clean up the run lock Redis key for an agent run."""
run_lock_key = f"agent_run_lock:{agent_run_id}"
logger.debug(f"Cleaning up Redis run lock key: {run_lock_key}")
try:
await redis.delete(run_lock_key)
logger.debug(f"Successfully cleaned up Redis run lock key: {run_lock_key}")
except Exception as e:
logger.warning(f"Failed to clean up Redis run lock key {run_lock_key}: {str(e)}")
# TTL for Redis response lists (24 hours)
REDIS_RESPONSE_LIST_TTL = 3600 * 24

View File

@ -96,10 +96,10 @@ async def get_client():
# Basic Redis operations
async def set(key: str, value: str, ex: int = None):
async def set(key: str, value: str, ex: int = None, nx: bool = False):
"""Set a Redis key."""
redis_client = await get_client()
return await redis_client.set(key, value, ex=ex)
return await redis_client.set(key, value, ex=ex, nx=nx)
async def get(key: str, default: str = None):