From cf13f3af31744063264b4a233ae4348a8e899d80 Mon Sep 17 00:00:00 2001 From: marko-kraemer Date: Sat, 5 Apr 2025 13:25:03 +0100 Subject: [PATCH] response processor orga refactor --- backend/agent/workspace/index.html | 495 ++++++++++++++++++++++- backend/agentpress/response_processor.py | 286 +++++++++---- backend/test_parallel.py | 80 ---- backend/test_wait_tool.py | 140 ------- 4 files changed, 682 insertions(+), 319 deletions(-) delete mode 100644 backend/test_parallel.py delete mode 100644 backend/test_wait_tool.py diff --git a/backend/agent/workspace/index.html b/backend/agent/workspace/index.html index e1fe48fa..0d9c663c 100644 --- a/backend/agent/workspace/index.html +++ b/backend/agent/workspace/index.html @@ -3,34 +3,493 @@ - Flappy Bird Test + Minecraft Clone - +
+
+
+
+
+
+
+ + + + \ No newline at end of file diff --git a/backend/agentpress/response_processor.py b/backend/agentpress/response_processor.py index c5dac5bf..df5360e3 100644 --- a/backend/agentpress/response_processor.py +++ b/backend/agentpress/response_processor.py @@ -13,7 +13,7 @@ import asyncio import re import uuid from typing import List, Dict, Any, Optional, Tuple, AsyncGenerator, Callable, Union, Literal -from dataclasses import dataclass +from dataclasses import dataclass, field from agentpress.tool import Tool, ToolResult from agentpress.tool_registry import ToolRegistry @@ -25,6 +25,15 @@ XmlAddingStrategy = Literal["user_message", "assistant_message", "inline_edit"] # Type alias for tool execution strategy ToolExecutionStrategy = Literal["sequential", "parallel"] +@dataclass +class ToolExecutionContext: + """Context for a tool execution including call details, result, and display info.""" + tool_call: Dict[str, Any] + tool_index: int + result: Optional[ToolResult] = None + display_name: Optional[str] = None + error: Optional[Exception] = None + @dataclass class ProcessorConfig: """ @@ -103,6 +112,9 @@ class ResponseProcessor: # Tool execution index counter tool_execution_index = 0 + # Tool index for remaining tools (used at end of function) + tool_index = 0 + logger.info(f"Starting to process streaming response for thread {thread_id}") logger.info(f"Config: XML={config.xml_tool_calling}, Native={config.native_tool_calling}, " f"Execute on stream={config.execute_on_stream}, Execution strategy={config.tool_execution_strategy}") @@ -110,7 +122,6 @@ class ResponseProcessor: try: async for chunk in llm_response: # Default content to yield - content_to_yield = None if hasattr(chunk, 'choices') and chunk.choices: delta = chunk.choices[0].delta if hasattr(chunk.choices[0], 'delta') else None @@ -136,16 +147,16 @@ class ResponseProcessor: # Parse and extract the tool call tool_call = self._parse_xml_tool_call(xml_chunk) if tool_call: + # Create a context for this tool execution + context = self._create_tool_context( + tool_call=tool_call, + tool_index=tool_execution_index + ) + # Execute tool if needed, but in background if config.execute_tools and config.execute_on_stream: # Yield tool execution start message - yield { - "type": "tool_status", - "status": "started", - "name": tool_call["name"], - "message": f"Starting execution of {tool_call['name']}", - "tool_index": tool_execution_index - } + yield self._yield_tool_started(context) # Start tool execution as a background task execution_task = asyncio.create_task(self._execute_tool(tool_call)) @@ -154,7 +165,8 @@ class ResponseProcessor: pending_tool_executions.append({ "task": execution_task, "tool_call": tool_call, - "tool_index": tool_execution_index + "tool_index": tool_execution_index, + "context": context }) # Increment the tool execution index @@ -208,14 +220,14 @@ class ResponseProcessor: "id": current_tool['id'] } + # Create a context for this tool execution + context = self._create_tool_context( + tool_call=tool_call_data, + tool_index=tool_execution_index + ) + # Yield tool execution start message - yield { - "type": "tool_status", - "status": "started", - "name": tool_call_data["name"], - "message": f"Starting execution of {tool_call_data['name']}", - "tool_index": tool_execution_index - } + yield self._yield_tool_started(context) # Start tool execution as a background task execution_task = asyncio.create_task(self._execute_tool(tool_call_data)) @@ -224,7 +236,8 @@ class ResponseProcessor: pending_tool_executions.append({ "task": execution_task, "tool_call": tool_call_data, - "tool_index": tool_execution_index + "tool_index": tool_execution_index, + "context": context }) # Increment the tool execution index @@ -245,22 +258,19 @@ class ResponseProcessor: # Store result for later database updates tool_results_buffer.append((tool_call, result)) - # Yield tool status message first - yield { - "type": "tool_status", - "status": "completed" if result.success else "failed", - "name": tool_call["name"], - "message": f"Tool {tool_call['name']} {'completed successfully' if result.success else 'failed'}", - "tool_index": tool_index - } + # Get or create the context + if "context" in execution: + context = execution["context"] + context.result = result + else: + context = self._create_tool_context(tool_call, tool_index) + context.result = result - # Yield tool execution result for client display - yield { - "type": "tool_result", - "name": tool_call["name"], - "result": str(result), - "tool_index": tool_index - } + # Yield tool status message first + yield self._yield_tool_completed(context) + + # Yield tool execution result + yield self._yield_tool_result(context) # Mark for removal completed_executions.append(i) @@ -270,14 +280,16 @@ class ResponseProcessor: tool_call = execution["tool_call"] tool_index = execution.get("tool_index", -1) - # Yield error status - yield { - "type": "tool_status", - "status": "error", - "name": tool_call["name"], - "message": f"Error executing tool: {str(e)}", - "tool_index": tool_index - } + # Get or create the context + if "context" in execution: + context = execution["context"] + context.error = e + else: + context = self._create_tool_context(tool_call, tool_index) + context.error = e + + # Yield error status for the tool + yield self._yield_tool_error(context) # Mark for removal completed_executions.append(i) @@ -305,33 +317,37 @@ class ResponseProcessor: # Store result for later tool_results_buffer.append((tool_call, result)) - # Yield tool status message first - yield { - "type": "tool_status", - "status": "completed" if result.success else "failed", - "name": tool_call["name"], - "message": f"Tool {tool_call['name']} {'completed successfully' if result.success else 'failed'}", - "tool_index": tool_index - } + # Get or create the context + if "context" in execution: + context = execution["context"] + context.result = result + else: + context = self._create_tool_context(tool_call, tool_index) + context.result = result + + # Yield tool status message first + yield self._yield_tool_completed(context) # Yield tool execution result - yield { - "type": "tool_result", - "name": tool_call["name"], - "result": str(result), - "tool_index": tool_index - } + yield self._yield_tool_result(context) except Exception as e: logger.error(f"Error processing remaining tool execution: {str(e)}") # Yield error status for the tool if "tool_call" in execution: tool_call = execution["tool_call"] tool_index = execution.get("tool_index", -1) + # Get or create the context + if "context" in execution: + context = execution["context"] + context.error = e + else: + context = self._create_tool_context(tool_call, tool_index) + context.error = e + formatted_result = self._format_xml_tool_result(tool_call, result) yield { - "type": "tool_status", - "status": "error", - "name": tool_call.get("name", "unknown"), - "message": f"Error processing tool result: {str(e)}", + "type": "tool_result", + "name": context.display_name, + "result": formatted_result, "tool_index": tool_index } @@ -373,6 +389,16 @@ class ResponseProcessor: result, config.xml_adding_strategy ) + + # Create context for tool result + context = self._create_tool_context(tool_call, tool_index) + context.result = result + + # Yield tool execution result + yield self._yield_tool_result(context) + + # Increment tool index for next tool + tool_index += 1 # Execute any remaining tool calls if not done during streaming if config.execute_tools and not config.execute_on_stream: @@ -414,12 +440,15 @@ class ResponseProcessor: config.xml_adding_strategy ) + # Create context for tool result + context = self._create_tool_context(tool_call, tool_index) + context.result = result + # Yield tool execution result - yield { - "type": "tool_result", - "name": tool_call["name"], - "result": str(result) - } + yield self._yield_tool_result(context) + + # Increment tool index for next tool + tool_index += 1 except Exception as e: logger.error(f"Error processing stream: {str(e)}", exc_info=True) @@ -445,6 +474,8 @@ class ResponseProcessor: # Extract content and tool calls from response content = "" tool_calls = [] + # Tool execution counter + tool_index = 0 if hasattr(llm_response, 'choices') and llm_response.choices: response_message = llm_response.choices[0].message if hasattr(llm_response.choices[0], 'message') else None @@ -509,12 +540,15 @@ class ResponseProcessor: config.xml_adding_strategy ) + # Create context for tool result + context = self._create_tool_context(tool_call, tool_index) + context.result = result + # Yield tool execution result - yield { - "type": "tool_result", - "name": tool_call["name"], - "result": str(result) - } + yield self._yield_tool_result(context) + + # Increment tool index for next tool + tool_index += 1 except Exception as e: logger.error(f"Error processing response: {str(e)}", exc_info=True) @@ -924,18 +958,12 @@ class ResponseProcessor: # Determine message role based on strategy result_role = "user" if strategy == "user_message" else "assistant" - # Determine if this is an XML tool - is_xml_tool = False - if hasattr(self.tool_registry, 'xml_tools'): - is_xml_tool = tool_call["name"] in self.tool_registry.xml_tools + # Create a context for consistent formatting + context = self._create_tool_context(tool_call, 0) # Index doesn't matter for DB + context.result = result - # Format the content based on tool type - if is_xml_tool: - # For XML tools, use simple content - content = str(result) - else: - # For native tools, include context about which tool was called - content = f"Result for {tool_call['name']}: {str(result)}" + # Format the content using the formatting helper + content = self._format_xml_tool_result(tool_call, result) # Add the message with the appropriate role result_message = { @@ -952,4 +980,100 @@ class ResponseProcessor: "content": str(result) }) except Exception as e2: - logger.error(f"Failed even with fallback message: {str(e2)}", exc_info=True) \ No newline at end of file + logger.error(f"Failed even with fallback message: {str(e2)}", exc_info=True) + + + def _format_xml_tool_result(self, tool_call: Dict[str, Any], result: ToolResult) -> str: + """Format a tool result as a simple XML tag. + + Args: + tool_call: The tool call that was executed + result: The result of the tool execution + + Returns: + String containing the XML-formatted result or standard format if not XML + """ + xml_tag_name = self._get_tool_display_name(tool_call["name"]) + + # If display name is same as method name, it's not an XML tool + if xml_tag_name == tool_call["name"]: + return f"Result for {tool_call['name']}: {str(result)}" + + # Format as simple XML tag without attributes + xml_output = f"<{xml_tag_name}> {str(result)} " + return xml_output + + def _get_tool_display_name(self, method_name: str) -> str: + """Get the display name for a tool (XML tag name if applicable, or method name).""" + if not hasattr(self.tool_registry, 'xml_tools'): + return method_name + + # Check if this method corresponds to an XML tool + for tag_name, xml_tool_info in self.tool_registry.xml_tools.items(): + if xml_tool_info.get('method') == method_name: + return tag_name + + # Default to the method name if no XML tag found + return method_name + + # At class level, define a method for yielding tool results + def _yield_tool_result(self, context: ToolExecutionContext) -> Dict[str, Any]: + """Format and return a tool result message.""" + if not context.result: + return { + "type": "tool_result", + "name": context.display_name, + "result": "No result available", + "tool_index": context.tool_index + } + + formatted_result = self._format_xml_tool_result(context.tool_call, context.result) + return { + "type": "tool_result", + "name": context.display_name, + "result": formatted_result, + "tool_index": context.tool_index + } + + def _create_tool_context(self, tool_call: Dict[str, Any], tool_index: int) -> ToolExecutionContext: + """Create a tool execution context with display name populated.""" + context = ToolExecutionContext( + tool_call=tool_call, + tool_index=tool_index + ) + context.display_name = self._get_tool_display_name(tool_call["name"]) + return context + + def _yield_tool_started(self, context: ToolExecutionContext) -> Dict[str, Any]: + """Format and return a tool started status message.""" + return { + "type": "tool_status", + "status": "started", + "name": context.display_name, + "message": f"Starting execution of {context.display_name}", + "tool_index": context.tool_index + } + + def _yield_tool_completed(self, context: ToolExecutionContext) -> Dict[str, Any]: + """Format and return a tool completed/failed status message.""" + if not context.result: + return self._yield_tool_error(context) + + return { + "type": "tool_status", + "status": "completed" if context.result.success else "failed", + "name": context.display_name, + "message": f"Tool {context.display_name} {'completed successfully' if context.result.success else 'failed'}", + "tool_index": context.tool_index + } + + def _yield_tool_error(self, context: ToolExecutionContext) -> Dict[str, Any]: + """Format and return a tool error status message.""" + error_msg = str(context.error) if context.error else "Unknown error" + return { + "type": "tool_status", + "status": "error", + "name": context.display_name, + "message": f"Error executing tool: {error_msg}", + "tool_index": context.tool_index + } \ No newline at end of file diff --git a/backend/test_parallel.py b/backend/test_parallel.py deleted file mode 100644 index d0505f3f..00000000 --- a/backend/test_parallel.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -Simplified test for sequential vs parallel tool execution. -""" - -import os -import asyncio -import sys -from dotenv import load_dotenv -from agentpress.thread_manager import ThreadManager -from agentpress.response_processor import ProcessorConfig -from agent.tools.wait_tool import WaitTool -from agentpress.tool import ToolResult - -# Load environment variables -load_dotenv() - -async def test_execution(): - """Directly test sequential vs parallel execution.""" - print("\n" + "="*80) - print("🧪 TESTING PARALLEL VS SEQUENTIAL EXECUTION") - print("="*80 + "\n") - - # Initialize ThreadManager and register tools - thread_manager = ThreadManager() - thread_manager.add_tool(WaitTool) - - # Create wait tool calls - wait_tool_calls = [ - {"name": "wait", "arguments": {"seconds": 2, "message": "Wait tool 1"}}, - {"name": "wait", "arguments": {"seconds": 2, "message": "Wait tool 2"}}, - {"name": "wait", "arguments": {"seconds": 2, "message": "Wait tool 3"}} - ] - - # Test sequential execution - print("🔄 Testing Sequential Execution") - print("-"*60) - sequential_start = asyncio.get_event_loop().time() - sequential_results = await thread_manager.response_processor._execute_tools( - wait_tool_calls, - sequential=True, - execution_strategy="sequential" - ) - sequential_end = asyncio.get_event_loop().time() - sequential_time = sequential_end - sequential_start - - print(f"Sequential execution completed in {sequential_time:.2f} seconds") - print() - - # Test parallel execution - print("⚡ Testing Parallel Execution") - print("-"*60) - parallel_start = asyncio.get_event_loop().time() - parallel_results = await thread_manager.response_processor._execute_tools( - wait_tool_calls, - sequential=False, - execution_strategy="parallel" - ) - parallel_end = asyncio.get_event_loop().time() - parallel_time = parallel_end - parallel_start - - print(f"Parallel execution completed in {parallel_time:.2f} seconds") - print() - - # Report results - print("\n" + "="*80) - print(f"🧮 RESULTS SUMMARY") - print("="*80) - print(f"Sequential: {sequential_time:.2f} seconds") - print(f"Parallel: {parallel_time:.2f} seconds") - print(f"Speedup: {sequential_time/parallel_time:.2f}x faster") - -if __name__ == "__main__": - try: - asyncio.run(test_execution()) - except KeyboardInterrupt: - print("\n\n❌ Test interrupted by user") - sys.exit(1) - except Exception as e: - print(f"\n\n❌ Error during test: {str(e)}") - sys.exit(1) \ No newline at end of file diff --git a/backend/test_wait_tool.py b/backend/test_wait_tool.py deleted file mode 100644 index 5f12da59..00000000 --- a/backend/test_wait_tool.py +++ /dev/null @@ -1,140 +0,0 @@ -""" -Test script for demonstrating sequential vs parallel tool execution strategies. - -This script creates a conversation thread and tests both execution strategies -with multiple wait tool calls to clearly demonstrate the difference. -""" - -import os -import asyncio -import sys -from dotenv import load_dotenv -from agentpress.thread_manager import ThreadManager -from agentpress.response_processor import ProcessorConfig -from agent.tools.wait_tool import WaitTool - -# Load environment variables -load_dotenv() - -TOOL_XML_SEQUENTIAL = """ -Here are some examples of using the wait tool: - -This is sequential wait 1 -This is sequential wait 2 -This is sequential wait 3 - -Now wait sequence: - -""" - -TOOL_XML_PARALLEL = """ -Here are some examples of using the wait tool: - -This is parallel wait 1 -This is parallel wait 2 -This is parallel wait 3 - -Now wait sequence: - -""" - -async def test_execution_strategies(): - """Test both sequential and parallel execution strategies.""" - print("\n" + "="*80) - print("🧪 TESTING TOOL EXECUTION STRATEGIES") - print("="*80 + "\n") - - # Initialize ThreadManager and register tools - thread_manager = ThreadManager() - thread_manager.add_tool(WaitTool) - - # Create a test thread - thread_id = await thread_manager.create_thread() - print(f"🧵 Created test thread: {thread_id}\n") - - # Add system message - await thread_manager.add_message( - thread_id, - { - "role": "system", - "content": "You are a testing assistant that will execute wait commands." - } - ) - - # Test both strategies - test_cases = [ - {"name": "Sequential", "strategy": "sequential", "content": TOOL_XML_SEQUENTIAL}, - {"name": "Parallel", "strategy": "parallel", "content": TOOL_XML_PARALLEL} - ] - - for test in test_cases: - print("\n" + "-"*60) - print(f"🔍 Testing {test['name']} Execution Strategy") - print("-"*60 + "\n") - - # Add special assistant message with tool calls - # This simulates an LLM response with tool calls - await thread_manager.add_message( - thread_id, - { - "role": "assistant", - "content": test["content"] - } - ) - - start_time = asyncio.get_event_loop().time() - print(f"⏱️ Starting execution with {test['strategy']} strategy at {start_time:.2f}s") - - # Process the response with appropriate strategy - config = ProcessorConfig( - xml_tool_calling=True, - native_tool_calling=False, - execute_tools=True, - execute_on_stream=False, - tool_execution_strategy=test["strategy"] - ) - - # Get the last message to process - messages = await thread_manager.get_messages(thread_id) - last_message = messages[-1] - - # Create a simple non-streaming response object - class MockResponse: - def __init__(self, content): - self.choices = [type('obj', (object,), { - 'message': type('obj', (object,), { - 'content': content - }) - })] - - mock_response = MockResponse(last_message["content"]) - - # Process using the response processor - async for chunk in thread_manager.response_processor.process_non_streaming_response( - llm_response=mock_response, - thread_id=thread_id, - config=config - ): - if chunk.get('type') == 'tool_result': - elapsed = asyncio.get_event_loop().time() - start_time - print(f"⏱️ [{elapsed:.2f}s] Tool result: {chunk['name']}") - print(f" {chunk['result']}") - print() - - end_time = asyncio.get_event_loop().time() - elapsed = end_time - start_time - print(f"\n⏱️ {test['name']} execution completed in {elapsed:.2f} seconds") - - print("\n" + "="*80) - print("✅ Testing completed") - print("="*80 + "\n") - -if __name__ == "__main__": - try: - asyncio.run(test_execution_strategies()) - except KeyboardInterrupt: - print("\n\n❌ Test interrupted by user") - sys.exit(1) - except Exception as e: - print(f"\n\n❌ Error during test: {str(e)}") - sys.exit(1) \ No newline at end of file