This commit is contained in:
Vukasin 2025-08-13 01:03:52 +02:00
parent ffd4232f9b
commit 09a369866e
2 changed files with 341 additions and 21 deletions

View File

@ -12,6 +12,8 @@ import hmac
import httpx
import asyncio
import json
import hashlib
import base64
from .composio_service import (
get_integration_service,
@ -695,6 +697,8 @@ class CreateComposioTriggerRequest(BaseModel):
agent_prompt: Optional[str] = None
workflow_id: Optional[str] = None
workflow_input: Optional[Dict[str, Any]] = None
connected_account_id: Optional[str] = None
webhook_url: Optional[str] = None
## REMOVED: /triggers/type/{slug}
@ -716,21 +720,175 @@ async def create_composio_trigger(req: CreateComposioTriggerRequest, current_use
if not composio_user_id:
raise HTTPException(status_code=400, detail="Composio profile is missing user_id")
client = ComposioClient.get_client()
# Create composio trigger remotely
created = client.triggers.create(
slug=req.slug,
user_id=composio_user_id,
trigger_config=req.trigger_config,
)
# Create Composio trigger via HTTP v3 API (robust against SDK changes)
api_key = os.getenv("COMPOSIO_API_KEY")
if not api_key:
raise HTTPException(status_code=500, detail="COMPOSIO_API_KEY not configured")
# Extract composio trigger id
if hasattr(created, "id"):
composio_trigger_id = getattr(created, "id")
elif isinstance(created, dict):
composio_trigger_id = created.get("id") or created.get("trigger_id")
else:
composio_trigger_id = getattr(created, "trigger_id", None)
url = f"{COMPOSIO_API_BASE}/api/v3/trigger_instances/{req.slug}/upsert"
headers = {"x-api-key": api_key, "Content-Type": "application/json"}
# Provide webhook details so Composio can deliver events
base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000")
secret = os.getenv("COMPOSIO_WEBHOOK_SECRET", "")
webhook_headers = {"X-Composio-Secret": secret} if secret else {}
# Include vercel bypass header if present (staging)
vercel_bypass = os.getenv("VERCEL_PROTECTION_BYPASS_KEY", "")
if vercel_bypass:
webhook_headers["X-Vercel-Protection-Bypass"] = vercel_bypass
# Fetch trigger type schema to coerce config types minimally
coerced_config = dict(req.trigger_config or {})
try:
type_url = f"{COMPOSIO_API_BASE}/api/v3/triggers_types/{req.slug}"
async with httpx.AsyncClient(timeout=10) as http_client:
tr = await http_client.get(type_url, headers=headers)
if tr.status_code == 200:
tdata = tr.json()
schema = tdata.get("config") or {}
props = schema.get("properties") or {}
for key, prop in props.items():
if key not in coerced_config:
continue
val = coerced_config[key]
ptype = prop.get("type") if isinstance(prop, dict) else None
try:
if ptype == "array":
if isinstance(val, str):
coerced_config[key] = [val]
elif ptype == "integer":
if isinstance(val, str) and val.isdigit():
coerced_config[key] = int(val)
elif ptype == "number":
if isinstance(val, str):
coerced_config[key] = float(val)
elif ptype == "boolean":
if isinstance(val, str):
coerced_config[key] = val.lower() in ("true", "1", "yes")
elif ptype == "string":
if isinstance(val, (list, tuple)):
# join list into comma-separated string
coerced_config[key] = ",".join(str(x) for x in val)
elif not isinstance(val, str):
coerced_config[key] = str(val)
except Exception:
pass
except Exception:
pass
body = {
# tolerant casing
"user_id": composio_user_id,
"userId": composio_user_id,
"trigger_config": coerced_config,
"triggerConfig": coerced_config,
# webhook config
"webhook": {
"url": req.webhook_url or f"{base_url}/api/composio/webhook",
"headers": webhook_headers,
"method": "POST",
},
}
if req.connected_account_id:
# Tolerate multiple API shapes
body["connectedAccountId"] = req.connected_account_id
body["connected_account_id"] = req.connected_account_id
body["connectedAccountIds"] = [req.connected_account_id]
body["connected_account_ids"] = [req.connected_account_id]
async with httpx.AsyncClient(timeout=20) as http_client:
resp = await http_client.post(url, headers=headers, json=body)
try:
resp.raise_for_status()
except httpx.HTTPStatusError:
# Bubble up API error body for quick debugging
ct = resp.headers.get("content-type", "")
if "application/json" in ct:
detail = resp.json()
else:
detail = resp.text
logger.error(f"Composio upsert error: {detail}")
raise HTTPException(status_code=400, detail=detail)
created = resp.json()
# Minimal debug log of response shape (no secrets)
try:
top_keys = list(created.keys()) if isinstance(created, dict) else None
logger.info(
"Composio upsert ok",
slug=req.slug,
status_code=resp.status_code,
top_keys=top_keys,
)
except Exception:
pass
# Extract composio trigger id from various possible shapes
composio_trigger_id = None
def _extract_id(obj: Dict[str, Any]) -> Optional[str]:
if not isinstance(obj, dict):
return None
cand = (
obj.get("id")
or obj.get("trigger_id")
or obj.get("triggerId")
or obj.get("nano_id")
or obj.get("nanoId")
or obj.get("triggerNanoId")
)
if cand:
return cand
# Nested shapes
for k in ("trigger", "trigger_instance", "triggerInstance", "data", "result"):
nested = obj.get(k)
if isinstance(nested, dict):
nid = _extract_id(nested)
if nid:
return nid
if isinstance(nested, list) and nested:
nid = _extract_id(nested[0])
if nid:
return nid
return None
if isinstance(created, dict):
composio_trigger_id = _extract_id(created)
try:
logger.info(
"Composio extracted trigger id",
slug=req.slug,
extracted_id=composio_trigger_id,
)
except Exception:
pass
# If still missing, fetch from list_active
if not composio_trigger_id:
try:
params_lookup = {
"limit": 50,
"slug": req.slug,
"userId": composio_user_id,
}
if req.connected_account_id:
params_lookup["connectedAccountId"] = req.connected_account_id
list_url = f"{COMPOSIO_API_BASE}/api/v3/trigger_instances/active"
async with httpx.AsyncClient(timeout=15) as http_client:
lr = await http_client.get(list_url, headers=headers, params=params_lookup)
if lr.status_code == 200:
ldata = lr.json()
items = ldata.get("items") if isinstance(ldata, dict) else (ldata if isinstance(ldata, list) else [])
if items:
composio_trigger_id = _extract_id(items[0] if isinstance(items[0], dict) else getattr(items[0], "__dict__", {}))
try:
logger.info(
"Composio list_active fallback",
slug=req.slug,
matched=len(items) if isinstance(items, list) else 0,
extracted_id=composio_trigger_id,
)
except Exception:
pass
except Exception:
pass
if not composio_trigger_id:
raise HTTPException(status_code=500, detail="Failed to get Composio trigger id from response")
@ -786,6 +944,36 @@ async def create_composio_trigger(req: CreateComposioTriggerRequest, current_use
async def composio_webhook(request: Request):
"""Shared Composio webhook endpoint. Verifies secret, matches triggers, and enqueues execution."""
try:
# Minimal request diagnostics (no secrets)
try:
client_ip = request.client.host if request.client else None
header_names = list(request.headers.keys())
has_auth = bool(request.headers.get("authorization"))
has_x_secret = bool(request.headers.get("x-composio-secret") or request.headers.get("X-Composio-Secret"))
has_x_trigger = bool(request.headers.get("x-trigger-secret") or request.headers.get("X-Trigger-Secret"))
# Peek payload meta safely
payload_preview = {}
try:
_p = await request.json()
payload_preview = {
"keys": list(_p.keys()) if isinstance(_p, dict) else [],
"id": _p.get("id") if isinstance(_p, dict) else None,
"triggerSlug": _p.get("triggerSlug") if isinstance(_p, dict) else None,
}
except Exception:
payload_preview = {"keys": []}
logger.info(
"Composio webhook incoming",
client_ip=client_ip,
header_names=header_names,
has_authorization=has_auth,
has_x_composio_secret=has_x_secret,
has_x_trigger_secret=has_x_trigger,
payload_meta=payload_preview,
)
except Exception:
pass
secret = os.getenv("COMPOSIO_WEBHOOK_SECRET")
if not secret:
logger.error("COMPOSIO_WEBHOOK_SECRET is not configured")
@ -798,18 +986,143 @@ async def composio_webhook(request: Request):
or ""
)
if not hmac.compare_digest(incoming_secret, secret):
logger.warning("Invalid Composio webhook secret")
raise HTTPException(status_code=401, detail="Unauthorized")
# Default: verify signature when present (no env toggles)
signature_hdr = request.headers.get("webhook-signature")
timestamp_hdr = request.headers.get("webhook-timestamp")
if signature_hdr:
try:
raw_body = await request.body()
# Raw bytes only; avoid re-encoding differences
provided_raw = signature_hdr.strip()
provided = provided_raw
# Parse common formats: "sha256=..." or "t=..., v1=..."
if provided.startswith("sha256="):
provided = provided.split("=", 1)[1]
elif "," in provided:
parts = {}
for seg in provided.split(","):
if "=" in seg:
k, v = seg.split("=", 1)
parts[k.strip()] = v.strip()
# Stripe-like format t=<ts>, v1=<sig>
provided = parts.get("v1", provided)
if not timestamp_hdr:
timestamp_hdr = parts.get("t", timestamp_hdr)
elif provided.lower().startswith("v1,") and len(provided) > 3:
# Format "v1,<base64>"
provided = provided.split(",", 1)[1]
# Build candidate HMACs across key interpretations and message shapes
candidates = []
key_variants = []
# raw ascii
try:
key_variants.append(("ascii", secret.encode()))
except Exception:
pass
# base64-decoded secret
try:
key_variants.append(("b64", base64.b64decode(secret, validate=False)))
except Exception:
pass
# hex-decoded secret
try:
key_variants.append(("hex", bytes.fromhex(secret)))
except Exception:
pass
msg_variants = []
if timestamp_hdr:
ts_bytes = timestamp_hdr.encode()
msg_variants.extend([
("ts.dot", ts_bytes + b"." + raw_body),
("ts.nl", ts_bytes + b"\n" + raw_body),
("ts.cat", ts_bytes + raw_body),
])
msg_variants.append(("body", raw_body))
for key_name, key_bytes in key_variants:
for msg_name, msg_bytes in msg_variants:
dig = hmac.new(key_bytes, msg_bytes, hashlib.sha256).digest()
candidates.append({
"hex": dig.hex(),
"b64": base64.b64encode(dig).decode(),
"raw": dig,
"key": key_name,
"msg": msg_name,
})
# Normalize provided signature to bytes candidates (base64 or hex)
provided_hex = None
provided_b64 = None
provided_bytes = None
# Try hex
try:
provided_bytes = bytes.fromhex(provided)
provided_hex = provided.lower()
except Exception:
provided_hex = None
# Try base64
if provided_bytes is None:
try:
provided_bytes = base64.b64decode(provided, validate=False)
provided_b64 = provided
except Exception:
provided_b64 = None
ok = False
chosen = None
for c in candidates:
if provided_hex and hmac.compare_digest(provided_hex, c["hex"]):
ok = True; chosen = c; break
if provided_b64 and hmac.compare_digest(provided_b64, c["b64"]):
ok = True; chosen = c; break
if provided_bytes is not None and hmac.compare_digest(provided_bytes, c["raw"]):
ok = True; chosen = c; break
# Debug log of signature mismatch details
try:
logger.info(
"Composio signature debug",
provided_raw=provided_raw,
timestamp=timestamp_hdr,
body_len=len(raw_body),
cand_hex=[c["hex"] for c in candidates[:6]],
cand_b64=[c["b64"] for c in candidates[:6]],
chosen_key=chosen.get("key") if chosen else None,
chosen_msg=chosen.get("msg") if chosen else None,
)
except Exception:
pass
if not ok:
logger.warning("Invalid Composio webhook signature")
raise HTTPException(status_code=401, detail="Unauthorized")
except HTTPException:
raise
except Exception:
logger.warning("Failed to verify Composio webhook signature; denying")
raise HTTPException(status_code=401, detail="Unauthorized")
else:
logger.warning("Invalid Composio webhook secret")
raise HTTPException(status_code=401, detail="Unauthorized")
try:
payload = await request.json()
except Exception:
payload = {}
composio_trigger_id = payload.get("id") # Trigger instance nano id
composio_trigger_id = payload.get("id") # Trigger instance nano id (not always present)
provider_event_id = (
payload.get("eventId") or payload.get("payload", {}).get("id") or payload.get("id")
)
# Try to derive trigger slug from various shapes
trigger_slug = (
payload.get("triggerSlug")
or payload.get("type")
or (payload.get("data", {}) or {}).get("triggerSlug")
)
client = await db.client
@ -817,9 +1130,9 @@ async def composio_webhook(request: Request):
logger.warning("Composio webhook missing trigger id in payload.id")
return JSONResponse(status_code=400, content={"success": False, "error": "Missing composio trigger id"})
# Fetch all active EVENT triggers and filter by provider 'composio' and composio_trigger_id in config (in Python)
# Fetch all active WEBHOOK triggers and filter by provider 'composio'
try:
res = await client.table("agent_triggers").select("*").eq("trigger_type", "event").eq("is_active", True).execute()
res = await client.table("agent_triggers").select("*").eq("trigger_type", "webhook").eq("is_active", True).execute()
rows = res.data or []
except Exception as e:
logger.error(f"Error fetching agent_triggers: {e}")
@ -830,7 +1143,13 @@ async def composio_webhook(request: Request):
cfg = row.get("config") or {}
if not isinstance(cfg, dict):
continue
if cfg.get("provider_id") == "composio" and cfg.get("composio_trigger_id") == composio_trigger_id:
if cfg.get("provider_id") != "composio":
continue
# Prefer instance-id match when available, else fall back to slug match
if composio_trigger_id and cfg.get("composio_trigger_id") == composio_trigger_id:
matched.append(row)
continue
if (not composio_trigger_id) and trigger_slug and cfg.get("trigger_slug") == trigger_slug:
matched.append(row)
if not matched:

View File

@ -426,7 +426,8 @@ class ProviderService:
class ComposioEventProvider(TriggerProvider):
def __init__(self):
super().__init__("composio", TriggerType.EVENT)
# Use WEBHOOK to match existing DB enum (no migration needed)
super().__init__("composio", TriggerType.WEBHOOK)
async def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
composio_trigger_id = config.get("composio_trigger_id")