From ad91ddf248edbfe54fc7eaffa494ab2a54c219a0 Mon Sep 17 00:00:00 2001 From: sharath <29162020+tnfssc@users.noreply.github.com> Date: Mon, 9 Jun 2025 01:51:11 +0000 Subject: [PATCH 1/7] feat(redis): implement idempotency check for agent runs and add cleanup for run locks --- backend/run_agent_background.py | 32 ++++++++++++++++++++++++++++++++ backend/services/redis.py | 4 ++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/backend/run_agent_background.py b/backend/run_agent_background.py index 56eb5e23..cbae733c 100644 --- a/backend/run_agent_background.py +++ b/backend/run_agent_background.py @@ -68,6 +68,25 @@ async def run_agent_background( logger.critical(f"Failed to initialize Redis connection: {e}") raise e + # Idempotency check: prevent duplicate runs + run_lock_key = f"agent_run_lock:{agent_run_id}" + + # Try to acquire a lock for this agent run + lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL) + + if not lock_acquired: + # Check if the run is already being handled by another instance + existing_instance = await redis.get(run_lock_key) + if existing_instance: + logger.info(f"Agent run {agent_run_id} is already being processed by instance {existing_instance.decode() if isinstance(existing_instance, bytes) else existing_instance}. Skipping duplicate execution.") + return + else: + # Lock exists but no value, try to acquire again + lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL) + if not lock_acquired: + logger.info(f"Agent run {agent_run_id} is already being processed by another instance. Skipping duplicate execution.") + return + sentry.sentry.set_tag("thread_id", thread_id) logger.info(f"Starting background agent run: {agent_run_id} for thread: {thread_id} (Instance: {instance_id})") @@ -249,6 +268,9 @@ async def run_agent_background( # Remove the instance-specific active run key await _cleanup_redis_instance_key(agent_run_id) + # Clean up the run lock + await _cleanup_redis_run_lock(agent_run_id) + # Wait for all pending redis operations to complete, with timeout try: await asyncio.wait_for(asyncio.gather(*pending_redis_operations), timeout=30.0) @@ -270,6 +292,16 @@ async def _cleanup_redis_instance_key(agent_run_id: str): except Exception as e: logger.warning(f"Failed to clean up Redis key {key}: {str(e)}") +async def _cleanup_redis_run_lock(agent_run_id: str): + """Clean up the run lock Redis key for an agent run.""" + run_lock_key = f"agent_run_lock:{agent_run_id}" + logger.debug(f"Cleaning up Redis run lock key: {run_lock_key}") + try: + await redis.delete(run_lock_key) + logger.debug(f"Successfully cleaned up Redis run lock key: {run_lock_key}") + except Exception as e: + logger.warning(f"Failed to clean up Redis run lock key {run_lock_key}: {str(e)}") + # TTL for Redis response lists (24 hours) REDIS_RESPONSE_LIST_TTL = 3600 * 24 diff --git a/backend/services/redis.py b/backend/services/redis.py index dc956b05..67a1df0a 100644 --- a/backend/services/redis.py +++ b/backend/services/redis.py @@ -96,10 +96,10 @@ async def get_client(): # Basic Redis operations -async def set(key: str, value: str, ex: int = None): +async def set(key: str, value: str, ex: int = None, nx: bool = False): """Set a Redis key.""" redis_client = await get_client() - return await redis_client.set(key, value, ex=ex) + return await redis_client.set(key, value, ex=ex, nx=nx) async def get(key: str, default: str = None): From 6c539325bcefaf16497609b05774c1efe187a058 Mon Sep 17 00:00:00 2001 From: sharath <29162020+tnfssc@users.noreply.github.com> Date: Tue, 10 Jun 2025 04:52:09 +0000 Subject: [PATCH 2/7] fix(redis): retry on connection errors --- backend/run_agent_background.py | 16 ++++----- backend/services/redis.py | 44 +++++++++++-------------- backend/utils/retry.py | 58 +++++++++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 33 deletions(-) create mode 100644 backend/utils/retry.py diff --git a/backend/run_agent_background.py b/backend/run_agent_background.py index 56eb5e23..51691dc0 100644 --- a/backend/run_agent_background.py +++ b/backend/run_agent_background.py @@ -15,6 +15,7 @@ from services import redis from dramatiq.brokers.rabbitmq import RabbitmqBroker import os from services.langfuse import langfuse +from utils.retry import retry rabbitmq_host = os.getenv('RABBITMQ_HOST', 'rabbitmq') rabbitmq_port = int(os.getenv('RABBITMQ_PORT', 5672)) @@ -28,18 +29,12 @@ instance_id = "single" async def initialize(): """Initialize the agent API with resources from the main API.""" global db, instance_id, _initialized - if _initialized: - try: await redis.client.ping() - except Exception as e: - logger.warning(f"Redis connection failed, re-initializing: {e}") - await redis.initialize_async(force=True) - return # Use provided instance_id or generate a new one if not instance_id: # Generate instance ID instance_id = str(uuid.uuid4())[:8] - await redis.initialize_async() + await retry(lambda: redis.initialize_async()) await db.initialize() _initialized = True @@ -117,7 +112,12 @@ async def run_agent_background( try: # Setup Pub/Sub listener for control signals pubsub = await redis.create_pubsub() - await pubsub.subscribe(instance_control_channel, global_control_channel) + try: + await retry(lambda: pubsub.subscribe(instance_control_channel, global_control_channel)) + except Exception as e: + logger.error(f"Redis failed to subscribe to control channels: {e}", exc_info=True) + raise e + logger.debug(f"Subscribed to control channels: {instance_control_channel}, {global_control_channel}") stop_checker = asyncio.create_task(check_for_stop_signal()) diff --git a/backend/services/redis.py b/backend/services/redis.py index dc956b05..108e36bd 100644 --- a/backend/services/redis.py +++ b/backend/services/redis.py @@ -4,6 +4,7 @@ from dotenv import load_dotenv import asyncio from utils.logger import logger from typing import List, Any +from utils.retry import retry # Redis client client: redis.Redis | None = None @@ -22,12 +23,12 @@ def initialize(): load_dotenv() # Get Redis configuration - redis_host = os.getenv('REDIS_HOST', 'redis') - redis_port = int(os.getenv('REDIS_PORT', 6379)) - redis_password = os.getenv('REDIS_PASSWORD', '') + redis_host = os.getenv("REDIS_HOST", "redis") + redis_port = int(os.getenv("REDIS_PORT", 6379)) + redis_password = os.getenv("REDIS_PASSWORD", "") # Convert string 'True'/'False' to boolean - redis_ssl_str = os.getenv('REDIS_SSL', 'False') - redis_ssl = redis_ssl_str.lower() == 'true' + redis_ssl_str = os.getenv("REDIS_SSL", "False") + redis_ssl = redis_ssl_str.lower() == "true" logger.info(f"Initializing Redis connection to {redis_host}:{redis_port}") @@ -41,37 +42,30 @@ def initialize(): socket_timeout=5.0, socket_connect_timeout=5.0, retry_on_timeout=True, - health_check_interval=30 + health_check_interval=30, ) return client -async def initialize_async(force: bool = False): +async def initialize_async(): """Initialize Redis connection asynchronously.""" global client, _initialized async with _init_lock: - if _initialized and force: - logger.info("Redis connection already initialized, closing and re-initializing") - _initialized = False - try: - await close() - except Exception as e: - logger.warning(f"Failed to close Redis connection, proceeding with re-initialization anyway: {e}") - if not _initialized: logger.info("Initializing Redis connection") initialize() - try: - await client.ping() - logger.info("Successfully connected to Redis") - _initialized = True - except Exception as e: - logger.error(f"Failed to connect to Redis: {e}") - client = None - raise + try: + await client.ping() + logger.info("Successfully connected to Redis") + _initialized = True + except Exception as e: + logger.error(f"Failed to connect to Redis: {e}") + client = None + _initialized = False + raise return client @@ -91,7 +85,7 @@ async def get_client(): """Get the Redis client, initializing if necessary.""" global client, _initialized if client is None or not _initialized: - await initialize_async() + await retry(lambda: initialize_async()) return client @@ -156,4 +150,4 @@ async def expire(key: str, time: int): async def keys(pattern: str) -> List[str]: """Get keys matching a pattern.""" redis_client = await get_client() - return await redis_client.keys(pattern) \ No newline at end of file + return await redis_client.keys(pattern) diff --git a/backend/utils/retry.py b/backend/utils/retry.py new file mode 100644 index 00000000..992c0454 --- /dev/null +++ b/backend/utils/retry.py @@ -0,0 +1,58 @@ +import asyncio +from typing import TypeVar, Callable, Awaitable, Optional + +T = TypeVar("T") + + +async def retry( + fn: Callable[[], Awaitable[T]], + max_attempts: int = 3, + delay_seconds: int = 1, +) -> T: + """ + Retry an async function with exponential backoff. + + Args: + fn: The async function to retry + max_attempts: Maximum number of attempts + delay_seconds: Delay between attempts in seconds + + Returns: + The result of the function call + + Raises: + The last exception if all attempts fail + + Example: + ```python + async def fetch_data(): + # Some operation that might fail + return await api_call() + + try: + result = await retry(fetch_data, max_attempts=3, delay_seconds=2) + print(f"Success: {result}") + except Exception as e: + print(f"Failed after all retries: {e}") + ``` + """ + if max_attempts <= 0: + raise ValueError("max_attempts must be greater than zero") + + last_error: Optional[Exception] = None + + for attempt in range(1, max_attempts + 1): + try: + return await fn() + except Exception as error: + last_error = error + + if attempt == max_attempts: + break + + await asyncio.sleep(delay_seconds) + + if last_error: + raise last_error + + raise RuntimeError("Unexpected: last_error is None") From d7f873a4242f4e49ad7c51272c7f9618998f546a Mon Sep 17 00:00:00 2001 From: Vukasin Date: Tue, 10 Jun 2025 21:06:18 +0200 Subject: [PATCH 3/7] fix: mobile responsive text overlap --- .../thread/chat-input/chat-input.tsx | 18 +++++++++--------- .../thread/chat-input/message-input.tsx | 19 ++++++++++++------- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/frontend/src/components/thread/chat-input/chat-input.tsx b/frontend/src/components/thread/chat-input/chat-input.tsx index dbe4ad41..c9a46c65 100644 --- a/frontend/src/components/thread/chat-input/chat-input.tsx +++ b/frontend/src/components/thread/chat-input/chat-input.tsx @@ -249,15 +249,15 @@ export const ChatInput = forwardRef( } }} > -
- - + + ( }; return ( -
-
+
+ +