mirror of https://github.com/kortix-ai/suna.git
765 lines
43 KiB
Python
765 lines
43 KiB
Python
import os
|
|
import json
|
|
import asyncio
|
|
from typing import Optional
|
|
|
|
# from agent.tools.message_tool import MessageTool
|
|
from agent.tools.message_tool import MessageTool
|
|
from agent.tools.sb_deploy_tool import SandboxDeployTool
|
|
from agent.tools.sb_expose_tool import SandboxExposeTool
|
|
from agent.tools.web_search_tool import SandboxWebSearchTool
|
|
from dotenv import load_dotenv
|
|
from utils.config import config
|
|
from flags.flags import is_enabled
|
|
from agent.agent_builder_prompt import get_agent_builder_prompt
|
|
from agentpress.thread_manager import ThreadManager
|
|
from agentpress.response_processor import ProcessorConfig
|
|
from agent.tools.sb_shell_tool import SandboxShellTool
|
|
from agent.tools.sb_files_tool import SandboxFilesTool
|
|
from agent.tools.sb_browser_tool import SandboxBrowserTool
|
|
from agent.tools.data_providers_tool import DataProvidersTool
|
|
from agent.tools.expand_msg_tool import ExpandMessageTool
|
|
from agent.prompt import get_system_prompt
|
|
from utils.logger import logger
|
|
from utils.auth_utils import get_account_id_from_thread
|
|
from services.billing import check_billing_status
|
|
from agent.tools.sb_vision_tool import SandboxVisionTool
|
|
from agent.tools.sb_image_edit_tool import SandboxImageEditTool
|
|
from services.langfuse import langfuse
|
|
from langfuse.client import StatefulTraceClient
|
|
from services.langfuse import langfuse
|
|
from agent.gemini_prompt import get_gemini_system_prompt
|
|
from agent.tools.mcp_tool_wrapper import MCPToolWrapper
|
|
from agentpress.tool import SchemaType
|
|
|
|
load_dotenv()
|
|
|
|
|
|
def _get_suna_builder_enhancement():
|
|
"""
|
|
Returns additional system prompt content for Suna when self-configuration is enabled.
|
|
This enhances Suna's default capabilities with agent builder functionality.
|
|
"""
|
|
return """
|
|
# 🔧 SELF-CONFIGURATION CAPABILITIES
|
|
|
|
You now have the ability to configure and enhance yourself! When users ask you to modify your capabilities, add integrations, create workflows, or set up automation, you can use these advanced tools:
|
|
|
|
## 🛠️ Available Self-Configuration Tools
|
|
|
|
### Agent Configuration (`update_agent`)
|
|
- Add MCP integrations to connect with external services
|
|
- Create and manage workflows for structured processes
|
|
- Set up triggers for scheduled automation
|
|
- Configure credential profiles for secure service connections
|
|
|
|
### MCP Integration Tools
|
|
- `search_mcp_servers`: Find integrations for specific services (Gmail, Slack, GitHub, etc.)
|
|
- `get_popular_mcp_servers`: Browse trending integrations
|
|
- `get_mcp_server_tools`: Explore integration capabilities
|
|
- `configure_mcp_server`: Set up external service connections
|
|
|
|
### Credential Management
|
|
- `create_credential_profile`: Set up secure connections to external services
|
|
- `connect_credential_profile`: Generate connection links for users
|
|
- `check_profile_connection`: Verify service connections
|
|
- `configure_profile_for_agent`: Add connected services to your configuration
|
|
|
|
### Workflow & Automation
|
|
- `create_workflow`: Design multi-step automated processes
|
|
- `create_scheduled_trigger`: Set up time-based automation
|
|
- `get_workflows` / `get_scheduled_triggers`: Review existing automation
|
|
|
|
## 🎯 When Users Request Configuration Changes
|
|
|
|
**If a user asks you to:**
|
|
- "Add Gmail integration" → Search for Gmail MCP, create credential profile, guide connection
|
|
- "Set up daily reports" → Create workflow + scheduled trigger
|
|
- "Connect to Slack" → Find Slack integration, set up credential profile
|
|
- "Automate [task]" → Design appropriate workflow/trigger combination
|
|
- "Add [service] capabilities" → Search for relevant MCP servers
|
|
|
|
**Important Guidelines:**
|
|
- Always search for integrations before creating credential profiles
|
|
- Guide users through connection processes step-by-step
|
|
- Explain what each integration enables and why it's useful
|
|
- Test connections before finalizing configurations
|
|
- Only enable tools that users specifically request or need
|
|
|
|
## 🌟 Self-Configuration Philosophy
|
|
|
|
You are Suna, and you can now evolve and adapt based on user needs. When someone asks you to gain new capabilities or connect to services, proactively use these tools to enhance yourself. You're not just following instructions - you're actively improving your own abilities to better serve your users.
|
|
|
|
Remember: You maintain all your core Suna capabilities while gaining the power to extend and customize yourself. This makes you more helpful and adaptable to each user's specific needs.
|
|
"""
|
|
|
|
|
|
async def run_agent(
|
|
thread_id: str,
|
|
project_id: str,
|
|
stream: bool,
|
|
thread_manager: Optional[ThreadManager] = None,
|
|
native_max_auto_continues: int = 25,
|
|
max_iterations: int = 100,
|
|
model_name: str = "anthropic/claude-sonnet-4-20250514",
|
|
enable_thinking: Optional[bool] = False,
|
|
reasoning_effort: Optional[str] = 'low',
|
|
enable_context_manager: bool = True,
|
|
agent_config: Optional[dict] = None,
|
|
trace: Optional[StatefulTraceClient] = None,
|
|
is_agent_builder: Optional[bool] = False,
|
|
target_agent_id: Optional[str] = None
|
|
):
|
|
"""Run the development agent with specified configuration."""
|
|
logger.info(f"🚀 Starting agent with model: {model_name}")
|
|
if agent_config:
|
|
logger.info(f"Using custom agent: {agent_config.get('name', 'Unknown')}")
|
|
|
|
if not trace:
|
|
trace = langfuse.trace(name="run_agent", session_id=thread_id, metadata={"project_id": project_id})
|
|
thread_manager = ThreadManager(trace=trace, is_agent_builder=is_agent_builder or False, target_agent_id=target_agent_id, agent_config=agent_config)
|
|
|
|
client = await thread_manager.db.client
|
|
|
|
# Get account ID from thread for billing checks
|
|
account_id = await get_account_id_from_thread(client, thread_id)
|
|
if not account_id:
|
|
raise ValueError("Could not determine account ID for thread")
|
|
|
|
# Get sandbox info from project
|
|
project = await client.table('projects').select('*').eq('project_id', project_id).execute()
|
|
if not project.data or len(project.data) == 0:
|
|
raise ValueError(f"Project {project_id} not found")
|
|
|
|
project_data = project.data[0]
|
|
sandbox_info = project_data.get('sandbox', {})
|
|
if not sandbox_info.get('id'):
|
|
raise ValueError(f"No sandbox found for project {project_id}")
|
|
|
|
# Initialize tools with project_id instead of sandbox object
|
|
# This ensures each tool independently verifies it's operating on the correct project
|
|
|
|
# Get enabled tools from agent config, or use defaults
|
|
enabled_tools = None
|
|
if agent_config and 'agentpress_tools' in agent_config:
|
|
enabled_tools = agent_config['agentpress_tools']
|
|
logger.info(f"Using custom tool configuration from agent")
|
|
|
|
|
|
# Check if this is Suna (default agent) and enable builder capabilities for self-configuration
|
|
suna_builder_enabled = False
|
|
if agent_config and agent_config.get('is_suna_default', False):
|
|
logger.info("Detected Suna default agent - enabling self-configuration capabilities")
|
|
suna_builder_enabled = True
|
|
|
|
from agent.tools.agent_builder_tools.agent_config_tool import AgentConfigTool
|
|
from agent.tools.agent_builder_tools.mcp_search_tool import MCPSearchTool
|
|
from agent.tools.agent_builder_tools.credential_profile_tool import CredentialProfileTool
|
|
from agent.tools.agent_builder_tools.workflow_tool import WorkflowTool
|
|
from agent.tools.agent_builder_tools.trigger_tool import TriggerTool
|
|
from services.supabase import DBConnection
|
|
db = DBConnection()
|
|
|
|
# Use Suna's own agent ID for self-configuration
|
|
suna_agent_id = agent_config['agent_id']
|
|
|
|
thread_manager.add_tool(AgentConfigTool, thread_manager=thread_manager, db_connection=db, agent_id=suna_agent_id)
|
|
thread_manager.add_tool(MCPSearchTool, thread_manager=thread_manager, db_connection=db, agent_id=suna_agent_id)
|
|
thread_manager.add_tool(CredentialProfileTool, thread_manager=thread_manager, db_connection=db, agent_id=suna_agent_id)
|
|
thread_manager.add_tool(WorkflowTool, thread_manager=thread_manager, db_connection=db, agent_id=suna_agent_id)
|
|
thread_manager.add_tool(TriggerTool, thread_manager=thread_manager, db_connection=db, agent_id=suna_agent_id)
|
|
|
|
logger.info(f"Enabled Suna self-configuration with agent ID: {suna_agent_id}")
|
|
|
|
# Original agent builder logic for custom agents (preserved)
|
|
if is_agent_builder:
|
|
from agent.tools.agent_builder_tools.agent_config_tool import AgentConfigTool
|
|
from agent.tools.agent_builder_tools.mcp_search_tool import MCPSearchTool
|
|
from agent.tools.agent_builder_tools.credential_profile_tool import CredentialProfileTool
|
|
from agent.tools.agent_builder_tools.workflow_tool import WorkflowTool
|
|
from agent.tools.agent_builder_tools.trigger_tool import TriggerTool
|
|
from services.supabase import DBConnection
|
|
db = DBConnection()
|
|
|
|
thread_manager.add_tool(AgentConfigTool, thread_manager=thread_manager, db_connection=db, agent_id=target_agent_id)
|
|
thread_manager.add_tool(MCPSearchTool, thread_manager=thread_manager, db_connection=db, agent_id=target_agent_id)
|
|
thread_manager.add_tool(CredentialProfileTool, thread_manager=thread_manager, db_connection=db, agent_id=target_agent_id)
|
|
thread_manager.add_tool(WorkflowTool, thread_manager=thread_manager, db_connection=db, agent_id=target_agent_id)
|
|
thread_manager.add_tool(TriggerTool, thread_manager=thread_manager, db_connection=db, agent_id=target_agent_id)
|
|
|
|
|
|
if enabled_tools is None:
|
|
logger.info("No agent specified - registering all tools for full Suna capabilities")
|
|
thread_manager.add_tool(SandboxShellTool, project_id=project_id, thread_manager=thread_manager)
|
|
thread_manager.add_tool(SandboxFilesTool, project_id=project_id, thread_manager=thread_manager)
|
|
thread_manager.add_tool(SandboxBrowserTool, project_id=project_id, thread_id=thread_id, thread_manager=thread_manager)
|
|
thread_manager.add_tool(SandboxDeployTool, project_id=project_id, thread_manager=thread_manager)
|
|
thread_manager.add_tool(SandboxExposeTool, project_id=project_id, thread_manager=thread_manager)
|
|
thread_manager.add_tool(ExpandMessageTool, thread_id=thread_id, thread_manager=thread_manager)
|
|
thread_manager.add_tool(MessageTool)
|
|
thread_manager.add_tool(SandboxWebSearchTool, project_id=project_id, thread_manager=thread_manager)
|
|
thread_manager.add_tool(SandboxVisionTool, project_id=project_id, thread_id=thread_id, thread_manager=thread_manager)
|
|
thread_manager.add_tool(SandboxImageEditTool, project_id=project_id, thread_id=thread_id, thread_manager=thread_manager)
|
|
if config.RAPID_API_KEY:
|
|
thread_manager.add_tool(DataProvidersTool)
|
|
else:
|
|
logger.info("Custom agent specified - registering only enabled tools")
|
|
thread_manager.add_tool(ExpandMessageTool, thread_id=thread_id, thread_manager=thread_manager)
|
|
thread_manager.add_tool(MessageTool)
|
|
if enabled_tools.get('sb_shell_tool', {}).get('enabled', False):
|
|
thread_manager.add_tool(SandboxShellTool, project_id=project_id, thread_manager=thread_manager)
|
|
if enabled_tools.get('sb_files_tool', {}).get('enabled', False):
|
|
thread_manager.add_tool(SandboxFilesTool, project_id=project_id, thread_manager=thread_manager)
|
|
if enabled_tools.get('sb_browser_tool', {}).get('enabled', False):
|
|
thread_manager.add_tool(SandboxBrowserTool, project_id=project_id, thread_id=thread_id, thread_manager=thread_manager)
|
|
if enabled_tools.get('sb_deploy_tool', {}).get('enabled', False):
|
|
thread_manager.add_tool(SandboxDeployTool, project_id=project_id, thread_manager=thread_manager)
|
|
if enabled_tools.get('sb_expose_tool', {}).get('enabled', False):
|
|
thread_manager.add_tool(SandboxExposeTool, project_id=project_id, thread_manager=thread_manager)
|
|
if enabled_tools.get('web_search_tool', {}).get('enabled', False):
|
|
thread_manager.add_tool(SandboxWebSearchTool, project_id=project_id, thread_manager=thread_manager)
|
|
if enabled_tools.get('sb_vision_tool', {}).get('enabled', False):
|
|
thread_manager.add_tool(SandboxVisionTool, project_id=project_id, thread_id=thread_id, thread_manager=thread_manager)
|
|
if config.RAPID_API_KEY and enabled_tools.get('data_providers_tool', {}).get('enabled', False):
|
|
thread_manager.add_tool(DataProvidersTool)
|
|
|
|
# Register MCP tool wrapper if agent has configured MCPs or custom MCPs
|
|
mcp_wrapper_instance = None
|
|
if agent_config:
|
|
# Merge configured_mcps and custom_mcps
|
|
all_mcps = []
|
|
|
|
# Add standard configured MCPs
|
|
if agent_config.get('configured_mcps'):
|
|
all_mcps.extend(agent_config['configured_mcps'])
|
|
|
|
# Add custom MCPs
|
|
if agent_config.get('custom_mcps'):
|
|
for custom_mcp in agent_config['custom_mcps']:
|
|
# Transform custom MCP to standard format
|
|
custom_type = custom_mcp.get('customType', custom_mcp.get('type', 'sse'))
|
|
|
|
# For Pipedream MCPs, ensure we have the user ID and proper config
|
|
if custom_type == 'pipedream':
|
|
# Get user ID from thread
|
|
if 'config' not in custom_mcp:
|
|
custom_mcp['config'] = {}
|
|
|
|
# Get external_user_id from profile if not present
|
|
if not custom_mcp['config'].get('external_user_id'):
|
|
profile_id = custom_mcp['config'].get('profile_id')
|
|
if profile_id:
|
|
try:
|
|
from pipedream.profiles import get_profile_manager
|
|
from services.supabase import DBConnection
|
|
profile_db = DBConnection()
|
|
profile_manager = get_profile_manager(profile_db)
|
|
|
|
# Get the profile to retrieve external_user_id
|
|
profile = await profile_manager.get_profile(account_id, profile_id)
|
|
if profile:
|
|
custom_mcp['config']['external_user_id'] = profile.external_user_id
|
|
logger.info(f"Retrieved external_user_id from profile {profile_id} for Pipedream MCP")
|
|
else:
|
|
logger.error(f"Could not find profile {profile_id} for Pipedream MCP")
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving external_user_id from profile {profile_id}: {e}")
|
|
|
|
if 'headers' in custom_mcp['config'] and 'x-pd-app-slug' in custom_mcp['config']['headers']:
|
|
custom_mcp['config']['app_slug'] = custom_mcp['config']['headers']['x-pd-app-slug']
|
|
|
|
mcp_config = {
|
|
'name': custom_mcp['name'],
|
|
'qualifiedName': f"custom_{custom_type}_{custom_mcp['name'].replace(' ', '_').lower()}",
|
|
'config': custom_mcp['config'],
|
|
'enabledTools': custom_mcp.get('enabledTools', []),
|
|
'instructions': custom_mcp.get('instructions', ''),
|
|
'isCustom': True,
|
|
'customType': custom_type
|
|
}
|
|
all_mcps.append(mcp_config)
|
|
|
|
if all_mcps:
|
|
logger.info(f"Registering MCP tool wrapper for {len(all_mcps)} MCP servers (including {len(agent_config.get('custom_mcps', []))} custom)")
|
|
thread_manager.add_tool(MCPToolWrapper, mcp_configs=all_mcps)
|
|
|
|
for tool_name, tool_info in thread_manager.tool_registry.tools.items():
|
|
if isinstance(tool_info['instance'], MCPToolWrapper):
|
|
mcp_wrapper_instance = tool_info['instance']
|
|
break
|
|
|
|
if mcp_wrapper_instance:
|
|
try:
|
|
await mcp_wrapper_instance.initialize_and_register_tools()
|
|
logger.info("MCP tools initialized successfully")
|
|
updated_schemas = mcp_wrapper_instance.get_schemas()
|
|
logger.info(f"MCP wrapper has {len(updated_schemas)} schemas available")
|
|
for method_name, schema_list in updated_schemas.items():
|
|
if method_name != 'call_mcp_tool':
|
|
for schema in schema_list:
|
|
if schema.schema_type == SchemaType.OPENAPI:
|
|
thread_manager.tool_registry.tools[method_name] = {
|
|
"instance": mcp_wrapper_instance,
|
|
"schema": schema
|
|
}
|
|
logger.info(f"Registered dynamic MCP tool: {method_name}")
|
|
|
|
# Log all registered tools for debugging
|
|
all_tools = list(thread_manager.tool_registry.tools.keys())
|
|
logger.info(f"All registered tools after MCP initialization: {all_tools}")
|
|
mcp_tools = [tool for tool in all_tools if tool not in ['call_mcp_tool', 'sb_files_tool', 'message_tool', 'expand_msg_tool', 'web_search_tool', 'sb_shell_tool', 'sb_vision_tool', 'sb_browser_tool', 'computer_use_tool', 'data_providers_tool', 'sb_deploy_tool', 'sb_expose_tool', 'update_agent_tool']]
|
|
logger.info(f"MCP tools registered: {mcp_tools}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize MCP tools: {e}")
|
|
# Continue without MCP tools if initialization fails
|
|
|
|
# Prepare system prompt
|
|
# First, get the default system prompt
|
|
if "gemini-2.5-flash" in model_name.lower() and "gemini-2.5-pro" not in model_name.lower():
|
|
default_system_content = get_gemini_system_prompt()
|
|
else:
|
|
# Use the original prompt - the LLM can only use tools that are registered
|
|
default_system_content = get_system_prompt()
|
|
|
|
# Add sample response for non-anthropic models
|
|
if "anthropic" not in model_name.lower():
|
|
sample_response_path = os.path.join(os.path.dirname(__file__), 'sample_responses/1.txt')
|
|
with open(sample_response_path, 'r') as file:
|
|
sample_response = file.read()
|
|
default_system_content = default_system_content + "\n\n <sample_assistant_response>" + sample_response + "</sample_assistant_response>"
|
|
|
|
# Handle custom agent system prompt
|
|
if agent_config and agent_config.get('system_prompt'):
|
|
custom_system_prompt = agent_config['system_prompt'].strip()
|
|
|
|
# Special case: If this is Suna with builder capabilities enabled, enhance the default prompt
|
|
if suna_builder_enabled:
|
|
system_content = custom_system_prompt + "\n\n" + _get_suna_builder_enhancement()
|
|
logger.info(f"Using Suna default system prompt with self-configuration capabilities")
|
|
else:
|
|
# Completely replace the default system prompt with the custom one
|
|
# This prevents confusion and tool hallucination
|
|
system_content = custom_system_prompt
|
|
logger.info(f"Using ONLY custom agent system prompt for: {agent_config.get('name', 'Unknown')}")
|
|
elif is_agent_builder:
|
|
system_content = get_agent_builder_prompt()
|
|
logger.info("Using agent builder system prompt")
|
|
else:
|
|
# Use just the default system prompt
|
|
system_content = default_system_content
|
|
logger.info("Using default system prompt only")
|
|
|
|
if await is_enabled("knowledge_base"):
|
|
try:
|
|
from services.supabase import DBConnection
|
|
kb_db = DBConnection()
|
|
kb_client = await kb_db.client
|
|
|
|
current_agent_id = agent_config.get('agent_id') if agent_config else None
|
|
|
|
kb_result = await kb_client.rpc('get_combined_knowledge_base_context', {
|
|
'p_thread_id': thread_id,
|
|
'p_agent_id': current_agent_id,
|
|
'p_max_tokens': 4000
|
|
}).execute()
|
|
|
|
if kb_result.data and kb_result.data.strip():
|
|
logger.info(f"Adding combined knowledge base context to system prompt for thread {thread_id}, agent {current_agent_id}")
|
|
system_content += "\n\n" + kb_result.data
|
|
else:
|
|
logger.debug(f"No knowledge base context found for thread {thread_id}, agent {current_agent_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving knowledge base context for thread {thread_id}: {e}")
|
|
|
|
|
|
if agent_config and (agent_config.get('configured_mcps') or agent_config.get('custom_mcps')) and mcp_wrapper_instance and mcp_wrapper_instance._initialized:
|
|
mcp_info = "\n\n--- MCP Tools Available ---\n"
|
|
mcp_info += "You have access to external MCP (Model Context Protocol) server tools.\n"
|
|
mcp_info += "MCP tools can be called directly using their native function names in the standard function calling format:\n"
|
|
mcp_info += '<function_calls>\n'
|
|
mcp_info += '<invoke name="{tool_name}">\n'
|
|
mcp_info += '<parameter name="param1">value1</parameter>\n'
|
|
mcp_info += '<parameter name="param2">value2</parameter>\n'
|
|
mcp_info += '</invoke>\n'
|
|
mcp_info += '</function_calls>\n\n'
|
|
|
|
# List available MCP tools
|
|
mcp_info += "Available MCP tools:\n"
|
|
try:
|
|
# Get the actual registered schemas from the wrapper
|
|
registered_schemas = mcp_wrapper_instance.get_schemas()
|
|
for method_name, schema_list in registered_schemas.items():
|
|
if method_name == 'call_mcp_tool':
|
|
continue # Skip the fallback method
|
|
|
|
# Get the schema info
|
|
for schema in schema_list:
|
|
if schema.schema_type == SchemaType.OPENAPI:
|
|
func_info = schema.schema.get('function', {})
|
|
description = func_info.get('description', 'No description available')
|
|
# Extract server name from description if available
|
|
server_match = description.find('(MCP Server: ')
|
|
if server_match != -1:
|
|
server_end = description.find(')', server_match)
|
|
server_info = description[server_match:server_end+1]
|
|
else:
|
|
server_info = ''
|
|
|
|
mcp_info += f"- **{method_name}**: {description}\n"
|
|
|
|
# Show parameter info
|
|
params = func_info.get('parameters', {})
|
|
props = params.get('properties', {})
|
|
if props:
|
|
mcp_info += f" Parameters: {', '.join(props.keys())}\n"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error listing MCP tools: {e}")
|
|
mcp_info += "- Error loading MCP tool list\n"
|
|
|
|
# Add critical instructions for using search results
|
|
mcp_info += "\n🚨 CRITICAL MCP TOOL RESULT INSTRUCTIONS 🚨\n"
|
|
mcp_info += "When you use ANY MCP (Model Context Protocol) tools:\n"
|
|
mcp_info += "1. ALWAYS read and use the EXACT results returned by the MCP tool\n"
|
|
mcp_info += "2. For search tools: ONLY cite URLs, sources, and information from the actual search results\n"
|
|
mcp_info += "3. For any tool: Base your response entirely on the tool's output - do NOT add external information\n"
|
|
mcp_info += "4. DO NOT fabricate, invent, hallucinate, or make up any sources, URLs, or data\n"
|
|
mcp_info += "5. If you need more information, call the MCP tool again with different parameters\n"
|
|
mcp_info += "6. When writing reports/summaries: Reference ONLY the data from MCP tool results\n"
|
|
mcp_info += "7. If the MCP tool doesn't return enough information, explicitly state this limitation\n"
|
|
mcp_info += "8. Always double-check that every fact, URL, and reference comes from the MCP tool output\n"
|
|
mcp_info += "\nIMPORTANT: MCP tool results are your PRIMARY and ONLY source of truth for external data!\n"
|
|
mcp_info += "NEVER supplement MCP results with your training data or make assumptions beyond what the tools provide.\n"
|
|
|
|
system_content += mcp_info
|
|
|
|
system_message = { "role": "system", "content": system_content }
|
|
|
|
iteration_count = 0
|
|
continue_execution = True
|
|
|
|
latest_user_message = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'user').order('created_at', desc=True).limit(1).execute()
|
|
if latest_user_message.data and len(latest_user_message.data) > 0:
|
|
data = latest_user_message.data[0]['content']
|
|
if isinstance(data, str):
|
|
data = json.loads(data)
|
|
if trace:
|
|
trace.update(input=data['content'])
|
|
|
|
while continue_execution and iteration_count < max_iterations:
|
|
iteration_count += 1
|
|
logger.info(f"🔄 Running iteration {iteration_count} of {max_iterations}...")
|
|
|
|
# Billing check on each iteration - still needed within the iterations
|
|
can_run, message, subscription = await check_billing_status(client, account_id)
|
|
if not can_run:
|
|
error_msg = f"Billing limit reached: {message}"
|
|
if trace:
|
|
trace.event(name="billing_limit_reached", level="ERROR", status_message=(f"{error_msg}"))
|
|
# Yield a special message to indicate billing limit reached
|
|
yield {
|
|
"type": "status",
|
|
"status": "stopped",
|
|
"message": error_msg
|
|
}
|
|
break
|
|
# Check if last message is from assistant using direct Supabase query
|
|
latest_message = await client.table('messages').select('*').eq('thread_id', thread_id).in_('type', ['assistant', 'tool', 'user']).order('created_at', desc=True).limit(1).execute()
|
|
if latest_message.data and len(latest_message.data) > 0:
|
|
message_type = latest_message.data[0].get('type')
|
|
if message_type == 'assistant':
|
|
logger.info(f"Last message was from assistant, stopping execution")
|
|
if trace:
|
|
trace.event(name="last_message_from_assistant", level="DEFAULT", status_message=(f"Last message was from assistant, stopping execution"))
|
|
continue_execution = False
|
|
break
|
|
|
|
# ---- Temporary Message Handling (Browser State & Image Context) ----
|
|
temporary_message = None
|
|
temp_message_content_list = [] # List to hold text/image blocks
|
|
|
|
# Get the latest browser_state message
|
|
latest_browser_state_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'browser_state').order('created_at', desc=True).limit(1).execute()
|
|
if latest_browser_state_msg.data and len(latest_browser_state_msg.data) > 0:
|
|
try:
|
|
browser_content = latest_browser_state_msg.data[0]["content"]
|
|
if isinstance(browser_content, str):
|
|
browser_content = json.loads(browser_content)
|
|
screenshot_base64 = browser_content.get("screenshot_base64")
|
|
screenshot_url = browser_content.get("image_url")
|
|
|
|
# Create a copy of the browser state without screenshot data
|
|
browser_state_text = browser_content.copy()
|
|
browser_state_text.pop('screenshot_base64', None)
|
|
browser_state_text.pop('image_url', None)
|
|
|
|
if browser_state_text:
|
|
temp_message_content_list.append({
|
|
"type": "text",
|
|
"text": f"The following is the current state of the browser:\n{json.dumps(browser_state_text, indent=2)}"
|
|
})
|
|
|
|
# Only add screenshot if model is not Gemini, Anthropic, or OpenAI
|
|
if 'gemini' in model_name.lower() or 'anthropic' in model_name.lower() or 'openai' in model_name.lower():
|
|
# Prioritize screenshot_url if available
|
|
if screenshot_url:
|
|
temp_message_content_list.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": screenshot_url,
|
|
"format": "image/jpeg"
|
|
}
|
|
})
|
|
if trace:
|
|
trace.event(name="screenshot_url_added_to_temporary_message", level="DEFAULT", status_message=(f"Screenshot URL added to temporary message."))
|
|
elif screenshot_base64:
|
|
# Fallback to base64 if URL not available
|
|
temp_message_content_list.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:image/jpeg;base64,{screenshot_base64}",
|
|
}
|
|
})
|
|
if trace:
|
|
trace.event(name="screenshot_base64_added_to_temporary_message", level="WARNING", status_message=(f"Screenshot base64 added to temporary message. Prefer screenshot_url if available."))
|
|
else:
|
|
logger.warning("Browser state found but no screenshot data.")
|
|
if trace:
|
|
trace.event(name="browser_state_found_but_no_screenshot_data", level="WARNING", status_message=(f"Browser state found but no screenshot data."))
|
|
else:
|
|
logger.warning("Model is Gemini, Anthropic, or OpenAI, so not adding screenshot to temporary message.")
|
|
if trace:
|
|
trace.event(name="model_is_gemini_anthropic_or_openai", level="WARNING", status_message=(f"Model is Gemini, Anthropic, or OpenAI, so not adding screenshot to temporary message."))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing browser state: {e}")
|
|
if trace:
|
|
trace.event(name="error_parsing_browser_state", level="ERROR", status_message=(f"{e}"))
|
|
|
|
# Get the latest image_context message (NEW)
|
|
latest_image_context_msg = await client.table('messages').select('*').eq('thread_id', thread_id).eq('type', 'image_context').order('created_at', desc=True).limit(1).execute()
|
|
if latest_image_context_msg.data and len(latest_image_context_msg.data) > 0:
|
|
try:
|
|
image_context_content = latest_image_context_msg.data[0]["content"] if isinstance(latest_image_context_msg.data[0]["content"], dict) else json.loads(latest_image_context_msg.data[0]["content"])
|
|
base64_image = image_context_content.get("base64")
|
|
mime_type = image_context_content.get("mime_type")
|
|
file_path = image_context_content.get("file_path", "unknown file")
|
|
|
|
if base64_image and mime_type:
|
|
temp_message_content_list.append({
|
|
"type": "text",
|
|
"text": f"Here is the image you requested to see: '{file_path}'"
|
|
})
|
|
temp_message_content_list.append({
|
|
"type": "image_url",
|
|
"image_url": {
|
|
"url": f"data:{mime_type};base64,{base64_image}",
|
|
}
|
|
})
|
|
else:
|
|
logger.warning(f"Image context found for '{file_path}' but missing base64 or mime_type.")
|
|
|
|
await client.table('messages').delete().eq('message_id', latest_image_context_msg.data[0]["message_id"]).execute()
|
|
except Exception as e:
|
|
logger.error(f"Error parsing image context: {e}")
|
|
if trace:
|
|
trace.event(name="error_parsing_image_context", level="ERROR", status_message=(f"{e}"))
|
|
|
|
# If we have any content, construct the temporary_message
|
|
if temp_message_content_list:
|
|
temporary_message = {"role": "user", "content": temp_message_content_list}
|
|
# logger.debug(f"Constructed temporary message with {len(temp_message_content_list)} content blocks.")
|
|
# ---- End Temporary Message Handling ----
|
|
|
|
# Set max_tokens based on model
|
|
max_tokens = None
|
|
if "sonnet" in model_name.lower():
|
|
# Claude 3.5 Sonnet has a limit of 8192 tokens
|
|
max_tokens = 8192
|
|
elif "gpt-4" in model_name.lower():
|
|
max_tokens = 4096
|
|
elif "gemini-2.5-pro" in model_name.lower():
|
|
# Gemini 2.5 Pro has 64k max output tokens
|
|
max_tokens = 64000
|
|
elif "kimi-k2" in model_name.lower():
|
|
# Kimi-K2 has 120K context, set reasonable max output tokens
|
|
max_tokens = 8192
|
|
|
|
generation = trace.generation(name="thread_manager.run_thread") if trace else None
|
|
try:
|
|
# Make the LLM call and process the response
|
|
response = await thread_manager.run_thread(
|
|
thread_id=thread_id,
|
|
system_prompt=system_message,
|
|
stream=stream,
|
|
llm_model=model_name,
|
|
llm_temperature=0,
|
|
llm_max_tokens=max_tokens,
|
|
tool_choice="auto",
|
|
max_xml_tool_calls=1,
|
|
temporary_message=temporary_message,
|
|
processor_config=ProcessorConfig(
|
|
xml_tool_calling=True,
|
|
native_tool_calling=False,
|
|
execute_tools=True,
|
|
execute_on_stream=True,
|
|
tool_execution_strategy="parallel",
|
|
xml_adding_strategy="user_message"
|
|
),
|
|
native_max_auto_continues=native_max_auto_continues,
|
|
include_xml_examples=True,
|
|
enable_thinking=enable_thinking,
|
|
reasoning_effort=reasoning_effort,
|
|
enable_context_manager=enable_context_manager,
|
|
generation=generation
|
|
)
|
|
|
|
if isinstance(response, dict) and "status" in response and response["status"] == "error":
|
|
logger.error(f"Error response from run_thread: {response.get('message', 'Unknown error')}")
|
|
if trace:
|
|
trace.event(name="error_response_from_run_thread", level="ERROR", status_message=(f"{response.get('message', 'Unknown error')}"))
|
|
yield response
|
|
break
|
|
|
|
# Track if we see ask, complete, or web-browser-takeover tool calls
|
|
last_tool_call = None
|
|
agent_should_terminate = False
|
|
|
|
# Process the response
|
|
error_detected = False
|
|
full_response = ""
|
|
try:
|
|
# Check if response is iterable (async generator) or a dict (error case)
|
|
if hasattr(response, '__aiter__') and not isinstance(response, dict):
|
|
async for chunk in response:
|
|
# If we receive an error chunk, we should stop after this iteration
|
|
if isinstance(chunk, dict) and chunk.get('type') == 'status' and chunk.get('status') == 'error':
|
|
logger.error(f"Error chunk detected: {chunk.get('message', 'Unknown error')}")
|
|
if trace:
|
|
trace.event(name="error_chunk_detected", level="ERROR", status_message=(f"{chunk.get('message', 'Unknown error')}"))
|
|
error_detected = True
|
|
yield chunk # Forward the error chunk
|
|
continue # Continue processing other chunks but don't break yet
|
|
|
|
# Check for termination signal in status messages
|
|
if chunk.get('type') == 'status':
|
|
try:
|
|
# Parse the metadata to check for termination signal
|
|
metadata = chunk.get('metadata', {})
|
|
if isinstance(metadata, str):
|
|
metadata = json.loads(metadata)
|
|
|
|
if metadata.get('agent_should_terminate'):
|
|
agent_should_terminate = True
|
|
logger.info("Agent termination signal detected in status message")
|
|
if trace:
|
|
trace.event(name="agent_termination_signal_detected", level="DEFAULT", status_message="Agent termination signal detected in status message")
|
|
|
|
# Extract the tool name from the status content if available
|
|
content = chunk.get('content', {})
|
|
if isinstance(content, str):
|
|
content = json.loads(content)
|
|
|
|
if content.get('function_name'):
|
|
last_tool_call = content['function_name']
|
|
elif content.get('xml_tag_name'):
|
|
last_tool_call = content['xml_tag_name']
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error parsing status message for termination check: {e}")
|
|
|
|
# Check for XML versions like <ask>, <complete>, or <web-browser-takeover> in assistant content chunks
|
|
if chunk.get('type') == 'assistant' and 'content' in chunk:
|
|
try:
|
|
# The content field might be a JSON string or object
|
|
content = chunk.get('content', '{}')
|
|
if isinstance(content, str):
|
|
assistant_content_json = json.loads(content)
|
|
else:
|
|
assistant_content_json = content
|
|
|
|
# The actual text content is nested within
|
|
assistant_text = assistant_content_json.get('content', '')
|
|
full_response += assistant_text
|
|
if isinstance(assistant_text, str):
|
|
if '</ask>' in assistant_text or '</complete>' in assistant_text or '</web-browser-takeover>' in assistant_text:
|
|
if '</ask>' in assistant_text:
|
|
xml_tool = 'ask'
|
|
elif '</complete>' in assistant_text:
|
|
xml_tool = 'complete'
|
|
elif '</web-browser-takeover>' in assistant_text:
|
|
xml_tool = 'web-browser-takeover'
|
|
|
|
last_tool_call = xml_tool
|
|
logger.info(f"Agent used XML tool: {xml_tool}")
|
|
if trace:
|
|
trace.event(name="agent_used_xml_tool", level="DEFAULT", status_message=(f"Agent used XML tool: {xml_tool}"))
|
|
|
|
except json.JSONDecodeError:
|
|
# Handle cases where content might not be valid JSON
|
|
logger.warning(f"Warning: Could not parse assistant content JSON: {chunk.get('content')}")
|
|
if trace:
|
|
trace.event(name="warning_could_not_parse_assistant_content_json", level="WARNING", status_message=(f"Warning: Could not parse assistant content JSON: {chunk.get('content')}"))
|
|
except Exception as e:
|
|
logger.error(f"Error processing assistant chunk: {e}")
|
|
if trace:
|
|
trace.event(name="error_processing_assistant_chunk", level="ERROR", status_message=(f"Error processing assistant chunk: {e}"))
|
|
|
|
yield chunk
|
|
else:
|
|
# Response is not iterable, likely an error dict
|
|
logger.error(f"Response is not iterable: {response}")
|
|
error_detected = True
|
|
|
|
# Check if we should stop based on the last tool call or error
|
|
if error_detected:
|
|
logger.info(f"Stopping due to error detected in response")
|
|
if trace:
|
|
trace.event(name="stopping_due_to_error_detected_in_response", level="DEFAULT", status_message=(f"Stopping due to error detected in response"))
|
|
if generation:
|
|
generation.end(output=full_response, status_message="error_detected", level="ERROR")
|
|
break
|
|
|
|
if agent_should_terminate or last_tool_call in ['ask', 'complete', 'web-browser-takeover']:
|
|
logger.info(f"Agent decided to stop with tool: {last_tool_call}")
|
|
if trace:
|
|
trace.event(name="agent_decided_to_stop_with_tool", level="DEFAULT", status_message=(f"Agent decided to stop with tool: {last_tool_call}"))
|
|
if generation:
|
|
generation.end(output=full_response, status_message="agent_stopped")
|
|
continue_execution = False
|
|
|
|
except Exception as e:
|
|
# Just log the error and re-raise to stop all iterations
|
|
error_msg = f"Error during response streaming: {str(e)}"
|
|
logger.error(f"Error: {error_msg}")
|
|
if trace:
|
|
trace.event(name="error_during_response_streaming", level="ERROR", status_message=(f"Error during response streaming: {str(e)}"))
|
|
if generation:
|
|
generation.end(output=full_response, status_message=error_msg, level="ERROR")
|
|
yield {
|
|
"type": "status",
|
|
"status": "error",
|
|
"message": error_msg
|
|
}
|
|
# Stop execution immediately on any error
|
|
break
|
|
|
|
except Exception as e:
|
|
# Just log the error and re-raise to stop all iterations
|
|
error_msg = f"Error running thread: {str(e)}"
|
|
logger.error(f"Error: {error_msg}")
|
|
if trace:
|
|
trace.event(name="error_running_thread", level="ERROR", status_message=(f"Error running thread: {str(e)}"))
|
|
yield {
|
|
"type": "status",
|
|
"status": "error",
|
|
"message": error_msg
|
|
}
|
|
# Stop execution immediately on any error
|
|
break
|
|
if generation:
|
|
generation.end(output=full_response)
|
|
|
|
asyncio.create_task(asyncio.to_thread(lambda: langfuse.flush())) |