suna/backend/triggers/api.py

485 lines
17 KiB
Python
Raw Normal View History

2025-07-01 02:03:46 +08:00
from fastapi import APIRouter, HTTPException, Depends, Request, Body, Query
2025-06-30 18:57:34 +08:00
from fastapi.responses import JSONResponse
from typing import List, Optional, Dict, Any
from pydantic import BaseModel
import os
from datetime import datetime
2025-07-14 19:49:18 +08:00
from .support.factory import TriggerModuleFactory
from .support.exceptions import TriggerError, ConfigurationError, ProviderError
from .services.trigger_service import TriggerService
from .services.execution_service import TriggerExecutionService
from .services.provider_service import ProviderService
2025-06-30 18:57:34 +08:00
from services.supabase import DBConnection
from utils.auth_utils import get_current_user_id_from_jwt
from utils.logger import logger
from flags.flags import is_enabled
from utils.config import config, EnvMode
2025-06-30 18:57:34 +08:00
2025-07-08 11:57:32 +08:00
router = APIRouter(prefix="/triggers", tags=["triggers"])
2025-06-30 18:57:34 +08:00
2025-07-14 19:49:18 +08:00
trigger_service: Optional[TriggerService] = None
execution_service: Optional[TriggerExecutionService] = None
provider_service: Optional[ProviderService] = None
2025-06-30 18:57:34 +08:00
db = None
class TriggerCreateRequest(BaseModel):
provider_id: str
name: str
config: Dict[str, Any]
2025-07-14 19:49:18 +08:00
description: Optional[str] = None
2025-06-30 18:57:34 +08:00
class TriggerUpdateRequest(BaseModel):
2025-07-14 19:49:18 +08:00
config: Optional[Dict[str, Any]] = None
2025-06-30 18:57:34 +08:00
name: Optional[str] = None
description: Optional[str] = None
is_active: Optional[bool] = None
2025-07-14 19:49:18 +08:00
2025-06-30 18:57:34 +08:00
class TriggerResponse(BaseModel):
trigger_id: str
agent_id: str
trigger_type: str
provider_id: str
name: str
description: Optional[str]
is_active: bool
2025-07-14 19:49:18 +08:00
webhook_url: Optional[str]
2025-06-30 18:57:34 +08:00
created_at: str
updated_at: str
2025-07-14 19:49:18 +08:00
2025-06-30 18:57:34 +08:00
class ProviderResponse(BaseModel):
provider_id: str
name: str
description: str
trigger_type: str
webhook_enabled: bool
2025-07-14 19:49:18 +08:00
setup_required: bool
2025-06-30 18:57:34 +08:00
config_schema: Dict[str, Any]
2025-07-14 19:49:18 +08:00
def initialize(database: DBConnection):
global db, trigger_service, execution_service, provider_service
db = database
async def get_services() -> tuple[TriggerService, TriggerExecutionService, ProviderService]:
global trigger_service, execution_service, provider_service
if trigger_service is None or execution_service is None or provider_service is None:
if db is None:
raise HTTPException(status_code=500, detail="Database not initialized")
trigger_service, execution_service, provider_service = await TriggerModuleFactory.create_trigger_module(db)
return trigger_service, execution_service, provider_service
async def verify_agent_access(agent_id: str, user_id: str):
client = await db.client
result = await client.table('agents').select('agent_id').eq('agent_id', agent_id).eq('created_by', user_id).execute()
if not result.data:
raise HTTPException(status_code=404, detail="Agent not found or access denied")
2025-06-30 18:57:34 +08:00
@router.get("/providers", response_model=List[ProviderResponse])
2025-07-14 19:49:18 +08:00
async def get_providers():
2025-06-30 18:57:34 +08:00
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
2025-07-14 19:49:18 +08:00
try:
_, _, provider_svc = await get_services()
providers = await provider_svc.get_available_providers()
return [
ProviderResponse(
provider_id=provider.provider_id,
name=provider.name,
description=provider.description,
trigger_type=provider.trigger_type.value,
webhook_enabled=provider.webhook_enabled,
setup_required=provider.setup_required,
config_schema=provider.config_schema
)
for provider in providers
]
except Exception as e:
logger.error(f"Error getting providers: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/providers/{provider_id}/schema")
async def get_provider_schema(provider_id: str):
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
2025-06-30 18:57:34 +08:00
2025-07-14 19:49:18 +08:00
try:
_, _, provider_svc = await get_services()
schema = await provider_svc.get_provider_config_schema(provider_id)
if not schema:
raise HTTPException(status_code=404, detail="Provider not found")
return {"schema": schema}
except Exception as e:
logger.error(f"Error getting provider schema: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
2025-06-30 18:57:34 +08:00
@router.get("/agents/{agent_id}/triggers", response_model=List[TriggerResponse])
async def get_agent_triggers(
agent_id: str,
user_id: str = Depends(get_current_user_id_from_jwt)
):
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
await verify_agent_access(agent_id, user_id)
2025-07-14 19:49:18 +08:00
try:
trigger_svc, _, provider_svc = await get_services()
triggers = await trigger_svc.get_agent_triggers(agent_id)
2025-06-30 18:57:34 +08:00
2025-07-14 19:49:18 +08:00
base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000")
2025-06-30 18:57:34 +08:00
2025-07-14 19:49:18 +08:00
responses = []
for trigger in triggers:
provider_def = await provider_svc.get_provider_definition(trigger.provider_id)
webhook_url = None
if provider_def and provider_def.webhook_enabled:
webhook_url = f"{base_url}/api/triggers/{trigger.trigger_id}/webhook"
responses.append(TriggerResponse(
trigger_id=trigger.trigger_id,
agent_id=trigger.agent_id,
trigger_type=trigger.trigger_type.value,
provider_id=trigger.provider_id,
name=trigger.config.name,
description=trigger.config.description,
is_active=trigger.is_active,
webhook_url=webhook_url,
created_at=trigger.metadata.created_at.isoformat(),
updated_at=trigger.metadata.updated_at.isoformat()
))
return responses
except Exception as e:
logger.error(f"Error getting agent triggers: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
2025-06-30 18:57:34 +08:00
@router.post("/agents/{agent_id}/triggers", response_model=TriggerResponse)
async def create_agent_trigger(
agent_id: str,
request: TriggerCreateRequest,
user_id: str = Depends(get_current_user_id_from_jwt)
):
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
await verify_agent_access(agent_id, user_id)
try:
2025-07-14 19:49:18 +08:00
trigger_svc, _, provider_svc = await get_services()
trigger = await trigger_svc.create_trigger(
2025-06-30 18:57:34 +08:00
agent_id=agent_id,
provider_id=request.provider_id,
name=request.name,
config=request.config,
description=request.description
)
2025-07-14 19:49:18 +08:00
provider_def = await provider_svc.get_provider_definition(request.provider_id)
2025-06-30 18:57:34 +08:00
webhook_url = None
2025-07-14 19:49:18 +08:00
if provider_def and provider_def.webhook_enabled:
2025-06-30 18:57:34 +08:00
base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000")
2025-07-14 19:49:18 +08:00
webhook_url = f"{base_url}/api/triggers/{trigger.trigger_id}/webhook"
2025-06-30 18:57:34 +08:00
return TriggerResponse(
2025-07-14 19:49:18 +08:00
trigger_id=trigger.trigger_id,
agent_id=trigger.agent_id,
trigger_type=trigger.trigger_type.value,
provider_id=trigger.provider_id,
name=trigger.config.name,
description=trigger.config.description,
is_active=trigger.is_active,
2025-06-30 18:57:34 +08:00
webhook_url=webhook_url,
2025-07-14 19:49:18 +08:00
created_at=trigger.metadata.created_at.isoformat(),
updated_at=trigger.metadata.updated_at.isoformat()
2025-06-30 18:57:34 +08:00
)
2025-07-14 19:49:18 +08:00
except (ValueError, ConfigurationError, ProviderError) as e:
2025-06-30 18:57:34 +08:00
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error creating trigger: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
2025-07-14 19:49:18 +08:00
2025-06-30 18:57:34 +08:00
@router.get("/{trigger_id}", response_model=TriggerResponse)
async def get_trigger(
trigger_id: str,
user_id: str = Depends(get_current_user_id_from_jwt)
):
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
2025-07-14 19:49:18 +08:00
try:
trigger_svc, _, provider_svc = await get_services()
trigger = await trigger_svc.get_trigger(trigger_id)
if not trigger:
raise HTTPException(status_code=404, detail="Trigger not found")
await verify_agent_access(trigger.agent_id, user_id)
provider_def = await provider_svc.get_provider_definition(trigger.provider_id)
webhook_url = None
if provider_def and provider_def.webhook_enabled:
base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000")
webhook_url = f"{base_url}/api/triggers/{trigger_id}/webhook"
return TriggerResponse(
trigger_id=trigger.trigger_id,
agent_id=trigger.agent_id,
trigger_type=trigger.trigger_type.value,
provider_id=trigger.provider_id,
name=trigger.config.name,
description=trigger.config.description,
is_active=trigger.is_active,
webhook_url=webhook_url,
created_at=trigger.metadata.created_at.isoformat(),
updated_at=trigger.metadata.updated_at.isoformat()
)
except Exception as e:
logger.error(f"Error getting trigger: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
2025-06-30 18:57:34 +08:00
@router.put("/{trigger_id}", response_model=TriggerResponse)
async def update_trigger(
trigger_id: str,
request: TriggerUpdateRequest,
user_id: str = Depends(get_current_user_id_from_jwt)
):
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
try:
2025-07-14 19:49:18 +08:00
trigger_svc, _, provider_svc = await get_services()
trigger = await trigger_svc.get_trigger(trigger_id)
if not trigger:
raise HTTPException(status_code=404, detail="Trigger not found")
await verify_agent_access(trigger.agent_id, user_id)
updated_trigger = await trigger_svc.update_trigger(
2025-06-30 18:57:34 +08:00
trigger_id=trigger_id,
config=request.config,
name=request.name,
description=request.description,
is_active=request.is_active
)
2025-07-14 19:49:18 +08:00
provider_def = await provider_svc.get_provider_definition(updated_trigger.provider_id)
2025-06-30 18:57:34 +08:00
webhook_url = None
2025-07-14 19:49:18 +08:00
if provider_def and provider_def.webhook_enabled:
base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000")
webhook_url = f"{base_url}/api/triggers/{trigger_id}/webhook"
2025-06-30 18:57:34 +08:00
return TriggerResponse(
2025-07-14 19:49:18 +08:00
trigger_id=updated_trigger.trigger_id,
agent_id=updated_trigger.agent_id,
trigger_type=updated_trigger.trigger_type.value,
provider_id=updated_trigger.provider_id,
name=updated_trigger.config.name,
description=updated_trigger.config.description,
is_active=updated_trigger.is_active,
2025-06-30 18:57:34 +08:00
webhook_url=webhook_url,
2025-07-14 19:49:18 +08:00
created_at=updated_trigger.metadata.created_at.isoformat(),
updated_at=updated_trigger.metadata.updated_at.isoformat()
2025-06-30 18:57:34 +08:00
)
2025-07-14 19:49:18 +08:00
except (ValueError, ConfigurationError, ProviderError) as e:
2025-06-30 18:57:34 +08:00
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error updating trigger: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
2025-07-14 19:49:18 +08:00
2025-06-30 18:57:34 +08:00
@router.delete("/{trigger_id}")
async def delete_trigger(
trigger_id: str,
user_id: str = Depends(get_current_user_id_from_jwt)
):
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
2025-07-14 19:49:18 +08:00
try:
trigger_svc, _, _ = await get_services()
trigger = await trigger_svc.get_trigger(trigger_id)
if not trigger:
raise HTTPException(status_code=404, detail="Trigger not found")
await verify_agent_access(trigger.agent_id, user_id)
success = await trigger_svc.delete_trigger(trigger_id)
if not success:
raise HTTPException(status_code=404, detail="Trigger not found")
2025-06-30 18:57:34 +08:00
return {"message": "Trigger deleted successfully"}
2025-07-14 19:49:18 +08:00
except Exception as e:
logger.error(f"Error deleting trigger: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
2025-06-30 18:57:34 +08:00
2025-07-14 19:49:18 +08:00
@router.post("/{trigger_id}/webhook")
async def trigger_webhook(
trigger_id: str,
request: Request
):
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
2025-07-01 16:05:55 +08:00
try:
2025-07-14 19:49:18 +08:00
trigger_svc, execution_svc, _ = await get_services()
2025-07-01 16:05:55 +08:00
try:
2025-07-14 19:49:18 +08:00
raw_data = await request.json()
except:
raw_data = {}
result = await trigger_svc.process_trigger_event(trigger_id, raw_data)
if not result.success:
2025-07-01 16:05:55 +08:00
return JSONResponse(
status_code=400,
2025-07-14 19:49:18 +08:00
content={"success": False, "error": result.error_message}
2025-07-01 16:05:55 +08:00
)
2025-07-14 19:49:18 +08:00
if result.should_execute_agent or result.should_execute_workflow:
trigger = await trigger_svc.get_trigger(trigger_id)
if trigger:
from .domain.entities import TriggerEvent
event = TriggerEvent(
2025-07-01 16:05:55 +08:00
trigger_id=trigger_id,
2025-07-14 19:49:18 +08:00
agent_id=trigger.agent_id,
trigger_type=trigger.trigger_type,
raw_data=raw_data
2025-07-01 16:05:55 +08:00
)
2025-07-14 19:49:18 +08:00
execution_result = await execution_svc.execute_trigger_result(
agent_id=trigger.agent_id,
2025-07-01 16:05:55 +08:00
trigger_result=result,
2025-07-14 19:49:18 +08:00
trigger_event=event
2025-07-01 16:05:55 +08:00
)
return JSONResponse(content={
2025-07-14 19:49:18 +08:00
"success": True,
"message": "Trigger processed successfully",
"execution": execution_result
2025-07-01 16:05:55 +08:00
})
2025-07-14 19:49:18 +08:00
return JSONResponse(content={
"success": True,
"message": "Trigger processed successfully"
})
2025-07-01 16:05:55 +08:00
except Exception as e:
2025-07-14 19:49:18 +08:00
logger.error(f"Error processing webhook trigger: {e}")
2025-07-01 16:05:55 +08:00
return JSONResponse(
status_code=500,
2025-07-14 19:49:18 +08:00
content={"success": False, "error": "Internal server error"}
2025-07-01 16:05:55 +08:00
)
2025-07-12 04:42:23 +08:00
2025-07-14 19:49:18 +08:00
@router.get("/{trigger_id}/logs")
async def get_trigger_logs(
trigger_id: str,
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
user_id: str = Depends(get_current_user_id_from_jwt)
):
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
try:
trigger_svc, _, _ = await get_services()
2025-07-01 16:05:55 +08:00
2025-07-14 19:49:18 +08:00
trigger = await trigger_svc.get_trigger(trigger_id)
if not trigger:
raise HTTPException(status_code=404, detail="Trigger not found")
2025-07-01 16:05:55 +08:00
2025-07-14 19:49:18 +08:00
await verify_agent_access(trigger.agent_id, user_id)
2025-07-01 16:05:55 +08:00
2025-07-14 19:49:18 +08:00
logs = await trigger_svc.get_trigger_logs(trigger_id, limit, offset)
2025-07-01 16:05:55 +08:00
2025-07-14 19:49:18 +08:00
return {"logs": logs}
2025-07-01 16:05:55 +08:00
except Exception as e:
2025-07-14 19:49:18 +08:00
logger.error(f"Error getting trigger logs: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
2025-07-01 16:05:55 +08:00
2025-07-14 19:49:18 +08:00
@router.get("/{trigger_id}/stats")
async def get_trigger_stats(
2025-06-30 18:57:34 +08:00
trigger_id: str,
2025-07-14 19:49:18 +08:00
hours: int = Query(24, ge=1, le=168),
user_id: str = Depends(get_current_user_id_from_jwt)
2025-06-30 18:57:34 +08:00
):
2025-07-14 19:49:18 +08:00
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
2025-06-30 18:57:34 +08:00
try:
2025-07-14 19:49:18 +08:00
trigger_svc, _, _ = await get_services()
2025-06-30 18:57:34 +08:00
2025-07-14 19:49:18 +08:00
trigger = await trigger_svc.get_trigger(trigger_id)
if not trigger:
raise HTTPException(status_code=404, detail="Trigger not found")
await verify_agent_access(trigger.agent_id, user_id)
2025-06-30 18:57:34 +08:00
2025-07-14 19:49:18 +08:00
stats = await trigger_svc.get_trigger_stats(trigger_id, hours)
return stats
2025-06-30 18:57:34 +08:00
except Exception as e:
2025-07-14 19:49:18 +08:00
logger.error(f"Error getting trigger stats: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
2025-06-30 18:57:34 +08:00
2025-07-14 19:49:18 +08:00
@router.get("/{trigger_id}/health")
async def health_check_trigger(
2025-06-30 18:57:34 +08:00
trigger_id: str,
user_id: str = Depends(get_current_user_id_from_jwt)
):
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
2025-07-14 19:49:18 +08:00
try:
trigger_svc, _, _ = await get_services()
trigger = await trigger_svc.get_trigger(trigger_id)
if not trigger:
raise HTTPException(status_code=404, detail="Trigger not found")
2025-06-30 18:57:34 +08:00
2025-07-14 19:49:18 +08:00
await verify_agent_access(trigger.agent_id, user_id)
is_healthy = await trigger_svc.health_check_trigger(trigger_id)
return {"healthy": is_healthy}
except Exception as e:
logger.error(f"Error checking trigger health: {e}")
raise HTTPException(status_code=500, detail="Internal server error")