mirror of https://github.com/kortix-ai/suna.git
fix: RabbitMQ connection fix
This commit is contained in:
parent
c18541433c
commit
a152777b2d
|
@ -16,16 +16,17 @@ from dramatiq.brokers.rabbitmq import RabbitmqBroker
|
|||
import os
|
||||
from services.langfuse import langfuse
|
||||
from utils.retry import retry
|
||||
from workflows.executor import WorkflowExecutor
|
||||
from workflows.deterministic_executor import DeterministicWorkflowExecutor
|
||||
from workflows.models import WorkflowDefinition
|
||||
import sentry_sdk
|
||||
from typing import Dict, Any
|
||||
|
||||
rabbitmq_host = os.getenv('RABBITMQ_HOST', 'rabbitmq')
|
||||
rabbitmq_port = int(os.getenv('RABBITMQ_PORT', 5672))
|
||||
rabbitmq_broker = RabbitmqBroker(host=rabbitmq_host, port=rabbitmq_port, middleware=[dramatiq.middleware.AsyncIO()])
|
||||
dramatiq.set_broker(rabbitmq_broker)
|
||||
|
||||
try:
|
||||
broker = dramatiq.get_broker()
|
||||
if not any(isinstance(m, dramatiq.middleware.AsyncIO) for m in broker.middleware):
|
||||
broker.add_middleware(dramatiq.middleware.AsyncIO())
|
||||
except RuntimeError:
|
||||
rabbitmq_host = os.getenv('RABBITMQ_HOST', 'rabbitmq')
|
||||
rabbitmq_port = int(os.getenv('RABBITMQ_PORT', 5672))
|
||||
rabbitmq_broker = RabbitmqBroker(host=rabbitmq_host, port=rabbitmq_port, middleware=[dramatiq.middleware.AsyncIO()])
|
||||
dramatiq.set_broker(rabbitmq_broker)
|
||||
|
||||
_initialized = False
|
||||
db = DBConnection()
|
||||
|
@ -33,7 +34,7 @@ instance_id = "single"
|
|||
|
||||
async def initialize():
|
||||
"""Initialize the agent API with resources from the main API."""
|
||||
global db, instance_id, _initialized
|
||||
global db, instance_id, _initialized, workflow_executor, deterministic_executor
|
||||
|
||||
if not instance_id:
|
||||
instance_id = str(uuid.uuid4())[:8]
|
||||
|
@ -392,3 +393,232 @@ async def update_agent_run_status(
|
|||
return False
|
||||
|
||||
return False
|
||||
|
||||
@dramatiq.actor
|
||||
async def run_workflow_background(
|
||||
execution_id: str,
|
||||
workflow_id: str,
|
||||
workflow_name: str,
|
||||
workflow_definition: Dict[str, Any],
|
||||
variables: Optional[Dict[str, Any]] = None,
|
||||
triggered_by: str = "MANUAL",
|
||||
project_id: Optional[str] = None,
|
||||
thread_id: Optional[str] = None,
|
||||
agent_run_id: Optional[str] = None,
|
||||
deterministic: bool = True
|
||||
):
|
||||
"""Run a workflow in the background using Dramatiq."""
|
||||
try:
|
||||
await initialize()
|
||||
except Exception as e:
|
||||
logger.critical(f"Failed to initialize workflow worker: {e}")
|
||||
raise e
|
||||
|
||||
run_lock_key = f"workflow_run_lock:{execution_id}"
|
||||
|
||||
lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL)
|
||||
|
||||
if not lock_acquired:
|
||||
existing_instance = await redis.get(run_lock_key)
|
||||
if existing_instance:
|
||||
logger.info(f"Workflow execution {execution_id} is already being processed by instance {existing_instance.decode() if isinstance(existing_instance, bytes) else existing_instance}. Skipping duplicate execution.")
|
||||
return
|
||||
else:
|
||||
lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL)
|
||||
if not lock_acquired:
|
||||
logger.info(f"Workflow execution {execution_id} is already being processed by another instance. Skipping duplicate execution.")
|
||||
return
|
||||
|
||||
sentry_sdk.set_tag("workflow_id", workflow_id)
|
||||
sentry_sdk.set_tag("execution_id", execution_id)
|
||||
|
||||
logger.info(f"Starting background workflow execution: {execution_id} for workflow: {workflow_name} (Instance: {instance_id})")
|
||||
logger.info(f"🔄 Triggered by: {triggered_by}")
|
||||
|
||||
client = await db.client
|
||||
start_time = datetime.now(timezone.utc)
|
||||
total_responses = 0
|
||||
pubsub = None
|
||||
stop_checker = None
|
||||
stop_signal_received = False
|
||||
|
||||
# Define Redis keys and channels - use agent_run pattern if agent_run_id provided for frontend compatibility
|
||||
if agent_run_id:
|
||||
response_list_key = f"agent_run:{agent_run_id}:responses"
|
||||
response_channel = f"agent_run:{agent_run_id}:new_response"
|
||||
instance_control_channel = f"agent_run:{agent_run_id}:control:{instance_id}"
|
||||
global_control_channel = f"agent_run:{agent_run_id}:control"
|
||||
instance_active_key = f"active_run:{instance_id}:{agent_run_id}"
|
||||
else:
|
||||
# Fallback to workflow execution pattern
|
||||
response_list_key = f"workflow_execution:{execution_id}:responses"
|
||||
response_channel = f"workflow_execution:{execution_id}:new_response"
|
||||
instance_control_channel = f"workflow_execution:{execution_id}:control:{instance_id}"
|
||||
global_control_channel = f"workflow_execution:{execution_id}:control"
|
||||
instance_active_key = f"active_workflow:{instance_id}:{execution_id}"
|
||||
|
||||
async def check_for_stop_signal():
|
||||
nonlocal stop_signal_received
|
||||
if not pubsub: return
|
||||
try:
|
||||
while not stop_signal_received:
|
||||
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.5)
|
||||
if message and message.get("type") == "message":
|
||||
data = message.get("data")
|
||||
if isinstance(data, bytes): data = data.decode('utf-8')
|
||||
if data == "STOP":
|
||||
logger.info(f"Received STOP signal for workflow execution {execution_id} (Instance: {instance_id})")
|
||||
stop_signal_received = True
|
||||
break
|
||||
if total_responses % 50 == 0:
|
||||
try: await redis.expire(instance_active_key, redis.REDIS_KEY_TTL)
|
||||
except Exception as ttl_err: logger.warning(f"Failed to refresh TTL for {instance_active_key}: {ttl_err}")
|
||||
await asyncio.sleep(0.1)
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"Stop signal checker cancelled for {execution_id} (Instance: {instance_id})")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in stop signal checker for {execution_id}: {e}", exc_info=True)
|
||||
stop_signal_received = True
|
||||
|
||||
try:
|
||||
pubsub = await redis.create_pubsub()
|
||||
try:
|
||||
await retry(lambda: pubsub.subscribe(instance_control_channel, global_control_channel))
|
||||
except Exception as e:
|
||||
logger.error(f"Redis failed to subscribe to control channels: {e}", exc_info=True)
|
||||
raise e
|
||||
|
||||
logger.debug(f"Subscribed to control channels: {instance_control_channel}, {global_control_channel}")
|
||||
stop_checker = asyncio.create_task(check_for_stop_signal())
|
||||
await redis.set(instance_active_key, "running", ex=redis.REDIS_KEY_TTL)
|
||||
|
||||
await client.table('workflow_executions').update({
|
||||
"status": "running",
|
||||
"started_at": start_time.isoformat()
|
||||
}).eq('id', execution_id).execute()
|
||||
|
||||
workflow = WorkflowDefinition(**workflow_definition)
|
||||
|
||||
if not thread_id:
|
||||
thread_id = str(uuid.uuid4())
|
||||
|
||||
final_status = "running"
|
||||
error_message = None
|
||||
pending_redis_operations = []
|
||||
|
||||
if deterministic:
|
||||
executor = deterministic_executor
|
||||
logger.info(f"Using deterministic executor for workflow {execution_id}")
|
||||
else:
|
||||
executor = workflow_executor
|
||||
logger.info(f"Using legacy executor for workflow {execution_id}")
|
||||
|
||||
async for response in executor.execute_workflow(
|
||||
workflow=workflow,
|
||||
variables=variables,
|
||||
thread_id=thread_id,
|
||||
project_id=project_id
|
||||
):
|
||||
if stop_signal_received:
|
||||
logger.info(f"Workflow execution {execution_id} stopped by signal.")
|
||||
final_status = "stopped"
|
||||
break
|
||||
|
||||
response_json = json.dumps(response)
|
||||
pending_redis_operations.append(asyncio.create_task(redis.rpush(response_list_key, response_json)))
|
||||
pending_redis_operations.append(asyncio.create_task(redis.publish(response_channel, "new")))
|
||||
total_responses += 1
|
||||
|
||||
if response.get('type') == 'workflow_status':
|
||||
status_val = response.get('status')
|
||||
if status_val in ['completed', 'failed', 'stopped']:
|
||||
logger.info(f"Workflow execution {execution_id} finished via status message: {status_val}")
|
||||
final_status = status_val
|
||||
if status_val == 'failed' or status_val == 'stopped':
|
||||
error_message = response.get('error', f"Workflow ended with status: {status_val}")
|
||||
break
|
||||
|
||||
if final_status == "running":
|
||||
final_status = "completed"
|
||||
duration = (datetime.now(timezone.utc) - start_time).total_seconds()
|
||||
logger.info(f"Workflow execution {execution_id} completed normally (duration: {duration:.2f}s, responses: {total_responses})")
|
||||
completion_message = {"type": "workflow_status", "status": "completed", "message": "Workflow execution completed successfully"}
|
||||
await redis.rpush(response_list_key, json.dumps(completion_message))
|
||||
await redis.publish(response_channel, "new")
|
||||
|
||||
await update_workflow_execution_status(client, execution_id, final_status, error=error_message, agent_run_id=agent_run_id)
|
||||
|
||||
control_signal = "END_STREAM" if final_status == "completed" else "ERROR" if final_status == "failed" else "STOP"
|
||||
try:
|
||||
await redis.publish(global_control_channel, control_signal)
|
||||
logger.debug(f"Published final control signal '{control_signal}' to {global_control_channel}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to publish final control signal {control_signal}: {str(e)}")
|
||||
|
||||
except Exception as e:
|
||||
error_message = str(e)
|
||||
traceback_str = traceback.format_exc()
|
||||
duration = (datetime.now(timezone.utc) - start_time).total_seconds()
|
||||
logger.error(f"Error in workflow execution {execution_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})")
|
||||
final_status = "failed"
|
||||
|
||||
error_response = {"type": "workflow_status", "status": "error", "message": error_message}
|
||||
try:
|
||||
await redis.rpush(response_list_key, json.dumps(error_response))
|
||||
await redis.publish(response_channel, "new")
|
||||
except Exception as redis_err:
|
||||
logger.error(f"Failed to push error response to Redis for {execution_id}: {redis_err}")
|
||||
|
||||
await update_workflow_execution_status(client, execution_id, "failed", error=f"{error_message}\n{traceback_str}", agent_run_id=agent_run_id)
|
||||
try:
|
||||
await redis.publish(global_control_channel, "ERROR")
|
||||
logger.debug(f"Published ERROR signal to {global_control_channel}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to publish ERROR signal: {str(e)}")
|
||||
|
||||
finally:
|
||||
if stop_checker and not stop_checker.done():
|
||||
stop_checker.cancel()
|
||||
try: await stop_checker
|
||||
except asyncio.CancelledError: pass
|
||||
except Exception as e: logger.warning(f"Error during stop_checker cancellation: {e}")
|
||||
|
||||
if pubsub:
|
||||
try:
|
||||
await pubsub.unsubscribe()
|
||||
await pubsub.close()
|
||||
logger.debug(f"Closed pubsub connection for {execution_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing pubsub for {execution_id}: {str(e)}")
|
||||
|
||||
await _cleanup_redis_response_list(execution_id, agent_run_id)
|
||||
await _cleanup_redis_instance_key(execution_id, agent_run_id)
|
||||
await _cleanup_redis_run_lock(execution_id)
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(asyncio.gather(*pending_redis_operations), timeout=30.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"Timeout waiting for pending Redis operations for {execution_id}")
|
||||
|
||||
logger.info(f"Workflow execution background task fully completed for: {execution_id} (Instance: {instance_id}) with final status: {final_status}")
|
||||
|
||||
|
||||
async def update_workflow_execution_status(client, execution_id: str, status: str, error: Optional[str] = None, agent_run_id: Optional[str] = None):
|
||||
"""Update workflow execution status in database."""
|
||||
try:
|
||||
update_data = {
|
||||
"status": status,
|
||||
"completed_at": datetime.now(timezone.utc).isoformat() if status in ['completed', 'failed', 'stopped'] else None,
|
||||
"error": error
|
||||
}
|
||||
|
||||
await client.table('workflow_executions').update(update_data).eq('id', execution_id).execute()
|
||||
logger.info(f"Updated workflow execution {execution_id} status to {status}")
|
||||
|
||||
# Also update agent_runs table if agent_run_id provided (for frontend streaming compatibility)
|
||||
if agent_run_id:
|
||||
await client.table('agent_runs').update(update_data).eq('id', agent_run_id).execute()
|
||||
logger.info(f"Updated agent run {agent_run_id} status to {status}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update workflow execution status: {e}")
|
|
@ -1,307 +0,0 @@
|
|||
import sentry_sdk
|
||||
import asyncio
|
||||
import json
|
||||
import traceback
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional, Dict, Any
|
||||
from services import redis
|
||||
from workflows.executor import WorkflowExecutor
|
||||
from workflows.deterministic_executor import DeterministicWorkflowExecutor
|
||||
from workflows.models import WorkflowDefinition
|
||||
from utils.logger import logger
|
||||
import dramatiq
|
||||
import uuid
|
||||
from services.supabase import DBConnection
|
||||
from dramatiq.brokers.rabbitmq import RabbitmqBroker
|
||||
import os
|
||||
from utils.retry import retry
|
||||
|
||||
try:
|
||||
broker = dramatiq.get_broker()
|
||||
if not any(isinstance(m, dramatiq.middleware.AsyncIO) for m in broker.middleware):
|
||||
broker.add_middleware(dramatiq.middleware.AsyncIO())
|
||||
except RuntimeError:
|
||||
rabbitmq_host = os.getenv('RABBITMQ_HOST', 'rabbitmq')
|
||||
rabbitmq_port = int(os.getenv('RABBITMQ_PORT', 5672))
|
||||
rabbitmq_broker = RabbitmqBroker(host=rabbitmq_host, port=rabbitmq_port, middleware=[dramatiq.middleware.AsyncIO()])
|
||||
dramatiq.set_broker(rabbitmq_broker)
|
||||
|
||||
_initialized = False
|
||||
db = DBConnection()
|
||||
workflow_executor = WorkflowExecutor(db)
|
||||
deterministic_executor = DeterministicWorkflowExecutor(db)
|
||||
instance_id = "workflow_worker"
|
||||
|
||||
async def initialize():
|
||||
"""Initialize the workflow worker with resources."""
|
||||
global db, workflow_executor, instance_id, _initialized
|
||||
|
||||
if not instance_id:
|
||||
instance_id = str(uuid.uuid4())[:8]
|
||||
|
||||
await retry(lambda: redis.initialize_async())
|
||||
await db.initialize()
|
||||
|
||||
_initialized = True
|
||||
logger.info(f"Initialized workflow worker with instance ID: {instance_id}")
|
||||
|
||||
@dramatiq.actor
|
||||
async def run_workflow_background(
|
||||
execution_id: str,
|
||||
workflow_id: str,
|
||||
workflow_name: str,
|
||||
workflow_definition: Dict[str, Any],
|
||||
variables: Optional[Dict[str, Any]] = None,
|
||||
triggered_by: str = "MANUAL",
|
||||
project_id: Optional[str] = None,
|
||||
thread_id: Optional[str] = None,
|
||||
agent_run_id: Optional[str] = None,
|
||||
deterministic: bool = True
|
||||
):
|
||||
"""Run a workflow in the background using Dramatiq."""
|
||||
try:
|
||||
await initialize()
|
||||
except Exception as e:
|
||||
logger.critical(f"Failed to initialize workflow worker: {e}")
|
||||
raise e
|
||||
|
||||
run_lock_key = f"workflow_run_lock:{execution_id}"
|
||||
|
||||
lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL)
|
||||
|
||||
if not lock_acquired:
|
||||
existing_instance = await redis.get(run_lock_key)
|
||||
if existing_instance:
|
||||
logger.info(f"Workflow execution {execution_id} is already being processed by instance {existing_instance.decode() if isinstance(existing_instance, bytes) else existing_instance}. Skipping duplicate execution.")
|
||||
return
|
||||
else:
|
||||
lock_acquired = await redis.set(run_lock_key, instance_id, nx=True, ex=redis.REDIS_KEY_TTL)
|
||||
if not lock_acquired:
|
||||
logger.info(f"Workflow execution {execution_id} is already being processed by another instance. Skipping duplicate execution.")
|
||||
return
|
||||
|
||||
sentry_sdk.set_tag("workflow_id", workflow_id)
|
||||
sentry_sdk.set_tag("execution_id", execution_id)
|
||||
|
||||
logger.info(f"Starting background workflow execution: {execution_id} for workflow: {workflow_name} (Instance: {instance_id})")
|
||||
logger.info(f"🔄 Triggered by: {triggered_by}")
|
||||
|
||||
client = await db.client
|
||||
start_time = datetime.now(timezone.utc)
|
||||
total_responses = 0
|
||||
pubsub = None
|
||||
stop_checker = None
|
||||
stop_signal_received = False
|
||||
|
||||
# Define Redis keys and channels - use agent_run pattern if agent_run_id provided for frontend compatibility
|
||||
if agent_run_id:
|
||||
response_list_key = f"agent_run:{agent_run_id}:responses"
|
||||
response_channel = f"agent_run:{agent_run_id}:new_response"
|
||||
instance_control_channel = f"agent_run:{agent_run_id}:control:{instance_id}"
|
||||
global_control_channel = f"agent_run:{agent_run_id}:control"
|
||||
instance_active_key = f"active_run:{instance_id}:{agent_run_id}"
|
||||
else:
|
||||
# Fallback to workflow execution pattern
|
||||
response_list_key = f"workflow_execution:{execution_id}:responses"
|
||||
response_channel = f"workflow_execution:{execution_id}:new_response"
|
||||
instance_control_channel = f"workflow_execution:{execution_id}:control:{instance_id}"
|
||||
global_control_channel = f"workflow_execution:{execution_id}:control"
|
||||
instance_active_key = f"active_workflow:{instance_id}:{execution_id}"
|
||||
|
||||
async def check_for_stop_signal():
|
||||
nonlocal stop_signal_received
|
||||
if not pubsub: return
|
||||
try:
|
||||
while not stop_signal_received:
|
||||
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.5)
|
||||
if message and message.get("type") == "message":
|
||||
data = message.get("data")
|
||||
if isinstance(data, bytes): data = data.decode('utf-8')
|
||||
if data == "STOP":
|
||||
logger.info(f"Received STOP signal for workflow execution {execution_id} (Instance: {instance_id})")
|
||||
stop_signal_received = True
|
||||
break
|
||||
if total_responses % 50 == 0:
|
||||
try: await redis.expire(instance_active_key, redis.REDIS_KEY_TTL)
|
||||
except Exception as ttl_err: logger.warning(f"Failed to refresh TTL for {instance_active_key}: {ttl_err}")
|
||||
await asyncio.sleep(0.1)
|
||||
except asyncio.CancelledError:
|
||||
logger.info(f"Stop signal checker cancelled for {execution_id} (Instance: {instance_id})")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in stop signal checker for {execution_id}: {e}", exc_info=True)
|
||||
stop_signal_received = True
|
||||
|
||||
try:
|
||||
pubsub = await redis.create_pubsub()
|
||||
try:
|
||||
await retry(lambda: pubsub.subscribe(instance_control_channel, global_control_channel))
|
||||
except Exception as e:
|
||||
logger.error(f"Redis failed to subscribe to control channels: {e}", exc_info=True)
|
||||
raise e
|
||||
|
||||
logger.debug(f"Subscribed to control channels: {instance_control_channel}, {global_control_channel}")
|
||||
stop_checker = asyncio.create_task(check_for_stop_signal())
|
||||
await redis.set(instance_active_key, "running", ex=redis.REDIS_KEY_TTL)
|
||||
|
||||
await client.table('workflow_executions').update({
|
||||
"status": "running",
|
||||
"started_at": start_time.isoformat()
|
||||
}).eq('id', execution_id).execute()
|
||||
|
||||
workflow = WorkflowDefinition(**workflow_definition)
|
||||
|
||||
if not thread_id:
|
||||
thread_id = str(uuid.uuid4())
|
||||
|
||||
final_status = "running"
|
||||
error_message = None
|
||||
pending_redis_operations = []
|
||||
|
||||
if deterministic:
|
||||
executor = deterministic_executor
|
||||
logger.info(f"Using deterministic executor for workflow {execution_id}")
|
||||
else:
|
||||
executor = workflow_executor
|
||||
logger.info(f"Using legacy executor for workflow {execution_id}")
|
||||
|
||||
async for response in executor.execute_workflow(
|
||||
workflow=workflow,
|
||||
variables=variables,
|
||||
thread_id=thread_id,
|
||||
project_id=project_id
|
||||
):
|
||||
if stop_signal_received:
|
||||
logger.info(f"Workflow execution {execution_id} stopped by signal.")
|
||||
final_status = "stopped"
|
||||
break
|
||||
|
||||
response_json = json.dumps(response)
|
||||
pending_redis_operations.append(asyncio.create_task(redis.rpush(response_list_key, response_json)))
|
||||
pending_redis_operations.append(asyncio.create_task(redis.publish(response_channel, "new")))
|
||||
total_responses += 1
|
||||
|
||||
if response.get('type') == 'workflow_status':
|
||||
status_val = response.get('status')
|
||||
if status_val in ['completed', 'failed', 'stopped']:
|
||||
logger.info(f"Workflow execution {execution_id} finished via status message: {status_val}")
|
||||
final_status = status_val
|
||||
if status_val == 'failed' or status_val == 'stopped':
|
||||
error_message = response.get('error', f"Workflow ended with status: {status_val}")
|
||||
break
|
||||
|
||||
if final_status == "running":
|
||||
final_status = "completed"
|
||||
duration = (datetime.now(timezone.utc) - start_time).total_seconds()
|
||||
logger.info(f"Workflow execution {execution_id} completed normally (duration: {duration:.2f}s, responses: {total_responses})")
|
||||
completion_message = {"type": "workflow_status", "status": "completed", "message": "Workflow execution completed successfully"}
|
||||
await redis.rpush(response_list_key, json.dumps(completion_message))
|
||||
await redis.publish(response_channel, "new")
|
||||
|
||||
await update_workflow_execution_status(client, execution_id, final_status, error=error_message, agent_run_id=agent_run_id)
|
||||
|
||||
control_signal = "END_STREAM" if final_status == "completed" else "ERROR" if final_status == "failed" else "STOP"
|
||||
try:
|
||||
await redis.publish(global_control_channel, control_signal)
|
||||
logger.debug(f"Published final control signal '{control_signal}' to {global_control_channel}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to publish final control signal {control_signal}: {str(e)}")
|
||||
|
||||
except Exception as e:
|
||||
error_message = str(e)
|
||||
traceback_str = traceback.format_exc()
|
||||
duration = (datetime.now(timezone.utc) - start_time).total_seconds()
|
||||
logger.error(f"Error in workflow execution {execution_id} after {duration:.2f}s: {error_message}\n{traceback_str} (Instance: {instance_id})")
|
||||
final_status = "failed"
|
||||
|
||||
error_response = {"type": "workflow_status", "status": "error", "message": error_message}
|
||||
try:
|
||||
await redis.rpush(response_list_key, json.dumps(error_response))
|
||||
await redis.publish(response_channel, "new")
|
||||
except Exception as redis_err:
|
||||
logger.error(f"Failed to push error response to Redis for {execution_id}: {redis_err}")
|
||||
|
||||
await update_workflow_execution_status(client, execution_id, "failed", error=f"{error_message}\n{traceback_str}", agent_run_id=agent_run_id)
|
||||
try:
|
||||
await redis.publish(global_control_channel, "ERROR")
|
||||
logger.debug(f"Published ERROR signal to {global_control_channel}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to publish ERROR signal: {str(e)}")
|
||||
|
||||
finally:
|
||||
if stop_checker and not stop_checker.done():
|
||||
stop_checker.cancel()
|
||||
try: await stop_checker
|
||||
except asyncio.CancelledError: pass
|
||||
except Exception as e: logger.warning(f"Error during stop_checker cancellation: {e}")
|
||||
|
||||
if pubsub:
|
||||
try:
|
||||
await pubsub.unsubscribe()
|
||||
await pubsub.close()
|
||||
logger.debug(f"Closed pubsub connection for {execution_id}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing pubsub for {execution_id}: {str(e)}")
|
||||
|
||||
await _cleanup_redis_response_list(execution_id, agent_run_id)
|
||||
await _cleanup_redis_instance_key(execution_id, agent_run_id)
|
||||
await _cleanup_redis_run_lock(execution_id)
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(asyncio.gather(*pending_redis_operations), timeout=30.0)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(f"Timeout waiting for pending Redis operations for {execution_id}")
|
||||
|
||||
logger.info(f"Workflow execution background task fully completed for: {execution_id} (Instance: {instance_id}) with final status: {final_status}")
|
||||
|
||||
async def update_workflow_execution_status(client, execution_id: str, status: str, error: Optional[str] = None, agent_run_id: Optional[str] = None):
|
||||
"""Update workflow execution status in database."""
|
||||
try:
|
||||
update_data = {
|
||||
"status": status,
|
||||
"completed_at": datetime.now(timezone.utc).isoformat() if status in ['completed', 'failed', 'stopped'] else None,
|
||||
"error": error
|
||||
}
|
||||
|
||||
await client.table('workflow_executions').update(update_data).eq('id', execution_id).execute()
|
||||
logger.info(f"Updated workflow execution {execution_id} status to {status}")
|
||||
|
||||
# Also update agent_runs table if agent_run_id provided (for frontend streaming compatibility)
|
||||
if agent_run_id:
|
||||
await client.table('agent_runs').update(update_data).eq('id', agent_run_id).execute()
|
||||
logger.info(f"Updated agent run {agent_run_id} status to {status}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update workflow execution status: {e}")
|
||||
|
||||
async def _cleanup_redis_response_list(execution_id: str, agent_run_id: Optional[str] = None):
|
||||
"""Set TTL on workflow execution response list."""
|
||||
try:
|
||||
if agent_run_id:
|
||||
response_list_key = f"agent_run:{agent_run_id}:responses"
|
||||
else:
|
||||
response_list_key = f"workflow_execution:{execution_id}:responses"
|
||||
await redis.expire(response_list_key, redis.REDIS_KEY_TTL)
|
||||
logger.debug(f"Set TTL on {response_list_key}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to set TTL on response list for {execution_id}: {e}")
|
||||
|
||||
async def _cleanup_redis_instance_key(execution_id: str, agent_run_id: Optional[str] = None):
|
||||
"""Remove instance-specific active run key."""
|
||||
try:
|
||||
if agent_run_id:
|
||||
instance_active_key = f"active_run:{instance_id}:{agent_run_id}"
|
||||
else:
|
||||
instance_active_key = f"active_workflow:{instance_id}:{execution_id}"
|
||||
await redis.delete(instance_active_key)
|
||||
logger.debug(f"Cleaned up instance key {instance_active_key}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to clean up instance key for {execution_id}: {e}")
|
||||
|
||||
async def _cleanup_redis_run_lock(execution_id: str):
|
||||
"""Remove workflow execution lock."""
|
||||
try:
|
||||
run_lock_key = f"workflow_run_lock:{execution_id}"
|
||||
await redis.delete(run_lock_key)
|
||||
logger.debug(f"Cleaned up run lock {run_lock_key}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to clean up run lock for {execution_id}: {e}")
|
|
@ -403,7 +403,7 @@ async def execute_scheduled_workflow(
|
|||
workflow_dict['updated_at'] = workflow_dict['updated_at'].isoformat()
|
||||
|
||||
# Send workflow to background worker
|
||||
from run_workflow_background import run_workflow_background
|
||||
from run_agent_background import run_workflow_background
|
||||
run_workflow_background.send(
|
||||
execution_id=execution_id,
|
||||
workflow_id=workflow_id,
|
||||
|
|
|
@ -168,7 +168,7 @@ async def trigger_workflow_webhook(
|
|||
result = await _handle_generic_webhook(workflow, data)
|
||||
|
||||
if result.get("should_execute", False):
|
||||
from run_workflow_background import run_workflow_background
|
||||
from run_agent_background import run_workflow_background
|
||||
|
||||
execution_id = str(uuid.uuid4())
|
||||
|
||||
|
|
|
@ -438,7 +438,7 @@ async def execute_workflow(
|
|||
agent_run_id = agent_run.data[0]['id']
|
||||
logger.info(f"Created agent run for workflow: {agent_run_id}")
|
||||
|
||||
from run_workflow_background import run_workflow_background
|
||||
from run_agent_background import run_workflow_background
|
||||
if hasattr(workflow, 'model_dump'):
|
||||
workflow_dict = workflow.model_dump(mode='json')
|
||||
else:
|
||||
|
|
|
@ -57,7 +57,7 @@ services:
|
|||
build:
|
||||
context: ./backend
|
||||
dockerfile: Dockerfile
|
||||
command: python -m dramatiq --skip-logging run_agent_background run_workflow_background
|
||||
command: python -m dramatiq --skip-logging run_agent_background
|
||||
volumes:
|
||||
- ./backend/.env:/app/.env:ro
|
||||
env_file:
|
||||
|
|
Loading…
Reference in New Issue