suna/backend/composio_integration/composio_profile_service.py

305 lines
12 KiB
Python

import json
import hashlib
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import List, Optional, Dict, Any
from uuid import uuid4
from cryptography.fernet import Fernet
import os
from services.supabase import DBConnection
from utils.logger import logger
@dataclass
class ComposioProfile:
profile_id: str
account_id: str
mcp_qualified_name: str
profile_name: str
display_name: str
encrypted_config: str
config_hash: str
toolkit_slug: str
toolkit_name: str
mcp_url: str # The complete MCP URL
redirect_url: Optional[str] = None
is_active: bool = True
is_default: bool = False
is_connected: bool = False
created_at: datetime = None
updated_at: datetime = None
class ComposioProfileService:
def __init__(self, db_connection: Optional[DBConnection] = None):
self.db = db_connection or DBConnection()
def _get_encryption_key(self) -> bytes:
key = os.getenv("ENCRYPTION_KEY")
if not key:
raise ValueError("ENCRYPTION_KEY environment variable is required")
return key.encode()
def _encrypt_config(self, config_json: str) -> str:
fernet = Fernet(self._get_encryption_key())
return fernet.encrypt(config_json.encode()).decode()
def _decrypt_config(self, encrypted_config: str) -> Dict[str, Any]:
fernet = Fernet(self._get_encryption_key())
decrypted = fernet.decrypt(encrypted_config.encode()).decode()
return json.loads(decrypted)
def _generate_config_hash(self, config_json: str) -> str:
return hashlib.sha256(config_json.encode()).hexdigest()
def _build_config(
self,
toolkit_slug: str,
toolkit_name: str,
mcp_url: str, # The complete MCP URL
redirect_url: Optional[str] = None,
user_id: str = "default"
) -> Dict[str, Any]:
return {
"type": "composio",
"toolkit_slug": toolkit_slug,
"toolkit_name": toolkit_name,
"mcp_url": mcp_url, # Store the complete URL
"redirect_url": redirect_url,
"user_id": user_id,
"created_at": datetime.now(timezone.utc).isoformat()
}
async def _generate_unique_profile_name(self, base_name: str, account_id: str, mcp_qualified_name: str, client) -> str:
"""Generate a unique profile name by appending a number if needed"""
original_name = base_name
counter = 1
current_name = base_name
while True:
# Check if this name exists
existing = await client.table('user_mcp_credential_profiles').select('profile_id').eq(
'account_id', account_id
).eq('mcp_qualified_name', mcp_qualified_name).eq('profile_name', current_name).execute()
if not existing.data:
return current_name
# Generate next name
counter += 1
current_name = f"{original_name} ({counter})"
async def create_profile(
self,
account_id: str,
profile_name: str,
toolkit_slug: str,
toolkit_name: str,
mcp_url: str, # The complete MCP URL from Composio
redirect_url: Optional[str] = None,
user_id: str = "default",
is_default: bool = False
) -> ComposioProfile:
try:
logger.info(f"Creating Composio profile for user: {account_id}, toolkit: {toolkit_slug}")
logger.info(f"MCP URL to store: {mcp_url}")
config = self._build_config(
toolkit_slug, toolkit_name, mcp_url, redirect_url, user_id
)
config_json = json.dumps(config, sort_keys=True)
encrypted_config = self._encrypt_config(config_json)
config_hash = self._generate_config_hash(config_json)
mcp_qualified_name = f"composio.{toolkit_slug}"
profile_id = str(uuid4())
now = datetime.now(timezone.utc)
client = await self.db.client
# Generate unique profile name if needed
unique_profile_name = await self._generate_unique_profile_name(
profile_name, account_id, mcp_qualified_name, client
)
if unique_profile_name != profile_name:
logger.info(f"Generated unique profile name: {unique_profile_name} (original: {profile_name})")
# Set other profiles as non-default if this is default
if is_default:
await client.table('user_mcp_credential_profiles').update({
'is_default': False
}).eq('account_id', account_id).eq('mcp_qualified_name', mcp_qualified_name).execute()
# Insert new profile
result = await client.table('user_mcp_credential_profiles').insert({
'profile_id': profile_id,
'account_id': account_id,
'mcp_qualified_name': mcp_qualified_name,
'profile_name': unique_profile_name,
'display_name': unique_profile_name,
'encrypted_config': encrypted_config,
'config_hash': config_hash,
'is_active': True,
'is_default': is_default,
'created_at': now.isoformat(),
'updated_at': now.isoformat()
}).execute()
if not result.data:
raise Exception("Failed to create profile in database")
logger.info(f"Successfully created Composio profile: {profile_id}")
return ComposioProfile(
profile_id=profile_id,
account_id=account_id,
mcp_qualified_name=mcp_qualified_name,
profile_name=unique_profile_name,
display_name=unique_profile_name,
encrypted_config=encrypted_config,
config_hash=config_hash,
toolkit_slug=toolkit_slug,
toolkit_name=toolkit_name,
mcp_url=mcp_url,
redirect_url=redirect_url,
is_active=True,
is_default=is_default,
is_connected=bool(redirect_url), # Connected if we have OAuth flow
created_at=now,
updated_at=now
)
except Exception as e:
logger.error(f"Failed to create Composio profile: {e}", exc_info=True)
raise
async def get_mcp_config_for_agent(self, profile_id: str) -> Dict[str, Any]:
"""Get the MCP configuration object for adding to an agent's custom_mcps"""
try:
client = await self.db.client
# Get the profile
result = await client.table('user_mcp_credential_profiles').select('*').eq(
'profile_id', profile_id
).execute()
if not result.data:
raise ValueError(f"Profile {profile_id} not found")
profile_data = result.data[0]
# Decrypt the config to get toolkit info
config = self._decrypt_config(profile_data['encrypted_config'])
if config.get('type') != 'composio':
raise ValueError(f"Profile {profile_id} is not a Composio profile")
# Return the MCP configuration for the agent - with ONLY profile_id
return {
"name": config['toolkit_name'], # Just the toolkit name, cleaner
"type": "composio",
"config": {
"profile_id": profile_id # Only store profile_id reference
},
"enabledTools": [] # Will be populated by tool selection
}
except Exception as e:
logger.error(f"Failed to get MCP config for profile {profile_id}: {e}", exc_info=True)
raise
async def get_mcp_url_for_runtime(self, profile_id: str) -> str:
"""Get the actual MCP URL for runtime connection (used internally by MCP service)"""
try:
client = await self.db.client
# Get the profile
result = await client.table('user_mcp_credential_profiles').select('*').eq(
'profile_id', profile_id
).execute()
if not result.data:
raise ValueError(f"Profile {profile_id} not found")
profile_data = result.data[0]
# Decrypt the config to get the MCP URL
config = self._decrypt_config(profile_data['encrypted_config'])
if config.get('type') != 'composio':
raise ValueError(f"Profile {profile_id} is not a Composio profile")
mcp_url = config.get('mcp_url')
if not mcp_url:
raise ValueError(f"Profile {profile_id} has no MCP URL")
logger.info(f"Retrieved MCP URL for profile {profile_id}")
return mcp_url
except Exception as e:
logger.error(f"Failed to get MCP URL for profile {profile_id}: {e}", exc_info=True)
raise
async def get_profile_config(self, profile_id: str) -> Dict[str, Any]:
"""Get decrypted config for a profile"""
try:
client = await self.db.client
result = await client.table('user_mcp_credential_profiles').select('encrypted_config').eq(
'profile_id', profile_id
).execute()
if not result.data:
raise ValueError(f"Profile {profile_id} not found")
return self._decrypt_config(result.data[0]['encrypted_config'])
except Exception as e:
logger.error(f"Failed to get config for profile {profile_id}: {e}", exc_info=True)
raise
async def get_profiles(self, account_id: str, toolkit_slug: Optional[str] = None) -> List[ComposioProfile]:
try:
client = await self.db.client
query = client.table('user_mcp_credential_profiles').select('*').eq('account_id', account_id)
if toolkit_slug:
query = query.eq('mcp_qualified_name', f"composio.{toolkit_slug}")
else:
query = query.like('mcp_qualified_name', 'composio.%')
result = await query.execute()
profiles = []
for row in result.data:
# Decrypt and parse config
config = self._decrypt_config(row['encrypted_config'])
profile = ComposioProfile(
profile_id=row['profile_id'],
account_id=row['account_id'],
mcp_qualified_name=row['mcp_qualified_name'],
profile_name=row['profile_name'],
display_name=row['display_name'],
encrypted_config=row['encrypted_config'],
config_hash=row['config_hash'],
toolkit_slug=config.get('toolkit_slug', ''),
toolkit_name=config.get('toolkit_name', ''),
mcp_url=config.get('mcp_url', ''),
redirect_url=config.get('redirect_url'),
is_active=row.get('is_active', True),
is_default=row.get('is_default', False),
is_connected=bool(config.get('redirect_url')),
created_at=datetime.fromisoformat(row['created_at'].replace('Z', '+00:00')) if row.get('created_at') else None,
updated_at=datetime.fromisoformat(row['updated_at'].replace('Z', '+00:00')) if row.get('updated_at') else None
)
profiles.append(profile)
return profiles
except Exception as e:
logger.error(f"Failed to get Composio profiles: {e}", exc_info=True)
raise