import json from typing import Optional, Dict, Any, List from agentpress.tool import ToolResult, openapi_schema, usage_example from agentpress.thread_manager import ThreadManager from .base_tool import AgentBuilderBaseTool from utils.logger import logger from utils.config import config, EnvMode from datetime import datetime from services.supabase import DBConnection from triggers import get_trigger_service import os import httpx from composio_integration.composio_profile_service import ComposioProfileService from composio_integration.composio_trigger_service import ComposioTriggerService class TriggerTool(AgentBuilderBaseTool): def __init__(self, thread_manager: ThreadManager, db_connection, agent_id: str): super().__init__(thread_manager, db_connection, agent_id) # ===== SCHEDULED TRIGGERS ===== @openapi_schema({ "type": "function", "function": { "name": "create_scheduled_trigger", "description": "Create a scheduled trigger for the agent to execute workflows or direct agent runs using cron expressions. This allows the agent to run automatically at specified times.", "parameters": { "type": "object", "properties": { "name": { "type": "string", "description": "Name of the scheduled trigger. Should be descriptive of when/why it runs." }, "description": { "type": "string", "description": "Description of what this trigger does and when it runs." }, "cron_expression": { "type": "string", "description": "Cron expression defining when to run (e.g., '0 9 * * *' for daily at 9am, '*/30 * * * *' for every 30 minutes)" }, "execution_type": { "type": "string", "enum": ["workflow", "agent"], "description": "Whether to execute a workflow or run the agent directly", "default": "agent" }, "workflow_id": { "type": "string", "description": "ID of the workflow to execute (required if execution_type is 'workflow')" }, "workflow_input": { "type": "object", "description": "Input data to pass to the workflow (optional, only for workflow execution)", "additionalProperties": True }, "agent_prompt": { "type": "string", "description": "Prompt to send to the agent when triggered (required if execution_type is 'agent')" } }, "required": ["name", "cron_expression", "execution_type"] } } }) @usage_example(''' Daily Report Generation Generates daily reports every morning at 9 AM 0 9 * * * workflow workflow-123 {"report_type": "daily", "include_charts": true} ''') async def create_scheduled_trigger( self, name: str, cron_expression: str, execution_type: str = "agent", description: Optional[str] = None, workflow_id: Optional[str] = None, workflow_input: Optional[Dict[str, Any]] = None, agent_prompt: Optional[str] = None ) -> ToolResult: try: if execution_type not in ["workflow", "agent"]: return self.fail_response("execution_type must be either 'workflow' or 'agent'") if execution_type == "workflow" and not workflow_id: return self.fail_response("workflow_id is required when execution_type is 'workflow'") if execution_type == "agent" and not agent_prompt: return self.fail_response("agent_prompt is required when execution_type is 'agent'") if execution_type == "workflow": client = await self.db.client workflow_result = await client.table('agent_workflows').select('*').eq('id', workflow_id).eq('agent_id', self.agent_id).execute() if not workflow_result.data: return self.fail_response(f"Workflow {workflow_id} not found or doesn't belong to this agent") workflow = workflow_result.data[0] if workflow['status'] != 'active': return self.fail_response(f"Workflow '{workflow['name']}' is not active. Please activate it first.") trigger_config = { "cron_expression": cron_expression, "execution_type": execution_type, "provider_id": "schedule" } if execution_type == "workflow": trigger_config["workflow_id"] = workflow_id if workflow_input: trigger_config["workflow_input"] = workflow_input else: trigger_config["agent_prompt"] = agent_prompt trigger_svc = get_trigger_service(self.db) try: trigger = await trigger_svc.create_trigger( agent_id=self.agent_id, provider_id="schedule", name=name, config=trigger_config, description=description ) result_message = f"Scheduled trigger '{name}' created successfully!\n\n" result_message += f"**Schedule**: {cron_expression}\n" result_message += f"**Type**: {execution_type.capitalize()} execution\n" if execution_type == "workflow": result_message += f"**Workflow**: {workflow['name']}\n" if workflow_input: result_message += f"**Input Data**: {json.dumps(workflow_input, indent=2)}\n" else: result_message += f"**Prompt**: {agent_prompt}\n" result_message += f"\nThe trigger is now active and will run according to the schedule." return self.success_response({ "message": result_message, "trigger": { "id": trigger.trigger_id, "name": trigger.name, "description": trigger.description, "cron_expression": cron_expression, "execution_type": execution_type, "is_active": trigger.is_active, "created_at": trigger.created_at.isoformat() } }) except ValueError as ve: return self.fail_response(f"Validation error: {str(ve)}") except Exception as e: logger.error(f"Error creating trigger through manager: {str(e)}") return self.fail_response(f"Failed to create trigger: {str(e)}") except Exception as e: logger.error(f"Error creating scheduled trigger: {str(e)}") return self.fail_response(f"Error creating scheduled trigger: {str(e)}") @openapi_schema({ "type": "function", "function": { "name": "get_scheduled_triggers", "description": "Get all scheduled triggers for the current agent. Shows when the agent will run automatically.", "parameters": { "type": "object", "properties": {}, "required": [] } } }) @usage_example(''' ''') async def get_scheduled_triggers(self) -> ToolResult: try: from triggers import TriggerType trigger_svc = get_trigger_service(self.db) triggers = await trigger_svc.get_agent_triggers(self.agent_id) schedule_triggers = [t for t in triggers if t.trigger_type == TriggerType.SCHEDULE] if not schedule_triggers: return self.success_response({ "message": "No scheduled triggers found for this agent.", "triggers": [] }) client = await self.db.client workflows = {} for trigger in schedule_triggers: if trigger.config.get("execution_type") == "workflow" and trigger.config.get("workflow_id"): workflow_id = trigger.config["workflow_id"] if workflow_id not in workflows: workflow_result = await client.table('agent_workflows').select('name').eq('id', workflow_id).execute() if workflow_result.data: workflows[workflow_id] = workflow_result.data[0]['name'] formatted_triggers = [] for trigger in schedule_triggers: formatted = { "id": trigger.trigger_id, "name": trigger.name, "description": trigger.description, "cron_expression": trigger.config.get("cron_expression"), "execution_type": trigger.config.get("execution_type", "agent"), "is_active": trigger.is_active, "created_at": trigger.created_at.isoformat() } if trigger.config.get("execution_type") == "workflow": workflow_id = trigger.config.get("workflow_id") formatted["workflow_name"] = workflows.get(workflow_id, "Unknown Workflow") formatted["workflow_input"] = trigger.config.get("workflow_input") else: formatted["agent_prompt"] = trigger.config.get("agent_prompt") formatted_triggers.append(formatted) return self.success_response({ "message": f"Found {len(formatted_triggers)} scheduled trigger(s)", "triggers": formatted_triggers }) except Exception as e: logger.error(f"Error getting scheduled triggers: {str(e)}") return self.fail_response(f"Error getting scheduled triggers: {str(e)}") @openapi_schema({ "type": "function", "function": { "name": "delete_scheduled_trigger", "description": "Delete a scheduled trigger. The agent will no longer run automatically at the scheduled time.", "parameters": { "type": "object", "properties": { "trigger_id": { "type": "string", "description": "ID of the trigger to delete" } }, "required": ["trigger_id"] } } }) @usage_example(''' trigger-123 ''') async def delete_scheduled_trigger(self, trigger_id: str) -> ToolResult: try: trigger_svc = get_trigger_service(self.db) trigger_config = await trigger_svc.get_trigger(trigger_id) if not trigger_config: return self.fail_response("Trigger not found") if trigger_config.agent_id != self.agent_id: return self.fail_response("This trigger doesn't belong to the current agent") success = await trigger_svc.delete_trigger(trigger_id) if success: return self.success_response({ "message": f"Scheduled trigger '{trigger_config.name}' deleted successfully", "trigger_id": trigger_id }) else: return self.fail_response("Failed to delete trigger") except Exception as e: logger.error(f"Error deleting scheduled trigger: {str(e)}") return self.fail_response(f"Error deleting scheduled trigger: {str(e)}") @openapi_schema({ "type": "function", "function": { "name": "toggle_scheduled_trigger", "description": "Enable or disable a scheduled trigger. Disabled triggers won't run until re-enabled.", "parameters": { "type": "object", "properties": { "trigger_id": { "type": "string", "description": "ID of the trigger to toggle" }, "is_active": { "type": "boolean", "description": "Whether to enable (true) or disable (false) the trigger" } }, "required": ["trigger_id", "is_active"] } } }) @usage_example(''' trigger-123 false ''') async def toggle_scheduled_trigger(self, trigger_id: str, is_active: bool) -> ToolResult: try: trigger_svc = get_trigger_service(self.db) trigger_config = await trigger_svc.get_trigger(trigger_id) if not trigger_config: return self.fail_response("Trigger not found") if trigger_config.agent_id != self.agent_id: return self.fail_response("This trigger doesn't belong to the current agent") updated_config = await trigger_svc.update_trigger( trigger_id=trigger_id, is_active=is_active ) if updated_config: status = "enabled" if is_active else "disabled" return self.success_response({ "message": f"Scheduled trigger '{updated_config.name}' has been {status}", "trigger": { "id": updated_config.trigger_id, "name": updated_config.name, "is_active": updated_config.is_active } }) else: return self.fail_response("Failed to update trigger") except Exception as e: logger.error(f"Error toggling scheduled trigger: {str(e)}") return self.fail_response(f"Error toggling scheduled trigger: {str(e)}") # ===== EVENT-BASED TRIGGERS (Non-Production Only) ===== # Event trigger methods - only available in non-production environments if config.ENV_MODE != EnvMode.PRODUCTION: @openapi_schema({ "type": "function", "function": { "name": "list_event_trigger_apps", "description": "List apps (toolkits) that have available event-based triggers via Composio. Returns slug, name, and logo.", "parameters": { "type": "object", "properties": {}, "required": [] } } }) @usage_example(''' ''') async def list_event_trigger_apps(self) -> ToolResult: try: trigger_service = ComposioTriggerService() response = await trigger_service.list_apps_with_triggers() # Return exact same format as API return self.success_response({ "message": f"Found {response['total']} apps with triggers", "items": response["items"], "total": response["total"] }) except Exception as e: logger.error(f"Error listing event trigger apps: {e}") return self.fail_response(f"Error listing apps: {str(e)}") TriggerTool.list_event_trigger_apps = list_event_trigger_apps @openapi_schema({ "type": "function", "function": { "name": "list_app_event_triggers", "description": "List available triggers for a given app/toolkit slug. Includes slug, name, description, type, instructions, config, and payload schema.", "parameters": { "type": "object", "properties": { "toolkit_slug": { "type": "string", "description": "Toolkit slug, e.g. 'gmail'" } }, "required": ["toolkit_slug"] } } }) @usage_example(''' gmail ''') async def list_app_event_triggers(self, toolkit_slug: str) -> ToolResult: try: trigger_service = ComposioTriggerService() response = await trigger_service.list_triggers_for_app(toolkit_slug) # Return exact same format as API return self.success_response({ "message": f"Found {response['total']} triggers for {toolkit_slug}", "items": response["items"], "toolkit": response["toolkit"], "total": response["total"] }) except Exception as e: logger.error(f"Error listing triggers for app {toolkit_slug}: {e}") return self.fail_response(f"Error listing triggers: {str(e)}") TriggerTool.list_app_event_triggers = list_app_event_triggers @openapi_schema({ "type": "function", "function": { "name": "list_event_profiles", "description": "List connected Composio profiles for a toolkit. Use this to get profile_id and connected_account_id before creating a trigger.", "parameters": { "type": "object", "properties": { "toolkit_slug": { "type": "string", "description": "Toolkit slug, e.g. 'gmail'" } }, "required": ["toolkit_slug"] } } }) @usage_example(''' gmail ''') async def list_event_profiles(self, toolkit_slug: str) -> ToolResult: try: client = await self.db.client agent_rows = await client.table('agents').select('account_id').eq('agent_id', self.agent_id).execute() if not agent_rows.data: return self.fail_response("Agent not found") account_id = agent_rows.data[0]['account_id'] profile_service = ComposioProfileService(self.db) profiles = await profile_service.get_profiles(account_id, toolkit_slug) items = [] for p in profiles: items.append({ "profile_id": p.profile_id, "display_name": p.display_name, "is_connected": p.is_connected, "connected_account_id": getattr(p, 'connected_account_id', None) }) return self.success_response({ "message": f"Found {len(items)} profile(s) for {toolkit_slug}", "items": items, "total": len(items) }) except Exception as e: logger.error(f"Error listing event profiles: {e}") return self.fail_response(f"Error listing profiles: {str(e)}") TriggerTool.list_event_profiles = list_event_profiles @openapi_schema({ "type": "function", "function": { "name": "create_event_trigger", "description": "Create a Composio event-based trigger for this agent. First list apps and triggers, then pass the chosen trigger slug, profile_id, and trigger_config. Optionally route to a workflow.", "parameters": { "type": "object", "properties": { "slug": {"type": "string", "description": "Trigger type slug, e.g. 'GMAIL_NEW_GMAIL_MESSAGE'"}, "profile_id": {"type": "string", "description": "Composio profile_id to use (must be connected)"}, "trigger_config": {"type": "object", "description": "Trigger configuration object per trigger schema", "additionalProperties": True}, "route": {"type": "string", "enum": ["agent", "workflow"], "default": "agent", "description": "Execute agent directly or run a workflow"}, "name": {"type": "string", "description": "Optional friendly name for the trigger"}, "agent_prompt": {"type": "string", "description": "Prompt to pass to the agent when route is 'agent'"}, "workflow_id": {"type": "string", "description": "Workflow ID when route is 'workflow'"}, "workflow_input": {"type": "object", "description": "Workflow input variables when route is 'workflow'", "additionalProperties": True}, "connected_account_id": {"type": "string", "description": "Connected account id; if omitted we try to derive from profile"} }, "required": ["slug", "profile_id"] } } }) @usage_example(''' gmail gmail GMAIL_NEW_GMAIL_MESSAGE profile_123 {"interval": 1, "userId": "me", "labelIds": "INBOX"} agent Read this ''') async def create_event_trigger( self, slug: str, profile_id: str, trigger_config: Optional[Dict[str, Any]] = None, route: str = "agent", name: Optional[str] = None, agent_prompt: Optional[str] = None, workflow_id: Optional[str] = None, workflow_input: Optional[Dict[str, Any]] = None, connected_account_id: Optional[str] = None ) -> ToolResult: try: if route not in ("agent", "workflow"): return self.fail_response("route must be either 'agent' or 'workflow'") if route == "workflow" and not workflow_id: return self.fail_response("workflow_id is required when route is 'workflow'") if route == "agent" and not agent_prompt: return self.fail_response("agent_prompt is required when route is 'agent'") # Resolve composio user id and connected account id from profile profile_service = ComposioProfileService(self.db) profile_cfg = await profile_service.get_profile_config(profile_id) composio_user_id = profile_cfg.get("user_id") if not composio_user_id: return self.fail_response("Composio profile is missing user_id") if not connected_account_id: connected_account_id = profile_cfg.get("connected_account_id") api_base = os.getenv("COMPOSIO_API_BASE", "https://backend.composio.dev").rstrip("/") api_key = os.getenv("COMPOSIO_API_KEY") if not api_key: return self.fail_response("COMPOSIO_API_KEY not configured") headers = {"x-api-key": api_key, "Content-Type": "application/json"} # Coerce config types per trigger schema coerced_config = dict(trigger_config or {}) try: type_url = f"{api_base}/api/v3/triggers_types/{slug}" async with httpx.AsyncClient(timeout=10) as http_client: tr = await http_client.get(type_url, headers=headers) if tr.status_code == 200: tdata = tr.json() schema = tdata.get("config") or {} props = schema.get("properties") or {} for key, prop in props.items(): if key not in coerced_config: continue val = coerced_config[key] ptype = prop.get("type") if isinstance(prop, dict) else None try: if ptype == "array": if isinstance(val, str): coerced_config[key] = [val] elif ptype == "integer": if isinstance(val, str) and val.isdigit(): coerced_config[key] = int(val) elif ptype == "number": if isinstance(val, str): coerced_config[key] = float(val) elif ptype == "boolean": if isinstance(val, str): coerced_config[key] = val.lower() in ("true", "1", "yes") elif ptype == "string": if isinstance(val, (list, tuple)): coerced_config[key] = ",".join(str(x) for x in val) elif not isinstance(val, str): coerced_config[key] = str(val) except Exception: pass except Exception: pass # Upsert trigger instance with webhook base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000").rstrip("/") secret = os.getenv("COMPOSIO_WEBHOOK_SECRET", "") webhook_headers: Dict[str, str] = {"X-Composio-Secret": secret} if secret else {} vercel_bypass = os.getenv("VERCEL_PROTECTION_BYPASS_KEY", "") if vercel_bypass: webhook_headers["X-Vercel-Protection-Bypass"] = vercel_bypass body: Dict[str, Any] = { "user_id": composio_user_id, "userId": composio_user_id, "trigger_config": coerced_config, "triggerConfig": coerced_config, "webhook": { "url": f"{base_url}/api/composio/webhook", "headers": webhook_headers, "method": "POST", }, } if connected_account_id: body["connectedAccountId"] = connected_account_id body["connected_account_id"] = connected_account_id body["connectedAccountIds"] = [connected_account_id] body["connected_account_ids"] = [connected_account_id] upsert_url = f"{api_base}/api/v3/trigger_instances/{slug}/upsert" async with httpx.AsyncClient(timeout=20) as http_client: resp = await http_client.post(upsert_url, headers=headers, json=body) try: resp.raise_for_status() except httpx.HTTPStatusError: ct = resp.headers.get("content-type", "") detail = resp.json() if "application/json" in ct else resp.text return self.fail_response(f"Composio upsert error: {detail}") created = resp.json() def _extract_id(obj: Dict[str, Any]) -> Optional[str]: if not isinstance(obj, dict): return None cand = ( obj.get("id") or obj.get("trigger_id") or obj.get("triggerId") or obj.get("nano_id") or obj.get("nanoId") or obj.get("triggerNanoId") ) if cand: return cand for k in ("trigger", "trigger_instance", "triggerInstance", "data", "result"): nested = obj.get(k) if isinstance(nested, dict): nid = _extract_id(nested) if nid: return nid if isinstance(nested, list) and nested: nid = _extract_id(nested[0] if isinstance(nested[0], dict) else {}) if nid: return nid return None composio_trigger_id = _extract_id(created) if isinstance(created, dict) else None if not composio_trigger_id: # fallback to list active try: params_lookup: Dict[str, Any] = {"limit": 50, "slug": slug, "userId": composio_user_id} if connected_account_id: params_lookup["connectedAccountId"] = connected_account_id list_url = f"{api_base}/api/v3/trigger_instances/active" async with httpx.AsyncClient(timeout=15) as http_client: lr = await http_client.get(list_url, headers=headers, params=params_lookup) if lr.status_code == 200: ldata = lr.json() items = ldata.get("items") if isinstance(ldata, dict) else (ldata if isinstance(ldata, list) else []) if items: composio_trigger_id = _extract_id(items[0] if isinstance(items[0], dict) else {}) except Exception: pass if not composio_trigger_id: return self.fail_response("Failed to get Composio trigger id from response") # Build Suna trigger and save suna_config: Dict[str, Any] = { "composio_trigger_id": composio_trigger_id, "trigger_slug": slug, "execution_type": route, "profile_id": profile_id, } if route == "agent": if agent_prompt: suna_config["agent_prompt"] = agent_prompt else: suna_config["workflow_id"] = workflow_id if workflow_input: suna_config["workflow_input"] = workflow_input trigger_svc = get_trigger_service(self.db) trigger = await trigger_svc.create_trigger( agent_id=self.agent_id, provider_id="composio", name=name or slug, config=suna_config, description=f"Composio event: {slug}" ) message = f"Event trigger '{trigger.name}' created successfully.\n" message += f"Route: {route}. " if route == "workflow": message += f"Workflow: {workflow_id}." else: message += "Agent execution configured." return self.success_response({ "message": message, "trigger": { "id": trigger.trigger_id, "agent_id": trigger.agent_id, "provider": "composio", "slug": slug, "config": trigger.config, "is_active": trigger.is_active, "created_at": trigger.created_at.isoformat() } }) except Exception as e: logger.error(f"Error creating event trigger: {e}", exc_info=True) return self.fail_response(f"Error creating event trigger: {str(e)}") TriggerTool.create_event_trigger = create_event_trigger