fix: composio handle multiple triggers on our side

This commit is contained in:
Vukasin 2025-08-21 22:45:57 +02:00
parent cd3ebb5e78
commit e0ad5cf2cd
3 changed files with 121 additions and 86 deletions

View File

@ -692,20 +692,10 @@ async def create_composio_trigger(req: CreateComposioTriggerRequest, current_use
body = {
"user_id": composio_user_id,
"userId": composio_user_id,
"trigger_config": coerced_config,
"triggerConfig": coerced_config,
"webhook": {
"url": req.webhook_url or f"{base_url}/api/composio/webhook",
"headers": webhook_headers,
"method": "POST",
},
}
if req.connected_account_id:
body["connectedAccountId"] = req.connected_account_id
body["connected_account_id"] = req.connected_account_id
body["connectedAccountIds"] = [req.connected_account_id]
body["connected_account_ids"] = [req.connected_account_id]
async with httpx.AsyncClient(timeout=20) as http_client:
resp = await http_client.post(url, headers=headers, json=body)
@ -838,6 +828,8 @@ async def composio_webhook(request: Request):
logger.info("Composio webhook body read failed", error=str(e))
body_str = ""
# Get webhook ID early for logging
wid = request.headers.get("webhook-id", "")
# Minimal request diagnostics (no secrets)
try:
@ -859,15 +851,6 @@ async def composio_webhook(request: Request):
}
except Exception:
payload_preview = {"keys": []}
logger.debug(
"Composio webhook incoming",
client_ip=client_ip,
header_names=header_names,
has_authorization=has_auth,
has_x_composio_secret=has_x_secret,
has_x_trigger_secret=has_x_trigger,
payload_meta=payload_preview,
)
except Exception:
pass
@ -882,10 +865,10 @@ async def composio_webhook(request: Request):
# Parse payload for processing
try:
payload = json.loads(body_str) if body_str else {}
except Exception:
except Exception as parse_error:
logger.error(f"Failed to parse webhook payload: {parse_error}", payload_raw=body_str)
payload = {}
wid = request.headers.get("webhook-id", "")
# Look for trigger_nano_id in data.trigger_nano_id (the actual Composio trigger instance ID)
composio_trigger_id = (
(payload.get("data", {}) or {}).get("trigger_nano_id")
@ -906,11 +889,11 @@ async def composio_webhook(request: Request):
# Basic parsed-field logging (no secrets)
try:
logger.debug(
logger.info(
"Composio parsed fields",
webhook_id=wid,
trigger_slug=trigger_slug,
payload_id=composio_trigger_id,
composio_trigger_id=composio_trigger_id,
provider_event_id=provider_event_id,
payload_keys=list(payload.keys()) if isinstance(payload, dict) else [],
)
@ -933,14 +916,6 @@ async def composio_webhook(request: Request):
rows = []
matched = []
try:
logger.debug(
"Composio matching begin",
have_id=bool(composio_trigger_id),
payload_id=composio_trigger_id,
)
except Exception:
pass
for row in rows:
cfg = row.get("config") or {}
@ -948,43 +923,31 @@ async def composio_webhook(request: Request):
continue
prov = cfg.get("provider_id") or row.get("provider_id")
if prov != "composio":
try:
logger.debug("Composio skip non-provider", trigger_id=row.get("trigger_id"), provider_id=prov)
except Exception:
pass
logger.debug("Composio skip non-provider", trigger_id=row.get("trigger_id"), provider_id=prov)
continue
# ONLY match by exact composio_trigger_id - no slug fallback
cfg_tid = cfg.get("composio_trigger_id")
if composio_trigger_id and cfg_tid == composio_trigger_id:
logger.debug(
logger.info(
"Composio EXACT ID MATCH",
trigger_id=row.get("trigger_id"),
cfg_id=cfg_tid,
payload_id=composio_trigger_id
cfg_composio_trigger_id=cfg_tid,
payload_composio_trigger_id=composio_trigger_id,
is_active=row.get("is_active")
)
matched.append(row)
continue
else:
logger.debug(
logger.info(
"Composio ID mismatch",
trigger_id=row.get("trigger_id"),
cfg_id=cfg_tid,
payload_id=composio_trigger_id,
match_found=False
cfg_composio_trigger_id=cfg_tid,
payload_composio_trigger_id=composio_trigger_id,
match_found=False,
is_active=row.get("is_active")
)
try:
logger.debug(
"Composio matching result",
total=len(rows),
matched=len(matched),
have_id=bool(composio_trigger_id),
payload_id=composio_trigger_id,
)
except Exception:
pass
if not matched:
logger.error(
f"No exact ID match found for Composio trigger {composio_trigger_id}",

View File

@ -285,11 +285,13 @@ class ProviderService:
self._db = db_connection
self._providers: Dict[str, TriggerProvider] = {}
self._initialize_providers()
def _initialize_providers(self):
self._providers["schedule"] = ScheduleProvider()
self._providers["webhook"] = WebhookProvider()
self._providers["composio"] = ComposioEventProvider()
composio_provider = ComposioEventProvider()
composio_provider.set_db(self._db)
self._providers["composio"] = composio_provider
async def get_available_providers(self) -> List[Dict[str, Any]]:
providers = []
@ -446,6 +448,45 @@ class ComposioEventProvider(TriggerProvider):
super().__init__("composio", TriggerType.WEBHOOK)
self._api_base = os.getenv("COMPOSIO_API_BASE", "https://backend.composio.dev")
self._api_key = os.getenv("COMPOSIO_API_KEY", "")
self._db: Optional[DBConnection] = None
def set_db(self, db: DBConnection):
"""Set database connection for provider"""
self._db = db
async def _count_triggers_with_composio_id(self, composio_trigger_id: str, exclude_trigger_id: Optional[str] = None) -> int:
"""Count how many triggers use the same composio_trigger_id (excluding specified trigger)"""
if not self._db:
return 0
client = await self._db.client
# Use PostgreSQL JSON operator for exact match
query = client.table('agent_triggers').select('trigger_id', count='exact').eq('trigger_type', 'webhook').eq('config->>composio_trigger_id', composio_trigger_id)
if exclude_trigger_id:
query = query.neq('trigger_id', exclude_trigger_id)
result = await query.execute()
count = result.count or 0
return count
async def _count_active_triggers_with_composio_id(self, composio_trigger_id: str, exclude_trigger_id: Optional[str] = None) -> int:
"""Count how many ACTIVE triggers use the same composio_trigger_id (excluding specified trigger)"""
if not self._db:
return 0
client = await self._db.client
# Use PostgreSQL JSON operator for exact match
query = client.table('agent_triggers').select('trigger_id', count='exact').eq('trigger_type', 'webhook').eq('is_active', True).eq('config->>composio_trigger_id', composio_trigger_id)
if exclude_trigger_id:
query = query.neq('trigger_id', exclude_trigger_id)
result = await query.execute()
count = result.count or 0
return count
def _headers(self) -> Dict[str, str]:
return {"x-api-key": self._api_key, "Content-Type": "application/json"}
@ -482,14 +523,23 @@ class ComposioEventProvider(TriggerProvider):
return config
async def setup_trigger(self, trigger: Trigger) -> bool:
# Re-enable the Composio trigger instance if present
# Enable in Composio only if this will be the first active trigger with this composio_trigger_id
try:
trigger_id = trigger.config.get("composio_trigger_id")
if not trigger_id:
composio_trigger_id = trigger.config.get("composio_trigger_id")
if not composio_trigger_id or not self._api_key:
return True
if not self._api_key:
# Check if other ACTIVE triggers are using this composio_trigger_id
other_active_count = await self._count_active_triggers_with_composio_id(composio_trigger_id, trigger.trigger_id)
logger.debug(f"Setup trigger {trigger.trigger_id}: other_active_count={other_active_count} for composio_id={composio_trigger_id}")
if other_active_count > 0:
# Other active triggers exist, don't touch Composio - just mark our trigger as active locally
logger.debug(f"Skipping Composio enable - {other_active_count} other active triggers exist")
return True
# Use canonical payload first per Composio API; include tolerant fallbacks
# We're the first/only active trigger, enable in Composio
logger.debug(f"Enabling trigger in Composio - first active trigger for {composio_trigger_id}")
payload_candidates: List[Dict[str, Any]] = [
{"status": "enable"},
{"status": "enabled"},
@ -497,11 +547,12 @@ class ComposioEventProvider(TriggerProvider):
]
async with httpx.AsyncClient(timeout=10) as client:
for api_base in self._api_bases():
url = f"{api_base}/api/v3/trigger_instances/manage/{trigger_id}"
url = f"{api_base}/api/v3/trigger_instances/manage/{composio_trigger_id}"
for body in payload_candidates:
try:
resp = await client.patch(url, headers=self._headers(), json=body)
if resp.status_code in (200, 204):
logger.debug(f"Successfully enabled trigger in Composio: {composio_trigger_id}")
return True
except Exception:
continue
@ -510,14 +561,23 @@ class ComposioEventProvider(TriggerProvider):
return True
async def teardown_trigger(self, trigger: Trigger) -> bool:
# Disable the Composio trigger instance so it stops sending webhooks
# Disable in Composio only if this was the last active trigger with this composio_trigger_id
try:
trigger_id = trigger.config.get("composio_trigger_id")
if not trigger_id:
composio_trigger_id = trigger.config.get("composio_trigger_id")
if not composio_trigger_id or not self._api_key:
logger.info(f"TEARDOWN: Skipping - no composio_id or api_key")
return True
if not self._api_key:
# Check if other ACTIVE triggers are using this composio_trigger_id
other_active_count = await self._count_active_triggers_with_composio_id(composio_trigger_id, trigger.trigger_id)
if other_active_count > 0:
# Other active triggers exist, don't touch Composio - just mark our trigger as inactive locally
logger.info(f"TEARDOWN: Skipping Composio disable - {other_active_count} other active triggers exist")
return True
# Use canonical payload first per Composio API; include tolerant fallbacks
# We're the last active trigger, disable in Composio
payload_candidates: List[Dict[str, Any]] = [
{"status": "disable"},
{"status": "disabled"},
@ -525,29 +585,38 @@ class ComposioEventProvider(TriggerProvider):
]
async with httpx.AsyncClient(timeout=10) as client:
for api_base in self._api_bases():
url = f"{api_base}/api/v3/trigger_instances/manage/{trigger_id}"
url = f"{api_base}/api/v3/trigger_instances/manage/{composio_trigger_id}"
for body in payload_candidates:
try:
resp = await client.patch(url, headers=self._headers(), json=body)
if resp.status_code in (200, 204):
return True
except Exception:
except Exception as e:
logger.warning(f"TEARDOWN: Failed to disable with body {body}: {e}")
continue
logger.warning(f"TEARDOWN: Failed to disable trigger in Composio: {composio_trigger_id}")
return True
except Exception:
except Exception as e:
logger.error(f"TEARDOWN: Exception in teardown_trigger: {e}")
return True
async def delete_remote_trigger(self, trigger: Trigger) -> bool:
# Permanently remove the remote Composio trigger instance
# Only permanently remove the remote Composio trigger if this is the last trigger using it
try:
trigger_id = trigger.config.get("composio_trigger_id")
if not trigger_id:
composio_trigger_id = trigger.config.get("composio_trigger_id")
if not composio_trigger_id or not self._api_key:
return True
if not self._api_key:
# Check if other triggers are using this composio_trigger_id
other_count = await self._count_triggers_with_composio_id(composio_trigger_id, trigger.trigger_id)
if other_count > 0:
# Other triggers exist, don't delete from Composio - just remove our local trigger
return True
# We're the last trigger, permanently delete from Composio
async with httpx.AsyncClient(timeout=10) as client:
for api_base in self._api_bases():
url = f"{api_base}/api/v3/trigger_instances/manage/{trigger_id}"
url = f"{api_base}/api/v3/trigger_instances/manage/{composio_trigger_id}"
try:
resp = await client.delete(url, headers=self._headers())
if resp.status_code in (200, 204):

View File

@ -145,6 +145,10 @@ class TriggerService:
config_changed = config is not None
activation_toggled = (is_active is not None) and (previous_is_active != trigger.is_active)
# UPDATE DATABASE FIRST so provider methods see correct state
await self._update_trigger(trigger)
if config_changed or activation_toggled:
from .provider_service import get_provider_service
provider_service = get_provider_service(self._db)
@ -156,8 +160,8 @@ class TriggerService:
setup_success = await provider_service.setup_trigger(trigger)
if not setup_success:
raise ValueError(f"Failed to update trigger setup: {trigger_id}")
else:
# Only activation toggled; call the minimal required action
elif activation_toggled:
# Only activation toggled; call the appropriate action
if trigger.is_active:
setup_success = await provider_service.setup_trigger(trigger)
if not setup_success:
@ -165,8 +169,6 @@ class TriggerService:
else:
await provider_service.teardown_trigger(trigger)
await self._update_trigger(trigger)
logger.debug(f"Updated trigger {trigger_id}")
return trigger
@ -175,9 +177,17 @@ class TriggerService:
if not trigger:
return False
# DELETE FROM DATABASE FIRST so provider methods see correct state
client = await self._db.client
result = await client.table('agent_triggers').delete().eq('trigger_id', trigger_id).execute()
success = len(result.data) > 0
if not success:
return False
from .provider_service import get_provider_service
provider_service = get_provider_service(self._db)
# First disable remotely so webhooks stop quickly
# Now disable remotely so webhooks stop quickly
try:
await provider_service.teardown_trigger(trigger)
except Exception:
@ -188,13 +198,6 @@ class TriggerService:
except Exception:
pass
client = await self._db.client
result = await client.table('agent_triggers').delete().eq('trigger_id', trigger_id).execute()
success = len(result.data) > 0
if success:
logger.debug(f"Deleted trigger {trigger_id}")
return success
async def process_trigger_event(self, trigger_id: str, raw_data: Dict[str, Any]) -> TriggerResult: