From a152777b2d93b6998383b06f6d4676279b8e6924 Mon Sep 17 00:00:00 2001 From: Soumyadas15 Date: Mon, 23 Jun 2025 19:03:32 +0530 Subject: [PATCH] fix: RabbitMQ connection fix --- backend/run_agent_background.py | 250 ++++++++++++++++++++++- backend/run_workflow_background.py | 307 ----------------------------- backend/scheduling/api.py | 2 +- backend/webhooks/api.py | 2 +- backend/workflows/api.py | 2 +- docker-compose.yaml | 2 +- 6 files changed, 244 insertions(+), 321 deletions(-) delete mode 100644 backend/run_workflow_background.py diff --git a/backend/run_agent_background.py b/backend/run_agent_background.py index 9f681c52..53121184 100644 --- a/backend/run_agent_background.py +++ b/backend/run_agent_background.py @@ -16,16 +16,17 @@ from dramatiq.brokers.rabbitmq import RabbitmqBroker import os from services.langfuse import langfuse from utils.retry import retry +from workflows.executor import WorkflowExecutor +from workflows.deterministic_executor import DeterministicWorkflowExecutor +from workflows.models import WorkflowDefinition +import sentry_sdk +from typing import Dict, Any + +rabbitmq_host = os.getenv('RABBITMQ_HOST', 'rabbitmq') +rabbitmq_port = int(os.getenv('RABBITMQ_PORT', 5672)) +rabbitmq_broker = RabbitmqBroker(host=rabbitmq_host, port=rabbitmq_port, middleware=[dramatiq.middleware.AsyncIO()]) +dramatiq.set_broker(rabbitmq_broker) -try: - broker = dramatiq.get_broker() - if not any(isinstance(m, dramatiq.middleware.AsyncIO) for m in broker.middleware): - broker.add_middleware(dramatiq.middleware.AsyncIO()) -except RuntimeError: - rabbitmq_host = os.getenv('RABBITMQ_HOST', 'rabbitmq') - rabbitmq_port = int(os.getenv('RABBITMQ_PORT', 5672)) - rabbitmq_broker = RabbitmqBroker(host=rabbitmq_host, port=rabbitmq_port, middleware=[dramatiq.middleware.AsyncIO()]) - dramatiq.set_broker(rabbitmq_broker) _initialized = False db = DBConnection() @@ -33,7 +34,7 @@ instance_id = "single" async def initialize(): """Initialize the agent API with resources from the main API.""" - global db, instance_id, _initialized + global db, instance_id, _initialized, workflow_executor, deterministic_executor if not instance_id: instance_id = str(uuid.uuid4())[:8] @@ -392,3 +393,232 @@ async def update_agent_run_status( return False return False + +@dramatiq.actor +async def run_workflow_background( + execution_id: str, + workflow_id: str, + workflow_name: str, + workflow_definition: Dict[str, Any], + variables: Optional[Dict[str, Any]] = None, + triggered_by: str = "MANUAL", + project_id: Optional[str] = None, + thread_id: Optional[str] = None, + agent_run_id: Optional[str] = None, + deterministic: bool = True +): + """Run a workflow in the background using Dramatiq.""" + try: + await initialize() + except Exception as e: + logger.critical(f"Failed to initialize workflow worker: {e}") + raise e + + run_lock_key = f"workflow_run_lock:{execution_id}" + + lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL) + + if not lock_acquired: + existing_instance = await redis.get(run_lock_key) + if existing_instance: + logger.info(f"Workflow execution {execution_id} is already being processed by instance {existing_instance.decode() if isinstance(existing_instance, bytes) else existing_instance}. Skipping duplicate execution.") + return + else: + lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL) + if not lock_acquired: + logger.info(f"Workflow execution {execution_id} is already being processed by another instance. Skipping duplicate execution.") + return + + sentry_sdk.set_tag("workflow_id", workflow_id) + sentry_sdk.set_tag("execution_id", execution_id) + + logger.info(f"Starting background workflow execution: {execution_id} for workflow: {workflow_name} (Instance: {instance_id})") + logger.info(f"🔄 Triggered by: {triggered_by}") + + client = await db.client + start_time = datetime.now(timezone.utc) + total_responses = 0 + pubsub = None + stop_checker = None + stop_signal_received = False + + # Define Redis keys and channels - use agent_run pattern if agent_run_id provided for frontend compatibility + if agent_run_id: + response_list_key = f"agent_run:{agent_run_id}:responses" + response_channel = f"agent_run:{agent_run_id}:new_response" + instance_control_channel = f"agent_run:{agent_run_id}:control:{instance_id}" + global_control_channel = f"agent_run:{agent_run_id}:control" + instance_active_key = f"active_run:{instance_id}:{agent_run_id}" + else: + # Fallback to workflow execution pattern + response_list_key = f"workflow_execution:{execution_id}:responses" + response_channel = f"workflow_execution:{execution_id}:new_response" + instance_control_channel = f"workflow_execution:{execution_id}:control:{instance_id}" + global_control_channel = f"workflow_execution:{execution_id}:control" + instance_active_key = f"active_workflow:{instance_id}:{execution_id}" + + async def check_for_stop_signal(): + nonlocal stop_signal_received + if not pubsub: return + try: + while not stop_signal_received: + message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.5) + if message and message.get("type") == "message": + data = message.get("data") + if isinstance(data, bytes): data = data.decode('utf-8') + if data == "STOP": + logger.info(f"Received STOP signal for workflow execution {execution_id} (Instance: {instance_id})") + stop_signal_received = True + break + if total_responses % 50 == 0: + try: await redis.expire(instance_active_key, redis.REDIS_KEY_TTL) + except Exception as ttl_err: logger.warning(f"Failed to refresh TTL for {instance_active_key}: {ttl_err}") + await asyncio.sleep(0.1) + except asyncio.CancelledError: + logger.info(f"Stop signal checker cancelled for {execution_id} (Instance: {instance_id})") + except Exception as e: + logger.error(f"Error in stop signal checker for {execution_id}: {e}", exc_info=True) + stop_signal_received = True + + try: + pubsub = await redis.create_pubsub() + try: + await retry(lambda: pubsub.subscribe(instance_control_channel, global_control_channel)) + except Exception as e: + logger.error(f"Redis failed to subscribe to control channels: {e}", exc_info=True) + raise e + + logger.debug(f"Subscribed to control channels: {instance_control_channel}, {global_control_channel}") + stop_checker = asyncio.create_task(check_for_stop_signal()) + await redis.set(instance_active_key, "running", ex=redis.REDIS_KEY_TTL) + + await client.table('workflow_executions').update({ + "status": "running", + "started_at": start_time.isoformat() + }).eq('id', execution_id).execute() + + workflow = WorkflowDefinition(**workflow_definition) + + if not thread_id: + thread_id = str(uuid.uuid4()) + + final_status = "running" + error_message = None + pending_redis_operations = [] + + if deterministic: + executor = deterministic_executor + logger.info(f"Using deterministic executor for workflow {execution_id}") + else: + executor = workflow_executor + logger.info(f"Using legacy executor for workflow {execution_id}") + + async for response in executor.execute_workflow( + workflow=workflow, + variables=variables, + thread_id=thread_id, + project_id=project_id + ): + if stop_signal_received: + logger.info(f"Workflow execution {execution_id} stopped by signal.") + final_status = "stopped" + break + + response_json = json.dumps(response) + pending_redis_operations.append(asyncio.create_task(redis.rpush(response_list_key, response_json))) + pending_redis_operations.append(asyncio.create_task(redis.publish(response_channel, "new"))) + total_responses += 1 + + if response.get('type') == 'workflow_status': + status_val = response.get('status') + if status_val in ['completed', 'failed', 'stopped']: + logger.info(f"Workflow execution {execution_id} finished via status message: {status_val}") + final_status = status_val + if status_val == 'failed' or status_val == 'stopped': + error_message = response.get('error', f"Workflow ended with status: {status_val}") + break + + if final_status == "running": + final_status = "completed" + duration = (datetime.now(timezone.utc) - start_time).total_seconds() + logger.info(f"Workflow execution {execution_id} completed normally (duration: {duration:.2f}s, responses: {total_responses})") + completion_message = {"type": "workflow_status", "status": "completed", "message": "Workflow execution completed successfully"} + await redis.rpush(response_list_key, json.dumps(completion_message)) + await redis.publish(response_channel, "new") + + await update_workflow_execution_status(client, execution_id, final_status, error=error_message, agent_run_id=agent_run_id) + + control_signal = "END_STREAM" if final_status == "completed" else "ERROR" if final_status == "failed" else "STOP" + try: + await redis.publish(global_control_channel, control_signal) + logger.debug(f"Published final control signal '{control_signal}' to {global_control_channel}") + except Exception as e: + logger.warning(f"Failed to publish final control signal {control_signal}: {str(e)}") + + except Exception as e: + error_message = str(e) + traceback_str = traceback.format_exc() + duration = (datetime.now(timezone.utc) - start_time).total_seconds() + logger.error(f"Error in workflow execution {execution_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})") + final_status = "failed" + + error_response = {"type": "workflow_status", "status": "error", "message": error_message} + try: + await redis.rpush(response_list_key, json.dumps(error_response)) + await redis.publish(response_channel, "new") + except Exception as redis_err: + logger.error(f"Failed to push error response to Redis for {execution_id}: {redis_err}") + + await update_workflow_execution_status(client, execution_id, "failed", error=f"{error_message}\n{traceback_str}", agent_run_id=agent_run_id) + try: + await redis.publish(global_control_channel, "ERROR") + logger.debug(f"Published ERROR signal to {global_control_channel}") + except Exception as e: + logger.warning(f"Failed to publish ERROR signal: {str(e)}") + + finally: + if stop_checker and not stop_checker.done(): + stop_checker.cancel() + try: await stop_checker + except asyncio.CancelledError: pass + except Exception as e: logger.warning(f"Error during stop_checker cancellation: {e}") + + if pubsub: + try: + await pubsub.unsubscribe() + await pubsub.close() + logger.debug(f"Closed pubsub connection for {execution_id}") + except Exception as e: + logger.warning(f"Error closing pubsub for {execution_id}: {str(e)}") + + await _cleanup_redis_response_list(execution_id, agent_run_id) + await _cleanup_redis_instance_key(execution_id, agent_run_id) + await _cleanup_redis_run_lock(execution_id) + + try: + await asyncio.wait_for(asyncio.gather(*pending_redis_operations), timeout=30.0) + except asyncio.TimeoutError: + logger.warning(f"Timeout waiting for pending Redis operations for {execution_id}") + + logger.info(f"Workflow execution background task fully completed for: {execution_id} (Instance: {instance_id}) with final status: {final_status}") + + +async def update_workflow_execution_status(client, execution_id: str, status: str, error: Optional[str] = None, agent_run_id: Optional[str] = None): + """Update workflow execution status in database.""" + try: + update_data = { + "status": status, + "completed_at": datetime.now(timezone.utc).isoformat() if status in ['completed', 'failed', 'stopped'] else None, + "error": error + } + + await client.table('workflow_executions').update(update_data).eq('id', execution_id).execute() + logger.info(f"Updated workflow execution {execution_id} status to {status}") + + # Also update agent_runs table if agent_run_id provided (for frontend streaming compatibility) + if agent_run_id: + await client.table('agent_runs').update(update_data).eq('id', agent_run_id).execute() + logger.info(f"Updated agent run {agent_run_id} status to {status}") + + except Exception as e: + logger.error(f"Failed to update workflow execution status: {e}") \ No newline at end of file diff --git a/backend/run_workflow_background.py b/backend/run_workflow_background.py deleted file mode 100644 index 942d611c..00000000 --- a/backend/run_workflow_background.py +++ /dev/null @@ -1,307 +0,0 @@ -import sentry_sdk -import asyncio -import json -import traceback -from datetime import datetime, timezone -from typing import Optional, Dict, Any -from services import redis -from workflows.executor import WorkflowExecutor -from workflows.deterministic_executor import DeterministicWorkflowExecutor -from workflows.models import WorkflowDefinition -from utils.logger import logger -import dramatiq -import uuid -from services.supabase import DBConnection -from dramatiq.brokers.rabbitmq import RabbitmqBroker -import os -from utils.retry import retry - -try: - broker = dramatiq.get_broker() - if not any(isinstance(m, dramatiq.middleware.AsyncIO) for m in broker.middleware): - broker.add_middleware(dramatiq.middleware.AsyncIO()) -except RuntimeError: - rabbitmq_host = os.getenv('RABBITMQ_HOST', 'rabbitmq') - rabbitmq_port = int(os.getenv('RABBITMQ_PORT', 5672)) - rabbitmq_broker = RabbitmqBroker(host=rabbitmq_host, port=rabbitmq_port, middleware=[dramatiq.middleware.AsyncIO()]) - dramatiq.set_broker(rabbitmq_broker) - -_initialized = False -db = DBConnection() -workflow_executor = WorkflowExecutor(db) -deterministic_executor = DeterministicWorkflowExecutor(db) -instance_id = "workflow_worker" - -async def initialize(): - """Initialize the workflow worker with resources.""" - global db, workflow_executor, instance_id, _initialized - - if not instance_id: - instance_id = str(uuid.uuid4())[:8] - - await retry(lambda: redis.initialize_async()) - await db.initialize() - - _initialized = True - logger.info(f"Initialized workflow worker with instance ID: {instance_id}") - -@dramatiq.actor -async def run_workflow_background( - execution_id: str, - workflow_id: str, - workflow_name: str, - workflow_definition: Dict[str, Any], - variables: Optional[Dict[str, Any]] = None, - triggered_by: str = "MANUAL", - project_id: Optional[str] = None, - thread_id: Optional[str] = None, - agent_run_id: Optional[str] = None, - deterministic: bool = True -): - """Run a workflow in the background using Dramatiq.""" - try: - await initialize() - except Exception as e: - logger.critical(f"Failed to initialize workflow worker: {e}") - raise e - - run_lock_key = f"workflow_run_lock:{execution_id}" - - lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL) - - if not lock_acquired: - existing_instance = await redis.get(run_lock_key) - if existing_instance: - logger.info(f"Workflow execution {execution_id} is already being processed by instance {existing_instance.decode() if isinstance(existing_instance, bytes) else existing_instance}. Skipping duplicate execution.") - return - else: - lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL) - if not lock_acquired: - logger.info(f"Workflow execution {execution_id} is already being processed by another instance. Skipping duplicate execution.") - return - - sentry_sdk.set_tag("workflow_id", workflow_id) - sentry_sdk.set_tag("execution_id", execution_id) - - logger.info(f"Starting background workflow execution: {execution_id} for workflow: {workflow_name} (Instance: {instance_id})") - logger.info(f"🔄 Triggered by: {triggered_by}") - - client = await db.client - start_time = datetime.now(timezone.utc) - total_responses = 0 - pubsub = None - stop_checker = None - stop_signal_received = False - - # Define Redis keys and channels - use agent_run pattern if agent_run_id provided for frontend compatibility - if agent_run_id: - response_list_key = f"agent_run:{agent_run_id}:responses" - response_channel = f"agent_run:{agent_run_id}:new_response" - instance_control_channel = f"agent_run:{agent_run_id}:control:{instance_id}" - global_control_channel = f"agent_run:{agent_run_id}:control" - instance_active_key = f"active_run:{instance_id}:{agent_run_id}" - else: - # Fallback to workflow execution pattern - response_list_key = f"workflow_execution:{execution_id}:responses" - response_channel = f"workflow_execution:{execution_id}:new_response" - instance_control_channel = f"workflow_execution:{execution_id}:control:{instance_id}" - global_control_channel = f"workflow_execution:{execution_id}:control" - instance_active_key = f"active_workflow:{instance_id}:{execution_id}" - - async def check_for_stop_signal(): - nonlocal stop_signal_received - if not pubsub: return - try: - while not stop_signal_received: - message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.5) - if message and message.get("type") == "message": - data = message.get("data") - if isinstance(data, bytes): data = data.decode('utf-8') - if data == "STOP": - logger.info(f"Received STOP signal for workflow execution {execution_id} (Instance: {instance_id})") - stop_signal_received = True - break - if total_responses % 50 == 0: - try: await redis.expire(instance_active_key, redis.REDIS_KEY_TTL) - except Exception as ttl_err: logger.warning(f"Failed to refresh TTL for {instance_active_key}: {ttl_err}") - await asyncio.sleep(0.1) - except asyncio.CancelledError: - logger.info(f"Stop signal checker cancelled for {execution_id} (Instance: {instance_id})") - except Exception as e: - logger.error(f"Error in stop signal checker for {execution_id}: {e}", exc_info=True) - stop_signal_received = True - - try: - pubsub = await redis.create_pubsub() - try: - await retry(lambda: pubsub.subscribe(instance_control_channel, global_control_channel)) - except Exception as e: - logger.error(f"Redis failed to subscribe to control channels: {e}", exc_info=True) - raise e - - logger.debug(f"Subscribed to control channels: {instance_control_channel}, {global_control_channel}") - stop_checker = asyncio.create_task(check_for_stop_signal()) - await redis.set(instance_active_key, "running", ex=redis.REDIS_KEY_TTL) - - await client.table('workflow_executions').update({ - "status": "running", - "started_at": start_time.isoformat() - }).eq('id', execution_id).execute() - - workflow = WorkflowDefinition(**workflow_definition) - - if not thread_id: - thread_id = str(uuid.uuid4()) - - final_status = "running" - error_message = None - pending_redis_operations = [] - - if deterministic: - executor = deterministic_executor - logger.info(f"Using deterministic executor for workflow {execution_id}") - else: - executor = workflow_executor - logger.info(f"Using legacy executor for workflow {execution_id}") - - async for response in executor.execute_workflow( - workflow=workflow, - variables=variables, - thread_id=thread_id, - project_id=project_id - ): - if stop_signal_received: - logger.info(f"Workflow execution {execution_id} stopped by signal.") - final_status = "stopped" - break - - response_json = json.dumps(response) - pending_redis_operations.append(asyncio.create_task(redis.rpush(response_list_key, response_json))) - pending_redis_operations.append(asyncio.create_task(redis.publish(response_channel, "new"))) - total_responses += 1 - - if response.get('type') == 'workflow_status': - status_val = response.get('status') - if status_val in ['completed', 'failed', 'stopped']: - logger.info(f"Workflow execution {execution_id} finished via status message: {status_val}") - final_status = status_val - if status_val == 'failed' or status_val == 'stopped': - error_message = response.get('error', f"Workflow ended with status: {status_val}") - break - - if final_status == "running": - final_status = "completed" - duration = (datetime.now(timezone.utc) - start_time).total_seconds() - logger.info(f"Workflow execution {execution_id} completed normally (duration: {duration:.2f}s, responses: {total_responses})") - completion_message = {"type": "workflow_status", "status": "completed", "message": "Workflow execution completed successfully"} - await redis.rpush(response_list_key, json.dumps(completion_message)) - await redis.publish(response_channel, "new") - - await update_workflow_execution_status(client, execution_id, final_status, error=error_message, agent_run_id=agent_run_id) - - control_signal = "END_STREAM" if final_status == "completed" else "ERROR" if final_status == "failed" else "STOP" - try: - await redis.publish(global_control_channel, control_signal) - logger.debug(f"Published final control signal '{control_signal}' to {global_control_channel}") - except Exception as e: - logger.warning(f"Failed to publish final control signal {control_signal}: {str(e)}") - - except Exception as e: - error_message = str(e) - traceback_str = traceback.format_exc() - duration = (datetime.now(timezone.utc) - start_time).total_seconds() - logger.error(f"Error in workflow execution {execution_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})") - final_status = "failed" - - error_response = {"type": "workflow_status", "status": "error", "message": error_message} - try: - await redis.rpush(response_list_key, json.dumps(error_response)) - await redis.publish(response_channel, "new") - except Exception as redis_err: - logger.error(f"Failed to push error response to Redis for {execution_id}: {redis_err}") - - await update_workflow_execution_status(client, execution_id, "failed", error=f"{error_message}\n{traceback_str}", agent_run_id=agent_run_id) - try: - await redis.publish(global_control_channel, "ERROR") - logger.debug(f"Published ERROR signal to {global_control_channel}") - except Exception as e: - logger.warning(f"Failed to publish ERROR signal: {str(e)}") - - finally: - if stop_checker and not stop_checker.done(): - stop_checker.cancel() - try: await stop_checker - except asyncio.CancelledError: pass - except Exception as e: logger.warning(f"Error during stop_checker cancellation: {e}") - - if pubsub: - try: - await pubsub.unsubscribe() - await pubsub.close() - logger.debug(f"Closed pubsub connection for {execution_id}") - except Exception as e: - logger.warning(f"Error closing pubsub for {execution_id}: {str(e)}") - - await _cleanup_redis_response_list(execution_id, agent_run_id) - await _cleanup_redis_instance_key(execution_id, agent_run_id) - await _cleanup_redis_run_lock(execution_id) - - try: - await asyncio.wait_for(asyncio.gather(*pending_redis_operations), timeout=30.0) - except asyncio.TimeoutError: - logger.warning(f"Timeout waiting for pending Redis operations for {execution_id}") - - logger.info(f"Workflow execution background task fully completed for: {execution_id} (Instance: {instance_id}) with final status: {final_status}") - -async def update_workflow_execution_status(client, execution_id: str, status: str, error: Optional[str] = None, agent_run_id: Optional[str] = None): - """Update workflow execution status in database.""" - try: - update_data = { - "status": status, - "completed_at": datetime.now(timezone.utc).isoformat() if status in ['completed', 'failed', 'stopped'] else None, - "error": error - } - - await client.table('workflow_executions').update(update_data).eq('id', execution_id).execute() - logger.info(f"Updated workflow execution {execution_id} status to {status}") - - # Also update agent_runs table if agent_run_id provided (for frontend streaming compatibility) - if agent_run_id: - await client.table('agent_runs').update(update_data).eq('id', agent_run_id).execute() - logger.info(f"Updated agent run {agent_run_id} status to {status}") - - except Exception as e: - logger.error(f"Failed to update workflow execution status: {e}") - -async def _cleanup_redis_response_list(execution_id: str, agent_run_id: Optional[str] = None): - """Set TTL on workflow execution response list.""" - try: - if agent_run_id: - response_list_key = f"agent_run:{agent_run_id}:responses" - else: - response_list_key = f"workflow_execution:{execution_id}:responses" - await redis.expire(response_list_key, redis.REDIS_KEY_TTL) - logger.debug(f"Set TTL on {response_list_key}") - except Exception as e: - logger.warning(f"Failed to set TTL on response list for {execution_id}: {e}") - -async def _cleanup_redis_instance_key(execution_id: str, agent_run_id: Optional[str] = None): - """Remove instance-specific active run key.""" - try: - if agent_run_id: - instance_active_key = f"active_run:{instance_id}:{agent_run_id}" - else: - instance_active_key = f"active_workflow:{instance_id}:{execution_id}" - await redis.delete(instance_active_key) - logger.debug(f"Cleaned up instance key {instance_active_key}") - except Exception as e: - logger.warning(f"Failed to clean up instance key for {execution_id}: {e}") - -async def _cleanup_redis_run_lock(execution_id: str): - """Remove workflow execution lock.""" - try: - run_lock_key = f"workflow_run_lock:{execution_id}" - await redis.delete(run_lock_key) - logger.debug(f"Cleaned up run lock {run_lock_key}") - except Exception as e: - logger.warning(f"Failed to clean up run lock for {execution_id}: {e}") \ No newline at end of file diff --git a/backend/scheduling/api.py b/backend/scheduling/api.py index b08dcfcd..61e30fae 100644 --- a/backend/scheduling/api.py +++ b/backend/scheduling/api.py @@ -403,7 +403,7 @@ async def execute_scheduled_workflow( workflow_dict['updated_at'] = workflow_dict['updated_at'].isoformat() # Send workflow to background worker - from run_workflow_background import run_workflow_background + from run_agent_background import run_workflow_background run_workflow_background.send( execution_id=execution_id, workflow_id=workflow_id, diff --git a/backend/webhooks/api.py b/backend/webhooks/api.py index d3f8018d..bd46f7a6 100644 --- a/backend/webhooks/api.py +++ b/backend/webhooks/api.py @@ -168,7 +168,7 @@ async def trigger_workflow_webhook( result = await _handle_generic_webhook(workflow, data) if result.get("should_execute", False): - from run_workflow_background import run_workflow_background + from run_agent_background import run_workflow_background execution_id = str(uuid.uuid4()) diff --git a/backend/workflows/api.py b/backend/workflows/api.py index 3ca898e7..97213f70 100644 --- a/backend/workflows/api.py +++ b/backend/workflows/api.py @@ -438,7 +438,7 @@ async def execute_workflow( agent_run_id = agent_run.data[0]['id'] logger.info(f"Created agent run for workflow: {agent_run_id}") - from run_workflow_background import run_workflow_background + from run_agent_background import run_workflow_background if hasattr(workflow, 'model_dump'): workflow_dict = workflow.model_dump(mode='json') else: diff --git a/docker-compose.yaml b/docker-compose.yaml index ad4999a9..3fffa4e0 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -57,7 +57,7 @@ services: build: context: ./backend dockerfile: Dockerfile - command: python -m dramatiq --skip-logging run_agent_background run_workflow_background + command: python -m dramatiq --skip-logging run_agent_background volumes: - ./backend/.env:/app/.env:ro env_file: