fix: trigger enable

This commit is contained in:
Vukasin 2025-08-19 10:58:56 +02:00
parent b23ca6e8a7
commit 1db63f2389
2 changed files with 51 additions and 52 deletions

View File

@ -769,36 +769,6 @@ async def create_composio_trigger(req: CreateComposioTriggerRequest, current_use
except Exception:
pass
# If still missing, fetch from list_active
if not composio_trigger_id:
try:
params_lookup = {
"limit": 50,
"slug": req.slug,
"userId": composio_user_id,
}
if req.connected_account_id:
params_lookup["connectedAccountId"] = req.connected_account_id
list_url = f"{COMPOSIO_API_BASE}/api/v3/trigger_instances/active"
async with httpx.AsyncClient(timeout=15) as http_client:
lr = await http_client.get(list_url, headers=headers, params=params_lookup)
if lr.status_code == 200:
ldata = lr.json()
items = ldata.get("items") if isinstance(ldata, dict) else (ldata if isinstance(ldata, list) else [])
if items:
composio_trigger_id = _extract_id(items[0] if isinstance(items[0], dict) else getattr(items[0], "__dict__", {}))
try:
logger.debug(
"Composio list_active fallback",
slug=req.slug,
matched=len(items) if isinstance(items, list) else 0,
extracted_id=composio_trigger_id,
)
except Exception:
pass
except Exception:
pass
if not composio_trigger_id:
raise HTTPException(status_code=500, detail="Failed to get Composio trigger id from response")
@ -858,6 +828,17 @@ async def create_composio_trigger(req: CreateComposioTriggerRequest, current_use
async def composio_webhook(request: Request):
"""Shared Composio webhook endpoint. Verifies secret, matches triggers, and enqueues execution."""
try:
# Read raw body first (can only be done once)
try:
body = await request.body()
body_str = body.decode('utf-8') if body else ""
logger.info("Composio webhook raw body", body=body_str, body_length=len(body) if body else 0)
except Exception as e:
logger.info("Composio webhook body read failed", error=str(e))
body_str = ""
# Minimal request diagnostics (no secrets)
try:
client_ip = request.client.host if request.client else None
@ -865,26 +846,28 @@ async def composio_webhook(request: Request):
has_auth = bool(request.headers.get("authorization"))
has_x_secret = bool(request.headers.get("x-composio-secret") or request.headers.get("X-Composio-Secret"))
has_x_trigger = bool(request.headers.get("x-trigger-secret") or request.headers.get("X-Trigger-Secret"))
# Peek payload meta safely
payload_preview = {}
# Parse payload for logging
payload_preview = {"keys": []}
try:
_p = await request.json()
payload_preview = {
"keys": list(_p.keys()) if isinstance(_p, dict) else [],
"id": _p.get("id") if isinstance(_p, dict) else None,
"triggerSlug": _p.get("triggerSlug") if isinstance(_p, dict) else None,
}
if body_str:
_p = json.loads(body_str)
payload_preview = {
"keys": list(_p.keys()) if isinstance(_p, dict) else [],
"id": _p.get("id") if isinstance(_p, dict) else None,
"triggerSlug": _p.get("triggerSlug") if isinstance(_p, dict) else None,
}
except Exception:
payload_preview = {"keys": []}
logger.debug(
"Composio webhook incoming",
client_ip=client_ip,
header_names=header_names,
has_authorization=has_auth,
has_x_composio_secret=has_x_secret,
has_x_trigger_secret=has_x_trigger,
payload_meta=payload_preview,
)
logger.debug(
"Composio webhook incoming",
client_ip=client_ip,
header_names=header_names,
has_authorization=has_auth,
has_x_composio_secret=has_x_secret,
has_x_trigger_secret=has_x_trigger,
payload_meta=payload_preview,
)
except Exception:
pass
@ -896,8 +879,9 @@ async def composio_webhook(request: Request):
# Use robust verifier (tries ASCII/HEX/B64 keys and id.ts.body/ts.body)
await verify_composio(request, "COMPOSIO_WEBHOOK_SECRET")
# Parse payload for processing
try:
payload = await request.json()
payload = json.loads(body_str) if body_str else {}
except Exception:
payload = {}

View File

@ -84,9 +84,11 @@ class TriggerService:
updated_at=now
)
setup_success = await provider_service.setup_trigger(trigger)
if not setup_success:
raise ValueError(f"Failed to setup trigger with provider: {provider_id}")
# Skip setup_trigger for Composio since triggers are already enabled when created
if provider_id != "composio":
setup_success = await provider_service.setup_trigger(trigger)
if not setup_success:
raise ValueError(f"Failed to setup trigger with provider: {provider_id}")
await self._save_trigger(trigger)
@ -283,12 +285,25 @@ class TriggerService:
async def _log_trigger_event(self, event: TriggerEvent, result: TriggerResult) -> None:
client = await self._db.client
# Ensure raw_data is JSON serializable
try:
if isinstance(event.raw_data, bytes):
event_data = event.raw_data.decode('utf-8', errors='replace')
elif isinstance(event.raw_data, str):
event_data = event.raw_data
else:
event_data = str(event.raw_data)
except Exception as e:
logger.warning(f"Failed to serialize raw_data: {e}")
event_data = str(event.raw_data) if event.raw_data else "{}"
await client.table('trigger_event_logs').insert({
'log_id': str(uuid.uuid4()),
'trigger_id': event.trigger_id,
'agent_id': event.agent_id,
'trigger_type': event.trigger_type.value,
'event_data': event.raw_data,
'event_data': event_data,
'success': result.success,
'should_execute_agent': result.should_execute_agent,
'should_execute_workflow': result.should_execute_workflow,