suna/backend/pipedream/client.py

449 lines
19 KiB
Python

import os
import asyncio
from typing import Dict, Any, Optional, List
from dataclasses import dataclass
import httpx
from utils.logger import logger
import time
import random
try:
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
except ImportError:
logger.warning("MCP not available - MCP discovery will be disabled")
ClientSession = None
streamablehttp_client = None
@dataclass
class PipedreamConfig:
project_id: str
environment: str
client_id: str
client_secret: str
class PipedreamClient:
def __init__(self, config: Optional[PipedreamConfig] = None):
self.config = config or self._load_config_from_env()
self.base_url = "https://api.pipedream.com/v1"
self.access_token: Optional[str] = None
self.rate_limit_token: Optional[str] = None
self.session: Optional[httpx.AsyncClient] = None
def _load_config_from_env(self) -> PipedreamConfig:
project_id = os.getenv("PIPEDREAM_PROJECT_ID")
environment = os.getenv("PIPEDREAM_X_PD_ENVIRONMENT", "development")
client_id = os.getenv("PIPEDREAM_CLIENT_ID")
client_secret = os.getenv("PIPEDREAM_CLIENT_SECRET")
if not all([project_id, client_id, client_secret]):
missing = []
if not project_id:
missing.append("PIPEDREAM_PROJECT_ID")
if not client_id:
missing.append("PIPEDREAM_CLIENT_ID")
if not client_secret:
missing.append("PIPEDREAM_CLIENT_SECRET")
raise ValueError(
f"Missing required environment variables: {', '.join(missing)}. "
"Please set these variables to use the Pipedream client."
)
return PipedreamConfig(
project_id=project_id,
environment=environment,
client_id=client_id,
client_secret=client_secret
)
async def _get_session(self) -> httpx.AsyncClient:
if self.session is None or self.session.is_closed:
self.session = httpx.AsyncClient(
timeout=httpx.Timeout(30.0),
headers={"User-Agent": "Suna-Pipedream-Client/1.0"}
)
return self.session
async def _obtain_rate_limit_token(self, window_size_seconds: int = 5, requests_per_window: int = 100000) -> str:
"""Obtain a rate limit token from Pipedream to bypass rate limits"""
if self.rate_limit_token:
logger.debug(f"Using existing rate limit token: {self.rate_limit_token[:20]}...")
return self.rate_limit_token
access_token = await self._obtain_access_token()
logger.info(f"Obtaining Pipedream rate limit token (window: {window_size_seconds}s, requests: {requests_per_window})")
try:
# Make this request without retry logic to avoid circular dependency
session = await self._get_session()
url = f"{self.base_url}/connect/rate_limits"
payload = {
"window_size_seconds": window_size_seconds,
"requests_per_window": requests_per_window
}
logger.debug(f"Making POST request to {url} with payload: {payload}")
response = await session.post(
url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}"
},
json=payload
)
logger.debug(f"Rate limit token request response: {response.status_code}")
if response.status_code != 200:
logger.error(f"Failed to obtain rate limit token: {response.status_code} - {response.text}")
response.raise_for_status()
data = response.json()
logger.debug(f"Rate limit token response data: {data}")
self.rate_limit_token = data.get("token")
if not self.rate_limit_token:
raise ValueError("No rate limit token received from Pipedream")
logger.info(f"Successfully obtained Pipedream rate limit token: {self.rate_limit_token[:20]}...")
return self.rate_limit_token
except Exception as e:
logger.error(f"Error obtaining rate limit token: {str(e)}")
raise
async def _make_request_with_retry(self, method: str, url: str, headers: dict, max_retries: int = 3, **kwargs) -> httpx.Response:
session = await self._get_session()
for attempt in range(max_retries + 1):
try:
# Always include rate limit token if available
request_headers = headers.copy()
if self.rate_limit_token:
request_headers["x-pd-rate-limit"] = self.rate_limit_token
logger.debug(f"Making {method} request to {url} with rate limit token: {self.rate_limit_token[:20]}...")
else:
logger.debug(f"Making {method} request to {url} without rate limit token")
response = await session.request(method, url, headers=request_headers, **kwargs)
if response.status_code == 429:
if attempt < max_retries:
retry_after = response.headers.get('retry-after')
if retry_after:
wait_time = int(retry_after)
else:
wait_time = (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Rate limited (429) even with token, retrying in {wait_time:.2f} seconds (attempt {attempt + 1}/{max_retries + 1})")
await asyncio.sleep(wait_time)
continue
else:
logger.error(f"Rate limit exceeded after {max_retries} retries for {method} {url}")
response.raise_for_status()
logger.debug(f"Successfully completed {method} request to {url} (status: {response.status_code})")
return response
except httpx.HTTPStatusError as e:
if e.response.status_code == 429 and attempt < max_retries:
continue
logger.error(f"HTTP error for {method} {url}: {e.response.status_code} - {e.response.text}")
raise
except Exception as e:
if attempt < max_retries:
wait_time = (2 ** attempt) + random.uniform(0, 1)
logger.warning(f"Request failed, retrying in {wait_time:.2f} seconds (attempt {attempt + 1}/{max_retries + 1}): {str(e)}")
await asyncio.sleep(wait_time)
continue
logger.error(f"Request failed after {max_retries} retries for {method} {url}: {str(e)}")
raise
raise Exception(f"Max retries ({max_retries}) exceeded for {method} {url}")
async def _obtain_access_token(self) -> str:
if self.access_token:
return self.access_token
logger.info("Obtaining Pipedream access token via OAuth")
try:
# Make this request without retry logic to avoid issues
session = await self._get_session()
response = await session.post(
f"{self.base_url}/oauth/token",
headers={"Content-Type": "application/json"},
json={
"grant_type": "client_credentials",
"client_id": self.config.client_id,
"client_secret": self.config.client_secret
}
)
response.raise_for_status()
data = response.json()
self.access_token = data.get("access_token")
if not self.access_token:
raise ValueError("No access token received from Pipedream OAuth")
logger.info("Successfully obtained Pipedream access token")
return self.access_token
except Exception as e:
logger.error(f"Error obtaining access token: {str(e)}")
raise
async def refresh_rate_limit_token(self, window_size_seconds: int = 10, requests_per_window: int = 100000) -> str:
"""Manually refresh the rate limit token with custom parameters"""
self.rate_limit_token = None # Clear existing token
return await self._obtain_rate_limit_token(window_size_seconds, requests_per_window)
def clear_rate_limit_token(self) -> None:
"""Clear the stored rate limit token to force obtaining a new one"""
self.rate_limit_token = None
logger.info("Rate limit token cleared")
async def _ensure_rate_limit_token(self) -> None:
"""Ensure we have a rate limit token before making requests"""
if not self.rate_limit_token:
try:
await self._obtain_rate_limit_token()
except Exception as e:
logger.warning(f"Failed to obtain rate limit token, proceeding without it: {str(e)}")
else:
logger.debug(f"Using existing rate limit token: {self.rate_limit_token[:20]}...")
async def create_connection_token(self, external_user_id: str, app: Optional[str] = None) -> Dict[str, Any]:
access_token = await self._obtain_access_token()
await self._ensure_rate_limit_token()
logger.info(f"Creating connection token for user: {external_user_id}, app: {app}")
try:
payload = {
"external_user_id": external_user_id
}
if app:
payload["app"] = app
response = await self._make_request_with_retry(
"POST",
f"{self.base_url}/connect/{self.config.project_id}/tokens",
headers={
"Content-Type": "application/json",
"X-PD-Environment": self.config.environment,
"Authorization": f"Bearer {access_token}"
},
json=payload
)
data = response.json()
if app and "connect_link_url" in data:
link = data["connect_link_url"]
if "app=" not in link:
separator = "&" if "?" in link else "?"
data["connect_link_url"] = f"{link}{separator}app={app}"
logger.info(f"Successfully created connection token for user: {external_user_id}")
logger.info(f"Connection token: {data}")
return data
except Exception as e:
logger.error(f"Error creating connection token: {str(e)}")
raise
async def get_connections(self, external_user_id: str) -> List[Dict[str, Any]]:
access_token = await self._obtain_access_token()
await self._ensure_rate_limit_token()
logger.info(f"Getting connections for user: {external_user_id}")
try:
url = f"{self.base_url}/connect/{self.config.project_id}/accounts?external_id={external_user_id}"
headers = {
"X-PD-Environment": self.config.environment,
"Authorization": f"Bearer {access_token}"
}
response = await self._make_request_with_retry(
"GET",
url,
headers=headers
)
data = response.json()
accounts = data.get("data", [])
apps = [account.get("app") for account in accounts if account.get("app")]
logger.info(f"Successfully retrieved {len(apps)} apps for user: {external_user_id}")
return apps
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
logger.info(f"User {external_user_id} not found in Pipedream, returning empty connections list")
return []
logger.error(f"HTTP error getting connections: {e.response.status_code} - {e.response.text}")
raise
except Exception as e:
logger.error(f"Error getting connections: {str(e)}")
raise
async def discover_mcp_servers(self, external_user_id: str, app_slug: Optional[str] = None, oauth_app_id: Optional[str] = None) -> List[Dict[str, Any]]:
if not ClientSession or not streamablehttp_client:
logger.error("MCP not available - cannot discover MCP servers")
return []
await self._ensure_rate_limit_token()
logger.info(f"Discovering MCP servers for user: {external_user_id}, app: {app_slug}")
user_apps = await self.get_connections(external_user_id)
if not user_apps:
logger.info(f"No connected apps found for user: {external_user_id}")
return []
if app_slug:
user_apps = [app for app in user_apps if app.get('name_slug') == app_slug]
mcp_servers = []
for app in user_apps:
app_slug_current = app.get('name_slug')
app_name = app.get('name')
if not app_slug_current:
continue
mcp_config = {
'app_slug': app_slug_current,
'app_name': app_name,
'external_user_id': external_user_id,
'oauth_app_id': oauth_app_id,
'server_url': 'https://remote.mcp.pipedream.net',
'project_id': self.config.project_id,
'environment': self.config.environment,
}
try:
tools = await self._test_mcp_connection(mcp_config)
mcp_config['available_tools'] = tools
mcp_config['status'] = 'connected'
mcp_servers.append(mcp_config)
logger.info(f"Successfully discovered MCP server for {app_name} ({app_slug_current}) with {len(tools)} tools")
except Exception as e:
logger.warning(f"Failed to connect to MCP server for {app_name} ({app_slug_current}): {str(e)}")
mcp_config['status'] = 'error'
mcp_config['error'] = str(e)
mcp_servers.append(mcp_config)
logger.info(f"Discovered {len(mcp_servers)} MCP servers for user: {external_user_id}")
return mcp_servers
async def _test_mcp_connection(self, mcp_config: Dict[str, Any]) -> List[Dict[str, Any]]:
if not ClientSession or not streamablehttp_client:
raise Exception("MCP not available")
access_token = await self._obtain_access_token()
server_url = mcp_config['server_url']
headers = {
"Authorization": f"Bearer {access_token}",
"x-pd-project-id": mcp_config['project_id'],
"x-pd-environment": mcp_config['environment'],
"x-pd-external-user-id": mcp_config['external_user_id'],
"x-pd-app-slug": mcp_config['app_slug'],
}
if self.rate_limit_token:
headers["x-pd-rate-limit"] = self.rate_limit_token
if mcp_config.get('oauth_app_id'):
headers["x-pd-oauth-app-id"] = mcp_config['oauth_app_id']
async with asyncio.timeout(15):
async with streamablehttp_client(server_url, headers=headers) as (read_stream, write_stream, _):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()
tools_result = await session.list_tools()
tools = tools_result.tools if hasattr(tools_result, 'tools') else tools_result
tools_info = []
for tool in tools:
tool_info = {
"name": tool.name,
"description": tool.description,
"inputSchema": tool.inputSchema
}
tools_info.append(tool_info)
return tools_info
async def create_mcp_connection(self, external_user_id: str, app_slug: str, oauth_app_id: Optional[str] = None) -> Dict[str, Any]:
if not ClientSession or not streamablehttp_client:
raise Exception("MCP not available")
await self._ensure_rate_limit_token()
logger.info(f"Creating MCP connection for user: {external_user_id}, app: {app_slug}")
user_apps = await self.get_connections(external_user_id)
connected_app = None
for app in user_apps:
if app.get('name_slug') == app_slug:
connected_app = app
break
if not connected_app:
raise Exception(f"User {external_user_id} does not have {app_slug} connected")
mcp_config = {
'app_slug': app_slug,
'app_name': connected_app.get('name'),
'external_user_id': external_user_id,
'oauth_app_id': oauth_app_id,
'server_url': 'https://remote.mcp.pipedream.net',
'project_id': self.config.project_id,
'environment': self.config.environment,
}
try:
tools = await self._test_mcp_connection(mcp_config)
mcp_config['available_tools'] = tools
mcp_config['status'] = 'connected'
logger.info(f"Successfully created MCP connection for {app_slug} with {len(tools)} tools")
except Exception as e:
logger.error(f"Failed to create MCP connection for {app_slug}: {str(e)}")
mcp_config['status'] = 'error'
mcp_config['error'] = str(e)
raise
return mcp_config
async def close(self):
if self.session and not self.session.is_closed:
await self.session.aclose()
def __repr__(self) -> str:
return f"PipedreamClient(project_id={self.config.project_id}, environment={self.config.environment})"
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
_pipedream_client: Optional[PipedreamClient] = None
def get_pipedream_client() -> PipedreamClient:
global _pipedream_client
if _pipedream_client is None:
_pipedream_client = PipedreamClient()
return _pipedream_client
def initialize_pipedream_client(config: Optional[PipedreamConfig] = None) -> PipedreamClient:
global _pipedream_client
_pipedream_client = PipedreamClient(config)
return _pipedream_client