exec on stream as background task

This commit is contained in:
marko-kraemer 2025-04-05 11:29:44 +01:00
parent d94f87b0f4
commit 6d6dda1284
2 changed files with 91 additions and 30 deletions

View File

@ -9,7 +9,6 @@ This module handles processing of LLM responses including:
""" """
import json import json
import logging
import asyncio import asyncio
import re import re
import uuid import uuid
@ -28,7 +27,8 @@ ToolExecutionStrategy = Literal["sequential", "parallel"]
@dataclass @dataclass
class ProcessorConfig: class ProcessorConfig:
"""Configuration for response processing and tool execution. """
Configuration for response processing and tool execution.
This class controls how the LLM's responses are processed, including how tool calls This class controls how the LLM's responses are processed, including how tool calls
are detected, executed, and their results handled. are detected, executed, and their results handled.
@ -41,11 +41,10 @@ class ProcessorConfig:
tool_execution_strategy: How to execute multiple tools ("sequential" or "parallel") tool_execution_strategy: How to execute multiple tools ("sequential" or "parallel")
xml_adding_strategy: How to add XML tool results to the conversation xml_adding_strategy: How to add XML tool results to the conversation
""" """
# Tool detection
xml_tool_calling: bool = True xml_tool_calling: bool = True
native_tool_calling: bool = False native_tool_calling: bool = False
# Tool execution
execute_tools: bool = True execute_tools: bool = True
execute_on_stream: bool = False execute_on_stream: bool = False
tool_execution_strategy: ToolExecutionStrategy = "sequential" tool_execution_strategy: ToolExecutionStrategy = "sequential"
@ -98,6 +97,9 @@ class ResponseProcessor:
# For tracking tool results during streaming to add later # For tracking tool results during streaming to add later
tool_results_buffer = [] tool_results_buffer = []
# For tracking pending tool executions
pending_tool_executions = []
logger.info(f"Starting to process streaming response for thread {thread_id}") logger.info(f"Starting to process streaming response for thread {thread_id}")
logger.info(f"Config: XML={config.xml_tool_calling}, Native={config.native_tool_calling}, " logger.info(f"Config: XML={config.xml_tool_calling}, Native={config.native_tool_calling}, "
f"Execute on stream={config.execute_on_stream}, Execution strategy={config.tool_execution_strategy}") f"Execute on stream={config.execute_on_stream}, Execution strategy={config.tool_execution_strategy}")
@ -131,19 +133,18 @@ class ResponseProcessor:
# Parse and extract the tool call # Parse and extract the tool call
tool_call = self._parse_xml_tool_call(xml_chunk) tool_call = self._parse_xml_tool_call(xml_chunk)
if tool_call: if tool_call:
# Execute tool if needed, but store results for later # Execute tool if needed, but in background
if config.execute_tools and config.execute_on_stream: if config.execute_tools and config.execute_on_stream:
result = await self._execute_tool(tool_call) # Start tool execution as a background task
execution_task = asyncio.create_task(self._execute_tool(tool_call))
# Store result to add after assistant message # Store the task for later retrieval
tool_results_buffer.append((tool_call, result)) pending_tool_executions.append({
"task": execution_task,
"tool_call": tool_call
})
# Yield tool execution result for client display # Immediately continue processing more chunks
yield {
"type": "tool_result",
"name": tool_call["name"],
"result": str(result)
}
# Process native tool calls # Process native tool calls
if config.native_tool_calling and delta and hasattr(delta, 'tool_calls') and delta.tool_calls: if config.native_tool_calling and delta and hasattr(delta, 'tool_calls') and delta.tool_calls:
@ -190,18 +191,74 @@ class ResponseProcessor:
"arguments": json.loads(current_tool['function']['arguments']), "arguments": json.loads(current_tool['function']['arguments']),
"id": current_tool['id'] "id": current_tool['id']
} }
result = await self._execute_tool(tool_call_data)
# Store result to add after assistant message # Start tool execution as a background task
tool_results_buffer.append((tool_call_data, result)) execution_task = asyncio.create_task(self._execute_tool(tool_call_data))
# Yield tool execution result for client display # Store the task for later retrieval
yield { pending_tool_executions.append({
"type": "tool_result", "task": execution_task,
"name": tool_call_data["name"], "tool_call": tool_call_data
"result": str(result) })
}
# Immediately continue processing more chunks
# Check for completed tool executions
completed_executions = []
for i, execution in enumerate(pending_tool_executions):
if execution["task"].done():
try:
# Get the result
result = execution["task"].result()
tool_call = execution["tool_call"]
# Store result for later database updates
tool_results_buffer.append((tool_call, result))
# Yield tool execution result for client display
yield {
"type": "tool_result",
"name": tool_call["name"],
"result": str(result)
}
# Mark for removal
completed_executions.append(i)
except Exception as e:
logger.error(f"Error getting tool execution result: {str(e)}")
# Remove completed executions from pending list (in reverse to maintain indices)
for i in sorted(completed_executions, reverse=True):
pending_tool_executions.pop(i)
# After streaming completes, wait for any remaining tool executions
if pending_tool_executions:
logger.info(f"Waiting for {len(pending_tool_executions)} pending tool executions to complete")
# Wait for all pending tasks to complete
pending_tasks = [execution["task"] for execution in pending_tool_executions]
done, _ = await asyncio.wait(pending_tasks)
# Process results
for execution in pending_tool_executions:
try:
if execution["task"].done():
result = execution["task"].result()
tool_call = execution["tool_call"]
# Store result for later
tool_results_buffer.append((tool_call, result))
# Yield tool execution result
yield {
"type": "tool_result",
"name": tool_call["name"],
"result": str(result)
}
except Exception as e:
logger.error(f"Error processing remaining tool execution: {str(e)}")
# After streaming completes, process any remaining content and tool calls # After streaming completes, process any remaining content and tool calls
if accumulated_content: if accumulated_content:
# Extract final complete tool calls for native format # Extract final complete tool calls for native format

View File

@ -10,19 +10,14 @@ This module provides comprehensive conversation management, including:
""" """
import json import json
import logging
import asyncio
import uuid import uuid
import re
from typing import List, Dict, Any, Optional, Type, Union, AsyncGenerator, Tuple, Callable, Literal from typing import List, Dict, Any, Optional, Type, Union, AsyncGenerator, Tuple, Callable, Literal
from services.llm import make_llm_api_call from services.llm import make_llm_api_call
from agentpress.tool import Tool, ToolResult from agentpress.tool import Tool, ToolResult
from agentpress.tool_registry import ToolRegistry from agentpress.tool_registry import ToolRegistry
from agentpress.response_processor import ( from agentpress.response_processor import (
ResponseProcessor, ResponseProcessor,
ProcessorConfig, ProcessorConfig
XmlAddingStrategy,
ToolExecutionStrategy
) )
from services.supabase import DBConnection from services.supabase import DBConnection
from utils.logger import logger from utils.logger import logger
@ -318,6 +313,15 @@ class ThreadManager:
logger.debug(f"Processor config: XML={processor_config.xml_tool_calling}, Native={processor_config.native_tool_calling}, " logger.debug(f"Processor config: XML={processor_config.xml_tool_calling}, Native={processor_config.native_tool_calling}, "
f"Execute tools={processor_config.execute_tools}, Strategy={processor_config.tool_execution_strategy}") f"Execute tools={processor_config.execute_tools}, Strategy={processor_config.tool_execution_strategy}")
# Check if native_tool_calling is enabled and throw an error if it is
if processor_config.native_tool_calling:
error_message = "Native tool calling is not supported in this version"
logger.error(error_message)
return {
"status": "error",
"message": error_message
}
# 4. Prepare tools for LLM call # 4. Prepare tools for LLM call
openapi_tool_schemas = None openapi_tool_schemas = None
if processor_config.native_tool_calling: if processor_config.native_tool_calling: