sync playbooks with config

This commit is contained in:
Saumya 2025-08-15 01:26:31 +05:30
parent ea045a4090
commit a3d4c02314
8 changed files with 290 additions and 51 deletions

View File

@ -2147,6 +2147,9 @@ async def update_agent(
if current_version_data is None:
logger.info(f"Agent {agent_id} has no version data, creating initial version")
try:
workflows_result = await client.table('agent_workflows').select('*').eq('agent_id', agent_id).execute()
workflows = workflows_result.data if workflows_result.data else []
initial_version_data = {
"agent_id": agent_id,
"version_number": 1,
@ -2165,7 +2168,8 @@ async def update_agent(
configured_mcps=initial_version_data["configured_mcps"],
custom_mcps=initial_version_data["custom_mcps"],
avatar=None,
avatar_color=None
avatar_color=None,
workflows=workflows
)
initial_version_data["config"] = initial_config

View File

@ -14,6 +14,7 @@ def extract_agent_config(agent_data: Dict[str, Any], version_data: Optional[Dict
logger.info(f"Using active version data for agent {agent_id} (version: {version_data.get('version_name', 'unknown')})")
model = None
workflows = []
if version_data.get('config'):
config = version_data['config'].copy()
system_prompt = config.get('system_prompt', '')
@ -22,12 +23,14 @@ def extract_agent_config(agent_data: Dict[str, Any], version_data: Optional[Dict
configured_mcps = tools.get('mcp', [])
custom_mcps = tools.get('custom_mcp', [])
agentpress_tools = tools.get('agentpress', {})
workflows = config.get('workflows', [])
else:
system_prompt = version_data.get('system_prompt', '')
model = version_data.get('model')
configured_mcps = version_data.get('configured_mcps', [])
custom_mcps = version_data.get('custom_mcps', [])
agentpress_tools = version_data.get('agentpress_tools', {})
workflows = []
if is_suna_default:
from agent.suna.config import SunaConfig
@ -47,10 +50,9 @@ def extract_agent_config(agent_data: Dict[str, Any], version_data: Optional[Dict
'configured_mcps': configured_mcps,
'custom_mcps': custom_mcps,
'agentpress_tools': _extract_agentpress_tools_for_run(agentpress_tools),
# Deprecated fields retained for compatibility
'workflows': workflows,
'avatar': agent_data.get('avatar'),
'avatar_color': agent_data.get('avatar_color'),
# New field
'profile_image_url': agent_data.get('profile_image_url'),
'is_suna_default': is_suna_default,
'centrally_managed': centrally_managed,
@ -85,6 +87,7 @@ def extract_agent_config(agent_data: Dict[str, Any], version_data: Optional[Dict
config['configured_mcps'] = tools.get('mcp', [])
config['custom_mcps'] = tools.get('custom_mcp', [])
config['agentpress_tools'] = _extract_agentpress_tools_for_run(tools.get('agentpress', {}))
config['workflows'] = config.get('workflows', [])
# Legacy and new fields
config['avatar'] = agent_data.get('avatar')
@ -110,6 +113,7 @@ def extract_agent_config(agent_data: Dict[str, Any], version_data: Optional[Dict
'configured_mcps': [],
'custom_mcps': [],
'agentpress_tools': {},
'workflows': [],
'avatar': agent_data.get('avatar'),
'avatar_color': agent_data.get('avatar_color'),
'profile_image_url': agent_data.get('profile_image_url'),
@ -128,7 +132,8 @@ def build_unified_config(
custom_mcps: Optional[List[Dict[str, Any]]] = None,
avatar: Optional[str] = None,
avatar_color: Optional[str] = None,
suna_metadata: Optional[Dict[str, Any]] = None
suna_metadata: Optional[Dict[str, Any]] = None,
workflows: Optional[List[Dict[str, Any]]] = None
) -> Dict[str, Any]:
simplified_tools = {}
for tool_name, tool_config in agentpress_tools.items():
@ -144,6 +149,7 @@ def build_unified_config(
'mcp': configured_mcps or [],
'custom_mcp': custom_mcps or []
},
'workflows': workflows or [],
'metadata': {
'avatar': avatar,
'avatar_color': avatar_color

View File

@ -11,6 +11,36 @@ class WorkflowTool(AgentBuilderBaseTool):
def __init__(self, thread_manager: ThreadManager, db_connection, agent_id: str):
super().__init__(thread_manager, db_connection, agent_id)
async def _sync_workflows_to_version_config(self) -> None:
try:
client = await self.db.client
agent_result = await client.table('agents').select('current_version_id').eq('agent_id', self.agent_id).single().execute()
if not agent_result.data or not agent_result.data.get('current_version_id'):
logger.warning(f"No current version found for agent {self.agent_id}")
return
current_version_id = agent_result.data['current_version_id']
workflows_result = await client.table('agent_workflows').select('*').eq('agent_id', self.agent_id).execute()
workflows = workflows_result.data if workflows_result.data else []
version_result = await client.table('agent_versions').select('config').eq('version_id', current_version_id).single().execute()
if not version_result.data:
logger.warning(f"Version {current_version_id} not found")
return
config = version_result.data.get('config', {})
config['workflows'] = workflows
await client.table('agent_versions').update({'config': config}).eq('version_id', current_version_id).execute()
logger.info(f"Synced {len(workflows)} workflows to version config for agent {self.agent_id}")
except Exception as e:
logger.error(f"Failed to sync workflows to version config: {e}")
async def _get_available_tools_for_agent(self) -> List[str]:
try:
client = await self.db.client
@ -214,6 +244,8 @@ class WorkflowTool(AgentBuilderBaseTool):
if not result.data:
return self.fail_response("Failed to create workflow")
await self._sync_workflows_to_version_config()
workflow = result.data[0]
return self.success_response({
"message": f"Workflow '{name}' created successfully",
@ -415,6 +447,8 @@ class WorkflowTool(AgentBuilderBaseTool):
if not result.data:
return self.fail_response("Failed to update workflow")
await self._sync_workflows_to_version_config()
workflow = result.data[0]
return self.success_response({
@ -471,6 +505,8 @@ class WorkflowTool(AgentBuilderBaseTool):
result = await client.table('agent_workflows').delete().eq('id', workflow_id).execute()
await self._sync_workflows_to_version_config()
return self.success_response({
"message": f"Workflow '{workflow_name}' deleted successfully",
"workflow_id": workflow_id
@ -523,6 +559,8 @@ class WorkflowTool(AgentBuilderBaseTool):
if not result.data:
return self.fail_response("Failed to update workflow status")
await self._sync_workflows_to_version_config()
action = "activated" if active else "deactivated"
return self.success_response({
"message": f"Workflow '{workflow_name}' {action} successfully",
@ -548,7 +586,6 @@ class WorkflowTool(AgentBuilderBaseTool):
'order': step.get('order', 0)
}
# Preserve identifiers to avoid breaking frontends/editors
if 'id' in step and step.get('id'):
step_dict['id'] = step['id']
if 'parentConditionalId' in step and step.get('parentConditionalId'):
@ -560,5 +597,3 @@ class WorkflowTool(AgentBuilderBaseTool):
result.append(step_dict)
return result
# Removed separate create_playbook in favor of playbook-style create_workflow

View File

@ -197,39 +197,31 @@ class VersionService:
configured_mcps: List[Dict[str, Any]],
custom_mcps: List[Dict[str, Any]],
agentpress_tools: Dict[str, Any],
model: Optional[str] = None, # Move model parameter after required params
model: Optional[str] = None,
version_name: Optional[str] = None,
change_description: Optional[str] = None
) -> AgentVersion:
logger.info(f"Creating version for agent {agent_id}")
client = await self.db.client
is_owner, _ = await self._verify_agent_access(agent_id, user_id)
if not is_owner:
raise UnauthorizedError("You don't have permission to create versions for this agent")
has_access = await has_edit_access_to_agent(agent_id, user_id)
if not has_access:
raise Exception("Unauthorized to create version for this agent")
client = await self._get_client()
current_result = await client.table('agents').select('current_version_id, version_count').eq('agent_id', agent_id).single().execute()
agent_result = await client.table('agents').select('*').eq(
'agent_id', agent_id
).execute()
if not current_result.data:
raise Exception("Agent not found")
if not agent_result.data:
raise AgentNotFoundError(f"Agent {agent_id} not found")
previous_version_id = current_result.data.get('current_version_id')
version_number = (current_result.data.get('version_count') or 0) + 1
version_number = await self._get_next_version_number(agent_id)
version_name = version_name or f"v{version_number}"
if not version_name:
version_name = f"v{version_number}"
current_active_result = await client.table('agent_versions').select('*').eq(
'agent_id', agent_id
).eq('is_active', True).execute()
previous_version_id = None
if current_active_result.data:
previous_version_id = current_active_result.data[0]['version_id']
await client.table('agent_versions').update({
'is_active': False,
'updated_at': datetime.now(timezone.utc).isoformat()
}).eq('version_id', previous_version_id).execute()
workflows_result = await client.table('agent_workflows').select('*').eq('agent_id', agent_id).execute()
workflows = workflows_result.data if workflows_result.data else []
normalized_custom_mcps = self._normalize_custom_mcps(custom_mcps)
@ -239,7 +231,7 @@ class VersionService:
version_number=version_number,
version_name=version_name,
system_prompt=system_prompt,
model=model, # Store the model field
model=model,
configured_mcps=configured_mcps,
custom_mcps=normalized_custom_mcps,
agentpress_tools=agentpress_tools,
@ -264,12 +256,13 @@ class VersionService:
'previous_version_id': version.previous_version_id,
'config': {
'system_prompt': version.system_prompt,
'model': version.model, # Include model in config
'model': version.model,
'tools': {
'agentpress': version.agentpress_tools,
'mcp': version.configured_mcps,
'custom_mcp': normalized_custom_mcps
}
},
'workflows': workflows
}
}

View File

@ -0,0 +1,80 @@
BEGIN;
CREATE OR REPLACE FUNCTION update_version_config_with_workflows(p_version_id UUID)
RETURNS VOID
LANGUAGE plpgsql
AS $$
DECLARE
v_agent_id UUID;
v_config JSONB;
v_workflows JSONB;
BEGIN
SELECT agent_id, config INTO v_agent_id, v_config
FROM agent_versions
WHERE version_id = p_version_id;
IF v_config IS NULL THEN
RETURN;
END IF;
SELECT COALESCE(
jsonb_agg(
jsonb_build_object(
'id', id,
'name', name,
'description', description,
'status', status,
'trigger_phrase', trigger_phrase,
'is_default', is_default,
'steps', steps,
'created_at', created_at,
'updated_at', updated_at
) ORDER BY created_at DESC
),
'[]'::jsonb
) INTO v_workflows
FROM agent_workflows
WHERE agent_id = v_agent_id;
v_config = jsonb_set(v_config, '{workflows}', v_workflows);
UPDATE agent_versions
SET config = v_config
WHERE version_id = p_version_id;
END;
$$;
DO $$
DECLARE
v_version RECORD;
v_count INTEGER := 0;
v_total INTEGER;
BEGIN
SELECT COUNT(*) INTO v_total FROM agent_versions WHERE config IS NOT NULL;
RAISE NOTICE 'Starting to update % version configs with workflows', v_total;
FOR v_version IN
SELECT version_id
FROM agent_versions
WHERE config IS NOT NULL
AND (config->>'workflows') IS NULL
LOOP
PERFORM update_version_config_with_workflows(v_version.version_id);
v_count := v_count + 1;
IF v_count % 100 = 0 THEN
RAISE NOTICE 'Processed % of % versions', v_count, v_total;
END IF;
END LOOP;
RAISE NOTICE 'Completed updating % version configs with workflows', v_count;
END;
$$;
DROP FUNCTION IF EXISTS update_version_config_with_workflows(UUID);
COMMENT ON COLUMN agent_versions.config IS 'Unified configuration including system_prompt, tools, workflows, and metadata';
COMMIT;

View File

@ -105,11 +105,9 @@ class InstallationService:
request.custom_system_prompt or template.system_prompt
)
await self._restore_workflows(agent_id, template.config)
await self._increment_download_count(template.template_id)
from utils.cache import Cache
await Cache.invalidate(f"agent_count_limit:{request.account_id}")
agent_name = request.instance_name or f"{template.name} (from marketplace)"
logger.info(f"Successfully installed template {template.template_id} as agent {agent_id}")
@ -371,6 +369,86 @@ class InstallationService:
except Exception as e:
logger.warning(f"Failed to create initial version for agent {agent_id}: {e}")
async def _restore_workflows(self, agent_id: str, template_config: Dict[str, Any]) -> None:
workflows = template_config.get('workflows', [])
if not workflows:
logger.info(f"No workflows to restore for agent {agent_id}")
return
client = await self._db.client
restored_count = 0
for workflow in workflows:
try:
steps = workflow.get('steps', [])
if steps:
steps = self._regenerate_step_ids(steps)
workflow_data = {
'id': str(uuid4()),
'agent_id': agent_id,
'name': workflow.get('name', 'Untitled Workflow'),
'description': workflow.get('description'),
'status': workflow.get('status', 'draft'),
'trigger_phrase': workflow.get('trigger_phrase'),
'is_default': workflow.get('is_default', False),
'steps': steps,
'created_at': datetime.now(timezone.utc).isoformat(),
'updated_at': datetime.now(timezone.utc).isoformat()
}
result = await client.table('agent_workflows').insert(workflow_data).execute()
if result.data:
restored_count += 1
logger.info(f"Restored workflow '{workflow_data['name']}' for agent {agent_id}")
else:
logger.warning(f"Failed to insert workflow '{workflow_data['name']}' for agent {agent_id}")
except Exception as e:
logger.error(f"Failed to restore workflow '{workflow.get('name', 'Unknown')}' for agent {agent_id}: {e}")
logger.info(f"Successfully restored {restored_count}/{len(workflows)} workflows for agent {agent_id}")
def _regenerate_step_ids(self, steps: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
if not steps:
return []
new_steps = []
id_mapping = {}
for step in steps:
if not isinstance(step, dict):
continue
old_id = step.get('id')
if old_id:
if old_id not in id_mapping:
id_mapping[old_id] = f"step-{str(uuid4())[:8]}"
new_id = id_mapping[old_id]
else:
new_id = f"step-{str(uuid4())[:8]}"
new_step = {
'id': new_id,
'name': step.get('name', ''),
'description': step.get('description', ''),
'type': step.get('type', 'instruction'),
'config': step.get('config', {}),
'order': step.get('order', 0)
}
if 'conditions' in step:
new_step['conditions'] = step['conditions']
if 'children' in step and isinstance(step['children'], list):
new_step['children'] = self._regenerate_step_ids(step['children'])
else:
new_step['children'] = []
new_steps.append(new_step)
return new_steps
async def _increment_download_count(self, template_id: str) -> None:
client = await self._db.client
try:

View File

@ -63,6 +63,10 @@ class AgentTemplate:
def agentpress_tools(self) -> Dict[str, Any]:
return self.config.get('tools', {}).get('agentpress', {})
@property
def workflows(self) -> List[Dict[str, Any]]:
return self.config.get('workflows', [])
@property
def mcp_requirements(self) -> List[MCPRequirementValue]:
requirements = []
@ -396,6 +400,20 @@ class TemplateService:
else:
sanitized_agentpress[tool_name] = False
workflows = config.get('workflows', [])
sanitized_workflows = []
for workflow in workflows:
if isinstance(workflow, dict):
sanitized_workflow = {
'name': workflow.get('name'),
'description': workflow.get('description'),
'status': workflow.get('status', 'draft'),
'trigger_phrase': workflow.get('trigger_phrase'),
'is_default': workflow.get('is_default', False),
'steps': workflow.get('steps', [])
}
sanitized_workflows.append(sanitized_workflow)
sanitized = {
'system_prompt': config.get('system_prompt', ''),
'model': config.get('model'),
@ -404,6 +422,7 @@ class TemplateService:
'mcp': config.get('tools', {}).get('mcp', []),
'custom_mcp': []
},
'workflows': sanitized_workflows, # Use sanitized workflows
'metadata': {
'avatar': config.get('metadata', {}).get('avatar'),
'avatar_color': config.get('metadata', {}).get('avatar_color')

View File

@ -123,20 +123,15 @@ class WorkflowExecuteRequest(BaseModel):
input_data: Optional[Dict[str, Any]] = None
# Rebuild models to handle forward references
WorkflowStepRequest.model_rebuild()
# ===== INITIALIZATION =====
def initialize(database: DBConnection):
"""Initialize the triggers module with database connection"""
global db
db = database
async def verify_agent_access(agent_id: str, user_id: str):
"""Verify user has access to the agent"""
client = await db.client
result = await client.table('agents').select('agent_id').eq('agent_id', agent_id).eq('account_id', user_id).execute()
@ -144,11 +139,39 @@ async def verify_agent_access(agent_id: str, user_id: str):
raise HTTPException(status_code=404, detail="Agent not found or access denied")
# ===== PROVIDER ENDPOINTS =====
async def sync_workflows_to_version_config(agent_id: str):
try:
client = await db.client
agent_result = await client.table('agents').select('current_version_id').eq('agent_id', agent_id).single().execute()
if not agent_result.data or not agent_result.data.get('current_version_id'):
logger.warning(f"No current version found for agent {agent_id}")
return
current_version_id = agent_result.data['current_version_id']
workflows_result = await client.table('agent_workflows').select('*').eq('agent_id', agent_id).execute()
workflows = workflows_result.data if workflows_result.data else []
version_result = await client.table('agent_versions').select('config').eq('version_id', current_version_id).single().execute()
if not version_result.data:
logger.warning(f"Version {current_version_id} not found")
return
config = version_result.data.get('config', {})
config['workflows'] = workflows
await client.table('agent_versions').update({'config': config}).eq('version_id', current_version_id).execute()
logger.info(f"Synced {len(workflows)} workflows to version config for agent {agent_id}")
except Exception as e:
logger.error(f"Failed to sync workflows to version config: {e}")
@router.get("/providers")
async def get_providers():
"""Get available trigger providers"""
if not await is_enabled("agent_triggers"):
raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
@ -603,6 +626,9 @@ async def create_agent_workflow(
'steps': steps_json
}).execute()
# Sync workflows to version config after creation
await sync_workflows_to_version_config(agent_id)
return result.data[0]
except Exception as e:
@ -645,6 +671,9 @@ async def update_agent_workflow(
if update_data:
await client.table('agent_workflows').update(update_data).eq('id', workflow_id).execute()
# Sync workflows to version config after update
await sync_workflows_to_version_config(agent_id)
# Return updated workflow
updated_result = await client.table('agent_workflows').select('*').eq('id', workflow_id).execute()
return updated_result.data[0]
@ -656,17 +685,18 @@ async def delete_agent_workflow(
workflow_id: str,
user_id: str = Depends(get_current_user_id_from_jwt)
):
"""Delete a workflow"""
await verify_agent_access(agent_id, user_id)
client = await db.client
# Verify workflow exists
workflow_result = await client.table('agent_workflows').select('*').eq('id', workflow_id).eq('agent_id', agent_id).execute()
if not workflow_result.data:
raise HTTPException(status_code=404, detail="Workflow not found")
await client.table('agent_workflows').delete().eq('id', workflow_id).execute()
await sync_workflows_to_version_config(agent_id)
return {"message": "Workflow deleted successfully"}
@ -677,12 +707,10 @@ async def execute_agent_workflow(
execution_data: WorkflowExecuteRequest,
user_id: str = Depends(get_current_user_id_from_jwt)
):
"""Manually execute a workflow"""
await verify_agent_access(agent_id, user_id)
client = await db.client
# Get workflow
workflow_result = await client.table('agent_workflows').select('*').eq('id', workflow_id).eq('agent_id', agent_id).execute()
if not workflow_result.data:
raise HTTPException(status_code=404, detail="Workflow not found")
@ -691,7 +719,6 @@ async def execute_agent_workflow(
if workflow['status'] != 'active':
raise HTTPException(status_code=400, detail="Workflow is not active")
# Get agent info
agent_result = await client.table('agents').select('account_id, name').eq('agent_id', agent_id).execute()
if not agent_result.data:
raise HTTPException(status_code=404, detail="Agent not found")
@ -737,7 +764,6 @@ async def execute_agent_workflow(
raw_data=execution_data.input_data or {}
)
# Execute workflow
execution_service = get_execution_service(db)
execution_result = await execution_service.execute_trigger_result(
agent_id=agent_id,
@ -764,6 +790,4 @@ async def execute_agent_workflow(
)
# ===== INCLUDE WORKFLOWS ROUTER =====
router.include_router(workflows_router)