diff --git a/backend/agentpress/response_processor.py b/backend/agentpress/response_processor.py index ad4afd07..a8ab17d9 100644 --- a/backend/agentpress/response_processor.py +++ b/backend/agentpress/response_processor.py @@ -9,7 +9,6 @@ This module handles processing of LLM responses including: """ import json -import logging import asyncio import re import uuid @@ -28,7 +27,8 @@ ToolExecutionStrategy = Literal["sequential", "parallel"] @dataclass class ProcessorConfig: - """Configuration for response processing and tool execution. + """ + Configuration for response processing and tool execution. This class controls how the LLM's responses are processed, including how tool calls are detected, executed, and their results handled. @@ -41,11 +41,10 @@ class ProcessorConfig: tool_execution_strategy: How to execute multiple tools ("sequential" or "parallel") xml_adding_strategy: How to add XML tool results to the conversation """ - # Tool detection + xml_tool_calling: bool = True native_tool_calling: bool = False - - # Tool execution + execute_tools: bool = True execute_on_stream: bool = False tool_execution_strategy: ToolExecutionStrategy = "sequential" @@ -98,6 +97,9 @@ class ResponseProcessor: # For tracking tool results during streaming to add later tool_results_buffer = [] + # For tracking pending tool executions + pending_tool_executions = [] + logger.info(f"Starting to process streaming response for thread {thread_id}") logger.info(f"Config: XML={config.xml_tool_calling}, Native={config.native_tool_calling}, " f"Execute on stream={config.execute_on_stream}, Execution strategy={config.tool_execution_strategy}") @@ -131,19 +133,18 @@ class ResponseProcessor: # Parse and extract the tool call tool_call = self._parse_xml_tool_call(xml_chunk) if tool_call: - # Execute tool if needed, but store results for later + # Execute tool if needed, but in background if config.execute_tools and config.execute_on_stream: - result = await self._execute_tool(tool_call) + # Start tool execution as a background task + execution_task = asyncio.create_task(self._execute_tool(tool_call)) - # Store result to add after assistant message - tool_results_buffer.append((tool_call, result)) + # Store the task for later retrieval + pending_tool_executions.append({ + "task": execution_task, + "tool_call": tool_call + }) - # Yield tool execution result for client display - yield { - "type": "tool_result", - "name": tool_call["name"], - "result": str(result) - } + # Immediately continue processing more chunks # Process native tool calls if config.native_tool_calling and delta and hasattr(delta, 'tool_calls') and delta.tool_calls: @@ -190,18 +191,74 @@ class ResponseProcessor: "arguments": json.loads(current_tool['function']['arguments']), "id": current_tool['id'] } - result = await self._execute_tool(tool_call_data) - # Store result to add after assistant message - tool_results_buffer.append((tool_call_data, result)) + # Start tool execution as a background task + execution_task = asyncio.create_task(self._execute_tool(tool_call_data)) - # Yield tool execution result for client display - yield { - "type": "tool_result", - "name": tool_call_data["name"], - "result": str(result) - } + # Store the task for later retrieval + pending_tool_executions.append({ + "task": execution_task, + "tool_call": tool_call_data + }) + + # Immediately continue processing more chunks + # Check for completed tool executions + completed_executions = [] + for i, execution in enumerate(pending_tool_executions): + if execution["task"].done(): + try: + # Get the result + result = execution["task"].result() + tool_call = execution["tool_call"] + + # Store result for later database updates + tool_results_buffer.append((tool_call, result)) + + # Yield tool execution result for client display + yield { + "type": "tool_result", + "name": tool_call["name"], + "result": str(result) + } + + # Mark for removal + completed_executions.append(i) + + except Exception as e: + logger.error(f"Error getting tool execution result: {str(e)}") + + # Remove completed executions from pending list (in reverse to maintain indices) + for i in sorted(completed_executions, reverse=True): + pending_tool_executions.pop(i) + + # After streaming completes, wait for any remaining tool executions + if pending_tool_executions: + logger.info(f"Waiting for {len(pending_tool_executions)} pending tool executions to complete") + + # Wait for all pending tasks to complete + pending_tasks = [execution["task"] for execution in pending_tool_executions] + done, _ = await asyncio.wait(pending_tasks) + + # Process results + for execution in pending_tool_executions: + try: + if execution["task"].done(): + result = execution["task"].result() + tool_call = execution["tool_call"] + + # Store result for later + tool_results_buffer.append((tool_call, result)) + + # Yield tool execution result + yield { + "type": "tool_result", + "name": tool_call["name"], + "result": str(result) + } + except Exception as e: + logger.error(f"Error processing remaining tool execution: {str(e)}") + # After streaming completes, process any remaining content and tool calls if accumulated_content: # Extract final complete tool calls for native format diff --git a/backend/agentpress/thread_manager.py b/backend/agentpress/thread_manager.py index 55639182..6440699d 100644 --- a/backend/agentpress/thread_manager.py +++ b/backend/agentpress/thread_manager.py @@ -10,19 +10,14 @@ This module provides comprehensive conversation management, including: """ import json -import logging -import asyncio import uuid -import re from typing import List, Dict, Any, Optional, Type, Union, AsyncGenerator, Tuple, Callable, Literal from services.llm import make_llm_api_call from agentpress.tool import Tool, ToolResult from agentpress.tool_registry import ToolRegistry from agentpress.response_processor import ( ResponseProcessor, - ProcessorConfig, - XmlAddingStrategy, - ToolExecutionStrategy + ProcessorConfig ) from services.supabase import DBConnection from utils.logger import logger @@ -318,6 +313,15 @@ class ThreadManager: logger.debug(f"Processor config: XML={processor_config.xml_tool_calling}, Native={processor_config.native_tool_calling}, " f"Execute tools={processor_config.execute_tools}, Strategy={processor_config.tool_execution_strategy}") + # Check if native_tool_calling is enabled and throw an error if it is + if processor_config.native_tool_calling: + error_message = "Native tool calling is not supported in this version" + logger.error(error_message) + return { + "status": "error", + "message": error_message + } + # 4. Prepare tools for LLM call openapi_tool_schemas = None if processor_config.native_tool_calling: