diff --git a/backend/agent/run.py b/backend/agent/run.py index e97ef729..9f14f8c6 100644 --- a/backend/agent/run.py +++ b/backend/agent/run.py @@ -407,7 +407,8 @@ async def run_agent( # Set max_tokens based on model max_tokens = None if "sonnet" in model_name.lower(): - max_tokens = 64000 + # Claude 3.5 Sonnet has a limit of 8192 tokens + max_tokens = 8192 elif "gpt-4" in model_name.lower(): max_tokens = 4096 diff --git a/backend/api.py b/backend/api.py index 1c93058d..6d97c5f1 100644 --- a/backend/api.py +++ b/backend/api.py @@ -52,6 +52,8 @@ async def lifespan(app: FastAPI): sandbox_api.initialize(db) + workflows_api.initialize(db) + # Initialize Redis connection from services import redis try: @@ -145,6 +147,9 @@ app.include_router(transcription_api.router, prefix="/api") app.include_router(email_api.router, prefix="/api") +from workflows import api as workflows_api +app.include_router(workflows_api.router, prefix="/api") + @app.get("/api/health") async def health_check(): """Health check endpoint to verify API is working.""" diff --git a/backend/supabase/migrations/20250115000003_workflow_flows.sql b/backend/supabase/migrations/20250115000003_workflow_flows.sql new file mode 100644 index 00000000..0d05b326 --- /dev/null +++ b/backend/supabase/migrations/20250115000003_workflow_flows.sql @@ -0,0 +1,42 @@ +-- Add workflow_flows table for storing visual flow representations +-- This table stores the visual flow data (nodes and edges) separately from the workflow definition + +CREATE TABLE workflow_flows ( + workflow_id UUID PRIMARY KEY REFERENCES workflows(id) ON DELETE CASCADE, + nodes JSONB NOT NULL DEFAULT '[]', + edges JSONB NOT NULL DEFAULT '[]', + metadata JSONB NOT NULL DEFAULT '{}', + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW() +); + +-- Enable RLS +ALTER TABLE workflow_flows ENABLE ROW LEVEL SECURITY; + +-- RLS policies +CREATE POLICY "Users can view flows for their workflows" ON workflow_flows + FOR SELECT USING ( + EXISTS ( + SELECT 1 FROM workflows + WHERE workflows.id = workflow_flows.workflow_id + AND basejump.has_role_on_account(workflows.account_id) = true + ) + ); + +CREATE POLICY "Users can manage flows for their workflows" ON workflow_flows + FOR ALL USING ( + EXISTS ( + SELECT 1 FROM workflows + WHERE workflows.id = workflow_flows.workflow_id + AND basejump.has_role_on_account(workflows.account_id) = true + ) + ); + +-- Create trigger for updated_at +CREATE TRIGGER update_workflow_flows_updated_at BEFORE UPDATE ON workflow_flows + FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); + +-- Grant permissions +GRANT ALL PRIVILEGES ON TABLE workflow_flows TO authenticated, service_role; + +-- Add comment +COMMENT ON TABLE workflow_flows IS 'Stores visual flow representations (nodes and edges) for workflows'; \ No newline at end of file diff --git a/backend/workflows/__init__.py b/backend/workflows/__init__.py new file mode 100644 index 00000000..86335211 --- /dev/null +++ b/backend/workflows/__init__.py @@ -0,0 +1 @@ +# Workflows module for AgentPress workflow execution \ No newline at end of file diff --git a/backend/workflows/api.py b/backend/workflows/api.py new file mode 100644 index 00000000..ee011927 --- /dev/null +++ b/backend/workflows/api.py @@ -0,0 +1,600 @@ +""" +Workflow API - REST endpoints for workflow management and execution. +""" + +from fastapi import APIRouter, HTTPException, Depends, Header +from fastapi.responses import StreamingResponse +from typing import List, Optional, Dict, Any +import uuid +import json +import asyncio +from datetime import datetime, timezone + +from .models import ( + WorkflowDefinition, WorkflowCreateRequest, WorkflowUpdateRequest, + WorkflowExecuteRequest, WorkflowConvertRequest, WorkflowValidateRequest, + WorkflowValidateResponse, WorkflowFlow, WorkflowExecution +) +from .converter import WorkflowConverter, validate_workflow_flow +from .executor import WorkflowExecutor +from services.supabase import DBConnection +from utils.logger import logger +from utils.auth_utils import get_current_user_id_from_jwt + +router = APIRouter() + +# Global instances +db = DBConnection() +workflow_converter = WorkflowConverter() +workflow_executor = WorkflowExecutor(db) + +def initialize(database: DBConnection): + """Initialize the workflow API with database connection.""" + global db, workflow_executor + db = database + workflow_executor = WorkflowExecutor(db) + +def _map_db_to_workflow_definition(data: dict) -> WorkflowDefinition: + """Helper function to map database record to WorkflowDefinition.""" + definition = data.get('definition', {}) + return WorkflowDefinition( + id=data['id'], + name=data['name'], + description=data.get('description'), + steps=definition.get('steps', []), + entry_point=definition.get('entry_point', ''), + triggers=definition.get('triggers', []), + state=data.get('status', 'draft').upper(), + created_at=datetime.fromisoformat(data['created_at']) if data.get('created_at') else None, + updated_at=datetime.fromisoformat(data['updated_at']) if data.get('updated_at') else None, + created_by=data.get('created_by'), + project_id=data['project_id'], + agent_id=definition.get('agent_id'), + is_template=False, # Templates are in a separate table + max_execution_time=definition.get('max_execution_time', 3600), + max_retries=definition.get('max_retries', 3) + ) + +@router.get("/workflows", response_model=List[WorkflowDefinition]) +async def list_workflows( + user_id: str = Depends(get_current_user_id_from_jwt), + x_project_id: Optional[str] = Header(None) +): + """List all workflows for the current user.""" + try: + client = await db.client + + query = client.table('workflows').select('*').eq('account_id', user_id) + + if x_project_id: + query = query.eq('project_id', x_project_id) + + result = await query.execute() + + workflows = [] + for data in result.data: + workflow = _map_db_to_workflow_definition(data) + workflows.append(workflow) + + return workflows + + except Exception as e: + logger.error(f"Error listing workflows: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/workflows", response_model=WorkflowDefinition) +async def create_workflow( + request: WorkflowCreateRequest, + user_id: str = Depends(get_current_user_id_from_jwt) +): + """Create a new workflow.""" + try: + client = await db.client + + workflow_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc) + + workflow_data = { + 'id': workflow_id, + 'name': request.name, + 'description': request.description, + 'project_id': request.project_id, + 'account_id': user_id, + 'created_by': user_id, + 'status': 'draft', + 'version': 1, + 'definition': { + 'steps': [], + 'entry_point': '', + 'triggers': [{'type': 'MANUAL', 'config': {}}], + 'agent_id': request.agent_id, + 'max_execution_time': request.max_execution_time, + 'max_retries': request.max_retries + } + } + + result = await client.table('workflows').insert(workflow_data).execute() + + if not result.data: + raise HTTPException(status_code=500, detail="Failed to create workflow") + + data = result.data[0] + return _map_db_to_workflow_definition(data) + + except Exception as e: + logger.error(f"Error creating workflow: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/workflows/{workflow_id}", response_model=WorkflowDefinition) +async def get_workflow( + workflow_id: str, + user_id: str = Depends(get_current_user_id_from_jwt) +): + """Get a specific workflow.""" + try: + client = await db.client + + result = await client.table('workflows').select('*').eq('id', workflow_id).eq('created_by', user_id).execute() + + if not result.data: + raise HTTPException(status_code=404, detail="Workflow not found") + + data = result.data[0] + return _map_db_to_workflow_definition(data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.put("/workflows/{workflow_id}", response_model=WorkflowDefinition) +async def update_workflow( + workflow_id: str, + request: WorkflowUpdateRequest, + user_id: str = Depends(get_current_user_id_from_jwt) +): + """Update a workflow.""" + try: + client = await db.client + + # Check if workflow exists and belongs to user + existing = await client.table('workflows').select('*').eq('id', workflow_id).eq('created_by', user_id).execute() + if not existing.data: + raise HTTPException(status_code=404, detail="Workflow not found") + + # Get current definition + current_definition = existing.data[0].get('definition', {}) + + # Prepare update data + update_data = {} + + if request.name is not None: + update_data['name'] = request.name + if request.description is not None: + update_data['description'] = request.description + if request.state is not None: + update_data['status'] = request.state.lower() + + # Update definition fields + definition_updated = False + if request.agent_id is not None: + current_definition['agent_id'] = request.agent_id + definition_updated = True + if request.max_execution_time is not None: + current_definition['max_execution_time'] = request.max_execution_time + definition_updated = True + if request.max_retries is not None: + current_definition['max_retries'] = request.max_retries + definition_updated = True + + if definition_updated: + update_data['definition'] = current_definition + + result = await client.table('workflows').update(update_data).eq('id', workflow_id).execute() + + if not result.data: + raise HTTPException(status_code=500, detail="Failed to update workflow") + + data = result.data[0] + return _map_db_to_workflow_definition(data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating workflow: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.delete("/workflows/{workflow_id}") +async def delete_workflow( + workflow_id: str, + user_id: str = Depends(get_current_user_id_from_jwt) +): + """Delete a workflow.""" + try: + client = await db.client + + # Check if workflow exists and belongs to user + existing = await client.table('workflows').select('id').eq('id', workflow_id).eq('created_by', user_id).execute() + if not existing.data: + raise HTTPException(status_code=404, detail="Workflow not found") + + await client.table('workflows').delete().eq('id', workflow_id).execute() + + return {"message": "Workflow deleted successfully"} + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting workflow: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/workflows/{workflow_id}/execute") +async def execute_workflow( + workflow_id: str, + request: WorkflowExecuteRequest, + user_id: str = Depends(get_current_user_id_from_jwt) +): + """Execute a workflow and return execution info.""" + try: + client = await db.client + + # Get workflow + result = await client.table('workflows').select('*').eq('id', workflow_id).eq('created_by', user_id).execute() + if not result.data: + raise HTTPException(status_code=404, detail="Workflow not found") + + data = result.data[0] + workflow = _map_db_to_workflow_definition(data) + + # Check if workflow is active (allow DRAFT for testing) + if workflow.state not in ['ACTIVE', 'DRAFT']: + raise HTTPException(status_code=400, detail="Workflow must be active or draft to execute") + + # Create execution record + execution_id = str(uuid.uuid4()) + execution_data = { + "id": execution_id, + "workflow_id": workflow_id, + "workflow_version": workflow.version if hasattr(workflow, 'version') else 1, + "workflow_name": workflow.name, + "execution_context": request.variables or {}, + "project_id": workflow.project_id, + "account_id": user_id, + "triggered_by": "MANUAL", + "status": "pending", + "started_at": datetime.now(timezone.utc).isoformat() + } + + await client.table('workflow_executions').insert(execution_data).execute() + + # Start execution in background + asyncio.create_task( + _execute_workflow_background(workflow, request.variables, execution_id) + ) + + return { + "execution_id": execution_id, + "status": "pending", + "message": "Workflow execution started" + } + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error executing workflow: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +async def _execute_workflow_background( + workflow: WorkflowDefinition, + variables: Optional[Dict[str, Any]], + execution_id: str +): + """Execute workflow in background.""" + try: + client = await db.client + + # Update status to running + await client.table('workflow_executions').update({ + "status": "running", + "started_at": datetime.now(timezone.utc).isoformat() + }).eq('id', execution_id).execute() + + # Execute workflow + async for update in workflow_executor.execute_workflow( + workflow=workflow, + variables=variables, + project_id=workflow.project_id + ): + # Log updates but don't stream them for now + logger.info(f"Workflow {workflow.id} update: {update.get('type', 'unknown')}") + + # Mark as completed + await client.table('workflow_executions').update({ + "status": "completed", + "completed_at": datetime.now(timezone.utc).isoformat() + }).eq('id', execution_id).execute() + + except Exception as e: + logger.error(f"Background workflow execution failed: {e}") + client = await db.client + await client.table('workflow_executions').update({ + "status": "failed", + "completed_at": datetime.now(timezone.utc).isoformat(), + "error": str(e) + }).eq('id', execution_id).execute() + +@router.get("/workflows/{workflow_id}/flow", response_model=WorkflowFlow) +async def get_workflow_flow( + workflow_id: str, + user_id: str = Depends(get_current_user_id_from_jwt) +): + """Get the visual flow representation of a workflow.""" + try: + client = await db.client + + # Get workflow flow data + result = await client.table('workflow_flows').select('*').eq('workflow_id', workflow_id).execute() + + if result.data: + # Return stored flow data + data = result.data[0] + return WorkflowFlow( + nodes=data.get('nodes', []), + edges=data.get('edges', []), + metadata=data.get('metadata', {}) + ) + + # If no flow data exists, get the workflow and generate a basic flow + workflow_result = await client.table('workflows').select('*').eq('id', workflow_id).eq('created_by', user_id).execute() + + if not workflow_result.data: + raise HTTPException(status_code=404, detail="Workflow not found") + + workflow_data = workflow_result.data[0] + + # Generate a basic flow from the workflow metadata + metadata = { + "name": workflow_data.get('name', 'Untitled Workflow'), + "description": workflow_data.get('description', '') + } + + # For now, return empty flow with proper metadata + # In the future, we could implement reverse conversion from workflow definition to visual flow + return WorkflowFlow( + nodes=[], + edges=[], + metadata=metadata + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting workflow flow: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.put("/workflows/{workflow_id}/flow", response_model=WorkflowDefinition) +async def update_workflow_flow( + workflow_id: str, + flow: WorkflowFlow, + user_id: str = Depends(get_current_user_id_from_jwt) +): + """Update the visual flow of a workflow and convert it to executable definition.""" + try: + client = await db.client + + # Check if workflow exists and belongs to user + existing = await client.table('workflows').select('*').eq('id', workflow_id).eq('created_by', user_id).execute() + if not existing.data: + raise HTTPException(status_code=404, detail="Workflow not found") + + # Store the flow data (convert Pydantic models to dicts) + flow_data = { + 'workflow_id': workflow_id, + 'nodes': [node.dict() for node in flow.nodes], + 'edges': [edge.dict() for edge in flow.edges], + 'metadata': flow.metadata, + 'updated_at': datetime.now(timezone.utc).isoformat() + } + + # Upsert flow data + await client.table('workflow_flows').upsert(flow_data).execute() + + # Convert flow to workflow definition + workflow_def = workflow_converter.convert_flow_to_workflow( + nodes=[node.dict() for node in flow.nodes], + edges=[edge.dict() for edge in flow.edges], + metadata={ + **flow.metadata, + 'project_id': existing.data[0]['project_id'], + 'agent_id': existing.data[0].get('definition', {}).get('agent_id') + } + ) + + # Update workflow with converted definition + current_definition = existing.data[0].get('definition', {}) + current_definition.update({ + 'steps': [step.dict() for step in workflow_def.steps], + 'entry_point': workflow_def.entry_point, + 'triggers': [trigger.dict() for trigger in workflow_def.triggers], + }) + + update_data = { + 'definition': current_definition + } + + # Update metadata if provided + if flow.metadata.get('name'): + update_data['name'] = flow.metadata['name'] + if flow.metadata.get('description'): + update_data['description'] = flow.metadata['description'] + + result = await client.table('workflows').update(update_data).eq('id', workflow_id).execute() + + if not result.data: + raise HTTPException(status_code=500, detail="Failed to update workflow") + + data = result.data[0] + return _map_db_to_workflow_definition(data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating workflow flow: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/workflows/builder/convert", response_model=WorkflowDefinition) +async def convert_flow_to_workflow( + request: WorkflowConvertRequest, + user_id: str = Depends(get_current_user_id_from_jwt), + x_project_id: Optional[str] = Header(None) +): + """Convert a visual flow to a workflow definition without saving.""" + try: + if not x_project_id: + raise HTTPException(status_code=400, detail="Project ID is required") + + # Convert flow to workflow definition + workflow_def = workflow_converter.convert_flow_to_workflow( + nodes=[node.dict() for node in request.nodes], + edges=[edge.dict() for edge in request.edges], + metadata={ + **request.metadata, + 'project_id': x_project_id + } + ) + + return workflow_def + + except Exception as e: + logger.error(f"Error converting flow to workflow: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/workflows/builder/validate", response_model=WorkflowValidateResponse) +async def validate_workflow_flow_endpoint(request: WorkflowValidateRequest): + """Validate a workflow flow for errors.""" + try: + valid, errors = validate_workflow_flow([node.dict() for node in request.nodes], [edge.dict() for edge in request.edges]) + return WorkflowValidateResponse(valid=valid, errors=errors) + + except Exception as e: + logger.error(f"Error validating workflow flow: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/workflows/builder/nodes") +async def get_builder_nodes(): + """Get available node types for the workflow builder.""" + try: + # Return the available node types that can be used in workflows + nodes = [ + { + "id": "agentNode", + "name": "AI Agent", + "description": "Intelligent agent that can execute tasks", + "category": "agent", + "icon": "Bot", + "inputs": ["tools", "input", "data-input"], + "outputs": ["output", "data-output", "action-output"] + }, + { + "id": "toolConnectionNode", + "name": "Tool Connection", + "description": "Connects tools to agents", + "category": "tool", + "icon": "Wrench", + "inputs": [], + "outputs": ["tool-connection"] + } + ] + + return nodes + + except Exception as e: + logger.error(f"Error getting builder nodes: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.get("/workflows/templates") +async def get_workflow_templates(): + """Get available workflow templates.""" + try: + client = await db.client + + result = await client.table('workflows').select('*').eq('is_template', True).execute() + + templates = [] + for data in result.data: + template = { + "id": data['id'], + "name": data['name'], + "description": data.get('description'), + "category": "general", # Could be extracted from metadata + "preview_image": None, # Could be added later + "created_at": data.get('created_at') + } + templates.append(template) + + return templates + + except Exception as e: + logger.error(f"Error getting workflow templates: {e}") + raise HTTPException(status_code=500, detail=str(e)) + +@router.post("/workflows/templates/{template_id}/create", response_model=WorkflowDefinition) +async def create_workflow_from_template( + template_id: str, + request: WorkflowExecuteRequest, + user_id: str = Depends(get_current_user_id_from_jwt), + x_project_id: Optional[str] = Header(None) +): + """Create a new workflow from a template.""" + try: + if not x_project_id: + raise HTTPException(status_code=400, detail="Project ID is required") + + client = await db.client + + # Get template + template_result = await client.table('workflows').select('*').eq('id', template_id).eq('is_template', True).execute() + if not template_result.data: + raise HTTPException(status_code=404, detail="Template not found") + + template_data = template_result.data[0] + + # Create new workflow from template + workflow_id = str(uuid.uuid4()) + now = datetime.now(timezone.utc) + + template_definition = template_data.get('definition', {}) + + workflow_data = { + 'id': workflow_id, + 'name': f"{template_data['name']} (Copy)", + 'description': template_data.get('description'), + 'project_id': x_project_id, + 'account_id': user_id, + 'created_by': user_id, + 'status': 'draft', + 'version': 1, + 'definition': { + 'steps': template_definition.get('steps', []), + 'entry_point': template_definition.get('entry_point', ''), + 'triggers': template_definition.get('triggers', []), + 'agent_id': template_definition.get('agent_id'), + 'max_execution_time': template_definition.get('max_execution_time', 3600), + 'max_retries': template_definition.get('max_retries', 3) + } + } + + result = await client.table('workflows').insert(workflow_data).execute() + + if not result.data: + raise HTTPException(status_code=500, detail="Failed to create workflow from template") + + data = result.data[0] + return _map_db_to_workflow_definition(data) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error creating workflow from template: {e}") + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/backend/workflows/converter.py b/backend/workflows/converter.py new file mode 100644 index 00000000..bb9a53fc --- /dev/null +++ b/backend/workflows/converter.py @@ -0,0 +1,261 @@ +from typing import List, Dict, Any, Optional +from .models import WorkflowNode, WorkflowEdge, WorkflowDefinition, WorkflowStep, WorkflowTrigger +import uuid +from utils.logger import logger + +class WorkflowConverter: + """Converts visual workflow flows into executable workflow definitions.""" + + def __init__(self): + pass + + def convert_flow_to_workflow( + self, + nodes: List[Dict[str, Any]], + edges: List[Dict[str, Any]], + metadata: Dict[str, Any] + ) -> WorkflowDefinition: + """ + Convert a visual workflow flow into an executable workflow definition. + + V1 Implementation: Generates a text prompt that describes the workflow + and creates a single agent step that executes the entire workflow. + """ + logger.info(f"Converting workflow flow with {len(nodes)} nodes and {len(edges)} edges") + + workflow_prompt = self._generate_workflow_prompt(nodes, edges) + entry_point = self._find_entry_point(nodes, edges) + + agent_step = WorkflowStep( + id="main_agent_step", + name="Workflow Agent", + description="Main agent that executes the workflow based on the visual flow", + type="TOOL", + config={ + "tool_name": "workflow_agent", + "system_prompt": workflow_prompt, + "agent_id": metadata.get("agent_id"), + "model": "anthropic/claude-3-5-sonnet-latest", + "max_iterations": 10 + }, + next_steps=[] + ) + + trigger = WorkflowTrigger( + type="MANUAL", + config={} + ) + + workflow = WorkflowDefinition( + name=metadata.get("name", "Untitled Workflow"), + description=metadata.get("description", "Generated from visual workflow"), + steps=[agent_step], + entry_point="main_agent_step", + triggers=[trigger], + project_id=metadata.get("project_id", ""), + agent_id=metadata.get("agent_id"), + is_template=metadata.get("is_template", False), + max_execution_time=metadata.get("max_execution_time", 3600), + max_retries=metadata.get("max_retries", 3) + ) + + return workflow + + def _generate_workflow_prompt(self, nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> str: + """Generate a comprehensive system prompt that describes the workflow.""" + + prompt_parts = [ + "You are an AI agent executing a workflow. Follow these instructions carefully:", + "", + "## Workflow Overview", + "This workflow was created visually and consists of the following components:", + "" + ] + + node_descriptions = [] + agent_nodes = [] + tool_nodes = [] + + for node in nodes: + if node.get('type') == 'agentNode': + agent_nodes.append(node) + desc = self._describe_agent_node(node, edges) + node_descriptions.append(desc) + elif node.get('type') == 'toolConnectionNode': + tool_nodes.append(node) + desc = self._describe_tool_node(node, edges) + node_descriptions.append(desc) + else: + desc = self._describe_generic_node(node, edges) + node_descriptions.append(desc) + + prompt_parts.extend(node_descriptions) + prompt_parts.append("") + + prompt_parts.extend([ + "## Execution Instructions", + "", + "Execute this workflow by following these steps:", + "1. Start with the input or trigger conditions", + "2. Process each component in the logical order defined by the connections", + "3. Use the available tools as specified in the workflow", + "4. Follow the data flow between components", + "5. Provide clear output at each step", + "6. Handle errors gracefully and provide meaningful feedback", + "", + "## Available Tools", + "You have access to the following tools based on the workflow configuration:" + ]) + + for tool_node in tool_nodes: + tool_data = tool_node.get('data', {}) + tool_name = tool_data.get('nodeId', tool_data.get('label', 'Unknown Tool')) + tool_desc = tool_data.get('description', 'No description available') + prompt_parts.append(f"- **{tool_name}**: {tool_desc}") + + prompt_parts.extend([ + "", + "## Workflow Execution", + "When executing this workflow:", + "- Follow the logical flow defined by the visual connections", + "- Use tools in the order and manner specified", + "- Provide clear, step-by-step output", + "- If any step fails, explain what went wrong and suggest alternatives", + "- Complete the workflow by providing the expected output", + "", + "Begin execution when the user provides input or triggers the workflow." + ]) + + return "\n".join(prompt_parts) + + def _describe_agent_node(self, node: Dict[str, Any], edges: List[Dict[str, Any]]) -> str: + """Describe an agent node and its role in the workflow.""" + data = node.get('data', {}) + name = data.get('label', 'AI Agent') + instructions = data.get('instructions', 'No specific instructions provided') + model = data.get('model', 'Default model') + connected_tools = data.get('connectedTools', []) + tool_list = [tool.get('name', 'Unknown') for tool in connected_tools] + + input_connections = self._find_node_inputs(node.get('id'), edges) + output_connections = self._find_node_outputs(node.get('id'), edges) + + description = [ + f"### {name}", + f"**Role**: {instructions}", + f"**Model**: {model}", + ] + + if tool_list: + description.append(f"**Available Tools**: {', '.join(tool_list)}") + + if input_connections: + description.append(f"**Receives input from**: {', '.join(input_connections)}") + + if output_connections: + description.append(f"**Sends output to**: {', '.join(output_connections)}") + + description.append("") + return "\n".join(description) + + def _describe_tool_node(self, node: Dict[str, Any], edges: List[Dict[str, Any]]) -> str: + """Describe a tool node and its configuration.""" + data = node.get('data', {}) + name = data.get('label', 'Tool') + tool_id = data.get('nodeId', 'unknown_tool') + + input_connections = self._find_node_inputs(node.get('id'), edges) + output_connections = self._find_node_outputs(node.get('id'), edges) + + description = [ + f"### {name} Tool", + f"**Tool ID**: {tool_id}", + f"**Purpose**: Provides {name.lower()} functionality to the workflow", + ] + + if input_connections: + description.append(f"**Connected to agents**: {', '.join(input_connections)}") + + description.append("") + return "\n".join(description) + + def _describe_generic_node(self, node: Dict[str, Any], edges: List[Dict[str, Any]]) -> str: + """Describe a generic node.""" + data = node.get('data', {}) + name = data.get('label', 'Component') + node_type = node.get('type') + + description = [ + f"### {name}", + f"**Type**: {node_type}", + f"**Purpose**: {data.get('description', 'Workflow component')}", + "" + ] + + return "\n".join(description) + + def _find_node_inputs(self, node_id: str, edges: List[Dict[str, Any]]) -> List[str]: + """Find nodes that connect to this node as inputs.""" + inputs = [] + for edge in edges: + if edge.get('target') == node_id: + inputs.append(edge.get('source')) + return inputs + + def _find_node_outputs(self, node_id: str, edges: List[Dict[str, Any]]) -> List[str]: + """Find nodes that this node connects to as outputs.""" + outputs = [] + for edge in edges: + if edge.get('source') == node_id: + outputs.append(edge.get('target')) + return outputs + + def _find_entry_point(self, nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> str: + """Find the entry point of the workflow.""" + for node in nodes: + if node.get('type') == 'triggerNode': + return node.get('id') + + for node in nodes: + if node.get('type') == 'inputNode': + return node.get('id') + + node_ids = {node.get('id') for node in nodes} + nodes_with_inputs = {edge.get('target') for edge in edges} + root_nodes = node_ids - nodes_with_inputs + + if root_nodes: + return list(root_nodes)[0] + + return nodes[0].get('id') if nodes else "main_agent_step" + + + +def validate_workflow_flow(nodes: List[Dict[str, Any]], edges: List[Dict[str, Any]]) -> tuple[bool, List[str]]: + """Validate a workflow flow for common issues.""" + errors = [] + + if not nodes: + errors.append("Workflow must have at least one node") + return False, errors + + connected_nodes = set() + for edge in edges: + connected_nodes.add(edge.get('source')) + connected_nodes.add(edge.get('target')) + + node_ids = {node.get('id') for node in nodes} + disconnected = node_ids - connected_nodes + + if len(disconnected) > 1: + errors.append(f"Found disconnected nodes: {', '.join(disconnected)}") + + for edge in edges: + if edge.get('source') == edge.get('target'): + errors.append(f"Self-referencing edge found on node {edge.get('source')}") + + has_agent = any(node.get('type') == 'agentNode' for node in nodes) + if not has_agent: + errors.append("Workflow should have at least one agent node") + + return len(errors) == 0, errors \ No newline at end of file diff --git a/backend/workflows/executor.py b/backend/workflows/executor.py new file mode 100644 index 00000000..05f9e21e --- /dev/null +++ b/backend/workflows/executor.py @@ -0,0 +1,247 @@ +import asyncio +import uuid +import json +from datetime import datetime, timezone +from typing import Dict, Any, Optional, AsyncGenerator +from .models import WorkflowDefinition, WorkflowExecution +from agent.run import run_agent +from services.supabase import DBConnection +from utils.logger import logger + +class WorkflowExecutor: + """Executes workflows using the AgentPress agent system.""" + + def __init__(self, db: DBConnection): + self.db = db + + async def execute_workflow( + self, + workflow: WorkflowDefinition, + variables: Optional[Dict[str, Any]] = None, + thread_id: Optional[str] = None, + project_id: Optional[str] = None + ) -> AsyncGenerator[Dict[str, Any], None]: + """ + Execute a workflow definition. + + V1 Implementation: Generates a system prompt from the workflow + and executes it as a single agent call. + """ + if not thread_id: + thread_id = str(uuid.uuid4()) + + if not project_id: + project_id = workflow.project_id + + logger.info(f"Executing workflow {workflow.name} (ID: {workflow.id}) in thread {thread_id}") + + execution = WorkflowExecution( + id=str(uuid.uuid4()), + workflow_id=workflow.id or str(uuid.uuid4()), + status="running", + started_at=datetime.now(timezone.utc), + trigger_type="MANUAL", + trigger_data={}, + variables=variables or {} + ) + + try: + await self._store_execution(execution) + await self._create_workflow_thread(thread_id, project_id, workflow, variables) + + if not workflow.steps: + raise ValueError("Workflow has no steps defined") + + main_step = workflow.steps[0] + system_prompt = main_step.config.get("system_prompt", "") + + if variables: + variables_text = "\n\n## Workflow Variables\n" + variables_text += "The following variables are available for this workflow execution:\n" + for key, value in variables.items(): + variables_text += f"- **{key}**: {value}\n" + variables_text += "\nUse these variables as needed during workflow execution.\n" + system_prompt += variables_text + + agent_config = { + "name": f"Workflow Agent: {workflow.name}", + "description": workflow.description or "Generated workflow agent", + "system_prompt": system_prompt, + "agentpress_tools": { + "sb_files_tool": {"enabled": True, "description": "File operations"}, + "message_tool": {"enabled": True, "description": "Send messages"}, + "expand_msg_tool": {"enabled": True, "description": "Expand messages"} + }, + "configured_mcps": [], + "custom_mcps": [] + } + + async for response in run_agent( + thread_id=thread_id, + project_id=project_id, + stream=True, + model_name="anthropic/claude-3-5-sonnet-latest", + enable_thinking=False, + reasoning_effort="low", + enable_context_manager=True, + agent_config=agent_config, + max_iterations=5 + ): + yield self._transform_agent_response_to_workflow_update(response, execution.id) + + if response.get('type') == 'status': + status = response.get('status') + if status in ['completed', 'failed', 'stopped']: + execution.status = status.lower() + execution.completed_at = datetime.now(timezone.utc) + if status == 'failed': + execution.error = response.get('message', 'Workflow execution failed') + await self._update_execution(execution) + break + + if execution.status == "running": + execution.status = "completed" + execution.completed_at = datetime.now(timezone.utc) + await self._update_execution(execution) + + yield { + "type": "workflow_status", + "execution_id": execution.id, + "status": "completed", + "message": "Workflow completed successfully" + } + + except Exception as e: + logger.error(f"Error executing workflow {workflow.id}: {e}") + execution.status = "failed" + execution.completed_at = datetime.now(timezone.utc) + execution.error = str(e) + await self._update_execution(execution) + + yield { + "type": "workflow_status", + "execution_id": execution.id, + "status": "failed", + "error": str(e) + } + + def _transform_agent_response_to_workflow_update( + self, + agent_response: Dict[str, Any], + execution_id: str + ) -> Dict[str, Any]: + """Transform agent response into workflow execution update.""" + workflow_response = { + **agent_response, + "execution_id": execution_id, + "source": "workflow_executor" + } + if agent_response.get('type') == 'assistant': + workflow_response['type'] = 'workflow_step' + workflow_response['step_name'] = 'workflow_execution' + + elif agent_response.get('type') == 'tool_call': + workflow_response['type'] = 'workflow_tool_call' + + elif agent_response.get('type') == 'tool_result': + workflow_response['type'] = 'workflow_tool_result' + + return workflow_response + + async def _store_execution(self, execution: WorkflowExecution): + """Store workflow execution in database.""" + try: + client = await self.db.client + logger.info(f"Execution {execution.id} handled by API endpoint") + except Exception as e: + logger.error(f"Failed to store workflow execution: {e}") + + async def _update_execution(self, execution: WorkflowExecution): + """Update workflow execution in database.""" + try: + client = await self.db.client + + update_data = { + "status": execution.status, + "completed_at": execution.completed_at.isoformat() if execution.completed_at else None, + "error": execution.error + } + + await client.table('workflow_executions').update(update_data).eq('id', execution.id).execute() + logger.info(f"Updated workflow execution {execution.id} status to {execution.status}") + + except Exception as e: + logger.error(f"Failed to update workflow execution: {e}") + + async def get_execution_status(self, execution_id: str) -> Optional[WorkflowExecution]: + """Get the status of a workflow execution.""" + try: + client = await self.db.client + result = await client.table('workflow_executions').select('*').eq('id', execution_id).execute() + + if result.data: + data = result.data[0] + return WorkflowExecution( + id=data['id'], + workflow_id=data['workflow_id'], + status=data['status'], + started_at=datetime.fromisoformat(data['started_at']) if data['started_at'] else None, + completed_at=datetime.fromisoformat(data['completed_at']) if data['completed_at'] else None, + trigger_type=data.get('triggered_by', 'MANUAL'), + trigger_data={}, + variables=data.get('execution_context', {}), + error=data.get('error') + ) + + return None + + except Exception as e: + logger.error(f"Failed to get execution status: {e}") + return None + + async def _create_workflow_thread( + self, + thread_id: str, + project_id: str, + workflow: WorkflowDefinition, + variables: Optional[Dict[str, Any]] = None + ): + """Create a thread in the database for workflow execution.""" + try: + client = await self.db.client + project_result = await client.table('projects').select('account_id').eq('project_id', project_id).execute() + if not project_result.data: + raise ValueError(f"Project {project_id} not found") + + account_id = project_result.data[0]['account_id'] + + thread_data = { + "thread_id": thread_id, + "project_id": project_id, + "account_id": account_id, + "created_at": datetime.now(timezone.utc).isoformat() + } + + await client.table('threads').insert(thread_data).execute() + initial_message = f"Execute the workflow: {workflow.name}" + if workflow.description: + initial_message += f"\n\nDescription: {workflow.description}" + + if variables: + initial_message += f"\n\nVariables: {json.dumps(variables, indent=2)}" + + message_data = { + "message_id": str(uuid.uuid4()), + "thread_id": thread_id, + "type": "user", + "is_llm_message": True, + "content": json.dumps({"role": "user", "content": initial_message}), + "created_at": datetime.now(timezone.utc).isoformat() + } + + await client.table('messages').insert(message_data).execute() + logger.info(f"Created workflow thread {thread_id} for workflow {workflow.id}") + + except Exception as e: + logger.error(f"Failed to create workflow thread: {e}") + raise \ No newline at end of file diff --git a/backend/workflows/models.py b/backend/workflows/models.py new file mode 100644 index 00000000..f9c8ad2c --- /dev/null +++ b/backend/workflows/models.py @@ -0,0 +1,99 @@ +from pydantic import BaseModel +from typing import List, Dict, Any, Optional, Literal +from datetime import datetime + +class WorkflowStep(BaseModel): + id: str + name: str + description: Optional[str] = None + type: Literal['TOOL', 'MCP_TOOL', 'CONDITION', 'LOOP', 'PARALLEL', 'WAIT', 'WEBHOOK', 'TRANSFORM'] + config: Dict[str, Any] + next_steps: List[str] + error_handler: Optional[str] = None + +class WorkflowTrigger(BaseModel): + type: Literal['MANUAL', 'SCHEDULE', 'WEBHOOK', 'EVENT'] + config: Dict[str, Any] + +class WorkflowDefinition(BaseModel): + id: Optional[str] = None + name: str + description: Optional[str] = None + steps: List[WorkflowStep] + entry_point: str + triggers: List[WorkflowTrigger] + state: Literal['DRAFT', 'ACTIVE', 'PAUSED'] = 'DRAFT' + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + created_by: Optional[str] = None + project_id: str + agent_id: Optional[str] = None + is_template: bool = False + max_execution_time: int = 3600 + max_retries: int = 3 + +class WorkflowExecution(BaseModel): + id: Optional[str] = None + workflow_id: str + status: Literal['pending', 'running', 'completed', 'failed', 'cancelled'] = 'pending' + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + trigger_type: str + trigger_data: Optional[Dict[str, Any]] = None + variables: Optional[Dict[str, Any]] = None + error: Optional[str] = None + +class WorkflowNode(BaseModel): + id: str + type: str + position: Dict[str, float] + data: Dict[str, Any] + +class WorkflowEdge(BaseModel): + id: str + source: str + target: str + sourceHandle: Optional[str] = None + targetHandle: Optional[str] = None + type: Optional[str] = None + animated: Optional[bool] = None + label: Optional[str] = None + +class WorkflowFlow(BaseModel): + nodes: List[WorkflowNode] + edges: List[WorkflowEdge] + metadata: Dict[str, Any] + +class WorkflowCreateRequest(BaseModel): + name: str + description: Optional[str] = None + project_id: str + agent_id: Optional[str] = None + is_template: bool = False + max_execution_time: int = 3600 + max_retries: int = 3 + +class WorkflowUpdateRequest(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + state: Optional[Literal['DRAFT', 'ACTIVE', 'PAUSED']] = None + agent_id: Optional[str] = None + is_template: Optional[bool] = None + max_execution_time: Optional[int] = None + max_retries: Optional[int] = None + +class WorkflowExecuteRequest(BaseModel): + variables: Optional[Dict[str, Any]] = None + +class WorkflowConvertRequest(BaseModel): + nodes: List[WorkflowNode] + edges: List[WorkflowEdge] + metadata: Dict[str, Any] + +class WorkflowValidateRequest(BaseModel): + nodes: List[WorkflowNode] + edges: List[WorkflowEdge] + +class WorkflowValidateResponse(BaseModel): + valid: bool + errors: List[str] \ No newline at end of file