import json from typing import Optional from agentpress.tool import ToolResult, openapi_schema, xml_schema from agentpress.thread_manager import ThreadManager from .base_tool import AgentBuilderBaseTool from pipedream.facade import PipedreamManager from pipedream.domain.value_objects import ExternalUserId, AppSlug from utils.logger import logger class MCPSearchTool(AgentBuilderBaseTool): def __init__(self, thread_manager: ThreadManager, db_connection, agent_id: str): super().__init__(thread_manager, db_connection, agent_id) self.pipedream_manager = PipedreamManager() @openapi_schema({ "type": "function", "function": { "name": "search_mcp_servers", "description": "Search for Pipedream MCP servers based on user requirements. Use this when the user wants to add MCP tools to their agent.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Search query for finding relevant Pipedream apps (e.g., 'linear', 'github', 'database', 'search')" }, "category": { "type": "string", "description": "Optional category filter for Pipedream apps" }, "limit": { "type": "integer", "description": "Maximum number of apps to return (default: 10)", "default": 10 } }, "required": ["query"] } } }) @xml_schema( tag_name="search-mcp-servers", mappings=[ {"param_name": "query", "node_type": "attribute", "path": "."}, {"param_name": "category", "node_type": "attribute", "path": "."}, {"param_name": "limit", "node_type": "attribute", "path": "."} ], example=''' linear 5 ''' ) async def search_mcp_servers( self, query: str, category: Optional[str] = None, limit: int = 10 ) -> ToolResult: try: search_result = await self.pipedream_manager.search_apps( query=query, category=category, page=1, limit=limit ) apps = search_result.get("apps", []) formatted_apps = [] for app in apps: if hasattr(app, '__dict__'): formatted_apps.append({ "name": app.name, "app_slug": app.app_slug.value if hasattr(app.app_slug, 'value') else str(app.app_slug), "description": app.description, "category": app.categories[0] if app.categories else "Other", "logo_url": getattr(app, 'logo_url', ''), "auth_type": app.auth_type.value if app.auth_type else '', "is_verified": getattr(app, 'is_verified', False), "url": getattr(app, 'url', ''), "tags": getattr(app, 'tags', []) }) else: formatted_apps.append({ "name": app.get("name", "Unknown"), "app_slug": app.get("app_slug", ""), "description": app.get("description", "No description available"), "category": app.get("category", "Other"), "logo_url": app.get("logo_url", ""), "auth_type": app.get("auth_type", ""), "is_verified": app.get("is_verified", False), "url": app.get("url", ""), "tags": app.get("tags", []) }) if not formatted_apps: return ToolResult( success=False, output=json.dumps([], ensure_ascii=False) ) return ToolResult( success=True, output=json.dumps(formatted_apps, ensure_ascii=False) ) except Exception as e: return self.fail_response(f"Error searching Pipedream apps: {str(e)}") @openapi_schema({ "type": "function", "function": { "name": "get_app_details", "description": "Get detailed information about a specific Pipedream app, including available tools and authentication requirements.", "parameters": { "type": "object", "properties": { "app_slug": { "type": "string", "description": "The app slug to get details for (e.g., 'github', 'linear', 'slack')" } }, "required": ["app_slug"] } } }) @xml_schema( tag_name="get-app-details", mappings=[ {"param_name": "app_slug", "node_type": "attribute", "path": "."} ], example=''' github ''' ) async def get_app_details(self, app_slug: str) -> ToolResult: try: app_data = await self.pipedream_manager.get_app_by_slug(app_slug) if not app_data: return self.fail_response(f"Could not find app details for '{app_slug}'") if hasattr(app_data, '__dict__'): app_data = { "name": app_data.name, "app_slug": app_data.app_slug.value, "description": app_data.description, "category": app_data.categories[0] if app_data.categories else "Other", "logo_url": getattr(app_data, 'logo_url', ''), "auth_type": app_data.auth_type.value if app_data.auth_type else '', "is_verified": getattr(app_data, 'is_verified', False), "url": getattr(app_data, 'url', ''), "tags": getattr(app_data, 'tags', []), "pricing": getattr(app_data, 'pricing', ''), "setup_instructions": getattr(app_data, 'setup_instructions', ''), "available_actions": getattr(app_data, 'available_actions', []), "available_triggers": getattr(app_data, 'available_triggers', []) } formatted_app = { "name": app_data.get("name", "Unknown"), "app_slug": app_data.get("app_slug", app_slug), "description": app_data.get("description", "No description available"), "category": app_data.get("category", "Other"), "logo_url": app_data.get("logo_url", ""), "auth_type": app_data.get("auth_type", ""), "is_verified": app_data.get("is_verified", False), "url": app_data.get("url", ""), "tags": app_data.get("tags", []), "pricing": app_data.get("pricing", ""), "setup_instructions": app_data.get("setup_instructions", ""), "available_actions": app_data.get("available_actions", []), "available_triggers": app_data.get("available_triggers", []) } return self.success_response({ "message": f"Retrieved details for {formatted_app['name']}", "app": formatted_app }) except Exception as e: return self.fail_response(f"Error getting app details: {str(e)}") @openapi_schema({ "type": "function", "function": { "name": "discover_user_mcp_servers", "description": "Discover available MCP servers for a specific user and app combination. Use this to see what MCP tools are available for a connected profile.", "parameters": { "type": "object", "properties": { "user_id": { "type": "string", "description": "The external user ID from the credential profile" }, "app_slug": { "type": "string", "description": "The app slug to discover MCP servers for" } }, "required": ["user_id", "app_slug"] } } }) @xml_schema( tag_name="discover-user-mcp-servers", mappings=[ {"param_name": "user_id", "node_type": "attribute", "path": "."}, {"param_name": "app_slug", "node_type": "attribute", "path": "."} ], example=''' user_123456 github ''' ) async def discover_user_mcp_servers(self, user_id: str, app_slug: str) -> ToolResult: try: servers = await self.pipedream_manager.discover_mcp_servers( external_user_id=user_id, app_slug=app_slug ) formatted_servers = [] for server in servers: if hasattr(server, '__dict__'): formatted_servers.append({ "server_id": getattr(server, 'server_id', ''), "name": getattr(server, 'name', 'Unknown'), "app_slug": getattr(server, 'app_slug', app_slug), "status": getattr(server, 'status', 'unknown'), "available_tools": getattr(server, 'available_tools', []), "last_ping": getattr(server, 'last_ping', ''), "created_at": getattr(server, 'created_at', '') }) else: formatted_servers.append({ "server_id": server.get("server_id", ""), "name": server.get("name", "Unknown"), "app_slug": server.get("app_slug", app_slug), "status": server.get("status", "unknown"), "available_tools": server.get("available_tools", []), "last_ping": server.get("last_ping", ""), "created_at": server.get("created_at", "") }) connected_servers = [s for s in formatted_servers if s["status"] == "connected"] total_tools = sum(len(s["available_tools"]) for s in connected_servers) return self.success_response({ "message": f"Found {len(formatted_servers)} MCP servers for {app_slug} (user: {user_id}), {len(connected_servers)} connected with {total_tools} total tools available", "servers": formatted_servers, "connected_count": len(connected_servers), "total_tools": total_tools }) except Exception as e: return self.fail_response(f"Error discovering MCP servers: {str(e)}")