suna/backend/composio_integration/composio_trigger_service.py

218 lines
9.5 KiB
Python

import os
import json
import httpx
from datetime import datetime
from typing import Dict, Any, List, Optional
from utils.logger import logger
from .toolkit_service import ToolkitService
class ComposioTriggerService:
def __init__(self):
self.api_base = os.getenv("COMPOSIO_API_BASE", "https://backend.composio.dev").rstrip("/")
self.api_key = os.getenv("COMPOSIO_API_KEY")
# Cache settings
self._apps_cache: Dict[str, Any] = {"ts": 0, "data": None}
self._apps_ttl = 60
self._triggers_cache: Dict[str, Dict[str, Any]] = {}
self._triggers_ttl = 60
async def list_apps_with_triggers(self) -> Dict[str, Any]:
"""Return toolkits that have at least one available trigger, with logo, slug, name."""
if not self.api_key:
raise ValueError("COMPOSIO_API_KEY not configured")
# Check cache
now_ts = int(datetime.utcnow().timestamp())
cached = self._apps_cache.get("data")
if cached and (now_ts - int(self._apps_cache.get("ts", 0)) < self._apps_ttl):
return cached
# Try Redis cache first
try:
from services import redis as redis_service
redis_client = await redis_service.get_client()
cache_key = "composio:apps-with-triggers:v1"
cached_json = await redis_client.get(cache_key)
if cached_json:
parsed = json.loads(cached_json)
self._apps_cache["data"] = parsed
self._apps_cache["ts"] = now_ts
return parsed
except Exception:
pass
# HTTP-only: list trigger types and derive toolkits
headers = {"x-api-key": self.api_key}
url = f"{self.api_base}/api/v3/triggers_types"
params = {"limit": 1000}
items = []
async with httpx.AsyncClient(timeout=20) as client_http:
while True:
resp = await client_http.get(url, headers=headers, params=params)
resp.raise_for_status()
data = resp.json()
page_items = data.get("items") if isinstance(data, dict) else data
if page_items is None:
page_items = data if isinstance(data, list) else []
items.extend(page_items)
next_cursor = None
if isinstance(data, dict):
next_cursor = data.get("next_cursor") or data.get("nextCursor")
if not next_cursor:
break
params["cursor"] = next_cursor
# Build toolkit map directly from triggers payload (preserves logos like Slack)
toolkits_map: Dict[str, Dict[str, Any]] = {}
for it in items:
x = it if isinstance(it, dict) else (it.__dict__ if hasattr(it, "__dict__") else None)
if not isinstance(x, dict):
continue
tk = x.get("toolkit")
if isinstance(tk, dict):
slug = (tk.get("slug") or tk.get("name") or "").strip()
if not slug:
continue
key = slug.lower()
name = (tk.get("name") or slug).strip()
logo = tk.get("logo")
existing = toolkits_map.get(key)
if not existing:
toolkits_map[key] = {"slug": slug, "name": name, "logo": logo}
else:
# Upgrade logo if previously missing
if not existing.get("logo") and logo:
existing["logo"] = logo
continue
# Fallback to flat keys
for k in ("toolkit_slug", "toolkitSlug", "toolkit_name", "toolkitName"):
val = x.get(k)
if isinstance(val, str) and val.strip():
key = val.strip().lower()
if key not in toolkits_map:
toolkits_map[key] = {"slug": val.strip(), "name": val.strip().capitalize(), "logo": None}
break
# Fallback enrichment with ToolkitService only for missing logos
missing = [slug for slug, info in toolkits_map.items() if not info.get("logo")]
if missing:
toolkit_service = ToolkitService()
tk_resp = await toolkit_service.list_toolkits(limit=500)
tk_items = tk_resp.get("items", [])
tk_by_slug = {t.slug.lower(): t for t in tk_items if hasattr(t, 'slug')}
for slug in missing:
t = tk_by_slug.get(slug)
if t and t.logo:
toolkits_map[slug]["logo"] = t.logo
# Prepare final list
result_items = sorted(toolkits_map.values(), key=lambda x: x["slug"].lower())
# Cache response
response = {"success": True, "items": result_items, "total": len(result_items)}
self._apps_cache["data"] = response
self._apps_cache["ts"] = now_ts
# Store in Redis cache as well
try:
from services import redis as redis_service
redis_client = await redis_service.get_client()
if redis_client:
cache_key = "composio:apps-with-triggers:v1"
await redis_client.set(cache_key, json.dumps(response), ex=self._apps_ttl)
except Exception:
pass
return response
async def list_triggers_for_app(self, toolkit_slug: str) -> Dict[str, Any]:
"""Return full trigger definitions for a given toolkit (slug), including config/payload and toolkit logo."""
if not self.api_key:
raise ValueError("COMPOSIO_API_KEY not configured")
# Per-toolkit cache
now_ts = int(datetime.utcnow().timestamp())
cache_entry = self._triggers_cache.get(toolkit_slug.lower())
if cache_entry and (now_ts - int(cache_entry.get("ts", 0)) < self._triggers_ttl):
return cache_entry["data"]
# HTTP-only: try server-side toolkit filter first, then fetch all and filter client-side
headers = {"x-api-key": self.api_key}
url = f"{self.api_base}/api/v3/triggers_types"
items = []
async with httpx.AsyncClient(timeout=20) as client_http:
# Try param filter
params = {"limit": 1000, "toolkits": toolkit_slug}
resp = await client_http.get(url, headers=headers, params=params)
resp.raise_for_status()
data = resp.json()
items = data.get("items") if isinstance(data, dict) else data
if items is None:
items = data if isinstance(data, list) else []
# Fallback to fetch all pages then filter client-side
if not items:
logger.info("[Composio HTTP] toolkit filter returned 0, fetching all and filtering", toolkit=toolkit_slug)
params_all = {"limit": 1000}
items = []
while True:
resp_all = await client_http.get(url, headers=headers, params=params_all)
resp_all.raise_for_status()
data_all = resp_all.json()
page_items = data_all.get("items") if isinstance(data_all, dict) else data_all
if page_items is None:
page_items = data_all if isinstance(data_all, list) else []
items.extend(page_items)
next_cursor = None
if isinstance(data_all, dict):
next_cursor = data_all.get("next_cursor") or data_all.get("nextCursor")
if not next_cursor:
break
params_all["cursor"] = next_cursor
# Prepare toolkit info
toolkit_service = ToolkitService()
tk_resp = await toolkit_service.list_toolkits(limit=500)
tk_items = tk_resp.get("items", [])
tk_by_slug = {t.slug.lower(): t for t in tk_items if hasattr(t, 'slug')}
tk = tk_by_slug.get(toolkit_slug.lower())
tk_info = {"slug": toolkit_slug, "name": (tk.name if tk else toolkit_slug), "logo": (tk.logo if tk else None)}
def match_toolkit(x: Dict[str, Any]) -> bool:
tkv = x.get("toolkit")
if isinstance(tkv, dict):
sl = (tkv.get("slug") or tkv.get("name") or "").lower()
if sl == toolkit_slug.lower():
return True
for key in ("toolkit_slug", "toolkitSlug", "toolkit_name"):
val = x.get(key)
if isinstance(val, str) and val.lower() == toolkit_slug.lower():
return True
return False
result_items = []
matched_count = 0
for it in items:
x = it if isinstance(it, dict) else (it.__dict__ if hasattr(it, "__dict__") else None)
if not isinstance(x, dict):
continue
if not match_toolkit(x):
continue
matched_count += 1
result_items.append({
"slug": x.get("slug"),
"name": x.get("name"),
"description": x.get("description"),
"type": x.get("type") or x.get("delivery_type") or "webhook",
"instructions": x.get("instructions") or "",
"toolkit": tk_info,
"config": x.get("config") or {},
"payload": x.get("payload") or {},
})
# Cache response
response = {"success": True, "items": result_items, "toolkit": tk_info, "total": len(result_items)}
self._triggers_cache[toolkit_slug.lower()] = {"data": response, "ts": now_ts}
return response