2025-07-12 04:42:23 +08:00
import json
from typing import Optional , Dict , Any , List
2025-07-31 04:12:11 +08:00
from agentpress . tool import ToolResult , openapi_schema , usage_example
2025-07-12 04:42:23 +08:00
from agentpress . thread_manager import ThreadManager
from . base_tool import AgentBuilderBaseTool
from utils . logger import logger
2025-08-17 05:09:43 +08:00
from utils . config import config , EnvMode
2025-07-12 04:42:23 +08:00
from datetime import datetime
from services . supabase import DBConnection
2025-07-28 20:35:59 +08:00
from triggers import get_trigger_service
2025-08-15 16:22:37 +08:00
import os
import httpx
from composio_integration . composio_profile_service import ComposioProfileService
from composio_integration . composio_trigger_service import ComposioTriggerService
2025-07-12 04:42:23 +08:00
class TriggerTool ( AgentBuilderBaseTool ) :
def __init__ ( self , thread_manager : ThreadManager , db_connection , agent_id : str ) :
super ( ) . __init__ ( thread_manager , db_connection , agent_id )
2025-08-17 05:09:43 +08:00
# ===== SCHEDULED TRIGGERS =====
2025-08-15 16:22:37 +08:00
2025-07-12 04:42:23 +08:00
@openapi_schema ( {
" type " : " function " ,
" function " : {
" name " : " create_scheduled_trigger " ,
" description " : " Create a scheduled trigger for the agent to execute workflows or direct agent runs using cron expressions. This allows the agent to run automatically at specified times. " ,
" parameters " : {
" type " : " object " ,
" properties " : {
" name " : {
" type " : " string " ,
" description " : " Name of the scheduled trigger. Should be descriptive of when/why it runs. "
} ,
" description " : {
" type " : " string " ,
" description " : " Description of what this trigger does and when it runs. "
} ,
" cron_expression " : {
" type " : " string " ,
" description " : " Cron expression defining when to run (e.g., ' 0 9 * * * ' for daily at 9am, ' */30 * * * * ' for every 30 minutes) "
} ,
" execution_type " : {
" type " : " string " ,
" enum " : [ " workflow " , " agent " ] ,
" description " : " Whether to execute a workflow or run the agent directly " ,
" default " : " agent "
} ,
" workflow_id " : {
" type " : " string " ,
" description " : " ID of the workflow to execute (required if execution_type is ' workflow ' ) "
} ,
" workflow_input " : {
" type " : " object " ,
" description " : " Input data to pass to the workflow (optional, only for workflow execution) " ,
" additionalProperties " : True
} ,
" agent_prompt " : {
" type " : " string " ,
" description " : " Prompt to send to the agent when triggered (required if execution_type is ' agent ' ) "
}
} ,
" required " : [ " name " , " cron_expression " , " execution_type " ]
}
}
} )
2025-07-31 04:12:11 +08:00
@usage_example ( '''
2025-07-12 04:42:23 +08:00
< function_calls >
< invoke name = " create_scheduled_trigger " >
< parameter name = " name " > Daily Report Generation < / parameter >
< parameter name = " description " > Generates daily reports every morning at 9 AM < / parameter >
< parameter name = " cron_expression " > 0 9 * * * < / parameter >
< parameter name = " execution_type " > workflow < / parameter >
< parameter name = " workflow_id " > workflow - 123 < / parameter >
< parameter name = " workflow_input " > { " report_type " : " daily " , " include_charts " : true } < / parameter >
< / invoke >
< / function_calls >
2025-07-31 04:12:11 +08:00
''' )
2025-07-12 04:42:23 +08:00
async def create_scheduled_trigger (
self ,
name : str ,
cron_expression : str ,
execution_type : str = " agent " ,
description : Optional [ str ] = None ,
workflow_id : Optional [ str ] = None ,
workflow_input : Optional [ Dict [ str , Any ] ] = None ,
agent_prompt : Optional [ str ] = None
) - > ToolResult :
try :
if execution_type not in [ " workflow " , " agent " ] :
return self . fail_response ( " execution_type must be either ' workflow ' or ' agent ' " )
if execution_type == " workflow " and not workflow_id :
return self . fail_response ( " workflow_id is required when execution_type is ' workflow ' " )
if execution_type == " agent " and not agent_prompt :
return self . fail_response ( " agent_prompt is required when execution_type is ' agent ' " )
if execution_type == " workflow " :
client = await self . db . client
workflow_result = await client . table ( ' agent_workflows ' ) . select ( ' * ' ) . eq ( ' id ' , workflow_id ) . eq ( ' agent_id ' , self . agent_id ) . execute ( )
if not workflow_result . data :
return self . fail_response ( f " Workflow { workflow_id } not found or doesn ' t belong to this agent " )
workflow = workflow_result . data [ 0 ]
if workflow [ ' status ' ] != ' active ' :
return self . fail_response ( f " Workflow ' { workflow [ ' name ' ] } ' is not active. Please activate it first. " )
trigger_config = {
" cron_expression " : cron_expression ,
" execution_type " : execution_type ,
" provider_id " : " schedule "
}
if execution_type == " workflow " :
trigger_config [ " workflow_id " ] = workflow_id
if workflow_input :
trigger_config [ " workflow_input " ] = workflow_input
else :
trigger_config [ " agent_prompt " ] = agent_prompt
2025-07-28 20:35:59 +08:00
trigger_svc = get_trigger_service ( self . db )
2025-07-12 04:42:23 +08:00
try :
2025-07-20 22:46:59 +08:00
trigger = await trigger_svc . create_trigger (
2025-07-12 04:42:23 +08:00
agent_id = self . agent_id ,
provider_id = " schedule " ,
name = name ,
config = trigger_config ,
description = description
)
result_message = f " Scheduled trigger ' { name } ' created successfully! \n \n "
result_message + = f " **Schedule**: { cron_expression } \n "
result_message + = f " **Type**: { execution_type . capitalize ( ) } execution \n "
if execution_type == " workflow " :
result_message + = f " **Workflow**: { workflow [ ' name ' ] } \n "
if workflow_input :
result_message + = f " **Input Data**: { json . dumps ( workflow_input , indent = 2 ) } \n "
else :
result_message + = f " **Prompt**: { agent_prompt } \n "
result_message + = f " \n The trigger is now active and will run according to the schedule. "
return self . success_response ( {
" message " : result_message ,
" trigger " : {
2025-07-20 22:46:59 +08:00
" id " : trigger . trigger_id ,
2025-07-28 20:35:59 +08:00
" name " : trigger . name ,
" description " : trigger . description ,
2025-07-12 04:42:23 +08:00
" cron_expression " : cron_expression ,
" execution_type " : execution_type ,
2025-07-28 20:35:59 +08:00
" is_active " : trigger . is_active ,
" created_at " : trigger . created_at . isoformat ( )
2025-07-12 04:42:23 +08:00
}
} )
except ValueError as ve :
return self . fail_response ( f " Validation error: { str ( ve ) } " )
except Exception as e :
logger . error ( f " Error creating trigger through manager: { str ( e ) } " )
return self . fail_response ( f " Failed to create trigger: { str ( e ) } " )
except Exception as e :
logger . error ( f " Error creating scheduled trigger: { str ( e ) } " )
return self . fail_response ( f " Error creating scheduled trigger: { str ( e ) } " )
@openapi_schema ( {
" type " : " function " ,
" function " : {
" name " : " get_scheduled_triggers " ,
" description " : " Get all scheduled triggers for the current agent. Shows when the agent will run automatically. " ,
" parameters " : {
" type " : " object " ,
" properties " : { } ,
" required " : [ ]
}
}
} )
2025-07-31 04:12:11 +08:00
@usage_example ( '''
2025-07-12 04:42:23 +08:00
< function_calls >
< invoke name = " get_scheduled_triggers " >
< / invoke >
< / function_calls >
2025-07-31 04:12:11 +08:00
''' )
2025-07-12 04:42:23 +08:00
async def get_scheduled_triggers ( self ) - > ToolResult :
try :
2025-07-28 20:35:59 +08:00
from triggers import TriggerType
2025-07-12 04:42:23 +08:00
2025-07-28 20:35:59 +08:00
trigger_svc = get_trigger_service ( self . db )
2025-07-12 04:42:23 +08:00
2025-07-28 20:35:59 +08:00
triggers = await trigger_svc . get_agent_triggers ( self . agent_id )
2025-07-12 04:42:23 +08:00
schedule_triggers = [ t for t in triggers if t . trigger_type == TriggerType . SCHEDULE ]
if not schedule_triggers :
return self . success_response ( {
" message " : " No scheduled triggers found for this agent. " ,
" triggers " : [ ]
} )
client = await self . db . client
workflows = { }
for trigger in schedule_triggers :
if trigger . config . get ( " execution_type " ) == " workflow " and trigger . config . get ( " workflow_id " ) :
workflow_id = trigger . config [ " workflow_id " ]
if workflow_id not in workflows :
workflow_result = await client . table ( ' agent_workflows ' ) . select ( ' name ' ) . eq ( ' id ' , workflow_id ) . execute ( )
if workflow_result . data :
workflows [ workflow_id ] = workflow_result . data [ 0 ] [ ' name ' ]
formatted_triggers = [ ]
for trigger in schedule_triggers :
formatted = {
" id " : trigger . trigger_id ,
" name " : trigger . name ,
" description " : trigger . description ,
" cron_expression " : trigger . config . get ( " cron_expression " ) ,
" execution_type " : trigger . config . get ( " execution_type " , " agent " ) ,
" is_active " : trigger . is_active ,
" created_at " : trigger . created_at . isoformat ( )
}
if trigger . config . get ( " execution_type " ) == " workflow " :
workflow_id = trigger . config . get ( " workflow_id " )
formatted [ " workflow_name " ] = workflows . get ( workflow_id , " Unknown Workflow " )
formatted [ " workflow_input " ] = trigger . config . get ( " workflow_input " )
else :
formatted [ " agent_prompt " ] = trigger . config . get ( " agent_prompt " )
formatted_triggers . append ( formatted )
return self . success_response ( {
" message " : f " Found { len ( formatted_triggers ) } scheduled trigger(s) " ,
" triggers " : formatted_triggers
} )
except Exception as e :
logger . error ( f " Error getting scheduled triggers: { str ( e ) } " )
return self . fail_response ( f " Error getting scheduled triggers: { str ( e ) } " )
@openapi_schema ( {
" type " : " function " ,
" function " : {
" name " : " delete_scheduled_trigger " ,
" description " : " Delete a scheduled trigger. The agent will no longer run automatically at the scheduled time. " ,
" parameters " : {
" type " : " object " ,
" properties " : {
" trigger_id " : {
" type " : " string " ,
" description " : " ID of the trigger to delete "
}
} ,
" required " : [ " trigger_id " ]
}
}
} )
2025-07-31 04:12:11 +08:00
@usage_example ( '''
2025-07-12 04:42:23 +08:00
< function_calls >
< invoke name = " delete_scheduled_trigger " >
< parameter name = " trigger_id " > trigger - 123 < / parameter >
< / invoke >
< / function_calls >
2025-07-31 04:12:11 +08:00
''' )
2025-07-12 04:42:23 +08:00
async def delete_scheduled_trigger ( self , trigger_id : str ) - > ToolResult :
try :
2025-07-28 20:35:59 +08:00
trigger_svc = get_trigger_service ( self . db )
2025-07-12 04:42:23 +08:00
2025-07-28 20:35:59 +08:00
trigger_config = await trigger_svc . get_trigger ( trigger_id )
2025-07-12 04:42:23 +08:00
if not trigger_config :
return self . fail_response ( " Trigger not found " )
if trigger_config . agent_id != self . agent_id :
return self . fail_response ( " This trigger doesn ' t belong to the current agent " )
2025-07-28 20:35:59 +08:00
success = await trigger_svc . delete_trigger ( trigger_id )
2025-07-12 04:42:23 +08:00
if success :
return self . success_response ( {
" message " : f " Scheduled trigger ' { trigger_config . name } ' deleted successfully " ,
" trigger_id " : trigger_id
} )
else :
return self . fail_response ( " Failed to delete trigger " )
except Exception as e :
logger . error ( f " Error deleting scheduled trigger: { str ( e ) } " )
return self . fail_response ( f " Error deleting scheduled trigger: { str ( e ) } " )
@openapi_schema ( {
" type " : " function " ,
" function " : {
" name " : " toggle_scheduled_trigger " ,
" description " : " Enable or disable a scheduled trigger. Disabled triggers won ' t run until re-enabled. " ,
" parameters " : {
" type " : " object " ,
" properties " : {
" trigger_id " : {
" type " : " string " ,
" description " : " ID of the trigger to toggle "
} ,
" is_active " : {
" type " : " boolean " ,
" description " : " Whether to enable (true) or disable (false) the trigger "
}
} ,
" required " : [ " trigger_id " , " is_active " ]
}
}
} )
2025-07-31 04:12:11 +08:00
@usage_example ( '''
2025-07-12 04:42:23 +08:00
< function_calls >
< invoke name = " toggle_scheduled_trigger " >
< parameter name = " trigger_id " > trigger - 123 < / parameter >
< parameter name = " is_active " > false < / parameter >
< / invoke >
< / function_calls >
2025-07-31 04:12:11 +08:00
''' )
2025-07-12 04:42:23 +08:00
async def toggle_scheduled_trigger ( self , trigger_id : str , is_active : bool ) - > ToolResult :
try :
2025-07-28 20:35:59 +08:00
trigger_svc = get_trigger_service ( self . db )
2025-07-12 04:42:23 +08:00
2025-07-28 20:35:59 +08:00
trigger_config = await trigger_svc . get_trigger ( trigger_id )
2025-07-12 04:42:23 +08:00
if not trigger_config :
return self . fail_response ( " Trigger not found " )
if trigger_config . agent_id != self . agent_id :
return self . fail_response ( " This trigger doesn ' t belong to the current agent " )
2025-07-28 20:35:59 +08:00
updated_config = await trigger_svc . update_trigger (
2025-07-12 04:42:23 +08:00
trigger_id = trigger_id ,
is_active = is_active
)
if updated_config :
status = " enabled " if is_active else " disabled "
return self . success_response ( {
" message " : f " Scheduled trigger ' { updated_config . name } ' has been { status } " ,
" trigger " : {
" id " : updated_config . trigger_id ,
" name " : updated_config . name ,
" is_active " : updated_config . is_active
}
} )
else :
return self . fail_response ( " Failed to update trigger " )
except Exception as e :
logger . error ( f " Error toggling scheduled trigger: { str ( e ) } " )
return self . fail_response ( f " Error toggling scheduled trigger: { str ( e ) } " )
2025-08-15 16:22:37 +08:00
2025-08-17 05:09:43 +08:00
# ===== EVENT-BASED TRIGGERS (Non-Production Only) =====
# Event trigger methods - only available in non-production environments
if config . ENV_MODE != EnvMode . PRODUCTION :
@openapi_schema ( {
" type " : " function " ,
" function " : {
" name " : " list_event_trigger_apps " ,
" description " : " List apps (toolkits) that have available event-based triggers via Composio. Returns slug, name, and logo. " ,
" parameters " : {
" type " : " object " ,
" properties " : { } ,
" required " : [ ]
}
}
} )
@usage_example ( '''
< function_calls >
< invoke name = " list_event_trigger_apps " > < / invoke >
< / function_calls >
''' )
async def list_event_trigger_apps ( self ) - > ToolResult :
try :
trigger_service = ComposioTriggerService ( )
response = await trigger_service . list_apps_with_triggers ( )
# Return exact same format as API
return self . success_response ( {
" message " : f " Found { response [ ' total ' ] } apps with triggers " ,
" items " : response [ " items " ] ,
" total " : response [ " total " ]
} )
except Exception as e :
logger . error ( f " Error listing event trigger apps: { e } " )
return self . fail_response ( f " Error listing apps: { str ( e ) } " )
TriggerTool . list_event_trigger_apps = list_event_trigger_apps
@openapi_schema ( {
" type " : " function " ,
" function " : {
" name " : " list_app_event_triggers " ,
" description " : " List available triggers for a given app/toolkit slug. Includes slug, name, description, type, instructions, config, and payload schema. " ,
" parameters " : {
" type " : " object " ,
" properties " : {
" toolkit_slug " : {
" type " : " string " ,
" description " : " Toolkit slug, e.g. ' gmail ' "
}
} ,
" required " : [ " toolkit_slug " ]
}
}
} )
@usage_example ( '''
< function_calls >
< invoke name = " list_app_event_triggers " >
< parameter name = " toolkit_slug " > gmail < / parameter >
< / invoke >
< / function_calls >
''' )
async def list_app_event_triggers ( self , toolkit_slug : str ) - > ToolResult :
try :
trigger_service = ComposioTriggerService ( )
response = await trigger_service . list_triggers_for_app ( toolkit_slug )
# Return exact same format as API
return self . success_response ( {
" message " : f " Found { response [ ' total ' ] } triggers for { toolkit_slug } " ,
" items " : response [ " items " ] ,
" toolkit " : response [ " toolkit " ] ,
" total " : response [ " total " ]
} )
except Exception as e :
logger . error ( f " Error listing triggers for app { toolkit_slug } : { e } " )
return self . fail_response ( f " Error listing triggers: { str ( e ) } " )
TriggerTool . list_app_event_triggers = list_app_event_triggers
@openapi_schema ( {
" type " : " function " ,
" function " : {
" name " : " list_event_profiles " ,
" description " : " List connected Composio profiles for a toolkit. Use this to get profile_id and connected_account_id before creating a trigger. " ,
" parameters " : {
" type " : " object " ,
" properties " : {
" toolkit_slug " : {
" type " : " string " ,
" description " : " Toolkit slug, e.g. ' gmail ' "
}
} ,
" required " : [ " toolkit_slug " ]
}
}
} )
@usage_example ( '''
< function_calls >
< invoke name = " list_event_profiles " >
< parameter name = " toolkit_slug " > gmail < / parameter >
< / invoke >
< / function_calls >
''' )
async def list_event_profiles ( self , toolkit_slug : str ) - > ToolResult :
try :
client = await self . db . client
agent_rows = await client . table ( ' agents ' ) . select ( ' account_id ' ) . eq ( ' agent_id ' , self . agent_id ) . execute ( )
if not agent_rows . data :
return self . fail_response ( " Agent not found " )
account_id = agent_rows . data [ 0 ] [ ' account_id ' ]
profile_service = ComposioProfileService ( self . db )
profiles = await profile_service . get_profiles ( account_id , toolkit_slug )
items = [ ]
for p in profiles :
items . append ( {
" profile_id " : p . profile_id ,
" display_name " : p . display_name ,
" is_connected " : p . is_connected ,
" connected_account_id " : getattr ( p , ' connected_account_id ' , None )
} )
return self . success_response ( {
" message " : f " Found { len ( items ) } profile(s) for { toolkit_slug } " ,
" items " : items ,
" total " : len ( items )
} )
except Exception as e :
logger . error ( f " Error listing event profiles: { e } " )
return self . fail_response ( f " Error listing profiles: { str ( e ) } " )
TriggerTool . list_event_profiles = list_event_profiles
2025-08-15 16:22:37 +08:00
@openapi_schema ( {
" type " : " function " ,
" function " : {
" name " : " create_event_trigger " ,
" description " : " Create a Composio event-based trigger for this agent. First list apps and triggers, then pass the chosen trigger slug, profile_id, and trigger_config. Optionally route to a workflow. " ,
" parameters " : {
" type " : " object " ,
" properties " : {
" slug " : { " type " : " string " , " description " : " Trigger type slug, e.g. ' GMAIL_NEW_GMAIL_MESSAGE ' " } ,
" profile_id " : { " type " : " string " , " description " : " Composio profile_id to use (must be connected) " } ,
" trigger_config " : { " type " : " object " , " description " : " Trigger configuration object per trigger schema " , " additionalProperties " : True } ,
" route " : { " type " : " string " , " enum " : [ " agent " , " workflow " ] , " default " : " agent " , " description " : " Execute agent directly or run a workflow " } ,
" name " : { " type " : " string " , " description " : " Optional friendly name for the trigger " } ,
" agent_prompt " : { " type " : " string " , " description " : " Prompt to pass to the agent when route is ' agent ' " } ,
" workflow_id " : { " type " : " string " , " description " : " Workflow ID when route is ' workflow ' " } ,
" workflow_input " : { " type " : " object " , " description " : " Workflow input variables when route is ' workflow ' " , " additionalProperties " : True } ,
" connected_account_id " : { " type " : " string " , " description " : " Connected account id; if omitted we try to derive from profile " }
} ,
" required " : [ " slug " , " profile_id " ]
}
}
} )
@usage_example ( '''
< function_calls >
< invoke name = " list_event_trigger_apps " > < / invoke >
< invoke name = " list_app_event_triggers " > < parameter name = " toolkit_slug " > gmail < / parameter > < / invoke >
< invoke name = " list_event_profiles " > < parameter name = " toolkit_slug " > gmail < / parameter > < / invoke >
< invoke name = " create_event_trigger " >
< parameter name = " slug " > GMAIL_NEW_GMAIL_MESSAGE < / parameter >
< parameter name = " profile_id " > profile_123 < / parameter >
< parameter name = " trigger_config " > { " interval " : 1 , " userId " : " me " , " labelIds " : " INBOX " } < / parameter >
< parameter name = " route " > agent < / parameter >
< parameter name = " agent_prompt " > Read this < / parameter >
< / invoke >
< / function_calls >
''' )
async def create_event_trigger (
self ,
slug : str ,
profile_id : str ,
trigger_config : Optional [ Dict [ str , Any ] ] = None ,
route : str = " agent " ,
name : Optional [ str ] = None ,
agent_prompt : Optional [ str ] = None ,
workflow_id : Optional [ str ] = None ,
workflow_input : Optional [ Dict [ str , Any ] ] = None ,
connected_account_id : Optional [ str ] = None
) - > ToolResult :
try :
if route not in ( " agent " , " workflow " ) :
return self . fail_response ( " route must be either ' agent ' or ' workflow ' " )
if route == " workflow " and not workflow_id :
return self . fail_response ( " workflow_id is required when route is ' workflow ' " )
if route == " agent " and not agent_prompt :
return self . fail_response ( " agent_prompt is required when route is ' agent ' " )
# Resolve composio user id and connected account id from profile
profile_service = ComposioProfileService ( self . db )
profile_cfg = await profile_service . get_profile_config ( profile_id )
composio_user_id = profile_cfg . get ( " user_id " )
if not composio_user_id :
return self . fail_response ( " Composio profile is missing user_id " )
if not connected_account_id :
connected_account_id = profile_cfg . get ( " connected_account_id " )
api_base = os . getenv ( " COMPOSIO_API_BASE " , " https://backend.composio.dev " ) . rstrip ( " / " )
api_key = os . getenv ( " COMPOSIO_API_KEY " )
if not api_key :
return self . fail_response ( " COMPOSIO_API_KEY not configured " )
headers = { " x-api-key " : api_key , " Content-Type " : " application/json " }
# Coerce config types per trigger schema
coerced_config = dict ( trigger_config or { } )
try :
type_url = f " { api_base } /api/v3/triggers_types/ { slug } "
async with httpx . AsyncClient ( timeout = 10 ) as http_client :
tr = await http_client . get ( type_url , headers = headers )
if tr . status_code == 200 :
tdata = tr . json ( )
schema = tdata . get ( " config " ) or { }
props = schema . get ( " properties " ) or { }
for key , prop in props . items ( ) :
if key not in coerced_config :
continue
val = coerced_config [ key ]
ptype = prop . get ( " type " ) if isinstance ( prop , dict ) else None
try :
if ptype == " array " :
if isinstance ( val , str ) :
coerced_config [ key ] = [ val ]
elif ptype == " integer " :
if isinstance ( val , str ) and val . isdigit ( ) :
coerced_config [ key ] = int ( val )
elif ptype == " number " :
if isinstance ( val , str ) :
coerced_config [ key ] = float ( val )
elif ptype == " boolean " :
if isinstance ( val , str ) :
coerced_config [ key ] = val . lower ( ) in ( " true " , " 1 " , " yes " )
elif ptype == " string " :
if isinstance ( val , ( list , tuple ) ) :
coerced_config [ key ] = " , " . join ( str ( x ) for x in val )
elif not isinstance ( val , str ) :
coerced_config [ key ] = str ( val )
except Exception :
pass
except Exception :
pass
# Upsert trigger instance with webhook
base_url = os . getenv ( " WEBHOOK_BASE_URL " , " http://localhost:8000 " ) . rstrip ( " / " )
secret = os . getenv ( " COMPOSIO_WEBHOOK_SECRET " , " " )
webhook_headers : Dict [ str , str ] = { " X-Composio-Secret " : secret } if secret else { }
vercel_bypass = os . getenv ( " VERCEL_PROTECTION_BYPASS_KEY " , " " )
if vercel_bypass :
webhook_headers [ " X-Vercel-Protection-Bypass " ] = vercel_bypass
body : Dict [ str , Any ] = {
" user_id " : composio_user_id ,
" userId " : composio_user_id ,
" trigger_config " : coerced_config ,
" triggerConfig " : coerced_config ,
" webhook " : {
" url " : f " { base_url } /api/composio/webhook " ,
" headers " : webhook_headers ,
" method " : " POST " ,
} ,
}
if connected_account_id :
body [ " connectedAccountId " ] = connected_account_id
body [ " connected_account_id " ] = connected_account_id
body [ " connectedAccountIds " ] = [ connected_account_id ]
body [ " connected_account_ids " ] = [ connected_account_id ]
upsert_url = f " { api_base } /api/v3/trigger_instances/ { slug } /upsert "
async with httpx . AsyncClient ( timeout = 20 ) as http_client :
resp = await http_client . post ( upsert_url , headers = headers , json = body )
try :
resp . raise_for_status ( )
except httpx . HTTPStatusError :
ct = resp . headers . get ( " content-type " , " " )
detail = resp . json ( ) if " application/json " in ct else resp . text
return self . fail_response ( f " Composio upsert error: { detail } " )
created = resp . json ( )
def _extract_id ( obj : Dict [ str , Any ] ) - > Optional [ str ] :
if not isinstance ( obj , dict ) :
return None
cand = (
obj . get ( " id " )
or obj . get ( " trigger_id " )
or obj . get ( " triggerId " )
or obj . get ( " nano_id " )
or obj . get ( " nanoId " )
or obj . get ( " triggerNanoId " )
)
if cand :
return cand
for k in ( " trigger " , " trigger_instance " , " triggerInstance " , " data " , " result " ) :
nested = obj . get ( k )
if isinstance ( nested , dict ) :
nid = _extract_id ( nested )
if nid :
return nid
if isinstance ( nested , list ) and nested :
nid = _extract_id ( nested [ 0 ] if isinstance ( nested [ 0 ] , dict ) else { } )
if nid :
return nid
return None
composio_trigger_id = _extract_id ( created ) if isinstance ( created , dict ) else None
if not composio_trigger_id :
# fallback to list active
try :
params_lookup : Dict [ str , Any ] = { " limit " : 50 , " slug " : slug , " userId " : composio_user_id }
if connected_account_id :
params_lookup [ " connectedAccountId " ] = connected_account_id
list_url = f " { api_base } /api/v3/trigger_instances/active "
async with httpx . AsyncClient ( timeout = 15 ) as http_client :
lr = await http_client . get ( list_url , headers = headers , params = params_lookup )
if lr . status_code == 200 :
ldata = lr . json ( )
items = ldata . get ( " items " ) if isinstance ( ldata , dict ) else ( ldata if isinstance ( ldata , list ) else [ ] )
if items :
composio_trigger_id = _extract_id ( items [ 0 ] if isinstance ( items [ 0 ] , dict ) else { } )
except Exception :
pass
if not composio_trigger_id :
return self . fail_response ( " Failed to get Composio trigger id from response " )
# Build Suna trigger and save
suna_config : Dict [ str , Any ] = {
" composio_trigger_id " : composio_trigger_id ,
" trigger_slug " : slug ,
" execution_type " : route ,
" profile_id " : profile_id ,
}
if route == " agent " :
if agent_prompt :
suna_config [ " agent_prompt " ] = agent_prompt
else :
suna_config [ " workflow_id " ] = workflow_id
if workflow_input :
suna_config [ " workflow_input " ] = workflow_input
trigger_svc = get_trigger_service ( self . db )
trigger = await trigger_svc . create_trigger (
agent_id = self . agent_id ,
provider_id = " composio " ,
name = name or slug ,
config = suna_config ,
description = f " Composio event: { slug } "
)
message = f " Event trigger ' { trigger . name } ' created successfully. \n "
message + = f " Route: { route } . "
if route == " workflow " :
message + = f " Workflow: { workflow_id } . "
else :
message + = " Agent execution configured. "
return self . success_response ( {
" message " : message ,
" trigger " : {
" id " : trigger . trigger_id ,
" agent_id " : trigger . agent_id ,
" provider " : " composio " ,
" slug " : slug ,
" config " : trigger . config ,
" is_active " : trigger . is_active ,
" created_at " : trigger . created_at . isoformat ( )
}
} )
except Exception as e :
logger . error ( f " Error creating event trigger: { e } " , exc_info = True )
return self . fail_response ( f " Error creating event trigger: { str ( e ) } " )
2025-08-17 05:09:43 +08:00
TriggerTool . create_event_trigger = create_event_trigger