suna/backend/triggers/provider_service.py

588 lines
23 KiB
Python
Raw Normal View History

2025-07-28 18:16:29 +08:00
import asyncio
import json
import os
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from typing import Dict, Any, Optional, List
import croniter
import pytz
2025-08-14 05:19:37 +08:00
import httpx
from services.supabase import DBConnection
2025-07-28 18:16:29 +08:00
from services.supabase import DBConnection
from utils.logger import logger
from utils.config import config, EnvMode
from .trigger_service import Trigger, TriggerEvent, TriggerResult, TriggerType
class TriggerProvider(ABC):
def __init__(self, provider_id: str, trigger_type: TriggerType):
self.provider_id = provider_id
self.trigger_type = trigger_type
@abstractmethod
async def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
pass
@abstractmethod
async def setup_trigger(self, trigger: Trigger) -> bool:
pass
@abstractmethod
async def teardown_trigger(self, trigger: Trigger) -> bool:
pass
@abstractmethod
async def process_event(self, trigger: Trigger, event: TriggerEvent) -> TriggerResult:
pass
2025-08-14 05:19:37 +08:00
# Optional override for providers that manage remote trigger instances
async def delete_remote_trigger(self, trigger: Trigger) -> bool:
return True
2025-07-28 18:16:29 +08:00
class ScheduleProvider(TriggerProvider):
def __init__(self):
super().__init__("schedule", TriggerType.SCHEDULE)
# This should point to your backend base URL since Supabase Cron will POST to backend
self._webhook_base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000")
self._db = DBConnection()
2025-07-28 18:16:29 +08:00
async def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
if 'cron_expression' not in config:
raise ValueError("cron_expression is required for scheduled triggers")
execution_type = config.get('execution_type', 'agent')
if execution_type not in ['agent', 'workflow']:
raise ValueError("execution_type must be either 'agent' or 'workflow'")
if execution_type == 'agent' and 'agent_prompt' not in config:
raise ValueError("agent_prompt is required for agent execution")
elif execution_type == 'workflow' and 'workflow_id' not in config:
raise ValueError("workflow_id is required for workflow execution")
user_timezone = config.get('timezone', 'UTC')
if user_timezone != 'UTC':
try:
pytz.timezone(user_timezone)
except pytz.UnknownTimeZoneError:
raise ValueError(f"Invalid timezone: {user_timezone}")
try:
croniter.croniter(config['cron_expression'])
except Exception as e:
raise ValueError(f"Invalid cron expression: {str(e)}")
return config
async def setup_trigger(self, trigger: Trigger) -> bool:
try:
webhook_url = f"{self._webhook_base_url}/api/triggers/{trigger.trigger_id}/webhook"
cron_expression = trigger.config['cron_expression']
execution_type = trigger.config.get('execution_type', 'agent')
user_timezone = trigger.config.get('timezone', 'UTC')
if user_timezone != 'UTC':
cron_expression = self._convert_cron_to_utc(cron_expression, user_timezone)
payload = {
"trigger_id": trigger.trigger_id,
"agent_id": trigger.agent_id,
"execution_type": execution_type,
"agent_prompt": trigger.config.get('agent_prompt'),
"workflow_id": trigger.config.get('workflow_id'),
"workflow_input": trigger.config.get('workflow_input', {}),
"timestamp": datetime.now(timezone.utc).isoformat()
}
headers: Dict[str, Any] = {
2025-07-28 18:16:29 +08:00
"Content-Type": "application/json",
"X-Trigger-Source": "schedule"
}
# Include simple shared secret header for backend auth
secret = os.getenv("TRIGGER_WEBHOOK_SECRET")
if secret:
headers["X-Trigger-Secret"] = secret
2025-07-28 18:16:29 +08:00
if config.ENV_MODE == EnvMode.STAGING:
vercel_bypass_key = os.getenv("VERCEL_PROTECTION_BYPASS_KEY", "")
if vercel_bypass_key:
headers["X-Vercel-Protection-Bypass"] = vercel_bypass_key
# Supabase Cron job names are case-sensitive; we keep a stable name per trigger
job_name = f"trigger_{trigger.trigger_id}"
# Schedule via Supabase Cron RPC helper
client = await self._db.client
try:
result = await client.rpc(
"schedule_trigger_http",
{
"job_name": job_name,
"schedule": cron_expression,
"url": webhook_url,
"headers": headers,
"body": payload,
"timeout_ms": 8000,
},
).execute()
except Exception as rpc_err:
logger.error(f"Failed to schedule Supabase Cron job via RPC: {rpc_err}")
return False
trigger.config['cron_job_name'] = job_name
try:
trigger.config['cron_job_id'] = result.data
except Exception:
trigger.config['cron_job_id'] = None
logger.info(f"Created Supabase Cron job '{job_name}' for trigger {trigger.trigger_id}")
2025-07-28 18:16:29 +08:00
return True
except Exception as e:
logger.error(f"Failed to setup Supabase Cron schedule for trigger {trigger.trigger_id}: {e}")
2025-07-28 18:16:29 +08:00
return False
async def teardown_trigger(self, trigger: Trigger) -> bool:
try:
job_name = trigger.config.get('cron_job_name') or f"trigger_{trigger.trigger_id}"
client = await self._db.client
try:
await client.rpc(
"unschedule_job_by_name",
{"job_name": job_name},
).execute()
logger.info(f"Unschedule requested for Supabase Cron job '{job_name}' (trigger {trigger.trigger_id})")
return True
except Exception as rpc_err:
logger.warning(f"Failed to unschedule job '{job_name}' via RPC: {rpc_err}")
return False
2025-07-28 18:16:29 +08:00
except Exception as e:
logger.error(f"Failed to teardown Supabase Cron schedule for trigger {trigger.trigger_id}: {e}")
2025-07-28 18:16:29 +08:00
return False
async def process_event(self, trigger: Trigger, event: TriggerEvent) -> TriggerResult:
try:
raw_data = event.raw_data
execution_type = raw_data.get('execution_type', 'agent')
execution_variables = {
'scheduled_time': raw_data.get('timestamp'),
'trigger_id': event.trigger_id,
'agent_id': event.agent_id
}
if execution_type == 'workflow':
workflow_id = raw_data.get('workflow_id')
workflow_input = raw_data.get('workflow_input', {})
if not workflow_id:
raise ValueError("workflow_id is required for workflow execution")
return TriggerResult(
success=True,
should_execute_workflow=True,
workflow_id=workflow_id,
workflow_input=workflow_input,
execution_variables=execution_variables
)
else:
agent_prompt = raw_data.get('agent_prompt')
if not agent_prompt:
raise ValueError("agent_prompt is required for agent execution")
return TriggerResult(
success=True,
should_execute_agent=True,
agent_prompt=agent_prompt,
execution_variables=execution_variables
)
except Exception as e:
return TriggerResult(
success=False,
error_message=f"Error processing schedule event: {str(e)}"
)
def _convert_cron_to_utc(self, cron_expression: str, user_timezone: str) -> str:
try:
parts = cron_expression.split()
if len(parts) != 5:
return cron_expression
minute, hour, day, month, weekday = parts
if minute.startswith('*/') and hour == '*':
return cron_expression
if hour == '*' or minute == '*':
return cron_expression
try:
user_tz = pytz.timezone(user_timezone)
utc_tz = pytz.UTC
now = datetime.now(user_tz)
if hour.isdigit() and minute.isdigit():
user_time = user_tz.localize(datetime(now.year, now.month, now.day, int(hour), int(minute)))
utc_time = user_time.astimezone(utc_tz)
return f"{utc_time.minute} {utc_time.hour} {day} {month} {weekday}"
except Exception as e:
logger.warning(f"Failed to convert timezone for cron expression: {e}")
return cron_expression
except Exception as e:
logger.error(f"Error converting cron expression to UTC: {e}")
return cron_expression
class WebhookProvider(TriggerProvider):
def __init__(self):
super().__init__("webhook", TriggerType.WEBHOOK)
async def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
return config
async def setup_trigger(self, trigger: Trigger) -> bool:
return True
async def teardown_trigger(self, trigger: Trigger) -> bool:
return True
async def process_event(self, trigger: Trigger, event: TriggerEvent) -> TriggerResult:
try:
execution_variables = {
'webhook_data': event.raw_data,
'trigger_id': event.trigger_id,
'agent_id': event.agent_id
}
agent_prompt = f"Process webhook data: {json.dumps(event.raw_data)}"
return TriggerResult(
success=True,
should_execute_agent=True,
agent_prompt=agent_prompt,
execution_variables=execution_variables
)
except Exception as e:
return TriggerResult(
success=False,
error_message=f"Error processing webhook event: {str(e)}"
)
class ProviderService:
def __init__(self, db_connection: DBConnection):
self._db = db_connection
self._providers: Dict[str, TriggerProvider] = {}
self._initialize_providers()
def _initialize_providers(self):
self._providers["schedule"] = ScheduleProvider()
self._providers["webhook"] = WebhookProvider()
2025-08-13 05:04:21 +08:00
self._providers["composio"] = ComposioEventProvider()
2025-07-28 18:16:29 +08:00
async def get_available_providers(self) -> List[Dict[str, Any]]:
providers = []
for provider_id, provider in self._providers.items():
provider_info = {
"provider_id": provider_id,
"name": provider_id.title(),
"description": f"{provider_id.title()} trigger provider",
"trigger_type": provider.trigger_type.value,
"webhook_enabled": True,
"config_schema": self._get_provider_schema(provider_id)
}
providers.append(provider_info)
return providers
def _get_provider_schema(self, provider_id: str) -> Dict[str, Any]:
if provider_id == "schedule":
return {
"type": "object",
"properties": {
"cron_expression": {
"type": "string",
"description": "Cron expression for scheduling"
},
"execution_type": {
"type": "string",
"enum": ["agent", "workflow"],
"description": "Type of execution"
},
"agent_prompt": {
"type": "string",
"description": "Prompt for agent execution"
},
"workflow_id": {
"type": "string",
"description": "ID of workflow to execute"
},
"workflow_input": {
"type": "object",
"description": "JSON input variables for the selected workflow/playbook",
"additionalProperties": True
},
2025-07-28 18:16:29 +08:00
"timezone": {
"type": "string",
"description": "Timezone for cron expression"
}
},
"required": ["cron_expression", "execution_type"]
}
elif provider_id == "webhook":
return {
"type": "object",
"properties": {
"webhook_secret": {
"type": "string",
"description": "Secret for webhook validation"
}
},
"required": []
}
2025-08-13 05:04:21 +08:00
elif provider_id == "composio":
return {
"type": "object",
"properties": {
"composio_trigger_id": {
"type": "string",
"description": "Composio trigger instance ID (nano id from payload.id)"
},
"trigger_slug": {
"type": "string",
"description": "Composio trigger slug (e.g., GITHUB_COMMIT_EVENT)"
},
"execution_type": {
"type": "string",
"enum": ["agent", "workflow"],
"description": "How to route the event"
},
"agent_prompt": {
"type": "string",
"description": "Prompt template for agent execution"
},
"workflow_id": {
"type": "string",
"description": "Workflow ID to execute for workflow routing"
},
"workflow_input": {
"type": "object",
"description": "Optional static input object for workflow execution",
"additionalProperties": True
}
},
"required": ["composio_trigger_id", "execution_type"]
}
2025-07-28 18:16:29 +08:00
return {"type": "object", "properties": {}, "required": []}
async def validate_trigger_config(self, provider_id: str, config: Dict[str, Any]) -> Dict[str, Any]:
provider = self._providers.get(provider_id)
if not provider:
raise ValueError(f"Unknown provider: {provider_id}")
return await provider.validate_config(config)
async def get_provider_trigger_type(self, provider_id: str) -> TriggerType:
provider = self._providers.get(provider_id)
if not provider:
raise ValueError(f"Unknown provider: {provider_id}")
return provider.trigger_type
async def setup_trigger(self, trigger: Trigger) -> bool:
provider = self._providers.get(trigger.provider_id)
if not provider:
logger.error(f"Unknown provider: {trigger.provider_id}")
return False
return await provider.setup_trigger(trigger)
async def teardown_trigger(self, trigger: Trigger) -> bool:
provider = self._providers.get(trigger.provider_id)
if not provider:
logger.error(f"Unknown provider: {trigger.provider_id}")
return False
return await provider.teardown_trigger(trigger)
2025-08-14 05:19:37 +08:00
async def delete_remote_trigger(self, trigger: Trigger) -> bool:
provider = self._providers.get(trigger.provider_id)
if not provider:
logger.error(f"Unknown provider: {trigger.provider_id}")
return False
try:
return await provider.delete_remote_trigger(trigger)
except Exception as e:
logger.warning(f"Provider delete_remote_trigger failed for {trigger.provider_id}: {e}")
return False
2025-07-28 18:16:29 +08:00
async def process_event(self, trigger: Trigger, event: TriggerEvent) -> TriggerResult:
provider = self._providers.get(trigger.provider_id)
if not provider:
return TriggerResult(
success=False,
error_message=f"Unknown provider: {trigger.provider_id}"
)
return await provider.process_event(trigger, event)
2025-08-13 05:04:21 +08:00
class ComposioEventProvider(TriggerProvider):
def __init__(self):
2025-08-13 07:03:52 +08:00
# Use WEBHOOK to match existing DB enum (no migration needed)
super().__init__("composio", TriggerType.WEBHOOK)
2025-08-14 05:19:37 +08:00
self._api_base = os.getenv("COMPOSIO_API_BASE", "https://api.composio.dev")
self._api_key = os.getenv("COMPOSIO_API_KEY", "")
def _headers(self) -> Dict[str, str]:
return {"x-api-key": self._api_key, "Content-Type": "application/json"}
2025-08-13 05:04:21 +08:00
async def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
composio_trigger_id = config.get("composio_trigger_id")
if not composio_trigger_id or not isinstance(composio_trigger_id, str):
raise ValueError("composio_trigger_id is required and must be a string")
execution_type = config.get("execution_type", "agent")
if execution_type not in ["agent", "workflow"]:
raise ValueError("execution_type must be either 'agent' or 'workflow'")
if execution_type == "workflow" and not config.get("workflow_id"):
raise ValueError("workflow_id is required for workflow execution")
return config
async def setup_trigger(self, trigger: Trigger) -> bool:
2025-08-14 05:19:37 +08:00
# Re-enable the Composio trigger instance if present
2025-08-13 05:04:21 +08:00
try:
2025-08-14 05:19:37 +08:00
trigger_id = trigger.config.get("composio_trigger_id")
if not trigger_id:
return True
if not self._api_key:
return True
url = f"{self._api_base}/api/v3/trigger_instances/manage/{trigger_id}"
payload_candidates: List[Dict[str, Any]] = [
{"status": "enabled"},
{"enabled": True},
]
async with httpx.AsyncClient(timeout=10) as client:
for body in payload_candidates:
try:
resp = await client.patch(url, headers=self._headers(), json=body)
if resp.status_code in (200, 204):
return True
except Exception:
continue
2025-08-13 05:04:21 +08:00
return True
except Exception:
2025-08-14 05:19:37 +08:00
return True
2025-08-13 05:04:21 +08:00
async def teardown_trigger(self, trigger: Trigger) -> bool:
2025-08-14 05:19:37 +08:00
# Disable the Composio trigger instance so it stops sending webhooks
try:
trigger_id = trigger.config.get("composio_trigger_id")
if not trigger_id:
return True
if not self._api_key:
return True
url = f"{self._api_base}/api/v3/trigger_instances/manage/{trigger_id}"
payload_candidates: List[Dict[str, Any]] = [
{"status": "disabled"},
{"enabled": False},
]
async with httpx.AsyncClient(timeout=10) as client:
for body in payload_candidates:
try:
resp = await client.patch(url, headers=self._headers(), json=body)
if resp.status_code in (200, 204):
return True
except Exception:
continue
return True
except Exception:
return True
async def delete_remote_trigger(self, trigger: Trigger) -> bool:
# Permanently remove the remote Composio trigger instance
try:
trigger_id = trigger.config.get("composio_trigger_id")
if not trigger_id:
return True
if not self._api_key:
return True
url = f"{self._api_base}/api/v3/trigger_instances/manage/{trigger_id}"
async with httpx.AsyncClient(timeout=10) as client:
try:
resp = await client.delete(url, headers=self._headers())
if resp.status_code in (200, 204):
return True
except Exception:
return False
return False
except Exception:
return False
2025-08-13 05:04:21 +08:00
async def process_event(self, trigger: Trigger, event: TriggerEvent) -> TriggerResult:
try:
raw = event.raw_data or {}
trigger_slug = raw.get("triggerSlug") or trigger.config.get("trigger_slug")
provider_event_id = raw.get("eventId") or raw.get("payload", {}).get("id") or raw.get("id")
connected_account_id = None
metadata = raw.get("metadata") or {}
if isinstance(metadata, dict):
connected = metadata.get("connectedAccount") or {}
if isinstance(connected, dict):
connected_account_id = connected.get("id")
execution_variables = {
"provider": "composio",
"trigger_slug": trigger_slug,
"composio_trigger_id": raw.get("id") or trigger.config.get("composio_trigger_id"),
"provider_event_id": provider_event_id,
"connected_account_id": connected_account_id,
"received_at": datetime.now(timezone.utc).isoformat(),
}
route = trigger.config.get("execution_type", "agent")
if route == "workflow":
workflow_id = trigger.config.get("workflow_id")
workflow_input = trigger.config.get("workflow_input", {})
return TriggerResult(
success=True,
should_execute_workflow=True,
workflow_id=workflow_id,
workflow_input=workflow_input,
execution_variables=execution_variables,
)
else:
# Agent routing
agent_prompt = trigger.config.get("agent_prompt")
if not agent_prompt:
# Minimal default prompt
agent_prompt = f"Process Composio event {trigger_slug or ''}: {json.dumps(raw.get('payload', raw))[:800]}"
return TriggerResult(
success=True,
should_execute_agent=True,
agent_prompt=agent_prompt,
execution_variables=execution_variables,
)
except Exception as e:
return TriggerResult(success=False, error_message=f"Error processing Composio event: {str(e)}")
2025-07-28 18:16:29 +08:00
def get_provider_service(db_connection: DBConnection) -> ProviderService:
return ProviderService(db_connection)