mirror of https://github.com/kortix-ai/suna.git
134 lines
6.0 KiB
Python
134 lines
6.0 KiB
Python
import os
|
|
from typing import Optional, List, Dict, Any
|
|
from composio_client import Composio
|
|
from utils.logger import logger
|
|
from pydantic import BaseModel
|
|
from services.supabase import DBConnection
|
|
|
|
from .client import ComposioClient, get_composio_client
|
|
from .toolkit_service import ToolkitService, ToolkitInfo
|
|
from .auth_config_service import AuthConfigService, AuthConfig
|
|
from .connected_account_service import ConnectedAccountService, ConnectedAccount
|
|
from .mcp_server_service import MCPServerService, MCPServer, MCPUrlResponse
|
|
from .composio_profile_service import ComposioProfileService
|
|
|
|
|
|
class ComposioIntegrationResult(BaseModel):
|
|
toolkit: ToolkitInfo
|
|
auth_config: AuthConfig
|
|
connected_account: ConnectedAccount
|
|
mcp_server: MCPServer
|
|
mcp_url_response: MCPUrlResponse
|
|
final_mcp_url: str
|
|
profile_id: Optional[str] = None
|
|
|
|
|
|
class ComposioIntegrationService:
|
|
def __init__(self, api_key: Optional[str] = None, db_connection: Optional[DBConnection] = None):
|
|
self.api_key = api_key
|
|
self.toolkit_service = ToolkitService(api_key)
|
|
self.auth_config_service = AuthConfigService(api_key)
|
|
self.connected_account_service = ConnectedAccountService(api_key)
|
|
self.mcp_server_service = MCPServerService(api_key)
|
|
self.profile_service = ComposioProfileService(db_connection) if db_connection else None
|
|
|
|
async def integrate_toolkit(
|
|
self,
|
|
toolkit_slug: str,
|
|
account_id: str,
|
|
user_id: str,
|
|
profile_name: Optional[str] = None,
|
|
display_name: Optional[str] = None,
|
|
mcp_server_name: Optional[str] = None,
|
|
save_as_profile: bool = True,
|
|
initiation_fields: Optional[Dict[str, str]] = None
|
|
) -> ComposioIntegrationResult:
|
|
try:
|
|
logger.info(f"Starting Composio integration for toolkit: {toolkit_slug}")
|
|
logger.info(f"Initiation fields: {initiation_fields}")
|
|
|
|
toolkit = await self.toolkit_service.get_toolkit_by_slug(toolkit_slug)
|
|
if not toolkit:
|
|
raise ValueError(f"Toolkit '{toolkit_slug}' not found")
|
|
|
|
logger.info(f"Step 1 complete: Verified toolkit {toolkit_slug}")
|
|
|
|
auth_config = await self.auth_config_service.create_auth_config(
|
|
toolkit_slug,
|
|
initiation_fields=initiation_fields
|
|
)
|
|
logger.info(f"Step 2 complete: Created auth config {auth_config.id}")
|
|
|
|
connected_account = await self.connected_account_service.create_connected_account(
|
|
auth_config_id=auth_config.id,
|
|
user_id=user_id,
|
|
initiation_fields=initiation_fields
|
|
)
|
|
logger.info(f"Step 3 complete: Connected account {connected_account.id}")
|
|
|
|
mcp_server = await self.mcp_server_service.create_mcp_server(
|
|
auth_config_ids=[auth_config.id],
|
|
name=mcp_server_name,
|
|
toolkit_name=toolkit.name
|
|
)
|
|
logger.info(f"Step 4 complete: Created MCP server {mcp_server.id}")
|
|
|
|
mcp_url_response = await self.mcp_server_service.generate_mcp_url(
|
|
mcp_server_id=mcp_server.id,
|
|
connected_account_ids=[connected_account.id],
|
|
user_ids=[user_id]
|
|
)
|
|
logger.info(f"Step 5 complete: Generated MCP URLs")
|
|
|
|
final_mcp_url = mcp_url_response.user_ids_url[0] if mcp_url_response.user_ids_url else mcp_url_response.mcp_url
|
|
|
|
profile_id = None
|
|
if save_as_profile and self.profile_service:
|
|
profile_name = profile_name or f"{toolkit.name} Integration"
|
|
display_name = display_name or f"{toolkit.name} via Composio"
|
|
|
|
composio_profile = await self.profile_service.create_profile(
|
|
account_id=account_id,
|
|
profile_name=profile_name,
|
|
toolkit_slug=toolkit_slug,
|
|
toolkit_name=toolkit.name,
|
|
mcp_url=final_mcp_url, # Pass the complete MCP URL
|
|
redirect_url=connected_account.redirect_url,
|
|
user_id=user_id,
|
|
is_default=False,
|
|
connected_account_id=connected_account.id
|
|
)
|
|
profile_id = composio_profile.profile_id
|
|
logger.info(f"Step 6 complete: Saved Composio credential profile {profile_id}")
|
|
|
|
result = ComposioIntegrationResult(
|
|
toolkit=toolkit,
|
|
auth_config=auth_config,
|
|
connected_account=connected_account,
|
|
mcp_server=mcp_server,
|
|
mcp_url_response=mcp_url_response,
|
|
final_mcp_url=final_mcp_url,
|
|
profile_id=profile_id
|
|
)
|
|
|
|
logger.info(f"Successfully completed Composio integration for {toolkit_slug}")
|
|
logger.info(f"Final MCP URL: {final_mcp_url}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to integrate toolkit {toolkit_slug}: {e}", exc_info=True)
|
|
raise
|
|
|
|
async def list_available_toolkits(self, limit: int = 100, cursor: Optional[str] = None, category: Optional[str] = None) -> Dict[str, Any]:
|
|
return await self.toolkit_service.list_toolkits(limit=limit, cursor=cursor, category=category)
|
|
|
|
async def search_toolkits(self, query: str, category: Optional[str] = None, limit: int = 100, cursor: Optional[str] = None) -> Dict[str, Any]:
|
|
return await self.toolkit_service.search_toolkits(query, category=category, limit=limit, cursor=cursor)
|
|
|
|
async def get_integration_status(self, connected_account_id: str) -> Dict[str, Any]:
|
|
return await self.connected_account_service.get_auth_status(connected_account_id)
|
|
|
|
|
|
def get_integration_service(api_key: Optional[str] = None, db_connection: Optional[DBConnection] = None) -> ComposioIntegrationService:
|
|
return ComposioIntegrationService(api_key, db_connection) |