diff --git a/backend/run_agent_background.py b/backend/run_agent_background.py index 56eb5e23..cbae733c 100644 --- a/backend/run_agent_background.py +++ b/backend/run_agent_background.py @@ -68,6 +68,25 @@ async def run_agent_background( logger.critical(f"Failed to initialize Redis connection: {e}") raise e + # Idempotency check: prevent duplicate runs + run_lock_key = f"agent_run_lock:{agent_run_id}" + + # Try to acquire a lock for this agent run + lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL) + + if not lock_acquired: + # Check if the run is already being handled by another instance + existing_instance = await redis.get(run_lock_key) + if existing_instance: + logger.info(f"Agent run {agent_run_id} is already being processed by instance {existing_instance.decode() if isinstance(existing_instance, bytes) else existing_instance}. Skipping duplicate execution.") + return + else: + # Lock exists but no value, try to acquire again + lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL) + if not lock_acquired: + logger.info(f"Agent run {agent_run_id} is already being processed by another instance. Skipping duplicate execution.") + return + sentry.sentry.set_tag("thread_id", thread_id) logger.info(f"Starting background agent run: {agent_run_id} for thread: {thread_id} (Instance: {instance_id})") @@ -249,6 +268,9 @@ async def run_agent_background( # Remove the instance-specific active run key await _cleanup_redis_instance_key(agent_run_id) + # Clean up the run lock + await _cleanup_redis_run_lock(agent_run_id) + # Wait for all pending redis operations to complete, with timeout try: await asyncio.wait_for(asyncio.gather(*pending_redis_operations), timeout=30.0) @@ -270,6 +292,16 @@ async def _cleanup_redis_instance_key(agent_run_id: str): except Exception as e: logger.warning(f"Failed to clean up Redis key {key}: {str(e)}") +async def _cleanup_redis_run_lock(agent_run_id: str): + """Clean up the run lock Redis key for an agent run.""" + run_lock_key = f"agent_run_lock:{agent_run_id}" + logger.debug(f"Cleaning up Redis run lock key: {run_lock_key}") + try: + await redis.delete(run_lock_key) + logger.debug(f"Successfully cleaned up Redis run lock key: {run_lock_key}") + except Exception as e: + logger.warning(f"Failed to clean up Redis run lock key {run_lock_key}: {str(e)}") + # TTL for Redis response lists (24 hours) REDIS_RESPONSE_LIST_TTL = 3600 * 24 diff --git a/backend/services/redis.py b/backend/services/redis.py index dc956b05..67a1df0a 100644 --- a/backend/services/redis.py +++ b/backend/services/redis.py @@ -96,10 +96,10 @@ async def get_client(): # Basic Redis operations -async def set(key: str, value: str, ex: int = None): +async def set(key: str, value: str, ex: int = None, nx: bool = False): """Set a Redis key.""" redis_client = await get_client() - return await redis_client.set(key, value, ex=ex) + return await redis_client.set(key, value, ex=ex, nx=nx) async def get(key: str, default: str = None):