suna/backend/agent/tools/agent_builder_tools/trigger_tool.py

381 lines
17 KiB
Python

import json
from typing import Optional, Dict, Any, List
from agentpress.tool import ToolResult, openapi_schema, xml_schema
from agentpress.thread_manager import ThreadManager
from .base_tool import AgentBuilderBaseTool
from utils.logger import logger
from datetime import datetime
from services.supabase import DBConnection
from triggers.core import TriggerManager
class TriggerTool(AgentBuilderBaseTool):
def __init__(self, thread_manager: ThreadManager, db_connection, agent_id: str):
super().__init__(thread_manager, db_connection, agent_id)
@openapi_schema({
"type": "function",
"function": {
"name": "create_scheduled_trigger",
"description": "Create a scheduled trigger for the agent to execute workflows or direct agent runs using cron expressions. This allows the agent to run automatically at specified times.",
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Name of the scheduled trigger. Should be descriptive of when/why it runs."
},
"description": {
"type": "string",
"description": "Description of what this trigger does and when it runs."
},
"cron_expression": {
"type": "string",
"description": "Cron expression defining when to run (e.g., '0 9 * * *' for daily at 9am, '*/30 * * * *' for every 30 minutes)"
},
"execution_type": {
"type": "string",
"enum": ["workflow", "agent"],
"description": "Whether to execute a workflow or run the agent directly",
"default": "agent"
},
"workflow_id": {
"type": "string",
"description": "ID of the workflow to execute (required if execution_type is 'workflow')"
},
"workflow_input": {
"type": "object",
"description": "Input data to pass to the workflow (optional, only for workflow execution)",
"additionalProperties": True
},
"agent_prompt": {
"type": "string",
"description": "Prompt to send to the agent when triggered (required if execution_type is 'agent')"
}
},
"required": ["name", "cron_expression", "execution_type"]
}
}
})
@xml_schema(
tag_name="create-scheduled-trigger",
mappings=[
{"param_name": "name", "node_type": "attribute", "path": ".", "required": True},
{"param_name": "description", "node_type": "element", "path": "description", "required": False},
{"param_name": "cron_expression", "node_type": "attribute", "path": ".", "required": True},
{"param_name": "execution_type", "node_type": "attribute", "path": ".", "required": True},
{"param_name": "workflow_id", "node_type": "element", "path": "workflow_id", "required": False},
{"param_name": "workflow_input", "node_type": "element", "path": "workflow_input", "required": False},
{"param_name": "agent_prompt", "node_type": "element", "path": "agent_prompt", "required": False}
],
example='''
<function_calls>
<invoke name="create_scheduled_trigger">
<parameter name="name">Daily Report Generation</parameter>
<parameter name="description">Generates daily reports every morning at 9 AM</parameter>
<parameter name="cron_expression">0 9 * * *</parameter>
<parameter name="execution_type">workflow</parameter>
<parameter name="workflow_id">workflow-123</parameter>
<parameter name="workflow_input">{"report_type": "daily", "include_charts": true}</parameter>
</invoke>
</function_calls>
'''
)
async def create_scheduled_trigger(
self,
name: str,
cron_expression: str,
execution_type: str = "agent",
description: Optional[str] = None,
workflow_id: Optional[str] = None,
workflow_input: Optional[Dict[str, Any]] = None,
agent_prompt: Optional[str] = None
) -> ToolResult:
try:
if execution_type not in ["workflow", "agent"]:
return self.fail_response("execution_type must be either 'workflow' or 'agent'")
if execution_type == "workflow" and not workflow_id:
return self.fail_response("workflow_id is required when execution_type is 'workflow'")
if execution_type == "agent" and not agent_prompt:
return self.fail_response("agent_prompt is required when execution_type is 'agent'")
if execution_type == "workflow":
client = await self.db.client
workflow_result = await client.table('agent_workflows').select('*').eq('id', workflow_id).eq('agent_id', self.agent_id).execute()
if not workflow_result.data:
return self.fail_response(f"Workflow {workflow_id} not found or doesn't belong to this agent")
workflow = workflow_result.data[0]
if workflow['status'] != 'active':
return self.fail_response(f"Workflow '{workflow['name']}' is not active. Please activate it first.")
trigger_config = {
"cron_expression": cron_expression,
"execution_type": execution_type,
"provider_id": "schedule"
}
if execution_type == "workflow":
trigger_config["workflow_id"] = workflow_id
if workflow_input:
trigger_config["workflow_input"] = workflow_input
else:
trigger_config["agent_prompt"] = agent_prompt
trigger_db = DBConnection()
trigger_manager = TriggerManager(trigger_db)
await trigger_manager.load_provider_definitions()
try:
trigger_config_obj = await trigger_manager.create_trigger(
agent_id=self.agent_id,
provider_id="schedule",
name=name,
config=trigger_config,
description=description
)
result_message = f"Scheduled trigger '{name}' created successfully!\n\n"
result_message += f"**Schedule**: {cron_expression}\n"
result_message += f"**Type**: {execution_type.capitalize()} execution\n"
if execution_type == "workflow":
result_message += f"**Workflow**: {workflow['name']}\n"
if workflow_input:
result_message += f"**Input Data**: {json.dumps(workflow_input, indent=2)}\n"
else:
result_message += f"**Prompt**: {agent_prompt}\n"
result_message += f"\nThe trigger is now active and will run according to the schedule."
return self.success_response({
"message": result_message,
"trigger": {
"id": trigger_config_obj.trigger_id,
"name": trigger_config_obj.name,
"description": trigger_config_obj.description,
"cron_expression": cron_expression,
"execution_type": execution_type,
"is_active": trigger_config_obj.is_active,
"created_at": trigger_config_obj.created_at.isoformat()
}
})
except ValueError as ve:
return self.fail_response(f"Validation error: {str(ve)}")
except Exception as e:
logger.error(f"Error creating trigger through manager: {str(e)}")
return self.fail_response(f"Failed to create trigger: {str(e)}")
except Exception as e:
logger.error(f"Error creating scheduled trigger: {str(e)}")
return self.fail_response(f"Error creating scheduled trigger: {str(e)}")
@openapi_schema({
"type": "function",
"function": {
"name": "get_scheduled_triggers",
"description": "Get all scheduled triggers for the current agent. Shows when the agent will run automatically.",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
})
@xml_schema(
tag_name="get-scheduled-triggers",
mappings=[],
example='''
<function_calls>
<invoke name="get_scheduled_triggers">
</invoke>
</function_calls>
'''
)
async def get_scheduled_triggers(self) -> ToolResult:
try:
from triggers.core import TriggerType
trigger_db = DBConnection()
trigger_manager = TriggerManager(trigger_db)
triggers = await trigger_manager.get_agent_triggers(self.agent_id)
schedule_triggers = [t for t in triggers if t.trigger_type == TriggerType.SCHEDULE]
if not schedule_triggers:
return self.success_response({
"message": "No scheduled triggers found for this agent.",
"triggers": []
})
client = await self.db.client
workflows = {}
for trigger in schedule_triggers:
if trigger.config.get("execution_type") == "workflow" and trigger.config.get("workflow_id"):
workflow_id = trigger.config["workflow_id"]
if workflow_id not in workflows:
workflow_result = await client.table('agent_workflows').select('name').eq('id', workflow_id).execute()
if workflow_result.data:
workflows[workflow_id] = workflow_result.data[0]['name']
formatted_triggers = []
for trigger in schedule_triggers:
formatted = {
"id": trigger.trigger_id,
"name": trigger.name,
"description": trigger.description,
"cron_expression": trigger.config.get("cron_expression"),
"execution_type": trigger.config.get("execution_type", "agent"),
"is_active": trigger.is_active,
"created_at": trigger.created_at.isoformat()
}
if trigger.config.get("execution_type") == "workflow":
workflow_id = trigger.config.get("workflow_id")
formatted["workflow_name"] = workflows.get(workflow_id, "Unknown Workflow")
formatted["workflow_input"] = trigger.config.get("workflow_input")
else:
formatted["agent_prompt"] = trigger.config.get("agent_prompt")
formatted_triggers.append(formatted)
return self.success_response({
"message": f"Found {len(formatted_triggers)} scheduled trigger(s)",
"triggers": formatted_triggers
})
except Exception as e:
logger.error(f"Error getting scheduled triggers: {str(e)}")
return self.fail_response(f"Error getting scheduled triggers: {str(e)}")
@openapi_schema({
"type": "function",
"function": {
"name": "delete_scheduled_trigger",
"description": "Delete a scheduled trigger. The agent will no longer run automatically at the scheduled time.",
"parameters": {
"type": "object",
"properties": {
"trigger_id": {
"type": "string",
"description": "ID of the trigger to delete"
}
},
"required": ["trigger_id"]
}
}
})
@xml_schema(
tag_name="delete-scheduled-trigger",
mappings=[
{"param_name": "trigger_id", "node_type": "attribute", "path": ".", "required": True}
],
example='''
<function_calls>
<invoke name="delete_scheduled_trigger">
<parameter name="trigger_id">trigger-123</parameter>
</invoke>
</function_calls>
'''
)
async def delete_scheduled_trigger(self, trigger_id: str) -> ToolResult:
try:
trigger_db = DBConnection()
trigger_manager = TriggerManager(trigger_db)
trigger_config = await trigger_manager.get_trigger(trigger_id)
if not trigger_config:
return self.fail_response("Trigger not found")
if trigger_config.agent_id != self.agent_id:
return self.fail_response("This trigger doesn't belong to the current agent")
success = await trigger_manager.delete_trigger(trigger_id)
if success:
return self.success_response({
"message": f"Scheduled trigger '{trigger_config.name}' deleted successfully",
"trigger_id": trigger_id
})
else:
return self.fail_response("Failed to delete trigger")
except Exception as e:
logger.error(f"Error deleting scheduled trigger: {str(e)}")
return self.fail_response(f"Error deleting scheduled trigger: {str(e)}")
@openapi_schema({
"type": "function",
"function": {
"name": "toggle_scheduled_trigger",
"description": "Enable or disable a scheduled trigger. Disabled triggers won't run until re-enabled.",
"parameters": {
"type": "object",
"properties": {
"trigger_id": {
"type": "string",
"description": "ID of the trigger to toggle"
},
"is_active": {
"type": "boolean",
"description": "Whether to enable (true) or disable (false) the trigger"
}
},
"required": ["trigger_id", "is_active"]
}
}
})
@xml_schema(
tag_name="toggle-scheduled-trigger",
mappings=[
{"param_name": "trigger_id", "node_type": "attribute", "path": ".", "required": True},
{"param_name": "is_active", "node_type": "attribute", "path": ".", "required": True}
],
example='''
<function_calls>
<invoke name="toggle_scheduled_trigger">
<parameter name="trigger_id">trigger-123</parameter>
<parameter name="is_active">false</parameter>
</invoke>
</function_calls>
'''
)
async def toggle_scheduled_trigger(self, trigger_id: str, is_active: bool) -> ToolResult:
try:
trigger_db = DBConnection()
trigger_manager = TriggerManager(trigger_db)
trigger_config = await trigger_manager.get_trigger(trigger_id)
if not trigger_config:
return self.fail_response("Trigger not found")
if trigger_config.agent_id != self.agent_id:
return self.fail_response("This trigger doesn't belong to the current agent")
updated_config = await trigger_manager.update_trigger(
trigger_id=trigger_id,
is_active=is_active
)
if updated_config:
status = "enabled" if is_active else "disabled"
return self.success_response({
"message": f"Scheduled trigger '{updated_config.name}' has been {status}",
"trigger": {
"id": updated_config.trigger_id,
"name": updated_config.name,
"is_active": updated_config.is_active
}
})
else:
return self.fail_response("Failed to update trigger")
except Exception as e:
logger.error(f"Error toggling scheduled trigger: {str(e)}")
return self.fail_response(f"Error toggling scheduled trigger: {str(e)}")