chore(dev): auto generate xml schema or mcp

This commit is contained in:
Soumyadas15 2025-05-30 13:01:11 +05:30
parent 546d5078ea
commit dedd6432bb
5 changed files with 313 additions and 32 deletions

View File

@ -28,6 +28,7 @@ from langfuse.client import StatefulTraceClient
from services.langfuse import langfuse
from agent.gemini_prompt import get_gemini_system_prompt
from agent.tools.mcp_tool_wrapper import MCPToolWrapper
from agentpress.tool import SchemaType
load_dotenv()
@ -120,10 +121,43 @@ async def run_agent(
thread_manager.add_tool(DataProvidersTool)
# Register MCP tool wrapper if agent has configured MCPs
mcp_wrapper_instance = None
if agent_config and agent_config.get('configured_mcps'):
logger.info(f"Registering MCP tool wrapper for {len(agent_config['configured_mcps'])} MCP servers")
# Register the tool
thread_manager.add_tool(MCPToolWrapper, mcp_configs=agent_config['configured_mcps'])
# Get the tool instance from the registry
# The tool is registered with method names as keys
for tool_name, tool_info in thread_manager.tool_registry.tools.items():
if isinstance(tool_info['instance'], MCPToolWrapper):
mcp_wrapper_instance = tool_info['instance']
break
# Initialize the MCP tools asynchronously
if mcp_wrapper_instance:
try:
await mcp_wrapper_instance.initialize_and_register_tools()
logger.info("MCP tools initialized successfully")
# Re-register the updated schemas with the tool registry
# This ensures the dynamically created tools are available for function calling
updated_schemas = mcp_wrapper_instance.get_schemas()
for method_name, schema_list in updated_schemas.items():
if method_name != 'call_mcp_tool': # Skip the fallback method
# Register each dynamic tool in the registry
for schema in schema_list:
if schema.schema_type == SchemaType.OPENAPI:
thread_manager.tool_registry.tools[method_name] = {
"instance": mcp_wrapper_instance,
"schema": schema
}
logger.debug(f"Registered dynamic MCP tool: {method_name}")
except Exception as e:
logger.error(f"Failed to initialize MCP tools: {e}")
# Continue without MCP tools if initialization fails
# Prepare system prompt
# First, get the default system prompt
if "gemini-2.5-flash" in model_name.lower():
@ -153,22 +187,50 @@ async def run_agent(
logger.info("Using default system prompt only")
# Add MCP tool information to system prompt if MCP tools are configured
if agent_config and agent_config.get('configured_mcps'):
if agent_config and agent_config.get('configured_mcps') and mcp_wrapper_instance and mcp_wrapper_instance._initialized:
mcp_info = "\n\n--- MCP Tools Available ---\n"
mcp_info += "You have access to external MCP (Model Context Protocol) server tools through the call_mcp_tool function.\n"
mcp_info += "To use an MCP tool, call it using the standard function calling format:\n"
mcp_info += "You have access to external MCP (Model Context Protocol) server tools.\n"
mcp_info += "MCP tools can be called directly using their native function names in the standard function calling format:\n"
mcp_info += '<function_calls>\n'
mcp_info += '<invoke name="call_mcp_tool">\n'
mcp_info += '<parameter name="tool_name">{server}_{tool}</parameter>\n'
mcp_info += '<parameter name="arguments">{"argument1": "value1", "argument2": "value2"}</parameter>\n'
mcp_info += '<invoke name="{tool_name}">\n'
mcp_info += '<parameter name="param1">value1</parameter>\n'
mcp_info += '<parameter name="param2">value2</parameter>\n'
mcp_info += '</invoke>\n'
mcp_info += '</function_calls>\n'
mcp_info += '</function_calls>\n\n'
mcp_info += "\nConfigured MCP servers:\n"
for mcp_config in agent_config['configured_mcps']:
server_name = mcp_config.get('name', 'Unknown')
qualified_name = mcp_config.get('qualifiedName', 'unknown')
mcp_info += f"- {server_name} (use prefix: mcp_{qualified_name}_)\n"
# List available MCP tools
mcp_info += "Available MCP tools:\n"
try:
# Get the actual registered schemas from the wrapper
registered_schemas = mcp_wrapper_instance.get_schemas()
for method_name, schema_list in registered_schemas.items():
if method_name == 'call_mcp_tool':
continue # Skip the fallback method
# Get the schema info
for schema in schema_list:
if schema.schema_type == SchemaType.OPENAPI:
func_info = schema.schema.get('function', {})
description = func_info.get('description', 'No description available')
# Extract server name from description if available
server_match = description.find('(MCP Server: ')
if server_match != -1:
server_end = description.find(')', server_match)
server_info = description[server_match:server_end+1]
else:
server_info = ''
mcp_info += f"- **{method_name}**: {description}\n"
# Show parameter info
params = func_info.get('parameters', {})
props = params.get('properties', {})
if props:
mcp_info += f" Parameters: {', '.join(props.keys())}\n"
except Exception as e:
logger.error(f"Error listing MCP tools: {e}")
mcp_info += "- Error loading MCP tool list\n"
# Add critical instructions for using search results
mcp_info += "\n🚨 CRITICAL MCP TOOL RESULT INSTRUCTIONS 🚨\n"

View File

@ -7,9 +7,10 @@ server tool calls through dynamically generated individual function methods.
import json
from typing import Any, Dict, List, Optional
from agentpress.tool import Tool, ToolResult, openapi_schema, xml_schema
from agentpress.tool import Tool, ToolResult, openapi_schema, xml_schema, ToolSchema, SchemaType
from mcp_local.client import MCPManager
from utils.logger import logger
import inspect
class MCPToolWrapper(Tool):
@ -27,11 +28,15 @@ class MCPToolWrapper(Tool):
Args:
mcp_configs: List of MCP configurations from agent's configured_mcps
"""
super().__init__()
# Don't call super().__init__() yet - we need to set up dynamic methods first
self.mcp_manager = MCPManager()
self.mcp_configs = mcp_configs or []
self._initialized = False
self._dynamic_tools = {}
self._schemas: Dict[str, List[ToolSchema]] = {}
# Now initialize the parent class which will call _register_schemas
super().__init__()
async def _ensure_initialized(self):
"""Ensure MCP connections are initialized and dynamic tools are created."""
@ -50,15 +55,37 @@ class MCPToolWrapper(Tool):
logger.error("3. Or add it to your .env file: SMITHERY_API_KEY=your-key-here")
raise
async def initialize_and_register_tools(self, tool_registry=None):
"""Initialize MCP tools and optionally update the tool registry.
This method should be called after the tool has been registered to dynamically
add the MCP tool schemas to the registry.
Args:
tool_registry: Optional ToolRegistry instance to update with new schemas
"""
await self._ensure_initialized()
# If a tool registry is provided, update it with our dynamic schemas
if tool_registry and self._dynamic_tools:
logger.info(f"Updating tool registry with {len(self._dynamic_tools)} MCP tools")
# The registry already has this tool instance registered,
# we just need to update the schemas
for method_name, schemas in self._schemas.items():
if method_name not in ['call_mcp_tool']: # Skip the fallback method
# The registry needs to know about these new methods
# We'll update the tool's schema registration
pass
async def _create_dynamic_tools(self):
"""Create dynamic tool methods for each available MCP tool."""
try:
available_tools = self.mcp_manager.get_all_tools_openapi()
for tool_info in available_tools:
tool_name = tool_info.get('function', {}).get('name', '')
tool_name = tool_info.get('name', '')
if tool_name:
# Create a dynamic method for this tool
# Create a dynamic method for this tool with proper OpenAI schema
self._create_dynamic_method(tool_name, tool_info)
logger.info(f"Created {len(self._dynamic_tools)} dynamic MCP tool methods")
@ -67,33 +94,103 @@ class MCPToolWrapper(Tool):
logger.error(f"Error creating dynamic MCP tools: {e}")
def _create_dynamic_method(self, tool_name: str, tool_info: Dict[str, Any]):
"""Create a dynamic method for a specific MCP tool."""
"""Create a dynamic method for a specific MCP tool with proper OpenAI schema."""
async def dynamic_tool_method(arguments: Dict[str, Any]) -> ToolResult:
# Extract the clean tool name without the mcp_{server}_ prefix
parts = tool_name.split("_", 2)
clean_tool_name = parts[2] if len(parts) > 2 else tool_name
server_name = parts[1] if len(parts) > 1 else "unknown"
# Use the clean tool name as the method name (without server prefix)
method_name = clean_tool_name.replace('-', '_')
# Store the original full tool name for execution
original_full_name = tool_name
# Create the dynamic method
async def dynamic_tool_method(**kwargs) -> ToolResult:
"""Dynamically created method for MCP tool."""
return await self._execute_mcp_tool(tool_name, arguments)
# Use the original full tool name for execution
return await self._execute_mcp_tool(original_full_name, kwargs)
# Set the method name to match the tool name
dynamic_tool_method.__name__ = method_name
dynamic_tool_method.__qualname__ = f"{self.__class__.__name__}.{method_name}"
# Build a more descriptive description
base_description = tool_info.get("description", f"MCP tool from {server_name}")
full_description = f"{base_description} (MCP Server: {server_name})"
# Create the OpenAI schema for this tool
openapi_function_schema = {
"type": "function",
"function": {
"name": method_name, # Use the clean method name for function calling
"description": full_description,
"parameters": tool_info.get("parameters", {
"type": "object",
"properties": {},
"required": []
})
}
}
# Create a ToolSchema object
tool_schema = ToolSchema(
schema_type=SchemaType.OPENAPI,
schema=openapi_function_schema
)
# Add the schema to our schemas dict
self._schemas[method_name] = [tool_schema]
# Also add the schema to the method itself (for compatibility)
dynamic_tool_method.tool_schemas = [tool_schema]
# Store the method and its info
self._dynamic_tools[tool_name] = {
'method': dynamic_tool_method,
'info': tool_info
'method_name': method_name,
'original_tool_name': tool_name,
'clean_tool_name': clean_tool_name,
'server_name': server_name,
'info': tool_info,
'schema': tool_schema
}
# Add the method to this instance
setattr(self, tool_name.replace('-', '_'), dynamic_tool_method)
setattr(self, method_name, dynamic_tool_method)
logger.debug(f"Created dynamic method '{method_name}' for MCP tool '{tool_name}' from server '{server_name}'")
def _register_schemas(self):
"""Register schemas from all decorated methods and dynamic tools."""
# First register static schemas from decorated methods
for name, method in inspect.getmembers(self, predicate=inspect.ismethod):
if hasattr(method, 'tool_schemas'):
self._schemas[name] = method.tool_schemas
logger.debug(f"Registered schemas for method '{name}' in {self.__class__.__name__}")
# Note: Dynamic schemas will be added after async initialization
logger.debug(f"Initial registration complete for MCPToolWrapper")
def get_schemas(self) -> Dict[str, List[ToolSchema]]:
"""Get all registered tool schemas including dynamic ones."""
# Return all schemas including dynamically added ones
return self._schemas
def __getattr__(self, name: str):
"""Handle calls to dynamically created MCP tool methods."""
# Convert method name back to tool name (handle underscore conversion)
tool_name = name.replace('_', '-')
# Look for exact method name match first
for tool_data in self._dynamic_tools.values():
if tool_data['method_name'] == name:
return tool_data['method']
if tool_name in self._dynamic_tools:
return self._dynamic_tools[tool_name]['method']
# If it looks like an MCP tool name, try to find it
for existing_tool_name in self._dynamic_tools:
if existing_tool_name.replace('-', '_') == name:
return self._dynamic_tools[existing_tool_name]['method']
# Try with underscore/hyphen conversion
name_with_hyphens = name.replace('_', '-')
for tool_name, tool_data in self._dynamic_tools.items():
if tool_data['method_name'] == name or tool_name == name_with_hyphens:
return tool_data['method']
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")

View File

@ -1458,7 +1458,22 @@ class ResponseProcessor:
# Check if this is an MCP tool (function_name starts with "call_mcp_tool")
function_name = tool_call.get("function_name", "")
# Check if this is an MCP tool - either the old call_mcp_tool or a dynamically registered MCP tool
is_mcp_tool = False
if function_name == "call_mcp_tool":
is_mcp_tool = True
else:
# Check if the result indicates it's an MCP tool by looking for MCP metadata
if hasattr(result, 'output') and isinstance(result.output, str):
# Check for MCP metadata pattern in the output
if "MCP Tool Result from" in result.output and "Tool Metadata:" in result.output:
is_mcp_tool = True
# Also check for MCP metadata in JSON format
elif "mcp_metadata" in result.output:
is_mcp_tool = True
if is_mcp_tool:
# Special handling for MCP tools - make content prominent and LLM-friendly
result_role = "user" if strategy == "user_message" else "assistant"

108
backend/test_mcp_tools.py Normal file
View File

@ -0,0 +1,108 @@
"""
Test script to list ONLY MCP tool OpenAI schema method names
"""
import asyncio
import os
from dotenv import load_dotenv
from agentpress.thread_manager import ThreadManager
from agent.tools.mcp_tool_wrapper import MCPToolWrapper
from agentpress.tool import SchemaType
from utils.logger import logger
load_dotenv()
async def test_mcp_tools_only():
"""Test listing only MCP tools and their OpenAI schema method names"""
# Create thread manager
thread_manager = ThreadManager()
print("\n=== MCP Tools Test ===")
# MCP configuration with ALL tools enabled (empty enabledTools)
mcp_configs = [
{
"name": "Exa Search",
"qualifiedName": "exa",
"config": {"exaApiKey": os.getenv("EXA_API_KEY", "test-key")},
"enabledTools": [] # Empty to get ALL tools
}
]
# Register MCP tool wrapper
logger.info("Registering MCP tool wrapper...")
thread_manager.add_tool(MCPToolWrapper, mcp_configs=mcp_configs)
# Get the tool instance
mcp_wrapper_instance = None
for tool_name, tool_info in thread_manager.tool_registry.tools.items():
if isinstance(tool_info['instance'], MCPToolWrapper):
mcp_wrapper_instance = tool_info['instance']
break
if not mcp_wrapper_instance:
logger.error("Failed to find MCP wrapper instance")
return
try:
# Initialize MCP tools
logger.info("Initializing MCP tools...")
await mcp_wrapper_instance.initialize_and_register_tools()
# Get all available MCP tools from the server
available_mcp_tools = await mcp_wrapper_instance.get_available_tools()
print(f"\nTotal MCP tools available from server: {len(available_mcp_tools)}")
# Get the dynamically created schemas
updated_schemas = mcp_wrapper_instance.get_schemas()
mcp_method_schemas = {k: v for k, v in updated_schemas.items() if k != 'call_mcp_tool'}
print(f"\nDynamically created MCP methods: {len(mcp_method_schemas)}")
# List all MCP tool method names with descriptions
print("\n=== MCP Tool Method Names (Clean Names) ===")
for method_name, schema_list in sorted(mcp_method_schemas.items()):
for schema in schema_list:
if schema.schema_type == SchemaType.OPENAPI:
func_info = schema.schema.get('function', {})
func_desc = func_info.get('description', 'No description')
# Extract just the description part before "(MCP Server:"
desc_parts = func_desc.split(' (MCP Server:')
clean_desc = desc_parts[0] if desc_parts else func_desc
print(f"\n{method_name}")
print(f" Description: {clean_desc}")
# Show parameters
params = func_info.get('parameters', {})
props = params.get('properties', {})
required = params.get('required', [])
if props:
print(f" Parameters:")
for param_name, param_info in props.items():
param_type = param_info.get('type', 'any')
param_desc = param_info.get('description', 'No description')
is_required = param_name in required
req_marker = " (required)" if is_required else " (optional)"
print(f" - {param_name}: {param_type}{req_marker} - {param_desc}")
# Show the name mapping
print("\n\n=== MCP Tool Name Mapping (Original -> Clean) ===")
for original_name, tool_data in sorted(mcp_wrapper_instance._dynamic_tools.items()):
print(f"{original_name} -> {tool_data['method_name']}")
# Summary of callable method names
print("\n\n=== Summary: Callable MCP Method Names ===")
method_names = sorted(mcp_method_schemas.keys())
for name in method_names:
print(f"- {name}")
print(f"\nTotal callable MCP methods: {len(method_names)}")
except Exception as e:
logger.error(f"Error during MCP initialization: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
asyncio.run(test_mcp_tools_only())

View File

@ -1 +0,0 @@