suna/backend/mcp_service/template_manager.py

746 lines
32 KiB
Python

"""
Agent Template Manager
This module handles:
1. Creating agent templates from existing agents (stripping credentials)
2. Installing templates as agent instances
3. Managing template lifecycle and marketplace operations
4. Converting between legacy agents and new secure architecture
"""
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timezone
from utils.logger import logger
from services.supabase import DBConnection
from .credential_manager import credential_manager, MCPRequirement, MCPCredential
db = DBConnection()
@dataclass
class AgentTemplate:
"""Represents an agent template"""
template_id: str
creator_id: str
name: str
description: Optional[str]
system_prompt: str
mcp_requirements: List[MCPRequirement]
agentpress_tools: Dict[str, Any]
tags: List[str]
is_public: bool
marketplace_published_at: Optional[datetime]
download_count: int
created_at: datetime
updated_at: datetime
avatar: Optional[str]
avatar_color: Optional[str]
metadata: Optional[Dict[str, Any]] = None
@dataclass
class AgentInstance:
"""Represents an agent instance"""
instance_id: str
template_id: Optional[str]
account_id: str
name: str
description: Optional[str]
credential_mappings: Dict[str, str]
custom_system_prompt: Optional[str]
is_active: bool
is_default: bool
created_at: datetime
updated_at: datetime
avatar: Optional[str]
avatar_color: Optional[str]
class TemplateManager:
"""Manages agent templates and instances"""
async def create_template_from_agent(
self,
agent_id: str,
creator_id: str,
make_public: bool = False,
tags: Optional[List[str]] = None
) -> str:
"""
Create a secure template from an existing agent
This extracts the agent configuration and creates a template with
MCP requirements (without credentials) that can be safely shared
"""
logger.info(f"Creating template from agent {agent_id} for user {creator_id}")
try:
client = await db.client
# Get the agent
agent_result = await client.table('agents').select('*').eq('agent_id', agent_id).execute()
if not agent_result.data:
raise ValueError("Agent not found")
agent = agent_result.data[0]
# Verify ownership
if agent['account_id'] != creator_id:
raise ValueError("Access denied - you can only create templates from your own agents")
# Extract MCP requirements from agent configuration
mcp_requirements = []
# Process configured_mcps (regular MCP servers)
for mcp in agent.get('configured_mcps', []):
if isinstance(mcp, dict) and 'qualifiedName' in mcp:
# Extract required config keys from the config
config_keys = list(mcp.get('config', {}).keys())
requirement = {
'qualified_name': mcp['qualifiedName'],
'display_name': mcp.get('name', mcp['qualifiedName']),
'enabled_tools': mcp.get('enabledTools', []),
'required_config': config_keys
}
mcp_requirements.append(requirement)
# Process custom_mcps (custom MCP servers)
for custom_mcp in agent.get('custom_mcps', []):
if isinstance(custom_mcp, dict) and 'name' in custom_mcp:
# Extract required config keys from the config
config_keys = list(custom_mcp.get('config', {}).keys())
requirement = {
'qualified_name': custom_mcp['name'].lower().replace(' ', '_'),
'display_name': custom_mcp['name'],
'enabled_tools': custom_mcp.get('enabledTools', []),
'required_config': config_keys,
'custom_type': custom_mcp.get('type', 'http') # Default to http
}
mcp_requirements.append(requirement)
kortix_team_account_ids = [
'xxxxxxxx',
]
is_kortix_team = creator_id in kortix_team_account_ids
# Create the template
template_data = {
'creator_id': creator_id,
'name': agent['name'],
'description': agent.get('description'),
'system_prompt': agent['system_prompt'],
'mcp_requirements': mcp_requirements,
'agentpress_tools': agent.get('agentpress_tools', {}),
'tags': tags or [],
'is_public': make_public,
'is_kortix_team': is_kortix_team,
'avatar': agent.get('avatar'),
'avatar_color': agent.get('avatar_color'),
'metadata': {
'source_agent_id': agent_id,
'source_version_id': agent.get('current_version_id'),
'source_version_name': agent.get('current_version', {}).get('version_name', 'v1.0')
}
}
if make_public:
template_data['marketplace_published_at'] = datetime.now(timezone.utc).isoformat()
result = await client.table('agent_templates').insert(template_data).execute()
if not result.data:
raise ValueError("Failed to create template")
template_id = result.data[0]['template_id']
logger.info(f"Successfully created template {template_id} from agent {agent_id} with is_kortix_team={is_kortix_team}")
return template_id
except Exception as e:
logger.error(f"Error creating template from agent {agent_id}: {str(e)}")
raise
async def get_template(self, template_id: str) -> Optional[AgentTemplate]:
"""Get an agent template by ID"""
try:
client = await db.client
result = await client.table('agent_templates').select('*')\
.eq('template_id', template_id).execute()
if not result.data:
return None
template_data = result.data[0]
# Convert mcp_requirements to MCPRequirement objects
mcp_requirements = []
for req_data in template_data.get('mcp_requirements', []):
mcp_requirements.append(MCPRequirement(
qualified_name=req_data.get('qualified_name') or req_data.get('qualifiedName'),
display_name=req_data.get('display_name') or req_data.get('name'),
enabled_tools=req_data.get('enabled_tools') or req_data.get('enabledTools', []),
required_config=req_data.get('required_config') or req_data.get('requiredConfig', []),
custom_type=req_data.get('custom_type')
))
return AgentTemplate(
template_id=template_data['template_id'],
creator_id=template_data['creator_id'],
name=template_data['name'],
description=template_data.get('description'),
system_prompt=template_data['system_prompt'],
mcp_requirements=mcp_requirements,
agentpress_tools=template_data.get('agentpress_tools', {}),
tags=template_data.get('tags', []),
is_public=template_data.get('is_public', False),
marketplace_published_at=template_data.get('marketplace_published_at'),
download_count=template_data.get('download_count', 0),
created_at=template_data['created_at'],
updated_at=template_data['updated_at'],
avatar=template_data.get('avatar'),
avatar_color=template_data.get('avatar_color'),
metadata=template_data.get('metadata', {})
)
except Exception as e:
logger.error(f"Error getting template {template_id}: {str(e)}")
return None
async def install_template(
self,
template_id: str,
account_id: str,
instance_name: Optional[str] = None,
custom_system_prompt: Optional[str] = None,
profile_mappings: Optional[Dict[str, str]] = None,
custom_mcp_configs: Optional[Dict[str, Dict[str, Any]]] = None
) -> Dict[str, Any]:
"""
Install a template as an agent instance for a user
Args:
template_id: ID of the template to install
account_id: ID of the user installing the template
instance_name: Optional custom name for the instance
custom_system_prompt: Optional custom system prompt override
profile_mappings: Optional dict mapping qualified_name to profile_id
custom_mcp_configs: Optional dict mapping qualified_name to config for custom MCPs
Returns:
Dictionary with installation result and any missing credentials
"""
logger.info(f"Installing template {template_id} for user {account_id}")
try:
# Get the template
template = await self.get_template(template_id)
if not template:
raise ValueError("Template not found")
# Check if template is accessible
if not template.is_public:
# Check if user owns the template
if template.creator_id != account_id:
raise ValueError("Access denied to private template")
# Debug: Log template requirements
logger.info(f"Template MCP requirements: {[(req.qualified_name, req.display_name, getattr(req, 'custom_type', None)) for req in template.mcp_requirements]}")
# Separate custom and regular MCP requirements
custom_requirements = [req for req in template.mcp_requirements if getattr(req, 'custom_type', None)]
regular_requirements = [req for req in template.mcp_requirements if not getattr(req, 'custom_type', None)]
# If no profile mappings provided, try to use default profiles
if not profile_mappings and regular_requirements:
profile_mappings = {}
for req in regular_requirements:
# Get default profile for this MCP service
default_profile = await credential_manager.get_default_credential_profile(
account_id, req.qualified_name
)
if default_profile:
profile_mappings[req.qualified_name] = default_profile.profile_id
# Check for missing profile mappings for regular requirements
missing_profile_mappings = []
if regular_requirements:
provided_mappings = profile_mappings or {}
for req in regular_requirements:
if req.qualified_name not in provided_mappings:
missing_profile_mappings.append({
'qualified_name': req.qualified_name,
'display_name': req.display_name,
'required_config': req.required_config
})
# Check for missing custom MCP configs
missing_custom_configs = []
if custom_requirements:
provided_custom_configs = custom_mcp_configs or {}
for req in custom_requirements:
if req.qualified_name not in provided_custom_configs:
missing_custom_configs.append({
'qualified_name': req.qualified_name,
'display_name': req.display_name,
'custom_type': req.custom_type,
'required_config': req.required_config
})
# If we have any missing profile mappings or configs, return them
if missing_profile_mappings or missing_custom_configs:
return {
'status': 'configs_required',
'missing_regular_credentials': missing_profile_mappings,
'missing_custom_configs': missing_custom_configs,
'template': {
'template_id': template.template_id,
'name': template.name,
'description': template.description
}
}
# Create regular agent with secure credentials
client = await db.client
# Build configured_mcps and custom_mcps with user's credential profiles
configured_mcps = []
custom_mcps = []
for req in template.mcp_requirements:
logger.info(f"Processing requirement: {req.qualified_name}, custom_type: {getattr(req, 'custom_type', None)}")
if hasattr(req, 'custom_type') and req.custom_type:
# For custom MCP servers, use the provided config from installation
if custom_mcp_configs and req.qualified_name in custom_mcp_configs:
provided_config = custom_mcp_configs[req.qualified_name]
custom_mcp_config = {
'name': req.display_name,
'type': req.custom_type,
'config': provided_config,
'enabledTools': req.enabled_tools
}
custom_mcps.append(custom_mcp_config)
logger.info(f"Added custom MCP with provided config: {custom_mcp_config}")
else:
logger.warning(f"No custom config provided for {req.qualified_name}")
continue
else:
# For regular MCP servers, use the selected credential profile
if profile_mappings and req.qualified_name in profile_mappings:
profile_id = profile_mappings[req.qualified_name]
# Validate profile_id is not empty
if not profile_id or profile_id.strip() == '':
logger.error(f"Empty profile_id provided for {req.qualified_name}")
raise ValueError(f"Invalid credential profile selected for {req.display_name}")
# Get the credential profile
profile = await credential_manager.get_credential_by_profile(
account_id, profile_id
)
if not profile:
logger.error(f"Credential profile {profile_id} not found for {req.qualified_name}")
raise ValueError(f"Credential profile not found for {req.display_name}. Please select a valid profile or create a new one.")
# Validate profile is active
if not profile.is_active:
logger.error(f"Credential profile {profile_id} is inactive for {req.qualified_name}")
raise ValueError(f"Selected credential profile for {req.display_name} is inactive. Please select an active profile.")
mcp_config = {
'name': req.display_name,
'qualifiedName': req.qualified_name,
'config': profile.config,
'enabledTools': req.enabled_tools,
'selectedProfileId': profile_id
}
configured_mcps.append(mcp_config)
logger.info(f"Added regular MCP with profile: {mcp_config}")
else:
logger.error(f"No profile mapping provided for {req.qualified_name}")
raise ValueError(f"Missing credential profile for {req.display_name}. Please select a credential profile.")
logger.info(f"Final configured_mcps: {configured_mcps}")
logger.info(f"Final custom_mcps: {custom_mcps}")
agent_data = {
'account_id': account_id,
'name': instance_name or f"{template.name} (from marketplace)",
'description': template.description,
'system_prompt': custom_system_prompt or template.system_prompt,
'configured_mcps': configured_mcps,
'custom_mcps': custom_mcps,
'agentpress_tools': template.agentpress_tools,
'is_default': False,
'avatar': template.avatar,
'avatar_color': template.avatar_color
}
result = await client.table('agents').insert(agent_data).execute()
if not result.data:
raise ValueError("Failed to create agent")
instance_id = result.data[0]['agent_id']
try:
initial_version_data = {
"agent_id": instance_id,
"version_number": 1,
"version_name": "v1",
"system_prompt": agent_data['system_prompt'],
"configured_mcps": agent_data['configured_mcps'],
"custom_mcps": agent_data['custom_mcps'],
"agentpress_tools": agent_data['agentpress_tools'],
"is_active": True,
"created_by": account_id
}
version_result = await client.table('agent_versions').insert(initial_version_data).execute()
if version_result.data:
version_id = version_result.data[0]['version_id']
await client.table('agents').update({
'current_version_id': version_id,
'version_count': 1
}).eq('agent_id', instance_id).execute()
logger.info(f"Created initial version v1 for installed agent {instance_id}")
else:
logger.warning(f"Failed to create initial version for agent {instance_id}")
except Exception as e:
logger.warning(f"Failed to create initial version for agent {instance_id}: {e}")
await client.table('agent_templates')\
.update({'download_count': template.download_count + 1})\
.eq('template_id', template_id).execute()
logger.info(f"Successfully installed template {template_id} as instance {instance_id}")
return {
'status': 'installed',
'instance_id': instance_id,
'name': agent_data['name']
}
except Exception as e:
logger.error(f"Error installing template {template_id}: {str(e)}")
raise
async def get_agent_instance(self, instance_id: str) -> Optional[AgentInstance]:
"""Get an agent instance by ID"""
try:
client = await db.client
result = await client.table('agent_instances').select('*')\
.eq('instance_id', instance_id).execute()
if not result.data:
return None
instance_data = result.data[0]
return AgentInstance(
instance_id=instance_data['instance_id'],
template_id=instance_data.get('template_id'),
account_id=instance_data['account_id'],
name=instance_data['name'],
description=instance_data.get('description'),
credential_mappings=instance_data.get('credential_mappings', {}),
custom_system_prompt=instance_data.get('custom_system_prompt'),
is_active=instance_data.get('is_active', True),
is_default=instance_data.get('is_default', False),
created_at=instance_data['created_at'],
updated_at=instance_data['updated_at'],
avatar=instance_data.get('avatar'),
avatar_color=instance_data.get('avatar_color')
)
except Exception as e:
logger.error(f"Error getting agent instance {instance_id}: {str(e)}")
return None
async def build_runtime_agent_config(self, instance_id: str) -> Dict[str, Any]:
"""
Build a complete agent configuration for runtime use by combining
template data with user credentials
Args:
instance_id: ID of the agent instance
Returns:
Complete agent configuration with populated MCP configs
"""
logger.info(f"Building runtime config for agent instance {instance_id}")
try:
# Get the agent instance
instance = await self.get_agent_instance(instance_id)
if not instance:
raise ValueError("Agent instance not found")
# If this is a legacy agent (no template), handle differently
if not instance.template_id:
return await self._build_legacy_agent_config(instance_id)
# Get the template
template = await self.get_template(instance.template_id)
if not template:
raise ValueError("Template not found")
# Build configured_mcps and custom_mcps with user's credentials
configured_mcps = []
custom_mcps = []
for req in template.mcp_requirements:
credential_id = instance.credential_mappings.get(req.qualified_name)
if not credential_id:
logger.warning(f"No credential mapping for {req.qualified_name}")
continue
# Get the credential
credential = await credential_manager.get_credential(
instance.account_id, req.qualified_name
)
if not credential:
logger.warning(f"Credential not found for {req.qualified_name}")
continue
# Check if this is a custom MCP server
if req.custom_type:
# Build custom MCP config
custom_mcp_config = {
'name': req.display_name,
'type': req.custom_type,
'config': credential.config,
'enabledTools': req.enabled_tools
}
custom_mcps.append(custom_mcp_config)
else:
# Build regular MCP config
mcp_config = {
'name': req.display_name,
'qualifiedName': req.qualified_name,
'config': credential.config,
'enabledTools': req.enabled_tools
}
configured_mcps.append(mcp_config)
# Build complete agent config
agent_config = {
'agent_id': instance.instance_id,
'account_id': instance.account_id,
'name': instance.name,
'description': instance.description,
'system_prompt': instance.custom_system_prompt or template.system_prompt,
'configured_mcps': configured_mcps,
'custom_mcps': custom_mcps,
'agentpress_tools': template.agentpress_tools,
'is_default': instance.is_default,
'avatar': instance.avatar,
'avatar_color': instance.avatar_color,
'created_at': instance.created_at,
'updated_at': instance.updated_at
}
return agent_config
except Exception as e:
logger.error(f"Error building runtime config for instance {instance_id}: {str(e)}")
raise
async def _build_legacy_agent_config(self, instance_id: str) -> Dict[str, Any]:
"""Build config for legacy agents (backward compatibility)"""
try:
client = await db.client
# For legacy agents, instance_id should match agent_id in agents table
result = await client.table('agents').select('*').eq('agent_id', instance_id).execute()
if not result.data:
raise ValueError("Legacy agent not found")
agent = result.data[0]
return agent
except Exception as e:
logger.error(f"Error building legacy agent config: {str(e)}")
raise
async def publish_template(self, template_id: str, creator_id: str, tags: Optional[List[str]] = None) -> bool:
"""Publish a template to the marketplace"""
try:
client = await db.client
# Verify ownership
template = await self.get_template(template_id)
if not template or template.creator_id != creator_id:
raise ValueError("Template not found or access denied")
# Check if this is a Kortix team account
kortix_team_account_ids = [
'bc14b70b-2edf-473c-95be-5f5b109d6553', # Your Kortix team account ID
# Add more Kortix team account IDs here as needed
]
is_kortix_team = template.creator_id in kortix_team_account_ids
# Update template
update_data = {
'is_public': True,
'marketplace_published_at': datetime.now(timezone.utc).isoformat(),
'is_kortix_team': is_kortix_team # Set based on account
}
if tags:
update_data['tags'] = tags
result = await client.table('agent_templates')\
.update(update_data)\
.eq('template_id', template_id)\
.execute()
logger.info(f"Published template {template_id} with is_kortix_team={is_kortix_team}")
return len(result.data) > 0
except Exception as e:
logger.error(f"Error publishing template {template_id}: {str(e)}")
return False
async def unpublish_template(self, template_id: str, creator_id: str) -> bool:
"""Unpublish a template from the marketplace"""
try:
client = await db.client
# Verify ownership
template = await self.get_template(template_id)
if not template or template.creator_id != creator_id:
raise ValueError("Template not found or access denied")
# Update template to make it private
update_data = {
'is_public': False,
'marketplace_published_at': None
}
result = await client.table('agent_templates')\
.update(update_data)\
.eq('template_id', template_id)\
.execute()
return len(result.data) > 0
except Exception as e:
logger.error(f"Error unpublishing template {template_id}: {str(e)}")
return False
async def get_marketplace_templates(
self,
limit: int = 50,
offset: int = 0,
search: Optional[str] = None,
tags: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""Get public templates from marketplace"""
try:
client = await db.client
query = client.table('agent_templates')\
.select('*')\
.eq('is_public', True)\
.order('is_kortix_team', desc=True)\
.order('marketplace_published_at', desc=True)\
.range(offset, offset + limit - 1)
if search:
query = query.or_(f'name.ilike.%{search}%,description.ilike.%{search}%')
if tags:
query = query.overlaps('tags', tags)
result = await query.execute()
templates = []
for template_data in result.data:
# Use the database field for is_kortix_team
is_kortix_team = template_data.get('is_kortix_team', False)
templates.append({
'template_id': template_data['template_id'],
'name': template_data['name'],
'description': template_data.get('description'),
'mcp_requirements': template_data.get('mcp_requirements', []),
'agentpress_tools': template_data.get('agentpress_tools', {}),
'tags': template_data.get('tags', []),
'is_public': template_data.get('is_public', True),
'download_count': template_data.get('download_count', 0),
'marketplace_published_at': template_data.get('marketplace_published_at'),
'created_at': template_data['created_at'],
'creator_name': 'Kortix Team' if is_kortix_team else 'Community',
'avatar': template_data.get('avatar'),
'avatar_color': template_data.get('avatar_color'),
'is_kortix_team': is_kortix_team
})
# Templates are already sorted by database query (Kortix team first, then by date)
return templates
except Exception as e:
logger.error(f"Error getting marketplace templates: {str(e)}")
return []
async def get_user_templates(
self,
creator_id: str,
limit: int = 50,
offset: int = 0
) -> List[Dict[str, Any]]:
"""Get all templates created by a specific user"""
try:
client = await db.client
query = client.table('agent_templates')\
.select('*')\
.eq('creator_id', creator_id)\
.order('created_at', desc=True)\
.range(offset, offset + limit - 1)
result = await query.execute()
templates = []
for template_data in result.data:
templates.append({
'template_id': template_data['template_id'],
'name': template_data['name'],
'description': template_data.get('description'),
'mcp_requirements': template_data.get('mcp_requirements', []),
'agentpress_tools': template_data.get('agentpress_tools', {}),
'tags': template_data.get('tags', []),
'is_public': template_data.get('is_public', False),
'download_count': template_data.get('download_count', 0),
'marketplace_published_at': template_data.get('marketplace_published_at'),
'created_at': template_data['created_at'],
'creator_name': 'You',
'avatar': template_data.get('avatar'),
'avatar_color': template_data.get('avatar_color')
})
return templates
except Exception as e:
logger.error(f"Error getting user templates: {str(e)}")
return []
# Global template manager instance
template_manager = TemplateManager()