suna/backend/pipedream/app_service.py

426 lines
15 KiB
Python
Raw Normal View History

2025-07-30 22:22:31 +08:00
from typing import List, Optional, Dict, Any
2025-07-30 20:27:26 +08:00
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
import os
import re
import httpx
import json
import asyncio
2025-07-30 22:22:31 +08:00
from utils.logger import logger
2025-07-30 20:27:26 +08:00
class AppSlug:
2025-07-30 22:22:31 +08:00
def __init__(self, value: str):
if not value or not isinstance(value, str):
2025-07-30 20:27:26 +08:00
raise ValueError("AppSlug must be a non-empty string")
2025-07-30 22:22:31 +08:00
if not re.match(r'^[a-z0-9_-]+$', value):
2025-07-30 20:27:26 +08:00
raise ValueError("AppSlug must contain only lowercase letters, numbers, hyphens, and underscores")
2025-07-30 22:22:31 +08:00
self.value = value
2025-07-30 20:27:26 +08:00
class SearchQuery:
2025-07-30 22:22:31 +08:00
def __init__(self, value: Optional[str] = None):
self.value = value
2025-07-30 20:27:26 +08:00
def is_empty(self) -> bool:
return not self.value or not self.value.strip()
class Category:
2025-07-30 22:22:31 +08:00
def __init__(self, value: str):
if not value or not isinstance(value, str):
2025-07-30 20:27:26 +08:00
raise ValueError("Category must be a non-empty string")
2025-07-30 22:22:31 +08:00
self.value = value
2025-07-30 20:27:26 +08:00
class PaginationCursor:
2025-07-30 22:22:31 +08:00
def __init__(self, value: Optional[str] = None):
self.value = value
2025-07-30 20:27:26 +08:00
def has_more(self) -> bool:
return self.value is not None
# Domain Entities
class AuthType(Enum):
OAUTH = "oauth"
API_KEY = "api_key"
BASIC = "basic"
NONE = "none"
KEYS = "keys"
CUSTOM = "custom"
@classmethod
def _missing_(cls, value):
if isinstance(value, str):
return cls.CUSTOM
return super()._missing_(value)
@dataclass
class App:
name: str
2025-07-30 22:22:31 +08:00
slug: str
2025-07-30 20:27:26 +08:00
description: str
category: str
logo_url: Optional[str] = None
auth_type: AuthType = AuthType.OAUTH
is_verified: bool = False
url: Optional[str] = None
tags: List[str] = field(default_factory=list)
featured_weight: int = 0
def is_featured(self) -> bool:
return self.featured_weight > 0
2025-07-30 22:22:31 +08:00
class AppServiceError(Exception):
pass
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
class AppNotFoundError(AppServiceError):
pass
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
class InvalidAppSlugError(AppServiceError):
pass
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
class AuthenticationError(AppServiceError):
pass
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
class RateLimitError(AppServiceError):
pass
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
class AppService:
2025-07-30 20:27:26 +08:00
def __init__(self):
self.base_url = "https://api.pipedream.com/v1"
self.session: Optional[httpx.AsyncClient] = None
self.access_token: Optional[str] = None
self.token_expires_at: Optional[datetime] = None
2025-07-30 22:22:31 +08:00
self._semaphore = asyncio.Semaphore(10)
2025-07-30 20:27:26 +08:00
async def _get_session(self) -> httpx.AsyncClient:
if self.session is None or self.session.is_closed:
self.session = httpx.AsyncClient(
timeout=httpx.Timeout(30.0),
headers={"User-Agent": "Suna-Pipedream-Client/1.0"}
)
return self.session
async def _ensure_access_token(self) -> str:
if self.access_token and self.token_expires_at:
if datetime.utcnow() < (self.token_expires_at - timedelta(minutes=5)):
return self.access_token
return await self._fetch_fresh_token()
async def _fetch_fresh_token(self) -> str:
project_id = os.getenv("PIPEDREAM_PROJECT_ID")
client_id = os.getenv("PIPEDREAM_CLIENT_ID")
client_secret = os.getenv("PIPEDREAM_CLIENT_SECRET")
if not all([project_id, client_id, client_secret]):
2025-07-30 22:22:31 +08:00
raise AuthenticationError("Missing required environment variables")
2025-07-30 20:27:26 +08:00
session = await self._get_session()
try:
response = await session.post(
f"{self.base_url}/oauth/token",
data={
"grant_type": "client_credentials",
"client_id": client_id,
"client_secret": client_secret
}
)
response.raise_for_status()
data = response.json()
self.access_token = data["access_token"]
expires_in = data.get("expires_in", 3600)
self.token_expires_at = datetime.utcnow() + timedelta(seconds=expires_in)
return self.access_token
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
2025-07-30 22:22:31 +08:00
raise RateLimitError()
raise AuthenticationError(f"Failed to obtain access token: {e}")
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
async def _make_request(self, url: str, headers: Dict[str, str] = None, params: Dict[str, Any] = None) -> Dict[str, Any]:
2025-07-30 20:27:26 +08:00
session = await self._get_session()
access_token = await self._ensure_access_token()
request_headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
if headers:
request_headers.update(headers)
try:
response = await session.get(url, headers=request_headers, params=params)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
2025-07-30 22:22:31 +08:00
raise RateLimitError()
raise AppServiceError(f"HTTP request failed: {e}")
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
async def _search(self, query: SearchQuery, category: Optional[Category] = None,
2025-07-30 20:27:26 +08:00
page: int = 1, limit: int = 20, cursor: Optional[PaginationCursor] = None) -> Dict[str, Any]:
2025-07-30 22:22:31 +08:00
url = f"{self.base_url}/apps"
2025-07-30 20:27:26 +08:00
params = {}
if not query.is_empty():
params["q"] = query.value
if category:
params["category"] = category.value
if cursor and cursor.value:
params["after"] = cursor.value
try:
2025-07-30 22:22:31 +08:00
data = await self._make_request(url, params=params)
2025-07-30 20:27:26 +08:00
apps = []
for app_data in data.get("data", []):
try:
app = self._map_to_domain(app_data)
apps.append(app)
except Exception as e:
2025-07-30 22:22:31 +08:00
logger.warning(f"Error mapping app data: {str(e)}")
2025-07-30 20:27:26 +08:00
continue
page_info = data.get("page_info", {})
page_info["has_more"] = bool(page_info.get("end_cursor"))
2025-07-30 22:22:31 +08:00
logger.info(f"Found {len(apps)} apps from search")
2025-07-30 20:27:26 +08:00
return {
"success": True,
"apps": apps,
"page_info": page_info,
"total_count": page_info.get("total_count", 0)
}
except Exception as e:
2025-07-30 22:22:31 +08:00
logger.error(f"Error searching apps: {str(e)}")
2025-07-30 20:27:26 +08:00
return {
"success": False,
"error": str(e),
"apps": [],
"page_info": {},
"total_count": 0
}
2025-07-30 22:22:31 +08:00
async def _get_by_slug(self, app_slug: str) -> Optional[App]:
cache_key = f"pipedream:app:{app_slug}"
2025-07-30 20:27:26 +08:00
try:
from services import redis
redis_client = await redis.get_client()
cached_data = await redis_client.get(cache_key)
if cached_data:
2025-07-30 22:22:31 +08:00
logger.debug(f"Found cached app for slug: {app_slug}")
2025-07-30 20:27:26 +08:00
cached_app_data = json.loads(cached_data)
return self._map_cached_app_to_domain(cached_app_data)
except Exception as e:
2025-07-30 22:22:31 +08:00
logger.warning(f"Redis cache error for app {app_slug}: {e}")
2025-07-30 20:27:26 +08:00
async with self._semaphore:
2025-07-30 22:22:31 +08:00
url = f"{self.base_url}/apps"
params = {"q": app_slug, "pageSize": 20}
2025-07-30 20:27:26 +08:00
try:
2025-07-30 22:22:31 +08:00
data = await self._make_request(url, params=params)
2025-07-30 20:27:26 +08:00
apps = data.get("data", [])
2025-07-30 22:22:31 +08:00
exact_match = next((app for app in apps if app.get("name_slug") == app_slug), None)
2025-07-30 20:27:26 +08:00
if exact_match:
app = self._map_to_domain(exact_match)
try:
from services import redis
redis_client = await redis.get_client()
app_data = self._map_domain_app_to_cache(app)
await redis_client.setex(cache_key, 21600, json.dumps(app_data))
2025-07-30 22:22:31 +08:00
logger.debug(f"Cached app: {app_slug}")
2025-07-30 20:27:26 +08:00
except Exception as e:
2025-07-30 22:22:31 +08:00
logger.warning(f"Failed to cache app {app_slug}: {e}")
2025-07-30 20:27:26 +08:00
return app
return None
except Exception as e:
2025-07-30 22:22:31 +08:00
logger.error(f"Error getting app by slug: {str(e)}")
2025-07-30 20:27:26 +08:00
return None
2025-07-30 22:22:31 +08:00
async def _get_popular(self, category: Optional[str] = None, limit: int = 100) -> List[App]:
2025-07-30 20:27:26 +08:00
popular_slugs = [
"slack", "microsoft_teams", "discord", "zoom", "telegram_bot_api",
"gmail", "microsoft_outlook", "google_calendar", "microsoft_exchange", "calendly",
"google_drive", "microsoft_onedrive", "dropbox", "google_docs", "google_sheets",
"notion", "asana", "monday", "trello", "linear", "jira", "clickup",
"salesforce", "hubspot", "pipedrive", "zendesk", "freshdesk", "intercom",
"github", "gitlab", "bitbucket", "docker", "jenkins", "vercel", "netlify",
"supabase", "firebase", "mongodb", "postgresql", "mysql", "redis", "airtable",
"openai", "anthropic", "hugging_face", "replicate",
"google_analytics", "facebook", "instagram", "twitter", "linkedin", "mailchimp",
"stripe", "paypal", "quickbooks", "xero", "square",
"aws", "google_cloud", "microsoft_azure", "digitalocean", "heroku",
"shopify", "woocommerce", "magento", "bigcommerce"
]
apps = []
batch_size = 20
target_slugs = popular_slugs[:limit]
async def fetch_app(slug: str):
try:
2025-07-30 22:22:31 +08:00
app = await self._get_by_slug(slug)
if app and (not category or app.category == category):
2025-07-30 20:27:26 +08:00
return app
return None
except Exception as e:
2025-07-30 22:22:31 +08:00
logger.warning(f"Error fetching popular app {slug}: {e}")
2025-07-30 20:27:26 +08:00
return None
for i in range(0, len(target_slugs), batch_size):
batch_slugs = target_slugs[i:i+batch_size]
if i > 0:
await asyncio.sleep(0.1)
batch_tasks = [fetch_app(slug) for slug in batch_slugs]
batch_results = await asyncio.gather(*batch_tasks, return_exceptions=True)
for result in batch_results:
if isinstance(result, App):
apps.append(result)
if len(apps) >= limit:
break
if len(apps) >= limit:
break
return apps
2025-07-30 22:22:31 +08:00
async def _get_by_category(self, category: str, limit: int = 20) -> List[App]:
2025-07-30 20:27:26 +08:00
query = SearchQuery(None)
2025-07-30 22:22:31 +08:00
category_obj = Category(category)
result = await self._search(query, category_obj, limit=limit)
2025-07-30 20:27:26 +08:00
return result.get("apps", [])
def _map_to_domain(self, app_data: Dict[str, Any]) -> App:
try:
auth_type_str = app_data.get("auth_type", "oauth")
auth_type = AuthType(auth_type_str)
except ValueError:
2025-07-30 22:22:31 +08:00
logger.warning(f"Unknown auth type '{auth_type_str}', using CUSTOM")
2025-07-30 20:27:26 +08:00
auth_type = AuthType.CUSTOM
return App(
name=app_data.get("name", "Unknown"),
2025-07-30 22:22:31 +08:00
slug=app_data.get("name_slug", ""),
2025-07-30 20:27:26 +08:00
description=app_data.get("description", ""),
category=app_data.get("category", "Other"),
logo_url=app_data.get("img_src"),
auth_type=auth_type,
is_verified=app_data.get("verified", False),
url=app_data.get("url"),
tags=app_data.get("tags", []),
featured_weight=app_data.get("featured_weight", 0)
)
def _map_domain_app_to_cache(self, app: App) -> Dict[str, Any]:
return {
"name": app.name,
2025-07-30 22:22:31 +08:00
"name_slug": app.slug,
2025-07-30 20:27:26 +08:00
"description": app.description,
"category": app.category,
"img_src": app.logo_url,
"auth_type": app.auth_type.value,
"verified": app.is_verified,
"url": app.url,
"tags": app.tags,
"featured_weight": app.featured_weight
}
def _map_cached_app_to_domain(self, app_data: Dict[str, Any]) -> App:
try:
auth_type_str = app_data.get("auth_type", "oauth")
auth_type = AuthType(auth_type_str)
except ValueError:
2025-07-30 22:22:31 +08:00
logger.warning(f"Unknown auth type '{auth_type_str}', using CUSTOM")
2025-07-30 20:27:26 +08:00
auth_type = AuthType.CUSTOM
return App(
name=app_data.get("name", "Unknown"),
2025-07-30 22:22:31 +08:00
slug=app_data.get("name_slug", ""),
2025-07-30 20:27:26 +08:00
description=app_data.get("description", ""),
category=app_data.get("category", "Other"),
logo_url=app_data.get("img_src"),
auth_type=auth_type,
is_verified=app_data.get("verified", False),
url=app_data.get("url"),
tags=app_data.get("tags", []),
featured_weight=app_data.get("featured_weight", 0)
)
async def search_apps(
self,
query: Optional[str] = None,
category: Optional[str] = None,
page: int = 1,
limit: int = 20,
cursor: Optional[str] = None
) -> Dict[str, Any]:
search_query = SearchQuery(query)
category_vo = Category(category) if category else None
cursor_vo = PaginationCursor(cursor) if cursor else None
2025-07-30 22:22:31 +08:00
logger.info(f"Searching apps: query='{query}', category='{category}', page={page}")
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
result = await self._search(search_query, category_vo, page, limit, cursor_vo)
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
logger.info(f"Found {len(result.get('apps', []))} apps")
2025-07-30 20:27:26 +08:00
return result
async def get_app_by_slug(self, app_slug: str) -> Optional[App]:
2025-07-30 22:22:31 +08:00
logger.info(f"Getting app by slug: {app_slug}")
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
app = await self._get_by_slug(app_slug)
2025-07-30 20:27:26 +08:00
if app:
2025-07-30 22:22:31 +08:00
logger.info(f"Found app: {app.name}")
2025-07-30 20:27:26 +08:00
else:
2025-07-30 22:22:31 +08:00
logger.info(f"App not found: {app_slug}")
2025-07-30 20:27:26 +08:00
return app
async def get_popular_apps(self, category: Optional[str] = None, limit: int = 10) -> List[App]:
2025-07-30 22:22:31 +08:00
logger.info(f"Getting popular apps: category='{category}', limit={limit}")
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
apps = await self._get_popular(category, limit)
2025-07-30 20:27:26 +08:00
2025-07-30 22:22:31 +08:00
logger.info(f"Found {len(apps)} popular apps")
2025-07-30 20:27:26 +08:00
return apps
async def close(self):
2025-07-30 22:22:31 +08:00
if self.session and not self.session.is_closed:
await self.session.aclose()
2025-07-30 20:27:26 +08:00
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
2025-07-30 22:22:31 +08:00
await self.close()
_app_service = None
def get_app_service() -> AppService:
global _app_service
if _app_service is None:
_app_service = AppService()
return _app_service
PipedreamException = AppServiceError
HttpClientException = AppServiceError
AuthenticationException = AuthenticationError
RateLimitException = RateLimitError