From 389883f18529b4d20c42cdafc06ec848c7761a83 Mon Sep 17 00:00:00 2001 From: asemyanov Date: Thu, 21 Aug 2025 22:54:22 +0200 Subject: [PATCH] BIG FEATURE: agent calls another agent --- backend/agent/config_helper.py | 4 +- backend/agent/run.py | 31 +- backend/agent/suna_config.py | 4 +- backend/agent/tools/agent_discovery_tool.py | 197 ++++++++++ backend/agent/tools/agent_execution_tool.py | 397 ++++++++++++++++++++ 5 files changed, 629 insertions(+), 4 deletions(-) create mode 100644 backend/agent/tools/agent_discovery_tool.py create mode 100644 backend/agent/tools/agent_execution_tool.py diff --git a/backend/agent/config_helper.py b/backend/agent/config_helper.py index c1127529..b97a9b4c 100644 --- a/backend/agent/config_helper.py +++ b/backend/agent/config_helper.py @@ -213,7 +213,9 @@ def _get_default_agentpress_tools() -> Dict[str, bool]: "mcp_search_tool": True, "credential_profile_tool": True, "workflow_tool": True, - "trigger_tool": True + "trigger_tool": True, + "agent_discovery_tool": True, + "agent_execution_tool": True } diff --git a/backend/agent/run.py b/backend/agent/run.py index 309ec315..865f0e25 100644 --- a/backend/agent/run.py +++ b/backend/agent/run.py @@ -37,6 +37,8 @@ from agentpress.tool import SchemaType from agent.tools.sb_sheets_tool import SandboxSheetsTool from agent.tools.sb_web_dev_tool import SandboxWebDevTool from agent.tools.sb_upload_file_tool import SandboxUploadFileTool +from agent.tools.agent_discovery_tool import AgentDiscoveryTool +from agent.tools.agent_execution_tool import AgentExecutionTool load_dotenv() @@ -59,10 +61,11 @@ class AgentConfig: class ToolManager: - def __init__(self, thread_manager: ThreadManager, project_id: str, thread_id: str): + def __init__(self, thread_manager: ThreadManager, project_id: str, thread_id: str, account_id: Optional[str] = None): self.thread_manager = thread_manager self.project_id = project_id self.thread_id = thread_id + self.account_id = account_id def register_all_tools(self, agent_id: Optional[str] = None, disabled_tools: Optional[List[str]] = None): """Register all available tools by default, with optional exclusions. @@ -88,6 +91,9 @@ class ToolManager: if agent_id: self._register_agent_builder_tools(agent_id, disabled_tools) + # Always register agent communication tools + self._register_agent_communication_tools(disabled_tools) + # Browser tool self._register_browser_tool(disabled_tools) @@ -152,6 +158,27 @@ class ToolManager: self.thread_manager.add_tool(tool_class, thread_manager=self.thread_manager, db_connection=db, agent_id=agent_id) logger.debug(f"Registered {tool_name}") + def _register_agent_communication_tools(self, disabled_tools: List[str]): + """Register agent-to-agent communication tools.""" + if not self.account_id: + logger.warning("Agent communication tools disabled: account_id not available") + return + + agent_comm_tools = [ + ('agent_discovery_tool', AgentDiscoveryTool), + ('agent_execution_tool', AgentExecutionTool), + ] + + for tool_name, tool_class in agent_comm_tools: + if tool_name not in disabled_tools: + self.thread_manager.add_tool( + tool_class, + project_id=self.project_id, + thread_manager=self.thread_manager, + account_id=self.account_id + ) + logger.debug(f"Registered {tool_name}") + def _register_browser_tool(self, disabled_tools: List[str]): """Register browser tool.""" if 'browser_tool' not in disabled_tools: @@ -471,7 +498,7 @@ class AgentRunner: logger.debug(f"No sandbox found for project {self.config.project_id}; will create lazily when needed") async def setup_tools(self): - tool_manager = ToolManager(self.thread_manager, self.config.project_id, self.config.thread_id) + tool_manager = ToolManager(self.thread_manager, self.config.project_id, self.config.thread_id, self.account_id) # Determine agent ID for agent builder tools agent_id = None diff --git a/backend/agent/suna_config.py b/backend/agent/suna_config.py index 8ceca09b..c5b599b3 100644 --- a/backend/agent/suna_config.py +++ b/backend/agent/suna_config.py @@ -29,7 +29,9 @@ SUNA_CONFIG = { "mcp_search_tool": True, "credential_profile_tool": True, "workflow_tool": True, - "trigger_tool": True + "trigger_tool": True, + "agent_discovery_tool": True, + "agent_execution_tool": True }, "is_default": True } diff --git a/backend/agent/tools/agent_discovery_tool.py b/backend/agent/tools/agent_discovery_tool.py new file mode 100644 index 00000000..af0f165b --- /dev/null +++ b/backend/agent/tools/agent_discovery_tool.py @@ -0,0 +1,197 @@ +from agentpress.tool import ToolResult, openapi_schema, usage_example +from agentpress.thread_manager import ThreadManager +from sandbox.tool_base import SandboxToolsBase +from utils.logger import logger +from typing import Optional +import json + +class AgentDiscoveryTool(SandboxToolsBase): + """ + Tool for discovering and listing available agents and their workflows. + + Allows agents to discover other agents in the same account and view their capabilities. + This enables agent-to-agent communication and coordination within the platform. + """ + + def __init__(self, project_id: str, thread_manager: ThreadManager, account_id: str): + super().__init__(project_id, thread_manager) + self.account_id = account_id + + @openapi_schema({ + "type": "function", + "function": { + "name": "list_available_agents", + "description": "List all available agents in the current account that can be called by this agent. Returns agent IDs, names, descriptions, and basic information.", + "parameters": { + "type": "object", + "properties": { + "include_self": { + "type": "boolean", + "description": "Whether to include the current agent in the results", + "default": False + }, + "search": { + "type": "string", + "description": "Optional search term to filter agents by name or description" + } + }, + "required": [] + } + } + }) + @usage_example(''' + + + false + data analysis + + + ''') + async def list_available_agents(self, include_self: bool = False, search: Optional[str] = None) -> ToolResult: + """List all available agents in the account.""" + try: + # Import the get_agents function from agent.api + try: + from agent.api import get_agents + except ImportError as e: + logger.error(f"Failed to import get_agents: {str(e)}") + return self.fail_response("Agent discovery service is not available") + + # Call the existing endpoint to get agents + response = await get_agents( + user_id=self.account_id, + page=1, + limit=100, # Get all agents + search=search, + sort_by="created_at", + sort_order="desc", + has_default=None, + has_mcp_tools=None, + has_agentpress_tools=None, + tools=None + ) + + # Handle both dict and object response formats + if hasattr(response, 'agents'): + agents = response.agents + elif isinstance(response, dict) and 'agents' in response: + agents = response['agents'] + else: + logger.error(f"Unexpected response format from get_agents: {type(response)}") + return self.fail_response(f"Error: Unexpected response format from get_agents") + + if not agents: + return self.success_response("No agents found in your account.") + + # Format agents for easy consumption + agent_list = [] + for agent in agents: + # Handle both dict and object formats for individual agents + agent_id = agent.agent_id if hasattr(agent, 'agent_id') else agent.get('agent_id') + name = agent.name if hasattr(agent, 'name') else agent.get('name') + description = agent.description if hasattr(agent, 'description') else agent.get('description') + + # Skip self if not requested + if not include_self and agent_id == getattr(self.thread_manager, 'current_agent_id', None): + continue + + agent_info = { + "agent_id": agent_id, + "name": name, + "description": description or "No description available" + } + agent_list.append(agent_info) + + result = { + "total_agents": len(agent_list), + "agents": agent_list, + "note": "Use 'call_agent' tool with the agent_id to invoke any of these agents" + } + + logger.info(f"Listed {len(agent_list)} agents via native tool") + return self.success_response(result) + + except Exception as e: + logger.error(f"Error in list_available_agents: {str(e)}") + return self.fail_response(f"Error listing agents: {str(e)}") + + @openapi_schema({ + "type": "function", + "function": { + "name": "get_agent_workflows", + "description": "Get all available workflows for a specific agent. Workflows are pre-configured execution paths with specific parameters.", + "parameters": { + "type": "object", + "properties": { + "agent_id": { + "type": "string", + "description": "The ID of the agent to get workflows for" + } + }, + "required": ["agent_id"] + } + } + }) + @usage_example(''' + + + agent_12345 + + + ''') + async def get_agent_workflows(self, agent_id: str) -> ToolResult: + """Get workflows for a specific agent.""" + try: + # Verify agent access + await self._verify_agent_access(agent_id) + + # Get workflows from database + client = await self.thread_manager.db.client + result = await client.table('agent_workflows').select('*').eq('agent_id', agent_id).order('created_at', desc=True).execute() + + if not result.data: + return self.success_response({ + "agent_id": agent_id, + "workflows": [], + "message": f"No workflows found for agent {agent_id}. This agent can only be run with custom prompts." + }) + + # Format workflows for consumption + workflows = [] + for workflow in result.data: + workflow_info = { + "workflow_id": workflow['id'], + "name": workflow['name'], + "description": workflow.get('description', 'No description available'), + "status": workflow.get('status', 'unknown') + } + workflows.append(workflow_info) + + result_data = { + "agent_id": agent_id, + "total_workflows": len(workflows), + "workflows": workflows, + "note": "Use 'call_agent' tool with execution_mode='workflow' and workflow_id to run a specific workflow" + } + + logger.info(f"Listed {len(workflows)} workflows for agent {agent_id} via native tool") + return self.success_response(result_data) + + except Exception as e: + logger.error(f"Error in get_agent_workflows: {str(e)}") + return self.fail_response(f"Error listing workflows: {str(e)}") + + async def _verify_agent_access(self, agent_id: str): + """Verify account has access to the agent.""" + try: + client = await self.thread_manager.db.client + result = await client.table('agents').select('agent_id').eq('agent_id', agent_id).eq('account_id', self.account_id).execute() + + if not result.data: + raise ValueError("Agent not found or access denied") + except ValueError: + # Re-raise ValueError for proper error messages + raise + except Exception as e: + logger.error(f"Database error in verify_agent_access: {str(e)}") + raise ValueError("Database connection error") \ No newline at end of file diff --git a/backend/agent/tools/agent_execution_tool.py b/backend/agent/tools/agent_execution_tool.py new file mode 100644 index 00000000..47880fd8 --- /dev/null +++ b/backend/agent/tools/agent_execution_tool.py @@ -0,0 +1,397 @@ +from agentpress.tool import ToolResult, openapi_schema, usage_example +from agentpress.thread_manager import ThreadManager +from sandbox.tool_base import SandboxToolsBase +from utils.logger import logger +from typing import Optional +import json +import asyncio +from datetime import datetime + +class AgentExecutionTool(SandboxToolsBase): + """ + Tool for executing other agents with prompts or workflows. + + Enables agent-to-agent communication by allowing one agent to call another + and receive the results. Supports both custom prompt execution and workflow execution. + """ + + def __init__(self, project_id: str, thread_manager: ThreadManager, account_id: str): + super().__init__(project_id, thread_manager) + self.account_id = account_id + + def _extract_last_message(self, full_output: str) -> str: + """Extract the last meaningful message from agent output.""" + if not full_output.strip(): + return "No output received" + + lines = full_output.strip().split('\n') + + # Look for the last substantial message + for line in reversed(lines): + if line.strip() and not line.startswith('#') and not line.startswith('```'): + try: + line_index = lines.index(line) + start_index = max(0, line_index - 3) + return '\n'.join(lines[start_index:]).strip() + except ValueError: + return line.strip() + + # Fallback: return last 20% of the output + return full_output[-len(full_output)//5:].strip() if len(full_output) > 100 else full_output + + def _truncate_from_end(self, text: str, max_tokens: int) -> str: + """Truncate text from the beginning, keeping the end.""" + if max_tokens <= 0: + return "" + + max_chars = max_tokens * 4 # Rough token estimation + + if len(text) <= max_chars: + return text + + truncated = text[-max_chars:] + return f"...[truncated {len(text) - max_chars} characters]...\n{truncated}" + + def _get_fallback_model(self, requested_model: Optional[str] = None) -> str: + """Get a reliable model with fallback logic.""" + if requested_model: + # Validate the requested model is reasonable + if any(provider in requested_model.lower() for provider in ['openrouter', 'anthropic', 'openai', 'google']): + return requested_model + + # Use a reliable free-tier model as fallback + fallback_model = "openrouter/google/gemini-2.5-flash" + if requested_model and requested_model != fallback_model: + logger.info(f"Model {requested_model} not validated, using fallback: {fallback_model}") + + return fallback_model + + @openapi_schema({ + "type": "function", + "function": { + "name": "call_agent", + "description": "Execute another agent with a custom prompt or workflow. This allows inter-agent communication and delegation of tasks to specialized agents.", + "parameters": { + "type": "object", + "properties": { + "agent_id": { + "type": "string", + "description": "The ID of the agent to call" + }, + "message": { + "type": "string", + "description": "The message/prompt to send to the agent" + }, + "execution_mode": { + "type": "string", + "enum": ["prompt", "workflow"], + "description": "Either 'prompt' for custom prompt execution or 'workflow' for workflow execution", + "default": "prompt" + }, + "workflow_id": { + "type": "string", + "description": "Required when execution_mode is 'workflow' - the ID of the workflow to run" + }, + "model_name": { + "type": "string", + "description": "Model to use for the agent execution. If not specified, uses the agent's configured model or fallback." + }, + "timeout": { + "type": "integer", + "description": "Maximum time to wait for agent response in seconds", + "default": 60, + "minimum": 10, + "maximum": 300 + }, + "max_tokens": { + "type": "integer", + "description": "Maximum tokens in response", + "default": 1000, + "minimum": 100, + "maximum": 4000 + }, + "output_mode": { + "type": "string", + "enum": ["last_message", "full"], + "description": "How to format output: 'last_message' (default) extracts key results, 'full' returns complete output", + "default": "last_message" + } + }, + "required": ["agent_id", "message"] + } + } + }) + @usage_example(''' + + + data_analyst_agent_123 + Analyze the sales data from Q3 and provide key insights + prompt + 120 + + + + + + + report_generator_456 + Generate monthly report with latest metrics + workflow + monthly_report_workflow_789 + + + ''') + async def call_agent( + self, + agent_id: str, + message: str, + execution_mode: str = "prompt", + workflow_id: Optional[str] = None, + model_name: Optional[str] = None, + timeout: int = 60, + max_tokens: int = 1000, + output_mode: str = "last_message" + ) -> ToolResult: + """Execute another agent and return the results.""" + try: + # Validate execution mode and workflow parameters + if execution_mode not in ["prompt", "workflow"]: + return self.fail_response("Error: execution_mode must be either 'prompt' or 'workflow'") + + if execution_mode == "workflow" and not workflow_id: + return self.fail_response("Error: workflow_id is required when execution_mode is 'workflow'") + + # Verify agent access + await self._verify_agent_access(agent_id) + + # Apply model fallback logic + model_name = self._get_fallback_model(model_name) + + # Validate parameters + timeout = max(10, min(300, timeout)) # Clamp between 10 and 300 seconds + max_tokens = max(100, min(4000, max_tokens)) # Clamp between 100 and 4000 tokens + + # Validate output mode + if output_mode not in ["last_message", "full"]: + output_mode = "last_message" + + logger.info(f"Calling agent {agent_id} in {execution_mode} mode with timeout {timeout}s") + + if execution_mode == "workflow": + # Execute workflow + raw_result = await self._execute_agent_workflow(agent_id, workflow_id, message, model_name, timeout) + else: + # Execute agent with prompt + raw_result = await self._execute_agent_prompt(agent_id, message, model_name, timeout) + + # Process the output based on the requested mode + if output_mode == "last_message": + processed_result = self._extract_last_message(raw_result) + else: + processed_result = raw_result + + # Apply token limiting + final_result = self._truncate_from_end(processed_result, max_tokens) + + # Return structured result + response_data = { + "agent_id": agent_id, + "execution_mode": execution_mode, + "workflow_id": workflow_id if execution_mode == "workflow" else None, + "model_used": model_name, + "output_mode": output_mode, + "max_tokens": max_tokens, + "result": final_result, + "timestamp": datetime.now().isoformat() + } + + logger.info(f"Agent call completed for agent {agent_id} in {execution_mode} mode") + return self.success_response(response_data) + + except Exception as e: + logger.error(f"Error calling agent {agent_id}: {str(e)}") + return self.fail_response(f"Error calling agent: {str(e)}") + + async def _execute_agent_workflow(self, agent_id: str, workflow_id: str, message: str, model_name: str, timeout: int) -> str: + """Execute an agent workflow.""" + try: + client = await self.thread_manager.db.client + + # Verify workflow exists and is active + workflow_result = await client.table('agent_workflows').select('*').eq('id', workflow_id).eq('agent_id', agent_id).execute() + if not workflow_result.data: + return f"Error: Workflow {workflow_id} not found for agent {agent_id}" + + workflow = workflow_result.data[0] + if workflow.get('status') != 'active': + return f"Error: Workflow {workflow['name']} is not active (status: {workflow.get('status')})" + + # Execute workflow through the execution service + try: + from triggers.execution_service import execute_workflow + + # Execute the workflow with the provided message + execution_result = await execute_workflow( + workflow_id=workflow_id, + agent_id=agent_id, + input_data={"message": message}, + user_id=self.account_id + ) + + if execution_result.get('success'): + return execution_result.get('output', f"Workflow '{workflow['name']}' executed successfully") + else: + return f"Workflow execution failed: {execution_result.get('error', 'Unknown error')}" + + except ImportError: + logger.warning("Execution service not available, using fallback workflow execution") + # Fallback: Create a thread and run the agent with workflow context + return await self._execute_agent_with_thread(agent_id, message, model_name, timeout, workflow) + + except Exception as e: + logger.error(f"Error executing workflow {workflow_id}: {str(e)}") + return f"Error executing workflow: {str(e)}" + + async def _execute_agent_prompt(self, agent_id: str, message: str, model_name: str, timeout: int) -> str: + """Execute an agent with a custom prompt.""" + try: + return await self._execute_agent_with_thread(agent_id, message, model_name, timeout) + except Exception as e: + logger.error(f"Error executing agent prompt: {str(e)}") + return f"Error executing agent: {str(e)}" + + async def _execute_agent_with_thread(self, agent_id: str, message: str, model_name: str, timeout: int, workflow: Optional[dict] = None) -> str: + """Execute agent using thread-based approach.""" + try: + # Import existing agent execution functions + from agent.api import create_thread, add_message_to_thread, start_agent, AgentStartRequest + + # Create thread name based on execution type + thread_name = f"Workflow: {workflow['name']}" if workflow else "Agent-to-Agent Call" + + # Create a new thread + thread_response = await create_thread(name=thread_name, user_id=self.account_id) + thread_id = thread_response.get('thread_id') if isinstance(thread_response, dict) else thread_response.thread_id + + # Prepare message with workflow context if needed + final_message = message + if workflow: + workflow_context = f"Executing workflow '{workflow['name']}'" + if workflow.get('description'): + workflow_context += f": {workflow['description']}" + final_message = f"{workflow_context}\n\nUser message: {message}" + + # Add the message to the thread + await add_message_to_thread( + thread_id=thread_id, + message=final_message, + user_id=self.account_id + ) + + # Start the agent + agent_request = AgentStartRequest( + agent_id=agent_id, + enable_thinking=False, + stream=False, + model_name=model_name + ) + + await start_agent( + thread_id=thread_id, + body=agent_request, + user_id=self.account_id + ) + + # Wait for agent completion and get response + return await self._poll_for_completion(thread_id, timeout) + + except Exception as e: + logger.error(f"Error executing agent with thread: {str(e)}") + return f"Error executing agent: {str(e)}" + + async def _poll_for_completion(self, thread_id: str, timeout: int) -> str: + """Poll for agent completion and return the result.""" + client = await self.thread_manager.db.client + + poll_interval = 2 + elapsed = 0 + + while elapsed < timeout: + # Check thread messages for agent response + messages_result = await client.table('messages').select('*').eq('thread_id', thread_id).order('created_at', desc=True).limit(5).execute() + + if messages_result.data: + # Look for the most recent agent message (not user message) + for msg in messages_result.data: + # Parse JSON content to check role + content = msg.get('content') + if content: + try: + if isinstance(content, str): + parsed_content = json.loads(content) + else: + parsed_content = content + + if parsed_content.get('role') == 'assistant': + return parsed_content.get('content', '') + except: + # If parsing fails, check if it's a direct assistant message + if msg.get('type') == 'assistant': + return content + + # Check if agent run is complete by checking agent_runs table + runs_result = await client.table('agent_runs').select('status, error').eq('thread_id', thread_id).order('created_at', desc=True).limit(1).execute() + + if runs_result.data: + run = runs_result.data[0] + if run['status'] in ['completed', 'failed', 'cancelled']: + if run['status'] == 'failed': + return f"Agent execution failed: {run.get('error', 'Unknown error')}" + elif run['status'] == 'cancelled': + return "Agent execution was cancelled" + # If completed, continue to check for messages + break + + await asyncio.sleep(poll_interval) + elapsed += poll_interval + + # Timeout fallback - get latest messages + messages_result = await client.table('messages').select('*').eq('thread_id', thread_id).order('created_at', desc=True).limit(10).execute() + + if messages_result.data: + # Return the most recent assistant message or fallback message + for msg in messages_result.data: + # Parse JSON content to check role + content = msg.get('content') + if content: + try: + if isinstance(content, str): + parsed_content = json.loads(content) + else: + parsed_content = content + + if parsed_content.get('role') == 'assistant': + return parsed_content.get('content', '') + except: + # If parsing fails, check if it's a direct assistant message + if msg.get('type') == 'assistant': + return content + + return f"Agent execution timed out after {timeout}s. Thread ID: {thread_id}" + + return f"No response received from agent. Thread ID: {thread_id}" + + async def _verify_agent_access(self, agent_id: str): + """Verify account has access to the agent.""" + try: + client = await self.thread_manager.db.client + result = await client.table('agents').select('agent_id').eq('agent_id', agent_id).eq('account_id', self.account_id).execute() + + if not result.data: + raise ValueError("Agent not found or access denied") + except ValueError: + # Re-raise ValueError for proper error messages + raise + except Exception as e: + logger.error(f"Database error in verify_agent_access: {str(e)}") + raise ValueError("Database connection error") \ No newline at end of file