diff --git a/backend/triggers/api.py b/backend/triggers/api.py index 7a4c5089..f28ffd88 100644 --- a/backend/triggers/api.py +++ b/backend/triggers/api.py @@ -80,7 +80,7 @@ async def get_services() -> tuple[TriggerService, TriggerExecutionService, Provi async def verify_agent_access(agent_id: str, user_id: str): client = await db.client - result = await client.table('agents').select('agent_id').eq('agent_id', agent_id).eq('created_by', user_id).execute() + result = await client.table('agents').select('agent_id').eq('agent_id', agent_id).eq('account_id', user_id).execute() if not result.data: raise HTTPException(status_code=404, detail="Agent not found or access denied") @@ -362,6 +362,9 @@ async def trigger_webhook( result = await trigger_svc.process_trigger_event(trigger_id, raw_data) + logger.info(f"Webhook trigger result: success={result.success}, should_execute_agent={result.should_execute_agent}, should_execute_workflow={result.should_execute_workflow}") + logger.info(f"Trigger result details: workflow_id={result.workflow_id}, agent_prompt={result.agent_prompt}") + if not result.success: return JSONResponse( status_code=400, @@ -371,6 +374,8 @@ async def trigger_webhook( if result.should_execute_agent or result.should_execute_workflow: trigger = await trigger_svc.get_trigger(trigger_id) if trigger: + logger.info(f"Executing agent {trigger.agent_id} for trigger {trigger_id}") + from .domain.entities import TriggerEvent event = TriggerEvent( trigger_id=trigger_id, @@ -385,15 +390,29 @@ async def trigger_webhook( trigger_event=event ) + logger.info(f"Agent execution result: {execution_result}") + return JSONResponse(content={ "success": True, - "message": "Trigger processed successfully", - "execution": execution_result + "message": "Trigger processed and agent execution started", + "execution": execution_result, + "trigger_result": { + "should_execute_agent": result.should_execute_agent, + "should_execute_workflow": result.should_execute_workflow, + "agent_prompt": result.agent_prompt + } }) + else: + logger.warning(f"Trigger {trigger_id} not found for execution") + logger.info(f"Webhook processed but no execution needed (should_execute_agent={result.should_execute_agent})") return JSONResponse(content={ "success": True, - "message": "Trigger processed successfully" + "message": "Trigger processed successfully (no execution needed)", + "trigger_result": { + "should_execute_agent": result.should_execute_agent, + "should_execute_workflow": result.should_execute_workflow + } }) except Exception as e: diff --git a/backend/triggers/infrastructure/providers/schedule_provider.py b/backend/triggers/infrastructure/providers/schedule_provider.py index 322b0604..50616bb1 100644 --- a/backend/triggers/infrastructure/providers/schedule_provider.py +++ b/backend/triggers/infrastructure/providers/schedule_provider.py @@ -72,7 +72,7 @@ class ScheduleTriggerProvider(TriggerProvider): } schedule_id = await asyncio.to_thread( - self._qstash.schedules.create, + self._qstash.schedule.create, destination=webhook_url, cron=cron_expression, body=json.dumps(payload), @@ -94,13 +94,13 @@ class ScheduleTriggerProvider(TriggerProvider): return True try: - schedules = await asyncio.to_thread(self._qstash.schedules.list) + schedules = await asyncio.to_thread(self._qstash.schedule.list) webhook_url = f"{self._webhook_base_url}/api/triggers/{trigger.trigger_id}/webhook" for schedule in schedules: if schedule.get('destination') == webhook_url: - await asyncio.to_thread(self._qstash.schedules.delete, schedule['scheduleId']) + await asyncio.to_thread(self._qstash.schedule.delete, schedule['scheduleId']) logger.info(f"Deleted QStash schedule {schedule['scheduleId']} for trigger {trigger.trigger_id}") break @@ -160,7 +160,7 @@ class ScheduleTriggerProvider(TriggerProvider): try: webhook_url = f"{self._webhook_base_url}/api/triggers/{trigger.trigger_id}/webhook" - schedules = await asyncio.to_thread(self._qstash.schedules.list) + schedules = await asyncio.to_thread(self._qstash.schedule.list) for schedule in schedules: if schedule.get('destination') == webhook_url: @@ -192,7 +192,7 @@ class ScheduleTriggerProvider(TriggerProvider): return [] try: - return await asyncio.to_thread(self._qstash.schedules.list) + return await asyncio.to_thread(self._qstash.schedule.list) except Exception as e: logger.error(f"Failed to list QStash schedules: {e}") return [] \ No newline at end of file diff --git a/backend/triggers/infrastructure/providers/webhook_provider.py b/backend/triggers/infrastructure/providers/webhook_provider.py index df470741..aee4f650 100644 --- a/backend/triggers/infrastructure/providers/webhook_provider.py +++ b/backend/triggers/infrastructure/providers/webhook_provider.py @@ -32,13 +32,21 @@ class GenericWebhookProvider(TriggerProvider): agent_prompt = self._create_agent_prompt(event.raw_data, execution_variables.variables) - return TriggerResult( + # Force agent execution only - never workflows for webhook triggers + result = TriggerResult( success=True, should_execute_agent=True, + should_execute_workflow=False, agent_prompt=agent_prompt, execution_variables=execution_variables ) + # Double-check the values are set correctly + assert result.should_execute_agent == True + assert result.should_execute_workflow == False + + return result + except Exception as e: return TriggerResult( success=False, diff --git a/backend/triggers/repositories/implementations.py b/backend/triggers/repositories/implementations.py index 017857fe..7de32522 100644 --- a/backend/triggers/repositories/implementations.py +++ b/backend/triggers/repositories/implementations.py @@ -16,15 +16,17 @@ class SupabaseTriggerRepository(TriggerRepository): async def save(self, trigger: Trigger) -> None: try: client = await self._db.client + + config_with_provider = {**trigger.config.config, "provider_id": trigger.provider_id} + await client.table('agent_triggers').insert({ 'trigger_id': trigger.trigger_id, 'agent_id': trigger.agent_id, - 'provider_id': trigger.provider_id, 'trigger_type': trigger.trigger_type.value, 'name': trigger.config.name, 'description': trigger.config.description, 'is_active': trigger.config.is_active, - 'config': trigger.config.config, + 'config': config_with_provider, 'created_at': trigger.metadata.created_at.isoformat(), 'updated_at': trigger.metadata.updated_at.isoformat() }).execute() @@ -64,22 +66,29 @@ class SupabaseTriggerRepository(TriggerRepository): async def find_by_provider_id(self, provider_id: str) -> List[Trigger]: try: client = await self._db.client - result = await client.table('agent_triggers').select('*').eq('provider_id', provider_id).execute() + result = await client.table('agent_triggers').select('*').execute() - return [self._map_to_trigger(data) for data in result.data] + triggers = [] + for data in result.data: + if data.get('config', {}).get('provider_id') == provider_id: + triggers.append(self._map_to_trigger(data)) + + return triggers except Exception as e: raise RepositoryError(f"Failed to find triggers by provider ID: {str(e)}") async def update(self, trigger: Trigger) -> None: try: client = await self._db.client + + config_with_provider = {**trigger.config.config, "provider_id": trigger.provider_id} + result = await client.table('agent_triggers').update({ - 'provider_id': trigger.provider_id, 'trigger_type': trigger.trigger_type.value, 'name': trigger.config.name, 'description': trigger.config.description, 'is_active': trigger.config.is_active, - 'config': trigger.config.config, + 'config': config_with_provider, 'updated_at': trigger.metadata.updated_at.isoformat() }).eq('trigger_id', trigger.trigger_id).execute() @@ -123,10 +132,15 @@ class SupabaseTriggerRepository(TriggerRepository): agent_id=data['agent_id'] ) + config_data = data.get('config', {}) + provider_id = config_data.get('provider_id', data['trigger_type']) + + clean_config = {k: v for k, v in config_data.items() if k != 'provider_id'} + config = TriggerConfig( name=data['name'], description=data.get('description'), - config=data.get('config', {}), + config=clean_config, is_active=data.get('is_active', True) ) @@ -139,7 +153,7 @@ class SupabaseTriggerRepository(TriggerRepository): return Trigger( identity=identity, - provider_id=data['provider_id'], + provider_id=provider_id, trigger_type=trigger_type, config=config, metadata=metadata @@ -165,7 +179,7 @@ class SupabaseTriggerEventLogRepository(TriggerEventLogRepository): 'log_id': log_id, 'trigger_id': event.trigger_id, 'agent_id': event.agent_id, - 'trigger_type': event.trigger_type.value, + 'trigger_type': event.trigger_type.value if hasattr(event.trigger_type, 'value') else str(event.trigger_type), 'event_data': event.raw_data, 'success': result.success, 'should_execute_agent': result.should_execute_agent, @@ -173,7 +187,7 @@ class SupabaseTriggerEventLogRepository(TriggerEventLogRepository): 'agent_prompt': result.agent_prompt, 'workflow_id': result.workflow_id, 'workflow_input': result.workflow_input, - 'execution_variables': result.execution_variables.variables, + 'execution_variables': result.execution_variables.variables if result.execution_variables else {}, 'error_message': result.error_message, 'metadata': result.metadata, 'execution_time_ms': execution_time_ms, diff --git a/backend/triggers/services/execution_service.py b/backend/triggers/services/execution_service.py index 5002c928..26093b52 100644 --- a/backend/triggers/services/execution_service.py +++ b/backend/triggers/services/execution_service.py @@ -22,11 +22,27 @@ class TriggerExecutionService: trigger_event: TriggerEvent ) -> Dict[str, Any]: try: + logger.info(f"Trigger execution: should_execute_agent={trigger_result.should_execute_agent}, should_execute_workflow={trigger_result.should_execute_workflow}") + logger.info(f"Trigger result type: {type(trigger_result)}") + logger.info(f"Workflow ID: {trigger_result.workflow_id}") + + # FORCE AGENT EXECUTION ONLY - disable workflows for webhook triggers + if trigger_event.trigger_type.value == "webhook": + logger.info(f"Webhook trigger detected - forcing agent execution for agent: {agent_id}") + # Override any workflow settings for webhook triggers + trigger_result.should_execute_workflow = False + trigger_result.should_execute_agent = True + + return await self._agent_executor.execute_triggered_agent( + agent_id=agent_id, + trigger_result=trigger_result, + trigger_event=trigger_event + ) + if trigger_result.should_execute_workflow: + logger.info(f"Executing workflow: {trigger_result.workflow_id}") workflow_id = trigger_result.workflow_id workflow_input = trigger_result.workflow_input or {} - - logger.info(f"Executing workflow {workflow_id} for agent {agent_id}") return await self._workflow_executor.execute_triggered_workflow( agent_id=agent_id, workflow_id=workflow_id, @@ -35,7 +51,7 @@ class TriggerExecutionService: trigger_event=trigger_event ) else: - logger.info(f"Executing agent {agent_id}") + logger.info(f"Executing agent: {agent_id}") return await self._agent_executor.execute_triggered_agent( agent_id=agent_id, trigger_result=trigger_result, @@ -52,7 +68,6 @@ class TriggerExecutionService: class AgentExecutor: - def __init__(self, db_connection: DBConnection): self._db = db_connection @@ -77,7 +92,9 @@ class AgentExecutor: await self._create_initial_message( thread_id=thread_id, prompt=trigger_result.agent_prompt, - trigger_data=trigger_result.execution_variables.variables + trigger_data=trigger_result.execution_variables.variables, + agent_id=agent_id, + agent_config=agent_config ) agent_run_id = await self._start_agent_execution( @@ -116,22 +133,68 @@ class AgentExecutor: ) -> tuple[str, str]: client = await self._db.client + project_id = str(uuid.uuid4()) + thread_id = str(uuid.uuid4()) + account_id = agent_config.get('account_id') + + placeholder_name = f"Trigger: {agent_config.get('name', 'Agent')} - {trigger_event.trigger_id[:8]}" + project = await client.table('projects').insert({ + "project_id": project_id, + "account_id": account_id, + "name": placeholder_name, + "created_at": datetime.now(timezone.utc).isoformat() + }).execute() + logger.info(f"Created new project for trigger: {project_id}") + + try: + from sandbox.sandbox import create_sandbox, delete_sandbox + sandbox_pass = str(uuid.uuid4()) + sandbox = await create_sandbox(sandbox_pass, project_id) + sandbox_id = sandbox.id + logger.info(f"Created new sandbox {sandbox_id} for trigger project {project_id}") + + vnc_link = await sandbox.get_preview_link(6080) + website_link = await sandbox.get_preview_link(8080) + vnc_url = vnc_link.url if hasattr(vnc_link, 'url') else str(vnc_link).split("url='")[1].split("'")[0] + website_url = website_link.url if hasattr(website_link, 'url') else str(website_link).split("url='")[1].split("'")[0] + token = None + if hasattr(vnc_link, 'token'): + token = vnc_link.token + elif "token='" in str(vnc_link): + token = str(vnc_link).split("token='")[1].split("'")[0] + + update_result = await client.table('projects').update({ + 'sandbox': { + 'id': sandbox_id, + 'pass': sandbox_pass, + 'vnc_preview': vnc_url, + 'sandbox_url': website_url, + 'token': token + } + }).eq('project_id', project_id).execute() + + if not update_result.data: + logger.error(f"Failed to update trigger project {project_id} with sandbox {sandbox_id}") + try: + await delete_sandbox(sandbox_id) + except Exception as e: + logger.error(f"Error deleting sandbox: {str(e)}") + raise Exception("Database update failed") + + except Exception as e: + logger.error(f"Error creating sandbox for trigger: {str(e)}") + await client.table('projects').delete().eq('project_id', project_id).execute() + raise Exception(f"Failed to create sandbox: {str(e)}") + thread_data = { - 'thread_id': str(uuid.uuid4()), - 'agent_id': agent_id, - 'project_id': agent_config.get('project_id'), - 'title': f"Trigger: {trigger_event.trigger_id}", - 'created_at': datetime.now(timezone.utc).isoformat(), - 'metadata': { - 'trigger_id': trigger_event.trigger_id, - 'trigger_type': trigger_event.trigger_type.value, - 'created_by_trigger': True - } + "thread_id": thread_id, + "project_id": project_id, + "account_id": account_id, + "created_at": datetime.now(timezone.utc).isoformat() } thread = await client.table('threads').insert(thread_data).execute() - thread_id = thread.data[0]['thread_id'] - project_id = thread.data[0]['project_id'] + logger.info(f"Created new thread for trigger: {thread_id}") return thread_id, project_id @@ -139,20 +202,21 @@ class AgentExecutor: self, thread_id: str, prompt: str, - trigger_data: Dict[str, Any] + trigger_data: Dict[str, Any], + agent_id: str, + agent_config: Dict[str, Any] ) -> str: client = await self._db.client - + + import json + message_payload = {"role": "user", "content": prompt} message_data = { - 'message_id': str(uuid.uuid4()), - 'thread_id': thread_id, - 'role': 'user', - 'content': prompt, - 'created_at': datetime.now(timezone.utc).isoformat(), - 'metadata': { - 'trigger_data': trigger_data, - 'created_by_trigger': True - } + "message_id": str(uuid.uuid4()), + "thread_id": thread_id, + "type": "user", + "is_llm_message": True, + "content": json.dumps(message_payload), + "created_at": datetime.now(timezone.utc).isoformat() } message = await client.table('messages').insert(message_data).execute() @@ -167,14 +231,16 @@ class AgentExecutor: ) -> str: client = await self._db.client + logger.info(f"Using project {project_id} with pre-created sandbox for trigger execution") + model_name = "anthropic/claude-sonnet-4-20250514" - agent_run_data = { + agent_run = await client.table('agent_runs').insert({ "thread_id": thread_id, - "agent_id": agent_config['agent_id'], - "agent_version_id": agent_config.get('current_version_id'), "status": "running", "started_at": datetime.now(timezone.utc).isoformat(), + "agent_id": agent_config.get('agent_id') if agent_config else None, + "agent_version_id": agent_config.get('current_version_id') if agent_config else None, "metadata": { "model_name": model_name, "enable_thinking": False, @@ -183,9 +249,7 @@ class AgentExecutor: "trigger_execution": True, "trigger_variables": trigger_variables } - } - - agent_run = await client.table('agent_runs').insert(agent_run_data).execute() + }).execute() agent_run_id = agent_run.data[0]['id'] instance_id = "trigger_executor" @@ -218,7 +282,6 @@ class AgentExecutor: class WorkflowExecutor: - def __init__(self, db_connection: DBConnection): self._db = db_connection @@ -262,7 +325,9 @@ class WorkflowExecutor: thread_id=thread_id, workflow_config=workflow_config, workflow_input=workflow_input, - trigger_data=trigger_result.execution_variables.variables + trigger_data=trigger_result.execution_variables.variables, + agent_id=agent_id, + agent_config=agent_config ) return { @@ -282,7 +347,8 @@ class WorkflowExecutor: async def _get_workflow_config(self, workflow_id: str) -> Dict[str, Any]: client = await self._db.client - result = await client.table('workflows').select('*').eq('workflow_id', workflow_id).execute() + # Use agent_workflows table like in workflows.py + result = await client.table('agent_workflows').select('*').eq('id', workflow_id).execute() return result.data[0] if result.data else None async def _get_agent_config(self, agent_id: str) -> Dict[str, Any]: @@ -302,17 +368,10 @@ class WorkflowExecutor: thread_data = { 'thread_id': str(uuid.uuid4()), - 'agent_id': agent_id, + 'account_id': agent_config.get('account_id'), 'project_id': agent_config.get('project_id'), - 'title': f"Workflow: {workflow_config.get('name', workflow_id)}", - 'created_at': datetime.now(timezone.utc).isoformat(), - 'metadata': { - 'workflow_id': workflow_id, - 'trigger_id': trigger_event.trigger_id, - 'trigger_type': trigger_event.trigger_type.value, - 'created_by_trigger': True, - 'is_workflow_execution': True - } + 'is_public': False, + 'created_at': datetime.now(timezone.utc).isoformat() } thread = await client.table('threads').insert(thread_data).execute() @@ -354,7 +413,9 @@ class WorkflowExecutor: thread_id: str, workflow_config: Dict[str, Any], workflow_input: Dict[str, Any], - trigger_data: Dict[str, Any] + trigger_data: Dict[str, Any], + agent_id: str, + agent_config: Dict[str, Any] ) -> str: client = await self._db.client @@ -363,8 +424,13 @@ class WorkflowExecutor: message_data = { 'message_id': str(uuid.uuid4()), 'thread_id': thread_id, - 'role': 'user', - 'content': prompt, + 'type': 'human', + 'is_llm_message': False, + 'content': { + 'role': 'user', + 'message': prompt + }, + 'agent_id': agent_id, 'created_at': datetime.now(timezone.utc).isoformat(), 'metadata': { 'workflow_input': workflow_input, @@ -374,5 +440,8 @@ class WorkflowExecutor: } } + if agent_config.get('current_version_id'): + message_data['agent_version_id'] = agent_config['current_version_id'] + message = await client.table('messages').insert(message_data).execute() return message.data[0]['message_id'] \ No newline at end of file diff --git a/backend/triggers/services/trigger_service.py b/backend/triggers/services/trigger_service.py index d886572e..6734f11b 100644 --- a/backend/triggers/services/trigger_service.py +++ b/backend/triggers/services/trigger_service.py @@ -9,7 +9,6 @@ from ..repositories.interfaces import TriggerRepository, TriggerEventLogReposito class TriggerService: - def __init__( self, trigger_repository: TriggerRepository, @@ -180,12 +179,7 @@ class TriggerService: raw_data=raw_data ) - start_time = datetime.now() result = await self._domain_service.process_trigger_event(trigger, event) - execution_time = int((datetime.now() - start_time).total_seconds() * 1000) - - await self._event_log_repo.log_event(event, result, execution_time) - return result async def get_trigger_logs( diff --git a/frontend/src/app/api/triggers/[triggerId]/webhook/route.ts b/frontend/src/app/api/triggers/[triggerId]/webhook/route.ts new file mode 100644 index 00000000..c9f119ce --- /dev/null +++ b/frontend/src/app/api/triggers/[triggerId]/webhook/route.ts @@ -0,0 +1,66 @@ +import { NextRequest, NextResponse } from 'next/server'; + +export async function POST( + request: NextRequest, + { params }: { params: Promise<{ triggerId: string }> } +) { + try { + const { triggerId } = await params; + const body = await request.arrayBuffer(); + const headers: Record = {}; + request.headers.forEach((value, key) => { + if (!['host', 'content-length', 'transfer-encoding', 'connection'].includes(key.toLowerCase())) { + headers[key] = value; + } + }); + + const backendUrl = process.env.BACKEND_URL || 'http://localhost:8000'; + const targetUrl = `${backendUrl}/triggers/${triggerId}/webhook`; + const response = await fetch(targetUrl, { + method: 'POST', + headers: { + ...headers, + 'Content-Type': headers['content-type'] || 'application/json', + }, + body: body, + }); + + const responseData = await response.text(); + + return new NextResponse(responseData, { + status: response.status, + headers: { + 'Content-Type': response.headers.get('Content-Type') || 'application/json', + }, + }); + + } catch (error) { + return NextResponse.json( + { + error: 'Internal server error', + details: error instanceof Error ? error.message : 'Unknown error', + timestamp: new Date().toISOString() + }, + { status: 500 } + ); + } +} + +export async function GET( + request: NextRequest, + { params }: { params: Promise<{ triggerId: string }> } +) { + try { + const { triggerId } = await params; + return NextResponse.json({ + status: 'ok', + service: 'trigger-webhook-proxy', + triggerId: triggerId + }); + } catch (error) { + return NextResponse.json( + { error: 'Internal server error' }, + { status: 500 } + ); + } +} \ No newline at end of file