suna/backend/mcp_service/template_manager.py

801 lines
36 KiB
Python

"""
Agent Template Manager
This module handles:
1. Creating agent templates from existing agents (stripping credentials)
2. Installing templates as agent instances
3. Managing template lifecycle and marketplace operations
4. Converting between legacy agents and new secure architecture
"""
import json
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timezone
from utils.logger import logger
from services.supabase import DBConnection
from .credential_manager import credential_manager, MCPRequirement, MCPCredential
db = DBConnection()
@dataclass
class AgentTemplate:
"""Represents an agent template"""
template_id: str
creator_id: str
name: str
description: Optional[str]
system_prompt: str
mcp_requirements: List[MCPRequirement]
agentpress_tools: Dict[str, Any]
tags: List[str]
is_public: bool
marketplace_published_at: Optional[datetime]
download_count: int
created_at: datetime
updated_at: datetime
avatar: Optional[str]
avatar_color: Optional[str]
metadata: Optional[Dict[str, Any]] = None
@dataclass
class AgentInstance:
"""Represents an agent instance"""
instance_id: str
template_id: Optional[str]
account_id: str
name: str
description: Optional[str]
credential_mappings: Dict[str, str]
custom_system_prompt: Optional[str]
is_active: bool
is_default: bool
created_at: datetime
updated_at: datetime
avatar: Optional[str]
avatar_color: Optional[str]
class TemplateManager:
"""Manages agent templates and instances"""
async def create_template_from_agent(
self,
agent_id: str,
creator_id: str,
make_public: bool = False,
tags: Optional[List[str]] = None
) -> str:
"""
Create a secure template from an existing agent
This extracts the agent configuration and creates a template with
MCP requirements (without credentials) that can be safely shared
"""
logger.info(f"Creating template from agent {agent_id} for user {creator_id}")
try:
client = await db.client
# 🔥 FIX: Get the agent WITH current version data
agent_result = await client.table('agents').select('*, agent_versions!current_version_id(*)').eq('agent_id', agent_id).execute()
if not agent_result.data:
raise ValueError("Agent not found")
agent = agent_result.data[0]
# Verify ownership
if agent['account_id'] != creator_id:
raise ValueError("Access denied - you can only create templates from your own agents")
# 🔥 FIX: Get configuration from current version, not legacy columns
version_data = agent.get('agent_versions')
if version_data and version_data.get('config'):
# Use version config (new structure)
version_config = version_data['config']
system_prompt = version_config.get('system_prompt', '')
agentpress_tools = version_config.get('tools', {}).get('agentpress', {})
configured_mcps = version_config.get('tools', {}).get('mcp', [])
custom_mcps = version_config.get('tools', {}).get('custom_mcp', [])
logger.info(f"Using VERSION config for template creation from agent {agent_id}")
else:
# Fallback to legacy columns if no version data
system_prompt = agent.get('system_prompt', '')
agentpress_tools = agent.get('agentpress_tools', {})
configured_mcps = agent.get('configured_mcps', [])
custom_mcps = agent.get('custom_mcps', [])
logger.info(f"Using LEGACY config for template creation from agent {agent_id}")
# Extract MCP requirements from agent configuration
mcp_requirements = []
# Process configured_mcps (regular MCP servers)
for mcp in configured_mcps:
if isinstance(mcp, dict) and 'qualifiedName' in mcp:
# Extract required config keys from the config
config_keys = list(mcp.get('config', {}).keys())
requirement = {
'qualified_name': mcp['qualifiedName'],
'display_name': mcp.get('name', mcp['qualifiedName']),
'enabled_tools': mcp.get('enabledTools', []),
'required_config': config_keys
}
mcp_requirements.append(requirement)
# Process custom_mcps (custom MCP servers)
for custom_mcp in custom_mcps:
if isinstance(custom_mcp, dict) and 'name' in custom_mcp:
# Extract required config keys from the config
config_keys = list(custom_mcp.get('config', {}).keys())
# 🔥 FIX: Handle Pipedream MCPs with correct qualified name format
custom_type = custom_mcp.get('customType', custom_mcp.get('type', 'http'))
if custom_type == 'pipedream':
# For Pipedream MCPs, extract app_slug and use pipedream:{app_slug} format
app_slug = custom_mcp.get('config', {}).get('app_slug')
if not app_slug and 'headers' in custom_mcp.get('config', {}):
app_slug = custom_mcp['config']['headers'].get('x-pd-app-slug')
if app_slug:
qualified_name = f"pipedream:{app_slug}"
logger.info(f"Using Pipedream qualified name: {qualified_name} for app_slug: {app_slug}")
else:
# Fallback if no app_slug found
qualified_name = f"pipedream:{custom_mcp['name'].lower().replace(' ', '_')}"
logger.warning(f"No app_slug found for Pipedream MCP {custom_mcp['name']}, using fallback: {qualified_name}")
else:
# For other custom MCPs, use the original logic
qualified_name = custom_mcp['name'].lower().replace(' ', '_')
requirement = {
'qualified_name': qualified_name,
'display_name': custom_mcp['name'],
'enabled_tools': custom_mcp.get('enabledTools', []),
'required_config': config_keys,
'custom_type': custom_type
}
mcp_requirements.append(requirement)
kortix_team_account_ids = [
'xxxxxxxx',
]
is_kortix_team = creator_id in kortix_team_account_ids
# Create the template using version data
template_data = {
'creator_id': creator_id,
'name': agent['name'],
'description': agent.get('description'),
'system_prompt': system_prompt, # 🔥 From version
'mcp_requirements': mcp_requirements,
'agentpress_tools': agentpress_tools, # 🔥 From version
'tags': tags or [],
'is_public': make_public,
'is_kortix_team': is_kortix_team,
'avatar': agent.get('avatar'),
'avatar_color': agent.get('avatar_color'),
'metadata': {
'source_agent_id': agent_id,
'source_version_id': agent.get('current_version_id'),
'source_version_name': version_data.get('version_name', 'v1') if version_data else 'v1'
}
}
if make_public:
template_data['marketplace_published_at'] = datetime.now(timezone.utc).isoformat()
result = await client.table('agent_templates').insert(template_data).execute()
if not result.data:
raise ValueError("Failed to create template")
template_id = result.data[0]['template_id']
logger.info(f"Successfully created template {template_id} from agent {agent_id} version {version_data.get('version_name', 'v1') if version_data else 'legacy'} with is_kortix_team={is_kortix_team}")
return template_id
except Exception as e:
logger.error(f"Error creating template from agent {agent_id}: {str(e)}")
raise
async def get_template(self, template_id: str) -> Optional[AgentTemplate]:
"""Get an agent template by ID"""
try:
client = await db.client
result = await client.table('agent_templates').select('*')\
.eq('template_id', template_id).execute()
if not result.data:
return None
template_data = result.data[0]
# Convert mcp_requirements to MCPRequirement objects
mcp_requirements = []
for req_data in template_data.get('mcp_requirements', []):
mcp_requirements.append(MCPRequirement(
qualified_name=req_data.get('qualified_name') or req_data.get('qualifiedName'),
display_name=req_data.get('display_name') or req_data.get('name'),
enabled_tools=req_data.get('enabled_tools') or req_data.get('enabledTools', []),
required_config=req_data.get('required_config') or req_data.get('requiredConfig', []),
custom_type=req_data.get('custom_type')
))
return AgentTemplate(
template_id=template_data['template_id'],
creator_id=template_data['creator_id'],
name=template_data['name'],
description=template_data.get('description'),
system_prompt=template_data['system_prompt'],
mcp_requirements=mcp_requirements,
agentpress_tools=template_data.get('agentpress_tools', {}),
tags=template_data.get('tags', []),
is_public=template_data.get('is_public', False),
marketplace_published_at=template_data.get('marketplace_published_at'),
download_count=template_data.get('download_count', 0),
created_at=template_data['created_at'],
updated_at=template_data['updated_at'],
avatar=template_data.get('avatar'),
avatar_color=template_data.get('avatar_color'),
metadata=template_data.get('metadata', {})
)
except Exception as e:
logger.error(f"Error getting template {template_id}: {str(e)}")
return None
async def install_template(
self,
template_id: str,
account_id: str,
instance_name: Optional[str] = None,
custom_system_prompt: Optional[str] = None,
profile_mappings: Optional[Dict[str, str]] = None,
custom_mcp_configs: Optional[Dict[str, Dict[str, Any]]] = None
) -> Dict[str, Any]:
logger.info(f"Installing template {template_id} for user {account_id}")
try:
template = await self.get_template(template_id)
if not template:
raise ValueError("Template not found")
if not template.is_public:
if template.creator_id != account_id:
raise ValueError("Access denied to private template")
custom_requirements = [req for req in template.mcp_requirements if getattr(req, 'custom_type', None)]
regular_requirements = [req for req in template.mcp_requirements if not getattr(req, 'custom_type', None)]
if not profile_mappings and regular_requirements:
profile_mappings = {}
for req in regular_requirements:
default_profile = await credential_manager.get_default_credential_profile(
account_id, req.qualified_name
)
if default_profile:
profile_mappings[req.qualified_name] = default_profile.profile_id
logger.info(f"✅ Mapped {req.qualified_name} -> {default_profile.profile_id} ({default_profile.display_name})")
else:
logger.warning(f"❌ No profile found for {req.qualified_name} after robust lookup")
missing_profile_mappings = []
if regular_requirements:
provided_mappings = profile_mappings or {}
for req in regular_requirements:
if req.qualified_name not in provided_mappings:
missing_profile_mappings.append({
'qualified_name': req.qualified_name,
'display_name': req.display_name,
'required_config': req.required_config
})
missing_custom_configs = []
if custom_requirements:
provided_custom_configs = custom_mcp_configs or {}
for req in custom_requirements:
if req.qualified_name not in provided_custom_configs:
missing_custom_configs.append({
'qualified_name': req.qualified_name,
'display_name': req.display_name,
'custom_type': req.custom_type,
'required_config': req.required_config
})
if missing_profile_mappings or missing_custom_configs:
return {
'status': 'configs_required',
'missing_regular_credentials': missing_profile_mappings,
'missing_custom_configs': missing_custom_configs,
'template': {
'template_id': template.template_id,
'name': template.name,
'description': template.description
}
}
client = await db.client
configured_mcps = []
custom_mcps = []
for req in template.mcp_requirements:
logger.info(f"Processing requirement: {req.qualified_name}, custom_type: {getattr(req, 'custom_type', None)}")
if hasattr(req, 'custom_type') and req.custom_type:
if custom_mcp_configs and req.qualified_name in custom_mcp_configs:
provided_config = custom_mcp_configs[req.qualified_name]
if req.custom_type == 'pipedream':
profile_id = provided_config.get('profile_id')
if profile_id:
logger.info(f"Looking up Pipedream profile {profile_id} for {req.qualified_name}")
try:
from pipedream.profiles import get_profile_manager
profile_manager = get_profile_manager(db)
profile = await profile_manager.get_profile(account_id, profile_id)
if profile:
actual_config = {
'app_slug': profile.app_slug,
'profile_id': profile_id,
'url': 'https://remote.mcp.pipedream.net',
'headers': {
'x-pd-app-slug': profile.app_slug,
}
}
logger.info(f"Found Pipedream profile: {profile.app_name} ({profile.profile_name})")
provided_config = actual_config
else:
logger.error(f"Pipedream profile {profile_id} not found for {req.qualified_name}")
raise ValueError(f"Pipedream profile not found for {req.display_name}")
except Exception as e:
logger.error(f"Error looking up Pipedream profile {profile_id}: {e}")
raise ValueError(f"Failed to lookup Pipedream profile for {req.display_name}")
custom_mcp_config = {
'name': req.display_name,
'type': req.custom_type,
'customType': req.custom_type,
'config': provided_config,
'enabledTools': req.enabled_tools
}
custom_mcps.append(custom_mcp_config)
logger.info(f"Added custom MCP with config: {custom_mcp_config}")
else:
logger.warning(f"No custom config provided for {req.qualified_name}")
continue
else:
if profile_mappings and req.qualified_name in profile_mappings:
profile_id = profile_mappings[req.qualified_name]
if not profile_id or profile_id.strip() == '':
logger.error(f"Empty profile_id provided for {req.qualified_name}")
raise ValueError(f"Invalid credential profile selected for {req.display_name}")
profile = await credential_manager.get_credential_by_profile(
account_id, profile_id
)
if not profile:
logger.error(f"Credential profile {profile_id} not found for {req.qualified_name}")
raise ValueError(f"Credential profile not found for {req.display_name}. Please select a valid profile or create a new one.")
if not profile.is_active:
logger.error(f"Credential profile {profile_id} is inactive for {req.qualified_name}")
raise ValueError(f"Selected credential profile for {req.display_name} is inactive. Please select an active profile.")
mcp_config = {
'name': req.display_name,
'qualifiedName': profile.mcp_qualified_name, # Use profile's qualified name!
'config': profile.config,
'enabledTools': req.enabled_tools,
'selectedProfileId': profile_id
}
configured_mcps.append(mcp_config)
logger.info(f"Added regular MCP with profile: {mcp_config}")
logger.info(f"Used qualified name from profile: {profile.mcp_qualified_name} (template had: {req.qualified_name})")
else:
logger.error(f"No profile mapping provided for {req.qualified_name}")
raise ValueError(f"Missing credential profile for {req.display_name}. Please select a credential profile.")
logger.info(f"Final configured_mcps: {configured_mcps}")
logger.info(f"Final custom_mcps: {custom_mcps}")
# 🔥 FIX: Build unified config structure like in agent creation
from agent.config_helper import build_unified_config
system_prompt = custom_system_prompt or template.system_prompt
unified_config = build_unified_config(
system_prompt=system_prompt,
agentpress_tools=template.agentpress_tools,
configured_mcps=configured_mcps,
custom_mcps=custom_mcps,
avatar=template.avatar,
avatar_color=template.avatar_color
)
agent_data = {
'account_id': account_id,
'name': instance_name or f"{template.name} (from marketplace)",
'description': template.description,
'config': unified_config,
# Keep legacy columns for backward compatibility
'system_prompt': system_prompt,
'configured_mcps': configured_mcps,
'custom_mcps': custom_mcps,
'agentpress_tools': template.agentpress_tools,
'is_default': False,
'avatar': template.avatar,
'avatar_color': template.avatar_color,
'version_count': 1
}
result = await client.table('agents').insert(agent_data).execute()
if not result.data:
raise ValueError("Failed to create agent")
instance_id = result.data[0]['agent_id']
try:
initial_version_data = {
"agent_id": instance_id,
"version_number": 1,
"version_name": "v1",
"config": unified_config,
# Keep legacy columns for backward compatibility
"system_prompt": system_prompt,
"configured_mcps": configured_mcps,
"custom_mcps": custom_mcps,
"agentpress_tools": template.agentpress_tools,
"is_active": True,
"created_by": account_id
}
version_result = await client.table('agent_versions').insert(initial_version_data).execute()
if version_result.data:
version_id = version_result.data[0]['version_id']
await client.table('agents').update({
'current_version_id': version_id
}).eq('agent_id', instance_id).execute()
logger.info(f"Created initial version v1 for installed agent {instance_id}")
else:
logger.warning(f"Failed to create initial version for agent {instance_id}")
except Exception as e:
logger.warning(f"Failed to create initial version for agent {instance_id}: {e}")
await client.table('agent_templates')\
.update({'download_count': template.download_count + 1})\
.eq('template_id', template_id).execute()
logger.info(f"Successfully installed template {template_id} as instance {instance_id}")
return {
'status': 'installed',
'instance_id': instance_id,
'name': agent_data['name']
}
except Exception as e:
logger.error(f"Error installing template {template_id}: {str(e)}")
raise
async def get_agent_instance(self, instance_id: str) -> Optional[AgentInstance]:
"""Get an agent instance by ID"""
try:
client = await db.client
result = await client.table('agent_instances').select('*')\
.eq('instance_id', instance_id).execute()
if not result.data:
return None
instance_data = result.data[0]
return AgentInstance(
instance_id=instance_data['instance_id'],
template_id=instance_data.get('template_id'),
account_id=instance_data['account_id'],
name=instance_data['name'],
description=instance_data.get('description'),
credential_mappings=instance_data.get('credential_mappings', {}),
custom_system_prompt=instance_data.get('custom_system_prompt'),
is_active=instance_data.get('is_active', True),
is_default=instance_data.get('is_default', False),
created_at=instance_data['created_at'],
updated_at=instance_data['updated_at'],
avatar=instance_data.get('avatar'),
avatar_color=instance_data.get('avatar_color')
)
except Exception as e:
logger.error(f"Error getting agent instance {instance_id}: {str(e)}")
return None
async def build_runtime_agent_config(self, instance_id: str) -> Dict[str, Any]:
"""
Build a complete agent configuration for runtime use by combining
template data with user credentials
Args:
instance_id: ID of the agent instance
Returns:
Complete agent configuration with populated MCP configs
"""
logger.info(f"Building runtime config for agent instance {instance_id}")
try:
# Get the agent instance
instance = await self.get_agent_instance(instance_id)
if not instance:
raise ValueError("Agent instance not found")
# If this is a legacy agent (no template), handle differently
if not instance.template_id:
return await self._build_legacy_agent_config(instance_id)
# Get the template
template = await self.get_template(instance.template_id)
if not template:
raise ValueError("Template not found")
# Build configured_mcps and custom_mcps with user's credentials
configured_mcps = []
custom_mcps = []
for req in template.mcp_requirements:
credential_id = instance.credential_mappings.get(req.qualified_name)
if not credential_id:
logger.warning(f"No credential mapping for {req.qualified_name}")
continue
# Get the credential
credential = await credential_manager.get_credential(
instance.account_id, req.qualified_name
)
if not credential:
logger.warning(f"Credential not found for {req.qualified_name}")
continue
# Check if this is a custom MCP server
if req.custom_type:
# Build custom MCP config
custom_mcp_config = {
'name': req.display_name,
'type': req.custom_type,
'config': credential.config,
'enabledTools': req.enabled_tools
}
custom_mcps.append(custom_mcp_config)
else:
mcp_config = {
'name': req.display_name,
'qualifiedName': credential.mcp_qualified_name, # Use credential's qualified name!
'config': credential.config,
'enabledTools': req.enabled_tools
}
configured_mcps.append(mcp_config)
logger.info(f"Runtime config - Used qualified name from credential: {credential.mcp_qualified_name} (template had: {req.qualified_name})")
# Build complete agent config
agent_config = {
'agent_id': instance.instance_id,
'account_id': instance.account_id,
'name': instance.name,
'description': instance.description,
'system_prompt': instance.custom_system_prompt or template.system_prompt,
'configured_mcps': configured_mcps,
'custom_mcps': custom_mcps,
'agentpress_tools': template.agentpress_tools,
'is_default': instance.is_default,
'avatar': instance.avatar,
'avatar_color': instance.avatar_color,
'created_at': instance.created_at,
'updated_at': instance.updated_at
}
return agent_config
except Exception as e:
logger.error(f"Error building runtime config for instance {instance_id}: {str(e)}")
raise
async def _build_legacy_agent_config(self, instance_id: str) -> Dict[str, Any]:
"""Build config for legacy agents (backward compatibility)"""
try:
client = await db.client
# For legacy agents, instance_id should match agent_id in agents table
result = await client.table('agents').select('*').eq('agent_id', instance_id).execute()
if not result.data:
raise ValueError("Legacy agent not found")
agent = result.data[0]
return agent
except Exception as e:
logger.error(f"Error building legacy agent config: {str(e)}")
raise
async def publish_template(self, template_id: str, creator_id: str, tags: Optional[List[str]] = None) -> bool:
"""Publish a template to the marketplace"""
try:
client = await db.client
# Verify ownership
template = await self.get_template(template_id)
if not template or template.creator_id != creator_id:
raise ValueError("Template not found or access denied")
# Check if this is a Kortix team account
kortix_team_account_ids = [
'bc14b70b-2edf-473c-95be-5f5b109d6553', # Your Kortix team account ID
# Add more Kortix team account IDs here as needed
]
is_kortix_team = template.creator_id in kortix_team_account_ids
# Update template
update_data = {
'is_public': True,
'marketplace_published_at': datetime.now(timezone.utc).isoformat(),
'is_kortix_team': is_kortix_team # Set based on account
}
if tags:
update_data['tags'] = tags
result = await client.table('agent_templates')\
.update(update_data)\
.eq('template_id', template_id)\
.execute()
logger.info(f"Published template {template_id} with is_kortix_team={is_kortix_team}")
return len(result.data) > 0
except Exception as e:
logger.error(f"Error publishing template {template_id}: {str(e)}")
return False
async def unpublish_template(self, template_id: str, creator_id: str) -> bool:
"""Unpublish a template from the marketplace"""
try:
client = await db.client
# Verify ownership
template = await self.get_template(template_id)
if not template or template.creator_id != creator_id:
raise ValueError("Template not found or access denied")
# Update template to make it private
update_data = {
'is_public': False,
'marketplace_published_at': None
}
result = await client.table('agent_templates')\
.update(update_data)\
.eq('template_id', template_id)\
.execute()
return len(result.data) > 0
except Exception as e:
logger.error(f"Error unpublishing template {template_id}: {str(e)}")
return False
async def get_marketplace_templates(
self,
limit: int = 50,
offset: int = 0,
search: Optional[str] = None,
tags: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""Get public templates from marketplace"""
try:
client = await db.client
query = client.table('agent_templates')\
.select('*')\
.eq('is_public', True)\
.order('is_kortix_team', desc=True)\
.order('marketplace_published_at', desc=True)\
.range(offset, offset + limit - 1)
if search:
query = query.or_(f'name.ilike.%{search}%,description.ilike.%{search}%')
if tags:
query = query.overlaps('tags', tags)
result = await query.execute()
templates = []
for template_data in result.data:
# Use the database field for is_kortix_team
is_kortix_team = template_data.get('is_kortix_team', False)
templates.append({
'template_id': template_data['template_id'],
'name': template_data['name'],
'description': template_data.get('description'),
'mcp_requirements': template_data.get('mcp_requirements', []),
'agentpress_tools': template_data.get('agentpress_tools', {}),
'tags': template_data.get('tags', []),
'is_public': template_data.get('is_public', True),
'download_count': template_data.get('download_count', 0),
'marketplace_published_at': template_data.get('marketplace_published_at'),
'created_at': template_data['created_at'],
'creator_name': 'Kortix Team' if is_kortix_team else 'Community',
'avatar': template_data.get('avatar'),
'avatar_color': template_data.get('avatar_color'),
'is_kortix_team': is_kortix_team
})
# Templates are already sorted by database query (Kortix team first, then by date)
return templates
except Exception as e:
logger.error(f"Error getting marketplace templates: {str(e)}")
return []
async def get_user_templates(
self,
creator_id: str,
limit: int = 50,
offset: int = 0
) -> List[Dict[str, Any]]:
"""Get all templates created by a specific user"""
try:
client = await db.client
query = client.table('agent_templates')\
.select('*')\
.eq('creator_id', creator_id)\
.order('created_at', desc=True)\
.range(offset, offset + limit - 1)
result = await query.execute()
templates = []
for template_data in result.data:
templates.append({
'template_id': template_data['template_id'],
'name': template_data['name'],
'description': template_data.get('description'),
'mcp_requirements': template_data.get('mcp_requirements', []),
'agentpress_tools': template_data.get('agentpress_tools', {}),
'tags': template_data.get('tags', []),
'is_public': template_data.get('is_public', False),
'download_count': template_data.get('download_count', 0),
'marketplace_published_at': template_data.get('marketplace_published_at'),
'created_at': template_data['created_at'],
'creator_name': 'You',
'avatar': template_data.get('avatar'),
'avatar_color': template_data.get('avatar_color')
})
return templates
except Exception as e:
logger.error(f"Error getting user templates: {str(e)}")
return []
# Global template manager instance
template_manager = TemplateManager()