chore(dev): mcp tools

This commit is contained in:
Soumyadas15 2025-05-29 23:27:03 +05:30
parent a476e71727
commit fa463fb09c
2 changed files with 116 additions and 56 deletions

View File

@ -158,13 +158,12 @@ async def run_agent(
mcp_info += "You have access to external MCP (Model Context Protocol) server tools through the call_mcp_tool function.\n" mcp_info += "You have access to external MCP (Model Context Protocol) server tools through the call_mcp_tool function.\n"
mcp_info += "To use an MCP tool, call it using the standard function calling format:\n" mcp_info += "To use an MCP tool, call it using the standard function calling format:\n"
mcp_info += '<function_calls>\n' mcp_info += '<function_calls>\n'
mcp_info += '<invoke name={server}_{tool}>\n' mcp_info += '<invoke name="call_mcp_tool">\n'
mcp_info += '<parameter name="tool_name">mcp_{server}_{tool}</parameter>\n' mcp_info += '<parameter name="tool_name">{server}_{tool}</parameter>\n'
mcp_info += '<parameter name="arguments">{"argument1": "value1", "argument2": "value2"}</parameter>\n' mcp_info += '<parameter name="arguments">{"argument1": "value1", "argument2": "value2"}</parameter>\n'
mcp_info += '</invoke>\n' mcp_info += '</invoke>\n'
mcp_info += '</function_calls>\n' mcp_info += '</function_calls>\n'
# List configured MCP servers
mcp_info += "\nConfigured MCP servers:\n" mcp_info += "\nConfigured MCP servers:\n"
for mcp_config in agent_config['configured_mcps']: for mcp_config in agent_config['configured_mcps']:
server_name = mcp_config.get('name', 'Unknown') server_name = mcp_config.get('name', 'Unknown')

View File

@ -2,7 +2,7 @@
MCP Tool Wrapper for AgentPress MCP Tool Wrapper for AgentPress
This module provides a generic tool wrapper that handles all MCP (Model Context Protocol) This module provides a generic tool wrapper that handles all MCP (Model Context Protocol)
server tool calls through a single AgentPress tool interface. server tool calls through dynamically generated individual function methods.
""" """
import json import json
@ -14,10 +14,10 @@ from utils.logger import logger
class MCPToolWrapper(Tool): class MCPToolWrapper(Tool):
""" """
A generic tool wrapper that handles all MCP server tool calls. A generic tool wrapper that dynamically creates individual methods for each MCP tool.
This tool acts as a proxy, forwarding tool calls to the appropriate MCP server This tool creates separate function calls for each MCP tool while routing them all
based on the tool name prefix. through the same underlying implementation.
""" """
def __init__(self, mcp_configs: Optional[List[Dict[str, Any]]] = None): def __init__(self, mcp_configs: Optional[List[Dict[str, Any]]] = None):
@ -31,13 +31,15 @@ class MCPToolWrapper(Tool):
self.mcp_manager = MCPManager() self.mcp_manager = MCPManager()
self.mcp_configs = mcp_configs or [] self.mcp_configs = mcp_configs or []
self._initialized = False self._initialized = False
self._dynamic_tools = {}
async def _ensure_initialized(self): async def _ensure_initialized(self):
"""Ensure MCP connections are initialized.""" """Ensure MCP connections are initialized and dynamic tools are created."""
if not self._initialized and self.mcp_configs: if not self._initialized and self.mcp_configs:
logger.info(f"Initializing MCP connections for {len(self.mcp_configs)} servers") logger.info(f"Initializing MCP connections for {len(self.mcp_configs)} servers")
try: try:
await self.mcp_manager.connect_all(self.mcp_configs) await self.mcp_manager.connect_all(self.mcp_configs)
await self._create_dynamic_tools()
self._initialized = True self._initialized = True
except ValueError as e: except ValueError as e:
if "SMITHERY_API_KEY" in str(e): if "SMITHERY_API_KEY" in str(e):
@ -48,54 +50,64 @@ class MCPToolWrapper(Tool):
logger.error("3. Or add it to your .env file: SMITHERY_API_KEY=your-key-here") logger.error("3. Or add it to your .env file: SMITHERY_API_KEY=your-key-here")
raise raise
async def _create_dynamic_tools(self):
"""Create dynamic tool methods for each available MCP tool."""
try:
available_tools = self.mcp_manager.get_all_tools_openapi()
for tool_info in available_tools:
tool_name = tool_info.get('function', {}).get('name', '')
if tool_name:
# Create a dynamic method for this tool
self._create_dynamic_method(tool_name, tool_info)
logger.info(f"Created {len(self._dynamic_tools)} dynamic MCP tool methods")
except Exception as e:
logger.error(f"Error creating dynamic MCP tools: {e}")
def _create_dynamic_method(self, tool_name: str, tool_info: Dict[str, Any]):
"""Create a dynamic method for a specific MCP tool."""
async def dynamic_tool_method(arguments: Dict[str, Any]) -> ToolResult:
"""Dynamically created method for MCP tool."""
return await self._execute_mcp_tool(tool_name, arguments)
# Store the method and its info
self._dynamic_tools[tool_name] = {
'method': dynamic_tool_method,
'info': tool_info
}
# Add the method to this instance
setattr(self, tool_name.replace('-', '_'), dynamic_tool_method)
def __getattr__(self, name: str):
"""Handle calls to dynamically created MCP tool methods."""
# Convert method name back to tool name (handle underscore conversion)
tool_name = name.replace('_', '-')
if tool_name in self._dynamic_tools:
return self._dynamic_tools[tool_name]['method']
# If it looks like an MCP tool name, try to find it
for existing_tool_name in self._dynamic_tools:
if existing_tool_name.replace('-', '_') == name:
return self._dynamic_tools[existing_tool_name]['method']
raise AttributeError(f"'{self.__class__.__name__}' object has no attribute '{name}'")
async def get_available_tools(self) -> List[Dict[str, Any]]: async def get_available_tools(self) -> List[Dict[str, Any]]:
"""Get all available MCP tools in OpenAPI format.""" """Get all available MCP tools in OpenAPI format."""
await self._ensure_initialized() await self._ensure_initialized()
return self.mcp_manager.get_all_tools_openapi() return self.mcp_manager.get_all_tools_openapi()
@openapi_schema({ async def _execute_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> ToolResult:
"type": "function",
"function": {
"name": "call_mcp_tool",
"description": "Execute a tool from any connected MCP server. This is a generic wrapper that forwards calls to MCP tools. The tool_name should be in the format 'mcp_{server}_{tool}' where {server} is the MCP server's qualified name and {tool} is the specific tool name.",
"parameters": {
"type": "object",
"properties": {
"tool_name": {
"type": "string",
"description": "The full MCP tool name in format 'mcp_{server}_{tool}', e.g., 'mcp_exa_web_search_exa'"
},
"arguments": {
"type": "object",
"description": "The arguments to pass to the MCP tool, as a JSON object. The required arguments depend on the specific tool being called.",
"additionalProperties": True
}
},
"required": ["tool_name", "arguments"]
}
}
})
@xml_schema(
tag_name="call-mcp-tool",
mappings=[
{"param_name": "tool_name", "node_type": "attribute", "path": "."},
{"param_name": "arguments", "node_type": "content", "path": "."}
],
example='''
<function_calls>
<invoke name="exa_web_search">
<parameter name="tool_name">mcp_exa_web_search_exa</parameter>
<parameter name="arguments">{"query": "latest developments in AI", "num_results": 10}</parameter>
</invoke>
</function_calls>
'''
)
async def call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> ToolResult:
""" """
Execute an MCP tool call. Execute an MCP tool call (internal implementation).
Args: Args:
tool_name: The full MCP tool name (e.g., "mcp_exa_web_search_exa") tool_name: The MCP tool name (e.g., "mcp_exa_web_search_exa")
arguments: The arguments to pass to the tool arguments: The arguments to pass to the tool
Returns: Returns:
@ -107,13 +119,6 @@ class MCPToolWrapper(Tool):
logger.info(f"Executing MCP tool {tool_name} with args: {arguments}") logger.info(f"Executing MCP tool {tool_name} with args: {arguments}")
# Parse tool name to extract server and tool info
parts = tool_name.split("_", 2)
if len(parts) != 3 or parts[0] != "mcp":
return self.fail_response(f"Invalid MCP tool name format: {tool_name}. Expected format: mcp_{{server}}_{{tool}}")
_, server_name, original_tool_name = parts
# Parse arguments if they're provided as a JSON string # Parse arguments if they're provided as a JSON string
if isinstance(arguments, str): if isinstance(arguments, str):
try: try:
@ -124,6 +129,11 @@ class MCPToolWrapper(Tool):
# Execute the tool through MCP manager # Execute the tool through MCP manager
result = await self.mcp_manager.execute_tool(tool_name, arguments) result = await self.mcp_manager.execute_tool(tool_name, arguments)
# Parse tool name to extract server and tool info for metadata
parts = tool_name.split("_", 2)
server_name = parts[1] if len(parts) > 1 else "unknown"
original_tool_name = parts[2] if len(parts) > 2 else tool_name
# Enhance the result with metadata for better frontend display # Enhance the result with metadata for better frontend display
enhanced_result = { enhanced_result = {
"mcp_metadata": { "mcp_metadata": {
@ -194,6 +204,57 @@ class MCPToolWrapper(Tool):
return self.fail_response(json.dumps(error_result, indent=2)) return self.fail_response(json.dumps(error_result, indent=2))
# Keep the original call_mcp_tool method as a fallback
@openapi_schema({
"type": "function",
"function": {
"name": "call_mcp_tool",
"description": "Execute a tool from any connected MCP server. This is a fallback wrapper that forwards calls to MCP tools. The tool_name should be in the format 'mcp_{server}_{tool}' where {server} is the MCP server's qualified name and {tool} is the specific tool name.",
"parameters": {
"type": "object",
"properties": {
"tool_name": {
"type": "string",
"description": "The full MCP tool name in format 'mcp_{server}_{tool}', e.g., 'mcp_exa_web_search_exa'"
},
"arguments": {
"type": "object",
"description": "The arguments to pass to the MCP tool, as a JSON object. The required arguments depend on the specific tool being called.",
"additionalProperties": True
}
},
"required": ["tool_name", "arguments"]
}
}
})
@xml_schema(
tag_name="call-mcp-tool",
mappings=[
{"param_name": "tool_name", "node_type": "attribute", "path": "."},
{"param_name": "arguments", "node_type": "content", "path": "."}
],
example='''
<function_calls>
<invoke name="call_mcp_tool">
<parameter name="tool_name">mcp_exa_web_search_exa</parameter>
<parameter name="arguments">{"query": "latest developments in AI", "num_results": 10}</parameter>
</invoke>
</function_calls>
'''
)
async def call_mcp_tool(self, tool_name: str, arguments: Dict[str, Any]) -> ToolResult:
"""
Execute an MCP tool call (fallback method).
Args:
tool_name: The full MCP tool name (e.g., "mcp_exa_web_search_exa")
arguments: The arguments to pass to the tool
Returns:
ToolResult with the tool execution result
"""
return await self._execute_mcp_tool(tool_name, arguments)
async def cleanup(self): async def cleanup(self):
"""Disconnect all MCP servers.""" """Disconnect all MCP servers."""
if self._initialized: if self._initialized: