From a6058b94ad6397b4d2da317f1b42658a71612e25 Mon Sep 17 00:00:00 2001 From: Saumya Date: Wed, 30 Jul 2025 19:52:31 +0530 Subject: [PATCH] refactor pipedream codebase --- backend/pipedream/__init__.py | 4 +- backend/pipedream/api.py | 8 +- backend/pipedream/app_service.py | 208 +++++++++++++--------------- backend/pipedream/mcp_service.py | 227 +++++++++++++++++++------------ 4 files changed, 240 insertions(+), 207 deletions(-) diff --git a/backend/pipedream/__init__.py b/backend/pipedream/__init__.py index 3816d6d6..6af7632d 100644 --- a/backend/pipedream/__init__.py +++ b/backend/pipedream/__init__.py @@ -3,7 +3,7 @@ from services.supabase import DBConnection from .profile_service import ProfileService, Profile from .connection_service import ConnectionService, Connection, AuthType -from .app_service import AppService, App +from .app_service import get_app_service, App from .mcp_service import MCPService, MCPServer, MCPTool, ConnectionStatus from .connection_token_service import ConnectionTokenService @@ -11,7 +11,7 @@ db = DBConnection() profile_service = ProfileService() connection_service = ConnectionService() -app_service = AppService(logger=logger) +app_service = get_app_service() mcp_service = MCPService() connection_token_service = ConnectionTokenService() diff --git a/backend/pipedream/api.py b/backend/pipedream/api.py index 30997488..ff64f9dc 100644 --- a/backend/pipedream/api.py +++ b/backend/pipedream/api.py @@ -8,7 +8,7 @@ from utils.logger import logger from utils.auth_utils import get_current_user_id_from_jwt from .profile_service import ProfileService, Profile, ProfileServiceError, ProfileNotFoundError, ProfileAlreadyExistsError, InvalidConfigError, EncryptionError from .connection_service import ConnectionService -from .app_service import AppService +from .app_service import get_app_service from .mcp_service import MCPService, ConnectionStatus, MCPConnectionError, MCPServiceError from .connection_token_service import ConnectionTokenService @@ -19,7 +19,7 @@ router = APIRouter(prefix="/pipedream", tags=["pipedream"]) profile_service: Optional[ProfileService] = None connection_service: Optional[ConnectionService] = None -app_service: Optional[AppService] = None +app_service = None mcp_service: Optional[MCPService] = None connection_token_service: Optional[ConnectionTokenService] = None @@ -401,7 +401,7 @@ async def get_pipedream_apps( apps_data.append({ "name": app.name, - "name_slug": app.slug.value, + "name_slug": app.slug, "description": app.description, "category": app.category, "categories": categories, @@ -447,7 +447,7 @@ async def get_popular_pipedream_apps(): apps_data.append({ "name": app.name, - "name_slug": app.slug.value, + "name_slug": app.slug, "description": app.description, "category": app.category, "categories": categories, diff --git a/backend/pipedream/app_service.py b/backend/pipedream/app_service.py index 4bc5f330..b8a3e7a2 100644 --- a/backend/pipedream/app_service.py +++ b/backend/pipedream/app_service.py @@ -1,39 +1,39 @@ -from typing import List, Optional, Dict, Any, Protocol +from typing import List, Optional, Dict, Any from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum import os -import logging import re import httpx import json import asyncio +from utils.logger import logger -@dataclass(frozen=True) class AppSlug: - value: str - def __post_init__(self): - if not self.value or not isinstance(self.value, str): + def __init__(self, value: str): + if not value or not isinstance(value, str): raise ValueError("AppSlug must be a non-empty string") - if not re.match(r'^[a-z0-9_-]+$', self.value): + if not re.match(r'^[a-z0-9_-]+$', value): raise ValueError("AppSlug must contain only lowercase letters, numbers, hyphens, and underscores") + self.value = value -@dataclass(frozen=True) class SearchQuery: - value: Optional[str] = None + def __init__(self, value: Optional[str] = None): + self.value = value + def is_empty(self) -> bool: return not self.value or not self.value.strip() -@dataclass(frozen=True) class Category: - value: str - def __post_init__(self): - if not self.value or not isinstance(self.value, str): + def __init__(self, value: str): + if not value or not isinstance(value, str): raise ValueError("Category must be a non-empty string") + self.value = value -@dataclass(frozen=True) class PaginationCursor: - value: Optional[str] = None + def __init__(self, value: Optional[str] = None): + self.value = value + def has_more(self) -> bool: return self.value is not None @@ -55,7 +55,7 @@ class AuthType(Enum): @dataclass class App: name: str - slug: AppSlug + slug: str description: str category: str logo_url: Optional[str] = None @@ -68,42 +68,29 @@ class App: def is_featured(self) -> bool: return self.featured_weight > 0 -class PipedreamException(Exception): - def __init__(self, message: str, error_code: str = None): - super().__init__(message) - self.error_code = error_code - self.message = message +class AppServiceError(Exception): + pass -class HttpClientException(PipedreamException): - def __init__(self, url: str, status_code: int, reason: str): - super().__init__(f"HTTP request to {url} failed with status {status_code}: {reason}", "HTTP_CLIENT_ERROR") - self.url = url - self.status_code = status_code - self.reason = reason +class AppNotFoundError(AppServiceError): + pass -class AuthenticationException(PipedreamException): - def __init__(self, reason: str): - super().__init__(f"Authentication failed: {reason}", "AUTHENTICATION_ERROR") - self.reason = reason +class InvalidAppSlugError(AppServiceError): + pass -class RateLimitException(PipedreamException): - def __init__(self, retry_after: int = None): - super().__init__("Rate limit exceeded", "RATE_LIMIT_EXCEEDED") - self.retry_after = retry_after +class AuthenticationError(AppServiceError): + pass -class Logger(Protocol): - def info(self, message: str) -> None: ... - def warning(self, message: str) -> None: ... - def error(self, message: str) -> None: ... - def debug(self, message: str) -> None: ... +class RateLimitError(AppServiceError): + pass -class HttpClient: +class AppService: def __init__(self): self.base_url = "https://api.pipedream.com/v1" self.session: Optional[httpx.AsyncClient] = None self.access_token: Optional[str] = None self.token_expires_at: Optional[datetime] = None - + self._semaphore = asyncio.Semaphore(10) + async def _get_session(self) -> httpx.AsyncClient: if self.session is None or self.session.is_closed: self.session = httpx.AsyncClient( @@ -124,7 +111,7 @@ class HttpClient: client_secret = os.getenv("PIPEDREAM_CLIENT_SECRET") if not all([project_id, client_id, client_secret]): - raise AuthenticationException("Missing required environment variables") + raise AuthenticationError("Missing required environment variables") session = await self._get_session() @@ -148,10 +135,10 @@ class HttpClient: except httpx.HTTPStatusError as e: if e.response.status_code == 429: - raise RateLimitException() - raise AuthenticationException(f"Failed to obtain access token: {e}") + raise RateLimitError() + raise AuthenticationError(f"Failed to obtain access token: {e}") - async def get(self, url: str, headers: Dict[str, str] = None, params: Dict[str, Any] = None) -> Dict[str, Any]: + async def _make_request(self, url: str, headers: Dict[str, str] = None, params: Dict[str, Any] = None) -> Dict[str, Any]: session = await self._get_session() access_token = await self._ensure_access_token() @@ -169,22 +156,12 @@ class HttpClient: return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 429: - raise RateLimitException() - raise HttpClientException(url, e.response.status_code, str(e)) - - async def close(self) -> None: - if self.session and not self.session.is_closed: - await self.session.aclose() + raise RateLimitError() + raise AppServiceError(f"HTTP request failed: {e}") -class AppRepository: - def __init__(self, http_client: HttpClient, logger: Logger): - self._http_client = http_client - self._logger = logger - self._semaphore = asyncio.Semaphore(10) - - async def search(self, query: SearchQuery, category: Optional[Category] = None, + async def _search(self, query: SearchQuery, category: Optional[Category] = None, page: int = 1, limit: int = 20, cursor: Optional[PaginationCursor] = None) -> Dict[str, Any]: - url = f"{self._http_client.base_url}/apps" + url = f"{self.base_url}/apps" params = {} if not query.is_empty(): @@ -195,20 +172,20 @@ class AppRepository: params["after"] = cursor.value try: - data = await self._http_client.get(url, params=params) + data = await self._make_request(url, params=params) apps = [] for app_data in data.get("data", []): try: app = self._map_to_domain(app_data) apps.append(app) except Exception as e: - self._logger.warning(f"Error mapping app data: {str(e)}") + logger.warning(f"Error mapping app data: {str(e)}") continue page_info = data.get("page_info", {}) page_info["has_more"] = bool(page_info.get("end_cursor")) - self._logger.info(f"Found {len(apps)} apps from search") + logger.info(f"Found {len(apps)} apps from search") return { "success": True, @@ -218,7 +195,7 @@ class AppRepository: } except Exception as e: - self._logger.error(f"Error searching apps: {str(e)}") + logger.error(f"Error searching apps: {str(e)}") return { "success": False, "error": str(e), @@ -227,29 +204,29 @@ class AppRepository: "total_count": 0 } - async def get_by_slug(self, app_slug: AppSlug) -> Optional[App]: - cache_key = f"pipedream:app:{app_slug.value}" + async def _get_by_slug(self, app_slug: str) -> Optional[App]: + cache_key = f"pipedream:app:{app_slug}" try: from services import redis redis_client = await redis.get_client() cached_data = await redis_client.get(cache_key) if cached_data: - self._logger.debug(f"Found cached app for slug: {app_slug.value}") + logger.debug(f"Found cached app for slug: {app_slug}") cached_app_data = json.loads(cached_data) return self._map_cached_app_to_domain(cached_app_data) except Exception as e: - self._logger.warning(f"Redis cache error for app {app_slug.value}: {e}") + logger.warning(f"Redis cache error for app {app_slug}: {e}") async with self._semaphore: - url = f"{self._http_client.base_url}/apps" - params = {"q": app_slug.value, "pageSize": 20} + url = f"{self.base_url}/apps" + params = {"q": app_slug, "pageSize": 20} try: - data = await self._http_client.get(url, params=params) + data = await self._make_request(url, params=params) apps = data.get("data", []) - exact_match = next((app for app in apps if app.get("name_slug") == app_slug.value), None) + exact_match = next((app for app in apps if app.get("name_slug") == app_slug), None) if exact_match: app = self._map_to_domain(exact_match) @@ -259,19 +236,19 @@ class AppRepository: redis_client = await redis.get_client() app_data = self._map_domain_app_to_cache(app) await redis_client.setex(cache_key, 21600, json.dumps(app_data)) - self._logger.debug(f"Cached app: {app_slug.value}") + logger.debug(f"Cached app: {app_slug}") except Exception as e: - self._logger.warning(f"Failed to cache app {app_slug.value}: {e}") + logger.warning(f"Failed to cache app {app_slug}: {e}") return app return None except Exception as e: - self._logger.error(f"Error getting app by slug: {str(e)}") + logger.error(f"Error getting app by slug: {str(e)}") return None - async def get_popular(self, category: Optional[Category] = None, limit: int = 100) -> List[App]: + async def _get_popular(self, category: Optional[str] = None, limit: int = 100) -> List[App]: popular_slugs = [ "slack", "microsoft_teams", "discord", "zoom", "telegram_bot_api", "gmail", "microsoft_outlook", "google_calendar", "microsoft_exchange", "calendly", @@ -293,12 +270,12 @@ class AppRepository: async def fetch_app(slug: str): try: - app = await self.get_by_slug(AppSlug(slug)) - if app and (not category or app.category == category.value): + app = await self._get_by_slug(slug) + if app and (not category or app.category == category): return app return None except Exception as e: - self._logger.warning(f"Error fetching popular app {slug}: {e}") + logger.warning(f"Error fetching popular app {slug}: {e}") return None for i in range(0, len(target_slugs), batch_size): @@ -322,9 +299,10 @@ class AppRepository: return apps - async def get_by_category(self, category: Category, limit: int = 20) -> List[App]: + async def _get_by_category(self, category: str, limit: int = 20) -> List[App]: query = SearchQuery(None) - result = await self.search(query, category, limit=limit) + category_obj = Category(category) + result = await self._search(query, category_obj, limit=limit) return result.get("apps", []) def _map_to_domain(self, app_data: Dict[str, Any]) -> App: @@ -332,12 +310,12 @@ class AppRepository: auth_type_str = app_data.get("auth_type", "oauth") auth_type = AuthType(auth_type_str) except ValueError: - self._logger.warning(f"Unknown auth type '{auth_type_str}', using CUSTOM") + logger.warning(f"Unknown auth type '{auth_type_str}', using CUSTOM") auth_type = AuthType.CUSTOM return App( name=app_data.get("name", "Unknown"), - slug=AppSlug(app_data.get("name_slug", "")), + slug=app_data.get("name_slug", ""), description=app_data.get("description", ""), category=app_data.get("category", "Other"), logo_url=app_data.get("img_src"), @@ -351,7 +329,7 @@ class AppRepository: def _map_domain_app_to_cache(self, app: App) -> Dict[str, Any]: return { "name": app.name, - "name_slug": app.slug.value, + "name_slug": app.slug, "description": app.description, "category": app.category, "img_src": app.logo_url, @@ -367,12 +345,12 @@ class AppRepository: auth_type_str = app_data.get("auth_type", "oauth") auth_type = AuthType(auth_type_str) except ValueError: - self._logger.warning(f"Unknown auth type '{auth_type_str}', using CUSTOM") + logger.warning(f"Unknown auth type '{auth_type_str}', using CUSTOM") auth_type = AuthType.CUSTOM return App( name=app_data.get("name", "Unknown"), - slug=AppSlug(app_data.get("name_slug", "")), + slug=app_data.get("name_slug", ""), description=app_data.get("description", ""), category=app_data.get("category", "Other"), logo_url=app_data.get("img_src"), @@ -383,12 +361,6 @@ class AppRepository: featured_weight=app_data.get("featured_weight", 0) ) -class AppService: - def __init__(self, logger: Optional[Logger] = None): - self._logger = logger or logging.getLogger(__name__) - self._http_client = HttpClient() - self._app_repo = AppRepository(self._http_client, self._logger) - async def search_apps( self, query: Optional[str] = None, @@ -401,52 +373,62 @@ class AppService: category_vo = Category(category) if category else None cursor_vo = PaginationCursor(cursor) if cursor else None - self._logger.info(f"Searching apps: query='{query}', category='{category}', page={page}") + logger.info(f"Searching apps: query='{query}', category='{category}', page={page}") - result = await self._app_repo.search(search_query, category_vo, page, limit, cursor_vo) + result = await self._search(search_query, category_vo, page, limit, cursor_vo) - self._logger.info(f"Found {len(result.get('apps', []))} apps") + logger.info(f"Found {len(result.get('apps', []))} apps") return result async def get_app_by_slug(self, app_slug: str) -> Optional[App]: - app_slug_vo = AppSlug(app_slug) + logger.info(f"Getting app by slug: {app_slug}") - self._logger.info(f"Getting app by slug: {app_slug}") - - app = await self._app_repo.get_by_slug(app_slug_vo) + app = await self._get_by_slug(app_slug) if app: - self._logger.info(f"Found app: {app.name}") + logger.info(f"Found app: {app.name}") else: - self._logger.info(f"App not found: {app_slug}") + logger.info(f"App not found: {app_slug}") return app async def get_popular_apps(self, category: Optional[str] = None, limit: int = 10) -> List[App]: - category_vo = Category(category) if category else None + logger.info(f"Getting popular apps: category='{category}', limit={limit}") - self._logger.info(f"Getting popular apps: category='{category}', limit={limit}") + apps = await self._get_popular(category, limit) - apps = await self._app_repo.get_popular(category_vo, limit) - - self._logger.info(f"Found {len(apps)} popular apps") + logger.info(f"Found {len(apps)} popular apps") return apps async def get_apps_by_category(self, category: str, limit: int = 20) -> List[App]: - category_vo = Category(category) + logger.info(f"Getting apps by category: {category}, limit={limit}") - self._logger.info(f"Getting apps by category: {category}, limit={limit}") + apps = await self._get_by_category(category, limit) - apps = await self._app_repo.get_by_category(category_vo, limit) - - self._logger.info(f"Found {len(apps)} apps in category {category}") + logger.info(f"Found {len(apps)} apps in category {category}") return apps async def close(self): - await self._http_client.close() + if self.session and not self.session.is_closed: + await self.session.aclose() async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close() \ No newline at end of file + await self.close() + + +_app_service = None + +def get_app_service() -> AppService: + global _app_service + if _app_service is None: + _app_service = AppService() + return _app_service + + +PipedreamException = AppServiceError +HttpClientException = AppServiceError +AuthenticationException = AuthenticationError +RateLimitException = RateLimitError \ No newline at end of file diff --git a/backend/pipedream/mcp_service.py b/backend/pipedream/mcp_service.py index b069d3ba..b73b3733 100644 --- a/backend/pipedream/mcp_service.py +++ b/backend/pipedream/mcp_service.py @@ -1,3 +1,4 @@ +import asyncio import json import os import re @@ -9,6 +10,14 @@ from enum import Enum import httpx from utils.logger import logger +try: + from mcp import ClientSession + from mcp.client.streamable_http import streamablehttp_client + MCP_AVAILABLE = True +except ImportError: + MCP_AVAILABLE = False + logger.warning("MCP client libraries not available") + class ConnectionStatus(Enum): CONNECTED = "connected" @@ -43,6 +52,9 @@ class MCPServer: def get_tool_count(self) -> int: return len(self.available_tools) + def add_tool(self, tool: MCPTool): + self.available_tools.append(tool) + class MCPServiceError(Exception): pass @@ -158,41 +170,67 @@ class MCPService: raise RateLimitError("Rate limit exceeded") raise MCPServiceError(f"HTTP request failed: {e}") - async def _fetch_server_tools(self, external_user_id: str, app_slug: str) -> List[MCPTool]: - project_id = os.getenv("PIPEDREAM_PROJECT_ID") - environment = os.getenv("PIPEDREAM_X_PD_ENVIRONMENT", "development") - - if not project_id: - return [] - - url = f"{self.base_url}/connect/{project_id}/tools" - params = { - "app": app_slug, - "external_id": external_user_id - } - headers = {"X-PD-Environment": environment} - + async def test_connection(self, server: MCPServer) -> MCPServer: + if not MCP_AVAILABLE: + logger.warning(f"MCP client not available for testing {server.app_name}") + server.status = ConnectionStatus.ERROR + server.error_message = "MCP client libraries not available" + return server + try: - data = await self._make_request(url, headers=headers, params=params) - tools_data = data.get("data", []) - - tools = [] - for tool_data in tools_data: - if tool_data.get("name") or tool_data.get("key"): - tool = MCPTool( - name=tool_data.get("name") or tool_data.get("key", ""), - description=tool_data.get("description", f"Tool from {app_slug}"), - input_schema=tool_data.get("inputSchema") or tool_data.get("props", {}) - ) - tools.append(tool) - - return tools - + access_token = await self._ensure_access_token() except Exception as e: - logger.error(f"Error fetching tools for {app_slug}: {str(e)}") - return [] + logger.error(f"Failed to get access token for MCP connection: {str(e)}") + server.status = ConnectionStatus.ERROR + server.error_message = f"Authentication failed: {str(e)}" + return server + + headers = { + "Authorization": f"Bearer {access_token}", + "x-pd-project-id": server.project_id, + "x-pd-environment": server.environment, + "x-pd-external-user-id": server.external_user_id, + "x-pd-app-slug": server.app_slug, + } + + logger.info(f"Testing MCP connection for {server.app_name} at {server.server_url}") + + try: + async with asyncio.timeout(15): + async with streamablehttp_client(server.server_url, headers=headers) as (read_stream, write_stream, _): + async with ClientSession(read_stream, write_stream) as session: + await session.initialize() + tools_result = await session.list_tools() + tools = tools_result.tools if hasattr(tools_result, 'tools') else tools_result + + for tool in tools: + mcp_tool = MCPTool( + name=tool.name, + description=tool.description, + input_schema=tool.inputSchema + ) + server.add_tool(mcp_tool) + + server.status = ConnectionStatus.CONNECTED + logger.info(f"Successfully tested MCP server for {server.app_name} with {server.get_tool_count()} tools") + return server + + except asyncio.TimeoutError: + logger.error(f"Timeout testing MCP connection for {server.app_name}") + server.status = ConnectionStatus.ERROR + server.error_message = "Connection timeout" + except Exception as e: + logger.error(f"Failed to test MCP connection for {server.app_name}: {str(e)}") + server.status = ConnectionStatus.ERROR + server.error_message = str(e) + + return server async def discover_servers_for_user(self, external_user_id: ExternalUserId, app_slug: Optional[AppSlug] = None) -> List[MCPServer]: + if not MCP_AVAILABLE: + logger.warning("MCP client libraries not available - returning empty server list") + return [] + project_id = os.getenv("PIPEDREAM_PROJECT_ID") environment = os.getenv("PIPEDREAM_X_PD_ENVIRONMENT", "development") @@ -218,57 +256,50 @@ class MCPService: if app_slug: user_apps = [app for app in user_apps if app.get("name_slug") == app_slug.value] + logger.info(f"Filtered to {len(user_apps)} apps for app_slug: {app_slug.value}") - servers = [] + mcp_servers = [] for app in user_apps: - try: - server = MCPServer( - app_slug=app.get("name_slug", ""), - app_name=app.get("name", "Unknown"), - server_url="https://remote.mcp.pipedream.net", - project_id=project_id, - environment=environment, - external_user_id=external_user_id.value, - status=ConnectionStatus.CONNECTED - ) - - try: - logger.info(f"Attempting to fetch tools for {app.get('name_slug')}...") - tools = await self._fetch_server_tools(external_user_id.value, server.app_slug) - server.available_tools = tools - - logger.info(f"Successfully fetched {len(tools)} tools for app: {app.get('name_slug')}") - - except Exception as e: - logger.error(f"Error fetching tools for {app.get('name_slug')}: {str(e)}") - server.available_tools = [] - - servers.append(server) - - except Exception as e: - logger.error(f"Error creating server for app {app.get('name_slug', 'unknown')}: {str(e)}") + app_slug_current = app.get('name_slug') + app_name = app.get('name') + + if not app_slug_current: + logger.warning(f"App missing name_slug: {app}") continue + + logger.info(f"Creating MCP server for app: {app_name} ({app_slug_current})") + + server = MCPServer( + app_slug=app_slug_current, + app_name=app_name, + server_url='https://remote.mcp.pipedream.net', + project_id=project_id, + environment=environment, + external_user_id=external_user_id.value, + status=ConnectionStatus.DISCONNECTED + ) + + try: + tested_server = await self.test_connection(server) + mcp_servers.append(tested_server) + logger.info(f"Successfully tested MCP server for {app_name}: {tested_server.status.value}") + except Exception as e: + logger.warning(f"Failed to test MCP server for {app_name}: {str(e)}") + server.status = ConnectionStatus.ERROR + server.error_message = str(e) + mcp_servers.append(server) - logger.info(f"Successfully discovered {len(servers)} MCP servers") - return servers + logger.info(f"Discovered {len(mcp_servers)} MCP servers for user: {external_user_id.value}") + return mcp_servers except Exception as e: - logger.error(f"Error discovering servers for user {external_user_id.value}: {str(e)}") + logger.error(f"Error discovering MCP servers: {str(e)}") return [] - async def test_server_connection(self, server: MCPServer) -> MCPServer: - logger.info(f"Testing MCP server connection: {server.app_name}") - - server.status = ConnectionStatus.CONNECTED - - if server.is_connected(): - logger.info(f"MCP server {server.app_name} connected successfully with {server.get_tool_count()} tools") - else: - logger.warning(f"MCP server {server.app_name} connection failed: {server.error_message}") - - return server - async def create_connection(self, external_user_id: ExternalUserId, app_slug: AppSlug, oauth_app_id: Optional[str] = None) -> MCPServer: + if not MCP_AVAILABLE: + raise MCPServerNotAvailableError("MCP client not available") + project_id = os.getenv("PIPEDREAM_PROJECT_ID") environment = os.getenv("PIPEDREAM_X_PD_ENVIRONMENT", "development") @@ -277,23 +308,43 @@ class MCPService: logger.info(f"Creating MCP connection for user: {external_user_id.value}, app: {app_slug.value}") - server = MCPServer( - app_slug=app_slug.value, - app_name=app_slug.value.replace('_', ' ').title(), - server_url="https://remote.mcp.pipedream.net", - project_id=project_id, - environment=environment, - external_user_id=external_user_id.value, - oauth_app_id=oauth_app_id, - status=ConnectionStatus.CONNECTED - ) + url = f"{self.base_url}/connect/{project_id}/accounts" + params = {"external_id": external_user_id.value} + headers = {"X-PD-Environment": environment} - if server.is_connected(): - logger.info(f"Successfully created MCP connection for {app_slug.value} with {server.get_tool_count()} tools") - else: - logger.error(f"Failed to create MCP connection for {app_slug.value}: {server.error_message}") + try: + data = await self._make_request(url, headers=headers, params=params) - return server + accounts = data.get("data", []) + user_apps = [account.get("app") for account in accounts if account.get("app")] + + connected_app = None + for app in user_apps: + if app.get('name_slug') == app_slug.value: + connected_app = app + break + + if not connected_app: + raise MCPConnectionError(f"User {external_user_id.value} does not have {app_slug.value} connected") + + server = MCPServer( + app_slug=app_slug.value, + app_name=connected_app.get('name'), + server_url='https://remote.mcp.pipedream.net', + project_id=project_id, + environment=environment, + external_user_id=external_user_id.value, + oauth_app_id=oauth_app_id, + status=ConnectionStatus.DISCONNECTED + ) + + tested_server = await self.test_connection(server) + logger.info(f"Successfully created MCP connection for {app_slug.value}") + return tested_server + + except Exception as e: + logger.error(f"Failed to create MCP connection for {app_slug.value}: {str(e)}") + raise MCPConnectionError(str(e)) async def close(self): if self.session and not self.session.is_closed: