import os from typing import Optional, List, Dict, Any from composio_client import Composio from utils.logger import logger from pydantic import BaseModel from services.supabase import DBConnection from .client import ComposioClient, get_composio_client from .toolkit_service import ToolkitService, ToolkitInfo from .auth_config_service import AuthConfigService, AuthConfig from .connected_account_service import ConnectedAccountService, ConnectedAccount from .mcp_server_service import MCPServerService, MCPServer, MCPUrlResponse from .composio_profile_service import ComposioProfileService class ComposioIntegrationResult(BaseModel): toolkit: ToolkitInfo auth_config: AuthConfig connected_account: ConnectedAccount mcp_server: MCPServer mcp_url_response: MCPUrlResponse final_mcp_url: str profile_id: Optional[str] = None class ComposioIntegrationService: def __init__(self, api_key: Optional[str] = None, db_connection: Optional[DBConnection] = None): self.api_key = api_key self.toolkit_service = ToolkitService(api_key) self.auth_config_service = AuthConfigService(api_key) self.connected_account_service = ConnectedAccountService(api_key) self.mcp_server_service = MCPServerService(api_key) self.profile_service = ComposioProfileService(db_connection) if db_connection else None async def integrate_toolkit( self, toolkit_slug: str, account_id: str, user_id: str, profile_name: Optional[str] = None, display_name: Optional[str] = None, mcp_server_name: Optional[str] = None, save_as_profile: bool = True, initiation_fields: Optional[Dict[str, str]] = None ) -> ComposioIntegrationResult: try: logger.info(f"Starting Composio integration for toolkit: {toolkit_slug}") logger.info(f"Initiation fields: {initiation_fields}") toolkit = await self.toolkit_service.get_toolkit_by_slug(toolkit_slug) if not toolkit: raise ValueError(f"Toolkit '{toolkit_slug}' not found") logger.info(f"Step 1 complete: Verified toolkit {toolkit_slug}") auth_config = await self.auth_config_service.create_auth_config( toolkit_slug, initiation_fields=initiation_fields ) logger.info(f"Step 2 complete: Created auth config {auth_config.id}") connected_account = await self.connected_account_service.create_connected_account( auth_config_id=auth_config.id, user_id=user_id, initiation_fields=initiation_fields ) logger.info(f"Step 3 complete: Connected account {connected_account.id}") mcp_server = await self.mcp_server_service.create_mcp_server( auth_config_ids=[auth_config.id], name=mcp_server_name, toolkit_name=toolkit.name ) logger.info(f"Step 4 complete: Created MCP server {mcp_server.id}") mcp_url_response = await self.mcp_server_service.generate_mcp_url( mcp_server_id=mcp_server.id, connected_account_ids=[connected_account.id], user_ids=[user_id] ) logger.info(f"Step 5 complete: Generated MCP URLs") final_mcp_url = mcp_url_response.user_ids_url[0] if mcp_url_response.user_ids_url else mcp_url_response.mcp_url profile_id = None if save_as_profile and self.profile_service: profile_name = profile_name or f"{toolkit.name} Integration" display_name = display_name or f"{toolkit.name} via Composio" composio_profile = await self.profile_service.create_profile( account_id=account_id, profile_name=profile_name, toolkit_slug=toolkit_slug, toolkit_name=toolkit.name, mcp_url=final_mcp_url, # Pass the complete MCP URL redirect_url=connected_account.redirect_url, user_id=user_id, is_default=False ) profile_id = composio_profile.profile_id logger.info(f"Step 6 complete: Saved Composio credential profile {profile_id}") result = ComposioIntegrationResult( toolkit=toolkit, auth_config=auth_config, connected_account=connected_account, mcp_server=mcp_server, mcp_url_response=mcp_url_response, final_mcp_url=final_mcp_url, profile_id=profile_id ) logger.info(f"Successfully completed Composio integration for {toolkit_slug}") logger.info(f"Final MCP URL: {final_mcp_url}") return result except Exception as e: logger.error(f"Failed to integrate toolkit {toolkit_slug}: {e}", exc_info=True) raise async def list_available_toolkits(self, limit: int = 100, cursor: Optional[str] = None, category: Optional[str] = None) -> Dict[str, Any]: return await self.toolkit_service.list_toolkits(limit=limit, cursor=cursor, category=category) async def search_toolkits(self, query: str, category: Optional[str] = None, limit: int = 100, cursor: Optional[str] = None) -> Dict[str, Any]: return await self.toolkit_service.search_toolkits(query, category=category, limit=limit, cursor=cursor) async def get_integration_status(self, connected_account_id: str) -> Dict[str, Any]: return await self.connected_account_service.get_auth_status(connected_account_id) def get_integration_service(api_key: Optional[str] = None, db_connection: Optional[DBConnection] = None) -> ComposioIntegrationService: return ComposioIntegrationService(api_key, db_connection)