suna/backend/triggers/api.py

745 lines
29 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, Request, Body, Query
from fastapi.responses import JSONResponse
from typing import List, Optional, Dict, Any
from pydantic import BaseModel
import uuid
import os
from datetime import datetime
from .core import TriggerManager, TriggerConfig, ProviderDefinition, TriggerType
from .registry import trigger_registry
from services.supabase import DBConnection
from utils.auth_utils import get_current_user_id_from_jwt
from utils.logger import logger
from flags.flags import is_enabled
from .integration import AgentTriggerExecutor
from utils.config import config, EnvMode
router = APIRouter(prefix="/api/triggers", tags=["triggers"])
trigger_manager: Optional[TriggerManager] = None
db = None
def initialize(database: DBConnection):
"""Initialize the triggers API with database connection."""
global db, trigger_manager
db = database
trigger_manager = TriggerManager(db)
class TriggerCreateRequest(BaseModel):
"""Request model for creating a trigger."""
provider_id: str
name: str
description: Optional[str] = None
config: Dict[str, Any]
class TriggerUpdateRequest(BaseModel):
"""Request model for updating a trigger."""
name: Optional[str] = None
description: Optional[str] = None
config: Optional[Dict[str, Any]] = None
is_active: Optional[bool] = None
class TriggerResponse(BaseModel):
"""Response model for trigger data."""
trigger_id: str
agent_id: str
trigger_type: str
provider_id: str
name: str
description: Optional[str]
is_active: bool
webhook_url: Optional[str] = None
created_at: str
updated_at: str
class ProviderResponse(BaseModel):
"""Response model for provider information."""
provider_id: str
name: str
description: str
trigger_type: str
webhook_enabled: bool
config_schema: Dict[str, Any]
async def get_trigger_manager() -> TriggerManager:
"""Get the trigger manager instance."""
if not trigger_manager:
raise HTTPException(status_code=500, detail="Trigger system not initialized")
return trigger_manager
@router.get("/providers", response_model=List[ProviderResponse])
async def get_available_providers(
user_id: str = Depends(get_current_user_id_from_jwt)
):
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
manager = await get_trigger_manager()
await manager.load_provider_definitions()
providers = await manager.get_available_providers()
return [
ProviderResponse(
provider_id=provider.provider_id,
name=provider.name,
description=provider.description,
trigger_type=provider.trigger_type,
webhook_enabled=provider.webhook_enabled,
config_schema=provider.config_schema
)
for provider in providers
]
@router.get("/agents/{agent_id}/triggers", response_model=List[TriggerResponse])
async def get_agent_triggers(
agent_id: str,
user_id: str = Depends(get_current_user_id_from_jwt)
):
"""Get all triggers for an agent."""
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
await verify_agent_access(agent_id, user_id)
manager = await get_trigger_manager()
triggers = await manager.get_agent_triggers(agent_id)
base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000")
responses = []
for trigger in triggers:
# Ensure trigger_type is properly handled (could be enum or string)
trigger_type_str = trigger.trigger_type.value if hasattr(trigger.trigger_type, 'value') else str(trigger.trigger_type)
provider_id = trigger.config.get("provider_id", trigger_type_str)
provider = await manager.get_or_create_provider(provider_id)
webhook_url = None
if provider and provider.provider_definition and provider.provider_definition.webhook_enabled:
webhook_url = provider.get_webhook_url(trigger.trigger_id, base_url)
responses.append(TriggerResponse(
trigger_id=trigger.trigger_id,
agent_id=trigger.agent_id,
trigger_type=trigger_type_str,
provider_id=provider_id,
name=trigger.name,
description=trigger.description,
is_active=trigger.is_active,
webhook_url=webhook_url,
created_at=trigger.created_at.isoformat(),
updated_at=trigger.updated_at.isoformat()
))
return responses
@router.post("/agents/{agent_id}/triggers", response_model=TriggerResponse)
async def create_agent_trigger(
agent_id: str,
request: TriggerCreateRequest,
user_id: str = Depends(get_current_user_id_from_jwt)
):
"""Create a new trigger for an agent."""
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
await verify_agent_access(agent_id, user_id)
manager = await get_trigger_manager()
await manager.load_provider_definitions()
try:
trigger_config = await manager.create_trigger(
agent_id=agent_id,
provider_id=request.provider_id,
name=request.name,
config=request.config,
description=request.description
)
# Get webhook URL if applicable
provider = await manager.get_or_create_provider(request.provider_id)
webhook_url = None
if provider and provider.provider_definition and provider.provider_definition.webhook_enabled:
base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000")
webhook_url = provider.get_webhook_url(trigger_config.trigger_id, base_url)
trigger_type_str = trigger_config.trigger_type.value if hasattr(trigger_config.trigger_type, 'value') else str(trigger_config.trigger_type)
return TriggerResponse(
trigger_id=trigger_config.trigger_id,
agent_id=trigger_config.agent_id,
trigger_type=trigger_type_str,
provider_id=request.provider_id,
name=trigger_config.name,
description=trigger_config.description,
is_active=trigger_config.is_active,
webhook_url=webhook_url,
created_at=trigger_config.created_at.isoformat(),
updated_at=trigger_config.updated_at.isoformat()
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error creating trigger: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/{trigger_id}", response_model=TriggerResponse)
async def get_trigger(
trigger_id: str,
user_id: str = Depends(get_current_user_id_from_jwt)
):
"""Get a specific trigger."""
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
manager = await get_trigger_manager()
trigger_config = await manager.get_trigger(trigger_id)
if not trigger_config:
raise HTTPException(status_code=404, detail="Trigger not found")
# Verify agent ownership
await verify_agent_access(trigger_config.agent_id, user_id)
# Ensure trigger_type is properly handled (could be enum or string)
trigger_type_str = trigger_config.trigger_type.value if hasattr(trigger_config.trigger_type, 'value') else str(trigger_config.trigger_type)
provider_id = trigger_config.config.get("provider_id", trigger_type_str)
provider = await manager.get_or_create_provider(provider_id)
webhook_url = None
if provider and provider.provider_definition and provider.provider_definition.webhook_enabled:
base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000")
webhook_url = provider.get_webhook_url(trigger_id, base_url)
return TriggerResponse(
trigger_id=trigger_config.trigger_id,
agent_id=trigger_config.agent_id,
trigger_type=trigger_type_str,
provider_id=provider_id,
name=trigger_config.name,
description=trigger_config.description,
is_active=trigger_config.is_active,
webhook_url=webhook_url,
created_at=trigger_config.created_at.isoformat(),
updated_at=trigger_config.updated_at.isoformat()
)
@router.put("/{trigger_id}", response_model=TriggerResponse)
async def update_trigger(
trigger_id: str,
request: TriggerUpdateRequest,
user_id: str = Depends(get_current_user_id_from_jwt)
):
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
manager = await get_trigger_manager()
trigger_config = await manager.get_trigger(trigger_id)
if not trigger_config:
raise HTTPException(status_code=404, detail="Trigger not found")
await verify_agent_access(trigger_config.agent_id, user_id)
try:
updated_config = await manager.update_trigger(
trigger_id=trigger_id,
config=request.config,
name=request.name,
description=request.description,
is_active=request.is_active
)
trigger_type_str = updated_config.trigger_type.value if hasattr(updated_config.trigger_type, 'value') else str(updated_config.trigger_type)
provider_id = updated_config.config.get("provider_id", trigger_type_str)
provider = await manager.get_or_create_provider(provider_id)
webhook_url = None
if provider and provider.provider_definition and provider.provider_definition.webhook_enabled:
base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:3000")
webhook_url = provider.get_webhook_url(trigger_id, base_url)
return TriggerResponse(
trigger_id=updated_config.trigger_id,
agent_id=updated_config.agent_id,
trigger_type=trigger_type_str,
provider_id=provider_id,
name=updated_config.name,
description=updated_config.description,
is_active=updated_config.is_active,
webhook_url=webhook_url,
created_at=updated_config.created_at.isoformat(),
updated_at=updated_config.updated_at.isoformat()
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error updating trigger: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.delete("/{trigger_id}")
async def delete_trigger(
trigger_id: str,
user_id: str = Depends(get_current_user_id_from_jwt)
):
"""Delete a trigger."""
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
manager = await get_trigger_manager()
trigger_config = await manager.get_trigger(trigger_id)
if not trigger_config:
raise HTTPException(status_code=404, detail="Trigger not found")
# Verify agent ownership
await verify_agent_access(trigger_config.agent_id, user_id)
success = await manager.delete_trigger(trigger_id)
if success:
return {"message": "Trigger deleted successfully"}
else:
raise HTTPException(status_code=500, detail="Failed to delete trigger")
@router.post("/slack/webhook")
async def universal_slack_webhook(request: Request):
"""
Universal Slack webhook endpoint that handles all Slack events.
This endpoint:
1. Receives Slack events from a single URL (required by Slack)
2. Extracts team_id from the event data
3. Finds all active Slack triggers for that team
4. Processes the event for each matching trigger
"""
try:
logger.info("Universal Slack webhook received event")
body = await request.body()
headers = dict(request.headers)
logger.debug(f"Webhook body: {body[:500]}...")
logger.debug(f"Webhook headers: {headers}")
try:
if body:
data = await request.json()
else:
data = {}
except Exception as e:
logger.warning(f"Failed to parse JSON body: {e}")
data = {
"raw_body": body.decode('utf-8', errors='ignore'),
"content_type": headers.get('content-type', '')
}
if data.get('type') == 'url_verification':
logger.info("Handling Slack URL verification challenge")
return JSONResponse(content={"challenge": data.get('challenge')})
team_id = data.get('team_id')
if not team_id:
logger.warning("No team_id found in Slack event")
return JSONResponse(content={"message": "No team_id found in event"})
logger.info(f"Processing Slack event for team_id: {team_id}")
manager = await get_trigger_manager()
client = await db.client
result = await client.table('agent_triggers')\
.select('*')\
.eq('trigger_type', 'slack')\
.eq('is_active', True)\
.execute()
matching_triggers = []
for trigger_data in result.data:
trigger_config = trigger_data.get('config', {})
if trigger_config.get('team_id') == team_id:
matching_triggers.append(trigger_data)
if not matching_triggers:
logger.info(f"No active Slack triggers found for team_id: {team_id}")
return JSONResponse(content={"message": "No active triggers found for this workspace"})
logger.info(f"Found {len(matching_triggers)} matching triggers for team_id: {team_id}")
results = []
data["headers"] = headers
for trigger_data in matching_triggers:
trigger_id = trigger_data['trigger_id']
try:
logger.info(f"Processing event for trigger {trigger_id}")
result = await manager.process_trigger_event(trigger_id, data)
if result.success and result.should_execute_agent:
executor = AgentTriggerExecutor(db)
trigger_config = await manager.get_trigger(trigger_id)
if trigger_config:
from .core import TriggerEvent, TriggerType
trigger_type = trigger_config.trigger_type
if isinstance(trigger_type, str):
trigger_type = TriggerType(trigger_type)
trigger_event = TriggerEvent(
trigger_id=trigger_id,
agent_id=trigger_config.agent_id,
trigger_type=trigger_type,
raw_data=data
)
execution_result = await executor.execute_triggered_agent(
agent_id=trigger_config.agent_id,
trigger_result=result,
trigger_event=trigger_event
)
results.append({
"trigger_id": trigger_id,
"agent_id": trigger_config.agent_id,
"executed": True,
"thread_id": execution_result.get("thread_id"),
"agent_run_id": execution_result.get("agent_run_id")
})
logger.info(f"Agent execution started for trigger {trigger_id}")
else:
results.append({
"trigger_id": trigger_id,
"executed": False,
"reason": result.error_message or "Event did not meet trigger criteria"
})
except Exception as e:
logger.error(f"Error processing trigger {trigger_id}: {e}")
results.append({
"trigger_id": trigger_id,
"executed": False,
"error": str(e)
})
return JSONResponse(content={
"message": "Slack webhook processed",
"team_id": team_id,
"processed_triggers": len(results),
"results": results
})
except Exception as e:
logger.error(f"Error in universal Slack webhook: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={"error": "Internal server error"}
)
@router.post("/qstash/webhook")
async def handle_qstash_webhook(request: Request):
try:
logger.info("QStash webhook received")
body = await request.body()
headers = dict(request.headers)
logger.debug(f"QStash webhook body: {body[:500]}...")
logger.debug(f"QStash webhook headers: {headers}")
try:
if body:
data = await request.json()
else:
data = {}
except Exception as e:
logger.warning(f"Failed to parse JSON body: {e}")
data = {
"raw_body": body.decode('utf-8', errors='ignore'),
"content_type": headers.get('content-type', '')
}
trigger_id = data.get('trigger_id')
if not trigger_id:
logger.error("No trigger_id in QStash webhook payload")
return JSONResponse(
status_code=400,
content={"error": "trigger_id is required"}
)
data["headers"] = headers
data["qstash_message_id"] = headers.get('upstash-message-id')
data["qstash_schedule_id"] = headers.get('upstash-schedule-id')
logger.info(f"Processing QStash trigger event for {trigger_id}")
manager = await get_trigger_manager()
result = await manager.process_trigger_event(trigger_id, data)
logger.info(f"QStash trigger processing result: success={result.success}, should_execute={result.should_execute_agent}, error={result.error_message}")
if result.success and result.should_execute_agent:
executor = AgentTriggerExecutor(db)
trigger_config = await manager.get_trigger(trigger_id)
if trigger_config:
from .core import TriggerEvent, TriggerType
trigger_type = trigger_config.trigger_type
if isinstance(trigger_type, str):
trigger_type = TriggerType(trigger_type)
trigger_event = TriggerEvent(
trigger_id=trigger_id,
agent_id=trigger_config.agent_id,
trigger_type=trigger_type,
raw_data=data
)
execution_result = await executor.execute_triggered_agent(
agent_id=trigger_config.agent_id,
trigger_result=result,
trigger_event=trigger_event
)
logger.info(f"QStash agent execution result: {execution_result}")
return JSONResponse(content={
"message": "QStash webhook processed and agent execution started",
"trigger_id": trigger_id,
"agent_id": trigger_config.agent_id,
"thread_id": execution_result.get("thread_id"),
"agent_run_id": execution_result.get("agent_run_id")
})
if result.response_data:
return JSONResponse(content=result.response_data)
elif result.success:
return {"message": "QStash webhook processed successfully"}
else:
logger.warning(f"QStash webhook processing failed for {trigger_id}: {result.error_message}")
return JSONResponse(
status_code=400,
content={"error": result.error_message}
)
except Exception as e:
logger.error(f"Error processing QStash webhook: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={"error": "Internal server error"}
)
@router.post("/schedule/webhook")
async def handle_schedule_webhook(request: Request):
try:
logger.info("Schedule webhook received from Pipedream")
body = await request.body()
headers = dict(request.headers)
logger.debug(f"Schedule webhook body: {body[:500]}...")
logger.debug(f"Schedule webhook headers: {headers}")
try:
if body:
data = await request.json()
else:
data = {}
except Exception as e:
logger.warning(f"Failed to parse JSON body: {e}")
data = {
"raw_body": body.decode('utf-8', errors='ignore'),
"content_type": headers.get('content-type', '')
}
trigger_id = data.get('trigger_id')
agent_id = data.get('agent_id')
if not trigger_id:
logger.error("No trigger_id in schedule webhook payload")
return JSONResponse(
status_code=400,
content={"error": "trigger_id is required"}
)
logger.info(f"Processing scheduled trigger event for {trigger_id}")
manager = await get_trigger_manager()
trigger_config = await manager.get_trigger(trigger_id)
if trigger_config:
data['trigger_config'] = trigger_config.config
result = await manager.process_trigger_event(trigger_id, data)
logger.info(f"Schedule trigger processing result: success={result.success}, should_execute={result.should_execute_agent}, error={result.error_message}")
if result.success and result.should_execute_agent:
executor = AgentTriggerExecutor(db)
if trigger_config:
from .core import TriggerEvent, TriggerType
trigger_type = trigger_config.trigger_type
if isinstance(trigger_type, str):
trigger_type = TriggerType(trigger_type)
trigger_event = TriggerEvent(
trigger_id=trigger_id,
agent_id=trigger_config.agent_id,
trigger_type=trigger_type,
raw_data=data
)
execution_result = await executor.execute_triggered_agent(
agent_id=trigger_config.agent_id,
trigger_result=result,
trigger_event=trigger_event
)
logger.info(f"Scheduled agent execution result: {execution_result}")
return JSONResponse(content={
"message": "Schedule webhook processed and agent execution started",
"trigger_id": trigger_id,
"agent_id": trigger_config.agent_id,
"thread_id": execution_result.get("thread_id"),
"agent_run_id": execution_result.get("agent_run_id")
})
if result.response_data:
return JSONResponse(content=result.response_data)
elif result.success:
return {"message": "Schedule webhook processed successfully"}
else:
logger.warning(f"Schedule webhook processing failed for {trigger_id}: {result.error_message}")
return JSONResponse(
status_code=400,
content={"error": result.error_message}
)
except Exception as e:
logger.error(f"Error processing schedule webhook: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={"error": "Internal server error"}
)
@router.post("/{trigger_id}/webhook")
async def handle_webhook(
trigger_id: str,
request: Request
):
try:
logger.info(f"Webhook received for trigger {trigger_id}")
body = await request.body()
headers = dict(request.headers)
logger.debug(f"Webhook body: {body[:500]}...")
logger.debug(f"Webhook headers: {headers}")
try:
if body:
data = await request.json()
else:
data = {}
except Exception as e:
logger.warning(f"Failed to parse JSON body: {e}")
data = {
"raw_body": body.decode('utf-8', errors='ignore'),
"content_type": headers.get('content-type', '')
}
data["headers"] = headers
logger.info(f"Processing trigger event for {trigger_id}")
manager = await get_trigger_manager()
result = await manager.process_trigger_event(trigger_id, data)
logger.info(f"Trigger processing result: success={result.success}, should_execute={result.should_execute_agent}, error={result.error_message}")
if result.success and result.should_execute_agent:
executor = AgentTriggerExecutor(db)
trigger_config = await manager.get_trigger(trigger_id)
if trigger_config:
from .core import TriggerEvent, TriggerType
trigger_type = trigger_config.trigger_type
if isinstance(trigger_type, str):
trigger_type = TriggerType(trigger_type)
trigger_event = TriggerEvent(
trigger_id=trigger_id,
agent_id=trigger_config.agent_id,
trigger_type=trigger_type,
raw_data=data
)
execution_result = await executor.execute_triggered_agent(
agent_id=trigger_config.agent_id,
trigger_result=result,
trigger_event=trigger_event
)
logger.info(f"Agent execution result: {execution_result}")
return JSONResponse(content={
"message": "Webhook processed and agent execution started",
"trigger_id": trigger_id,
"agent_id": trigger_config.agent_id,
"thread_id": execution_result.get("thread_id"),
"agent_run_id": execution_result.get("agent_run_id")
})
if result.response_data:
return JSONResponse(content=result.response_data)
elif result.success:
return {"message": "Webhook processed successfully"}
else:
logger.warning(f"Webhook processing failed for {trigger_id}: {result.error_message}")
return JSONResponse(
status_code=400,
content={"error": result.error_message}
)
except Exception as e:
logger.error(f"Error handling webhook for trigger {trigger_id}: {e}")
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return JSONResponse(
status_code=500,
content={"error": "Internal server error"}
)
@router.get("/triggers/{trigger_id}/health")
async def check_trigger_health(
trigger_id: str,
user_id: str = Depends(get_current_user_id_from_jwt)
):
"""Check the health of a trigger."""
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
manager = await get_trigger_manager()
trigger_config = await manager.get_trigger(trigger_id)
if not trigger_config:
raise HTTPException(status_code=404, detail="Trigger not found")
await verify_agent_access(trigger_config.agent_id, user_id)
health_results = await manager.health_check_triggers()
is_healthy = health_results.get(trigger_id, False)
return {
"trigger_id": trigger_id,
"healthy": is_healthy,
"checked_at": datetime.utcnow().isoformat()
}
async def verify_agent_access(agent_id: str, user_id: str):
"""Verify that the user has access to the agent."""
client = await db.client
result = await client.table('agents').select('account_id').eq('agent_id', agent_id).execute()
if not result.data:
raise HTTPException(status_code=404, detail="Agent not found")
agent = result.data[0]
if agent['account_id'] != user_id:
raise HTTPException(status_code=403, detail="Access denied")