From 6eb516e61de5bf1f4c946416194df57191b46077 Mon Sep 17 00:00:00 2001 From: marko-kraemer Date: Mon, 7 Apr 2025 18:35:40 +0100 Subject: [PATCH] native tool call wip --- backend/agent/prompt.py | 7 +- backend/agent/run.py | 12 +- backend/agent/tools/wait_tool.py | 176 ---------------- backend/agent/workspace/index.html | 233 +++++++++++++++++++++ backend/agentpress/response_processor.py | 251 ++++++++++++++--------- backend/agentpress/thread_manager.py | 80 +++++++- backend/services/llm.py | 2 +- backend/tests/test_simple_tools.py | 28 ++- 8 files changed, 499 insertions(+), 290 deletions(-) delete mode 100644 backend/agent/tools/wait_tool.py create mode 100644 backend/agent/workspace/index.html diff --git a/backend/agent/prompt.py b/backend/agent/prompt.py index d8a2046b..0970a8b3 100644 --- a/backend/agent/prompt.py +++ b/backend/agent/prompt.py @@ -22,7 +22,6 @@ Remember: 5. Focus on providing accurate, helpful information 6. Consider context and user needs in your responses 7. Handle ambiguity gracefully by asking clarifying questions when needed -8. ISSUE ONLY ONE SINGLE XML TOOL CALL AT A TIME - complete one action before proceeding to the next You have access to these tools through XML-based tool calling: @@ -38,12 +37,12 @@ You have access to these tools through XML-based tool calling: """ + +#Wait for each action to complete before proceeding to the next one. RESPONSE_FORMAT = """ RESPONSE FORMAT – STRICTLY Output XML tags for tool calling -You must only use ONE tool call at a time. Wait for each action to complete before proceeding to the next one. - file contents here @@ -82,5 +81,5 @@ def get_system_prompt(): ''' Returns the system prompt with XML tool usage instructions. ''' - return SYSTEM_PROMPT + return SYSTEM_PROMPT #+ RESPONSE_FORMAT \ No newline at end of file diff --git a/backend/agent/run.py b/backend/agent/run.py index b0655b10..118fd64d 100644 --- a/backend/agent/run.py +++ b/backend/agent/run.py @@ -4,7 +4,6 @@ import uuid from agentpress.thread_manager import ThreadManager from agent.tools.files_tool import FilesTool from agent.tools.terminal_tool import TerminalTool -from agent.tools.wait_tool import WaitTool # from agent.tools.search_tool import CodeSearchTool from typing import Optional from agent.prompt import get_system_prompt @@ -23,7 +22,6 @@ async def run_agent(thread_id: str, stream: bool = True, thread_manager: Optiona print("Adding tools to thread manager...") thread_manager.add_tool(FilesTool) thread_manager.add_tool(TerminalTool) - thread_manager.add_tool(WaitTool) # thread_manager.add_tool(CodeSearchTool) system_message = { @@ -31,7 +29,12 @@ async def run_agent(thread_id: str, stream: bool = True, thread_manager: Optiona "content": get_system_prompt() } - model_name = "bedrock/anthropic.claude-3-7-sonnet-20250219-v1:0" #groq/deepseek-r1-distill-llama-70b + model_name = "anthropic/claude-3-5-sonnet-latest" + + #anthropic/claude-3-7-sonnet-latest + #openai/gpt-4o + #groq/deepseek-r1-distill-llama-70b + #bedrock/anthropic.claude-3-7-sonnet-20250219-v1:0 files_tool = FilesTool() @@ -55,12 +58,13 @@ Current development environment workspace state: llm_model=model_name, llm_temperature=0.1, llm_max_tokens=8000, + tool_choice="any", processor_config=ProcessorConfig( xml_tool_calling=False, native_tool_calling=True, execute_tools=True, execute_on_stream=True, - tool_execution_strategy="sequential", + tool_execution_strategy="parallel", xml_adding_strategy="user_message" ) ) diff --git a/backend/agent/tools/wait_tool.py b/backend/agent/tools/wait_tool.py deleted file mode 100644 index 2b9e8bd3..00000000 --- a/backend/agent/tools/wait_tool.py +++ /dev/null @@ -1,176 +0,0 @@ -""" -Wait Tool for testing sequential vs parallel tool execution. - -This tool provides methods with configurable delays to test and demonstrate -the different tool execution strategies in AgentPress. -""" - -import asyncio -from agentpress.tool import Tool, ToolResult, openapi_schema, xml_schema -from utils.logger import logger - - -class WaitTool(Tool): - """Tool that introduces configurable delays. - - This tool is useful for testing and demonstrating sequential vs parallel - tool execution strategies by creating observable delays. - """ - - def __init__(self): - """Initialize the WaitTool.""" - super().__init__() - logger.info("Initialized WaitTool for testing execution strategies") - - @xml_schema( - tag_name="wait", - mappings=[ - {"param_name": "seconds", "node_type": "attribute", "path": "."}, - {"param_name": "message", "node_type": "content", "path": "."} - ], - example=''' - This will wait for 3 seconds - ''' - ) - @openapi_schema({ - "type": "function", - "function": { - "name": "wait", - "description": "Wait for a specified number of seconds", - "parameters": { - "type": "object", - "properties": { - "seconds": { - "type": "number", - "description": "Number of seconds to wait" - }, - "message": { - "type": "string", - "description": "Message to include in the result" - } - }, - "required": ["seconds"] - } - } - }) - async def wait(self, seconds: float, message: str = "") -> ToolResult: - """Wait for the specified number of seconds. - - Args: - seconds: Number of seconds to wait - message: Optional message to include in result - - Returns: - ToolResult with success status and timing information - """ - try: - # Limit the wait time to a reasonable range - seconds = min(max(0.5, float(seconds)), 10.0) - - logger.info(f"WaitTool: Starting wait for {seconds} seconds") - start_time = asyncio.get_event_loop().time() - - # Perform the actual wait - await asyncio.sleep(seconds) - - end_time = asyncio.get_event_loop().time() - elapsed = end_time - start_time - - logger.info(f"WaitTool: Completed wait of {elapsed:.2f} seconds") - - # Format the result - if message: - result = f"Waited for {elapsed:.2f} seconds with message: {message}" - else: - result = f"Waited for {elapsed:.2f} seconds" - - return self.success_response(result) - - except Exception as e: - logger.error(f"WaitTool error: {str(e)}") - return self.fail_response(f"Error during wait operation: {str(e)}") - - @xml_schema( - tag_name="wait-sequence", - mappings=[ - {"param_name": "count", "node_type": "attribute", "path": "."}, - {"param_name": "seconds", "node_type": "attribute", "path": "."}, - {"param_name": "label", "node_type": "attribute", "path": "."} - ], - example=''' - - ''' - ) - @openapi_schema({ - "type": "function", - "function": { - "name": "wait_sequence", - "description": "Execute a sequence of waits with the same duration", - "parameters": { - "type": "object", - "properties": { - "count": { - "type": "integer", - "description": "Number of sequential waits to perform" - }, - "seconds": { - "type": "number", - "description": "Duration of each wait in seconds" - }, - "label": { - "type": "string", - "description": "Label to identify this wait sequence" - } - }, - "required": ["count", "seconds"] - } - } - }) - async def wait_sequence(self, count: int, seconds: float, label: str = "Sequence") -> ToolResult: - """Perform a sequence of waits with progress reporting. - - Args: - count: Number of sequential waits to perform - seconds: Duration of each wait in seconds - label: Label to identify this wait sequence - - Returns: - ToolResult with success status and sequence information - """ - try: - # Validate and limit parameters - count = min(max(1, int(count)), 5) - seconds = min(max(0.5, float(seconds)), 5.0) - - logger.info(f"WaitTool: Starting wait sequence '{label}' with {count} iterations of {seconds}s each") - - # Perform the sequential waits - result_parts = [] - total_time = 0 - - for i in range(count): - start = asyncio.get_event_loop().time() - - # Log the wait start - logger.info(f"WaitTool: Sequence '{label}' - Starting iteration {i+1}/{count}") - - # Perform the wait - await asyncio.sleep(seconds) - - # Calculate elapsed time - elapsed = asyncio.get_event_loop().time() - start - total_time += elapsed - - # Add to results - result_parts.append(f"Step {i+1}/{count}: Waited {elapsed:.2f}s") - logger.info(f"WaitTool: Sequence '{label}' - Completed iteration {i+1}/{count} in {elapsed:.2f}s") - - # Compile the final result - result = f"Wait Sequence '{label}' completed in {total_time:.2f} seconds:\n" - result += "\n".join(result_parts) - - return self.success_response(result) - - except Exception as e: - logger.error(f"WaitTool error in sequence '{label}': {str(e)}") - return self.fail_response(f"Error during wait sequence: {str(e)}") \ No newline at end of file diff --git a/backend/agent/workspace/index.html b/backend/agent/workspace/index.html new file mode 100644 index 00000000..0b75ffa7 --- /dev/null +++ b/backend/agent/workspace/index.html @@ -0,0 +1,233 @@ + + + + + + Innovative Solutions | Transform Your Business + + + + + + + + +
+
+
+

Transform Your Business with Smart Solutions

+

Empower your company with cutting-edge technology and innovative strategies that drive growth.

+ +
+
+ Business Innovation +
+
+
+ + +
+
+

Why Choose Us

+
+
+ +

Fast Implementation

+

Get up and running quickly with our streamlined onboarding process.

+
+
+ +

Secure & Reliable

+

Enterprise-grade security to protect your valuable business data.

+
+
+ +

Scalable Solution

+

Grow your business with a platform that scales with your needs.

+
+
+ +

24/7 Support

+

Round-the-clock support to help you succeed.

+
+
+
+
+ + +
+
+

Benefits That Drive Success

+
+
+

Increase Productivity

+

Streamline your workflows and boost team efficiency by up to 50%.

+
    +
  • Automated task management
  • +
  • Real-time collaboration tools
  • +
  • Integrated communication systems
  • +
+
+
+ Productivity Benefits +
+
+
+
+ + +
+
+

What Our Clients Say

+
+
+
+

"This solution has transformed how we operate. Our productivity has increased by 200% since implementation."

+
+ John Smith +
+

John Smith

+

CEO, Tech Solutions Inc.

+
+
+
+
+
+
+

"The best investment we've made for our business. Customer support is outstanding!"

+
+ Sarah Johnson +
+

Sarah Johnson

+

Operations Director, Global Corp

+
+
+
+
+
+
+
+ + +
+
+

Simple, Transparent Pricing

+
+
+

Starter

+
$29/month
+
    +
  • Up to 5 users
  • +
  • Basic features
  • +
  • Email support
  • +
  • 2GB storage
  • +
+ Get Started +
+ +
+

Enterprise

+
Custom
+
    +
  • Unlimited users
  • +
  • All features
  • +
  • 24/7 support
  • +
  • Unlimited storage
  • +
+ Contact Us +
+
+
+
+ + +
+
+

Ready to Get Started?

+
+
+
+ +
+
+ +
+
+ +
+
+ +
+ +
+
+
+
+ + +
+
+ + +
+
+ + + + \ No newline at end of file diff --git a/backend/agentpress/response_processor.py b/backend/agentpress/response_processor.py index 33e9291e..b651453b 100644 --- a/backend/agentpress/response_processor.py +++ b/backend/agentpress/response_processor.py @@ -113,6 +113,9 @@ class ResponseProcessor: # Tool index counter for tracking all tool executions tool_index = 0 + # Track finish reason + finish_reason = None + logger.info(f"Starting to process streaming response for thread {thread_id}") logger.info(f"Config: XML={config.xml_tool_calling}, Native={config.native_tool_calling}, " f"Execute on stream={config.execute_on_stream}, Execution strategy={config.tool_execution_strategy}") @@ -121,6 +124,11 @@ class ResponseProcessor: async for chunk in llm_response: # Default content to yield + # Check for finish_reason + if hasattr(chunk, 'choices') and chunk.choices and hasattr(chunk.choices[0], 'finish_reason') and chunk.choices[0].finish_reason: + finish_reason = chunk.choices[0].finish_reason + logger.info(f"Detected finish_reason: {finish_reason}") + if hasattr(chunk, 'choices') and chunk.choices: delta = chunk.choices[0].delta if hasattr(chunk.choices[0], 'delta') else None @@ -172,108 +180,109 @@ class ResponseProcessor: # Immediately continue processing more chunks - # Process native tool calls - if config.native_tool_calling and delta and hasattr(delta, 'tool_calls') and delta.tool_calls: - for tool_call in delta.tool_calls: - # Yield the raw tool call chunk directly to the stream - # Safely extract tool call data even if model_dump isn't available - tool_call_data = {} + # Process native tool calls + if config.native_tool_calling and delta and hasattr(delta, 'tool_calls') and delta.tool_calls: + for tool_call in delta.tool_calls: + # Yield the raw tool call chunk directly to the stream + # Safely extract tool call data even if model_dump isn't available + tool_call_data = {} + + if hasattr(tool_call, 'model_dump'): + # Use model_dump if available (OpenAI client) + tool_call_data = tool_call.model_dump() + else: + # Manual extraction if model_dump not available + if hasattr(tool_call, 'id'): + tool_call_data['id'] = tool_call.id + if hasattr(tool_call, 'index'): + tool_call_data['index'] = tool_call.index + if hasattr(tool_call, 'type'): + tool_call_data['type'] = tool_call.type + if hasattr(tool_call, 'function'): + tool_call_data['function'] = {} + if hasattr(tool_call.function, 'name'): + tool_call_data['function']['name'] = tool_call.function.name + if hasattr(tool_call.function, 'arguments'): + # Ensure arguments is a string + tool_call_data['function']['arguments'] = tool_call.function.arguments if isinstance(tool_call.function.arguments, str) else json.dumps(tool_call.function.arguments) + + # Yield the chunk data + yield { + "type": "tool_call_chunk", + "tool_call": tool_call_data + } + + # Log the tool call chunk for debugging + # logger.debug(f"Yielded native tool call chunk: {tool_call_data}") + + if not hasattr(tool_call, 'function'): + continue - if hasattr(tool_call, 'model_dump'): - # Use model_dump if available (OpenAI client) - tool_call_data = tool_call.model_dump() - else: - # Manual extraction if model_dump not available - if hasattr(tool_call, 'id'): - tool_call_data['id'] = tool_call.id - if hasattr(tool_call, 'index'): - tool_call_data['index'] = tool_call.index - if hasattr(tool_call, 'type'): - tool_call_data['type'] = tool_call.type - if hasattr(tool_call, 'function'): - tool_call_data['function'] = {} - if hasattr(tool_call.function, 'name'): - tool_call_data['function']['name'] = tool_call.function.name - if hasattr(tool_call.function, 'arguments'): - tool_call_data['function']['arguments'] = tool_call.function.arguments - - # Yield the chunk data - yield { - "type": "tool_call_chunk", - "tool_call": tool_call_data + idx = tool_call.index if hasattr(tool_call, 'index') else 0 + + # Initialize or update tool call in buffer + if idx not in tool_calls_buffer: + tool_calls_buffer[idx] = { + 'id': tool_call.id if hasattr(tool_call, 'id') and tool_call.id else str(uuid.uuid4()), + 'type': 'function', + 'function': { + 'name': tool_call.function.name if hasattr(tool_call.function, 'name') and tool_call.function.name else None, + 'arguments': '' + } + } + + current_tool = tool_calls_buffer[idx] + if hasattr(tool_call, 'id') and tool_call.id: + current_tool['id'] = tool_call.id + if hasattr(tool_call.function, 'name') and tool_call.function.name: + current_tool['function']['name'] = tool_call.function.name + if hasattr(tool_call.function, 'arguments') and tool_call.function.arguments: + current_tool['function']['arguments'] += tool_call.function.arguments + + # Check if we have a complete tool call + has_complete_tool_call = False + if (current_tool['id'] and + current_tool['function']['name'] and + current_tool['function']['arguments']): + try: + json.loads(current_tool['function']['arguments']) + has_complete_tool_call = True + except json.JSONDecodeError: + pass + + if has_complete_tool_call and config.execute_tools and config.execute_on_stream: + # Execute this tool call + tool_call_data = { + "function_name": current_tool['function']['name'], + "arguments": json.loads(current_tool['function']['arguments']), + "id": current_tool['id'] } - # Log the tool call chunk for debugging - logger.debug(f"Yielded native tool call chunk: {tool_call_data}") + # Create a context for this tool execution + context = self._create_tool_context( + tool_call=tool_call_data, + tool_index=tool_index + ) - if not hasattr(tool_call, 'function'): - continue - - idx = tool_call.index if hasattr(tool_call, 'index') else 0 + # Yield tool execution start message + yield self._yield_tool_started(context) - # Initialize or update tool call in buffer - if idx not in tool_calls_buffer: - tool_calls_buffer[idx] = { - 'id': tool_call.id if hasattr(tool_call, 'id') and tool_call.id else str(uuid.uuid4()), - 'type': 'function', - 'function': { - 'name': tool_call.function.name if hasattr(tool_call.function, 'name') and tool_call.function.name else None, - 'arguments': '' - } - } + # Start tool execution as a background task + execution_task = asyncio.create_task(self._execute_tool(tool_call_data)) - current_tool = tool_calls_buffer[idx] - if hasattr(tool_call, 'id') and tool_call.id: - current_tool['id'] = tool_call.id - if hasattr(tool_call.function, 'name') and tool_call.function.name: - current_tool['function']['name'] = tool_call.function.name - if hasattr(tool_call.function, 'arguments') and tool_call.function.arguments: - current_tool['function']['arguments'] += tool_call.function.arguments + # Store the task for later retrieval + pending_tool_executions.append({ + "task": execution_task, + "tool_call": tool_call_data, + "tool_index": tool_index, + "context": context + }) - # Check if we have a complete tool call - has_complete_tool_call = False - if (current_tool['id'] and - current_tool['function']['name'] and - current_tool['function']['arguments']): - try: - json.loads(current_tool['function']['arguments']) - has_complete_tool_call = True - except json.JSONDecodeError: - pass + # Increment the tool index + tool_index += 1 - if has_complete_tool_call and config.execute_tools and config.execute_on_stream: - # Execute this tool call - tool_call_data = { - "function_name": current_tool['function']['name'], - "arguments": json.loads(current_tool['function']['arguments']), - "id": current_tool['id'] - } - - # Create a context for this tool execution - context = self._create_tool_context( - tool_call=tool_call_data, - tool_index=tool_index - ) - - # Yield tool execution start message - yield self._yield_tool_started(context) - - # Start tool execution as a background task - execution_task = asyncio.create_task(self._execute_tool(tool_call_data)) - - # Store the task for later retrieval - pending_tool_executions.append({ - "task": execution_task, - "tool_call": tool_call_data, - "tool_index": tool_index, - "context": context - }) - - # Increment the tool index - tool_index += 1 - - # Immediately continue processing more chunks - + # Immediately continue processing more chunks + # Check for completed tool executions completed_executions = [] for i, execution in enumerate(pending_tool_executions): @@ -485,6 +494,13 @@ class ResponseProcessor: # Increment tool index for next tool tool_index += 1 + + # Finally, if we detected a finish reason, yield it + if finish_reason: + yield { + "type": "finish", + "finish_reason": finish_reason + } except Exception as e: logger.error(f"Error processing stream: {str(e)}", exc_info=True) @@ -542,7 +558,7 @@ class ResponseProcessor: "type": "function", "function": { "name": tool_call.function.name, - "arguments": tool_call.function.arguments + "arguments": tool_call.function.arguments if isinstance(tool_call.function.arguments, str) else json.dumps(tool_call.function.arguments) } }) @@ -1001,7 +1017,48 @@ class ResponseProcessor: ): """Add a tool result to the thread based on the specified format.""" try: - # Always add results as user or assistant messages, never as 'tool' role + # Check if this is a native function call (has id field) + if "id" in tool_call: + # Format as a proper tool message according to OpenAI spec + function_name = tool_call.get("function_name", "") + + # Format the tool result content - tool role needs string content + if isinstance(result, str): + content = result + elif hasattr(result, 'output'): + # If it's a ToolResult object + if isinstance(result.output, dict) or isinstance(result.output, list): + # If output is already a dict or list, convert to JSON string + content = json.dumps(result.output) + else: + # Otherwise just use the string representation + content = str(result.output) + else: + # Fallback to string representation of the whole result + content = str(result) + + logger.info(f"Formatted tool result content: {content[:100]}...") + + # Create the tool response message with proper format + tool_message = { + "role": "tool", + "tool_call_id": tool_call["id"], + "name": function_name, + "content": content + } + + logger.info(f"Adding native tool result for tool_call_id={tool_call['id']} with role=tool") + + # Add as a tool message + await self.add_message( + thread_id=thread_id, + type="tool", # Special type for tool responses + content=tool_message, + is_llm_message=True + ) + return + + # For XML and other non-native tools, continue with the original logic # Determine message role based on strategy result_role = "user" if strategy == "user_message" else "assistant" @@ -1040,7 +1097,6 @@ class ResponseProcessor: except Exception as e2: logger.error(f"Failed even with fallback message: {str(e2)}", exc_info=True) - def _format_xml_tool_result(self, tool_call: Dict[str, Any], result: ToolResult) -> str: """Format a tool result as an XML tag or plain text. @@ -1060,7 +1116,6 @@ class ResponseProcessor: function_name = tool_call["function_name"] return f"Result for {function_name}: {str(result)}" - # At class level, define a method for yielding tool results def _yield_tool_result(self, context: ToolExecutionContext) -> Dict[str, Any]: """Format and return a tool result message.""" diff --git a/backend/agentpress/thread_manager.py b/backend/agentpress/thread_manager.py index 6952eb42..be87b511 100644 --- a/backend/agentpress/thread_manager.py +++ b/backend/agentpress/thread_manager.py @@ -58,7 +58,7 @@ class ThreadManager: Args: thread_id: The ID of the thread to add the message to. - type: The type of the message (e.g., 'text', 'image_url', 'tool_call'). + type: The type of the message (e.g., 'text', 'image_url', 'tool_call', 'tool', 'user', 'assistant'). content: The content of the message. Can be a dictionary, list, or string. It will be stored as JSONB in the database. is_llm_message: Flag indicating if the message originated from the LLM. @@ -115,7 +115,18 @@ class ThreadManager: logger.error(f"Failed to parse message: {item}") else: messages.append(item) - + + # Ensure tool_calls have properly formatted function arguments + for message in messages: + if message.get('tool_calls'): + for tool_call in message['tool_calls']: + if isinstance(tool_call, dict) and 'function' in tool_call: + # Ensure function.arguments is a string + if 'arguments' in tool_call['function'] and not isinstance(tool_call['function']['arguments'], str): + # Log and fix the issue + logger.warning(f"Found non-string arguments in tool_call, converting to string") + tool_call['function']['arguments'] = json.dumps(tool_call['function']['arguments']) + return messages except Exception as e: @@ -214,7 +225,70 @@ class ThreadManager: tool_choice=tool_choice if processor_config.native_tool_calling else None, stream=stream ) - logger.debug("Successfully received LLM API response") + logger.debug("Successfully received raw LLM API response stream/object") + + # # --- BEGIN ADDED DEBUG LOGGING --- + # async def logging_stream_wrapper(response_stream): + # stream_ended = False + # final_chunk_metadata = None + # last_chunk = None # Store the last received chunk + # try: + # chunk_count = 0 + # async for chunk in response_stream: + # chunk_count += 1 + # last_chunk = chunk # Keep track of the last chunk + + # # Try to access potential finish reason or metadata directly from chunk + # finish_reason = None + # if hasattr(chunk, 'choices') and chunk.choices and hasattr(chunk.choices[0], 'finish_reason'): + # finish_reason = chunk.choices[0].finish_reason + + # logger.debug(f"--> Raw Chunk {chunk_count}: Type={type(chunk)}, FinishReason={finish_reason}, Content={getattr(chunk.choices[0].delta, 'content', None)}, ToolCalls={getattr(chunk.choices[0].delta, 'tool_calls', None)}") + + # # Store metadata if it contains finish_reason + # if finish_reason: + # final_chunk_metadata = {"finish_reason": finish_reason} + # logger.info(f"--> Raw Stream: Detected finish_reason='{finish_reason}' in chunk {chunk_count}") + + # yield chunk + # stream_ended = True + # logger.info(f"--> Raw Stream: Finished iterating naturally after {chunk_count} chunks.") + # except Exception as e: + # logger.error(f"--> Raw Stream: Error during iteration: {str(e)}", exc_info=True) + # stream_ended = True # Assume ended on error + # raise + # finally: + # if not stream_ended: + # logger.warning("--> Raw Stream: Exited wrapper unexpectedly (maybe client stopped iterating?)") + + # # Log the entire last chunk received + # if last_chunk: + # try: + # # Try converting to dict if it's an object with model_dump + # last_chunk_data = last_chunk.model_dump() if hasattr(last_chunk, 'model_dump') else vars(last_chunk) + # logger.info(f"--> Raw Stream: Last Raw Chunk Received: {last_chunk_data}") + # except Exception as log_ex: + # logger.warning(f"--> Raw Stream: Could not serialize last chunk for logging: {log_ex}") + # logger.info(f"--> Raw Stream: Last Raw Chunk (repr): {repr(last_chunk)}") + # else: + # logger.warning("--> Raw Stream: No chunks were received or stored.") + + # # Attempt to get final metadata if stream has an attribute for it (depends on litellm/provider) + # final_metadata = getattr(response_stream, 'response_metadata', {}) + # if final_chunk_metadata: # Prioritize finish_reason found in-stream + # final_metadata.update(final_chunk_metadata) + # logger.info(f"--> Raw Stream: Final Metadata (if available): {final_metadata}") + + # # Wrap the stream only if it's streaming mode + # if stream and hasattr(raw_llm_response, '__aiter__'): + # llm_response = logging_stream_wrapper(raw_llm_response) + # logger.debug("Wrapped raw LLM stream with logging wrapper.") + # else: + # # If not streaming, just use the raw response (might be a dict/object) + # llm_response = raw_llm_response + # logger.debug("Not wrapping non-streaming LLM response.") + # # --- END ADDED DEBUG LOGGING --- + except Exception as e: logger.error(f"Failed to make LLM API call: {str(e)}", exc_info=True) raise diff --git a/backend/services/llm.py b/backend/services/llm.py index 58d0c545..2a16e75d 100644 --- a/backend/services/llm.py +++ b/backend/services/llm.py @@ -19,6 +19,7 @@ import litellm from utils.logger import logger # litellm.set_verbose=True +litellm.modify_params=True # Constants MAX_RETRIES = 3 @@ -311,4 +312,3 @@ if __name__ == "__main__": print("\nāœ… integration test completed successfully!") else: print("\nāŒ Bedrock integration test failed!") - diff --git a/backend/tests/test_simple_tools.py b/backend/tests/test_simple_tools.py index e776792d..6947d7c3 100644 --- a/backend/tests/test_simple_tools.py +++ b/backend/tests/test_simple_tools.py @@ -94,8 +94,8 @@ async def test_streaming_tool_call(): """Test tool calling with streaming to observe behavior.""" # Setup conversation messages = [ - {"role": "system", "content": "You are a helpful assistant with access to file management tools."}, - {"role": "user", "content": "Create an HTML file named hello.html with a simple Hello World message."} + {"role": "system", "content": "You are a helpful assistant with access to file management tools. YOU ALWAYS USE MULTIPLE TOOL FUNCTION CALLS AT ONCE. YOU NEVER USE ONE TOOL FUNCTION CALL AT A TIME."}, + {"role": "user", "content": "Create 10 random files with different extensions and content."} ] print("\n=== Testing streaming tool call ===\n") @@ -105,10 +105,10 @@ async def test_streaming_tool_call(): print("Sending streaming request...") stream_response = await make_llm_api_call( messages=messages, - model_name="gpt-4o", + model_name="anthropic/claude-3-5-sonnet-latest", temperature=0.0, tools=[CREATE_FILE_SCHEMA], - tool_choice={"type": "function", "function": {"name": "create_file"}}, + tool_choice="auto", stream=True ) @@ -123,10 +123,12 @@ async def test_streaming_tool_call(): # Storage for accumulated tool calls tool_calls = [] + last_chunk = None # Variable to store the last chunk # Process each chunk async for chunk in stream_response: chunk_count += 1 + last_chunk = chunk # Keep track of the last chunk # Print chunk number and type print(f"\n--- Chunk {chunk_count} ---") @@ -203,6 +205,24 @@ async def test_streaming_tool_call(): print(f"Error parsing arguments: {str(e)}") else: print("\nNo tool calls accumulated from streaming response.") + + # --- Added logging for last chunk and finish reason --- + finish_reason = None + if last_chunk: + try: + if hasattr(last_chunk, 'choices') and last_chunk.choices: + finish_reason = last_chunk.choices[0].finish_reason + last_chunk_data = last_chunk.model_dump() if hasattr(last_chunk, 'model_dump') else vars(last_chunk) + print("\n--- Last Chunk Received ---") + print(f"Finish Reason: {finish_reason}") + print(f"Raw Last Chunk Data: {json.dumps(last_chunk_data, indent=2)}") + except Exception as log_ex: + print("\n--- Error logging last chunk ---") + print(f"Error: {log_ex}") + print(f"Last Chunk (repr): {repr(last_chunk)}") + else: + print("\n--- No last chunk recorded ---") + # --- End added logging --- except Exception as e: logger.error(f"Error in streaming test: {str(e)}", exc_info=True)