From 6e1739698e2611a3bc93eee972883bb1c57e8e6f Mon Sep 17 00:00:00 2001 From: marko-kraemer Date: Sun, 17 Nov 2024 23:36:37 +0100 Subject: [PATCH] xml v1 wip --- agentpress/examples/example_agent/agent.py | 29 +- .../example_agent/tools/files_tool.py | 8 +- .../example_agent/workspace/index.html | 147 ++++--- .../example_agent/workspace/script.js | 24 ++ .../example_agent/workspace/styles.css | 386 +++++++++--------- agentpress/thread_llm_response_processor.py | 232 ++++++----- agentpress/thread_manager.py | 44 +- agentpress/xml_results_adder.py | 78 ++++ agentpress/xml_tool_executor.py | 131 ++++++ agentpress/xml_tool_parser.py | 189 +++++++++ 10 files changed, 848 insertions(+), 420 deletions(-) create mode 100644 agentpress/examples/example_agent/workspace/script.js create mode 100644 agentpress/xml_results_adder.py create mode 100644 agentpress/xml_tool_executor.py create mode 100644 agentpress/xml_tool_parser.py diff --git a/agentpress/examples/example_agent/agent.py b/agentpress/examples/example_agent/agent.py index 1d6d9b0e..c1e06f18 100644 --- a/agentpress/examples/example_agent/agent.py +++ b/agentpress/examples/example_agent/agent.py @@ -7,6 +7,9 @@ from tools.terminal_tool import TerminalTool import logging from typing import AsyncGenerator import sys +from agentpress.xml_tool_parser import XMLToolParser +from agentpress.xml_tool_executor import XMLToolExecutor +from agentpress.xml_results_adder import XMLResultsAdder async def run_agent(thread_id: str, max_iterations: int = 5): # Initialize managers and tools @@ -47,6 +50,22 @@ async def run_agent(thread_id: str, max_iterations: int = 5): iteration += 1 await pre_iteration() +# You are a world-class web developer who can create, edit, and delete files, and execute terminal commands. You write clean, well-structured code. + +# RESPONSE FORMAT: +# Use XML tags to specify file operations: + +# +# file contents here +# + +# +# updated file contents here +# + +# +# + system_message = { "role": "system", "content": """ @@ -119,10 +138,12 @@ Current development environment workspace state: execute_tools=True, stream=True, immediate_tool_execution=True, - parallel_tool_execution=True + parallel_tool_execution=True, + # tool_parser=XMLToolParser(), + # tool_executor=XMLToolExecutor(parallel=True), + # results_adder=XMLResultsAdder(thread_manager) ) - # Handle streaming response if isinstance(response, AsyncGenerator): print("\n🤖 Assistant is responding:") try: @@ -154,12 +175,10 @@ Current development environment workspace state: else: print("\n❌ Non-streaming response received:", response) - # Call after_iteration without arguments await after_iteration() await finalizer() - if __name__ == "__main__": async def main(): thread_manager = ThreadManager() @@ -169,7 +188,7 @@ if __name__ == "__main__": thread_id, { "role": "user", - "content": "Create a Crypto Trading Bot Platform. Use modern CSS styling. Make it look like FTX, the trusted and 100% safe crypto trading platform." + "content": "Create a modern, responsive landing page with HTML, CSS and JS." } ) diff --git a/agentpress/examples/example_agent/tools/files_tool.py b/agentpress/examples/example_agent/tools/files_tool.py index 2f0d54ff..15281bd0 100644 --- a/agentpress/examples/example_agent/tools/files_tool.py +++ b/agentpress/examples/example_agent/tools/files_tool.py @@ -123,16 +123,16 @@ class FilesTool(Tool): "type": "string", "description": "Path to the file to be created" }, - "content": { + "file_contents": { "type": "string", "description": "The content to write to the file" } }, - "required": ["file_path", "content"] + "required": ["file_path", "file_contents"] } } }) - async def create_file(self, file_path: str, content: str) -> ToolResult: + async def create_file(self, file_path: str, file_contents: str) -> ToolResult: try: full_path = os.path.join(self.workspace, file_path) if os.path.exists(full_path): @@ -140,7 +140,7 @@ class FilesTool(Tool): os.makedirs(os.path.dirname(full_path), exist_ok=True) with open(full_path, 'w') as f: - f.write(content) + f.write(file_contents) await self._update_workspace_state() return self.success_response(f"File '{file_path}' created successfully.") diff --git a/agentpress/examples/example_agent/workspace/index.html b/agentpress/examples/example_agent/workspace/index.html index 7419e105..6ef57d51 100644 --- a/agentpress/examples/example_agent/workspace/index.html +++ b/agentpress/examples/example_agent/workspace/index.html @@ -3,88 +3,87 @@ - CryptoBot Trading Platform + Modern Landing Page + -
- -
-
-
-
-
BTC/USDT
-
ETH/USDT
-
BNB/USDT
-
-
-
Price Chart Placeholder
-
+
+
+
+

Welcome to Our Platform

+

Simplify your workflow with our innovative solution

+ Get Started +
+
+ +
+
+
+ + + + +

Efficiency

+

Streamline your process with our first key feature

- -
-
- - - -
-
- - - - -
+
+ + + + +

Scalability

+

Enhance productivity with our second key feature

-
- -
-

Trading Bots

-
-
-

Trend Following Bot

-
- Performance: +12.5% - Active -
- -
-
-

Arbitrage Bot

-
- Performance: +8.3% - Inactive -
- -
-
-
-
- -
-
-
- +
+ +
+
+

About Us

+

We are dedicated to creating simple, powerful solutions that transform how you work.

+
+
+ +
+
+

Contact Us

+
+ + + + +
+
+
+
+ +
+

© 2023 Your Brand. All rights reserved.

+
\ No newline at end of file diff --git a/agentpress/examples/example_agent/workspace/script.js b/agentpress/examples/example_agent/workspace/script.js new file mode 100644 index 00000000..31b5400a --- /dev/null +++ b/agentpress/examples/example_agent/workspace/script.js @@ -0,0 +1,24 @@ +document.addEventListener('DOMContentLoaded', () => { + const menuToggle = document.querySelector('.menu-toggle'); + const navMenu = document.querySelector('.nav-menu'); + + menuToggle.addEventListener('click', () => { + navMenu.classList.toggle('active'); + menuToggle.classList.toggle('active'); + }); + + const navLinks = document.querySelectorAll('.nav-menu a'); + navLinks.forEach(link => { + link.addEventListener('click', () => { + navMenu.classList.remove('active'); + menuToggle.classList.remove('active'); + }); + }); + + const contactForm = document.querySelector('.contact-form form'); + contactForm.addEventListener('submit', (e) => { + e.preventDefault(); + alert('Thank you for your message! We will get back to you soon.'); + contactForm.reset(); + }); +}); \ No newline at end of file diff --git a/agentpress/examples/example_agent/workspace/styles.css b/agentpress/examples/example_agent/workspace/styles.css index a4622a4a..ae80a9e9 100644 --- a/agentpress/examples/example_agent/workspace/styles.css +++ b/agentpress/examples/example_agent/workspace/styles.css @@ -1,13 +1,8 @@ :root { - --bg-dark: #0b0e11; - --bg-darker: #121619; - --primary-color: #1b2028; - --accent-color: #2968ff; - --text-light: #ffffff; - --text-muted: #8a8f9b; - --green: #02c077; - --red: #f84960; - --border-color: #2c3035; + --primary-color: #3498db; + --secondary-color: #2ecc71; + --text-color: #333; + --background-color: #f4f4f4; } * { @@ -18,230 +13,225 @@ body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Oxygen, Ubuntu, Cantarell, 'Open Sans', 'Helvetica Neue', sans-serif; - background-color: var(--bg-dark); - color: var(--text-light); line-height: 1.6; + color: var(--text-color); + background-color: var(--background-color); } -.app-container { - max-width: 1400px; - margin: 0 auto; - padding: 0 15px; +header { + position: fixed; + width: 100%; + background-color: white; + box-shadow: 0 2px 4px rgba(0,0,0,0.1); + z-index: 1000; } -.navbar { +nav { display: flex; justify-content: space-between; align-items: center; - padding: 20px 0; - border-bottom: 1px solid var(--primary-color); + padding: 1rem 5%; + max-width: 1200px; + margin: 0 auto; } -.navbar .logo { - font-size: 24px; +nav .logo { + font-size: 1.5rem; font-weight: bold; - color: var(--accent-color); + color: var(--primary-color); } -.navbar nav ul { +nav ul { display: flex; list-style: none; } -.navbar nav ul li { - margin: 0 15px; +nav ul li { + margin-left: 1.5rem; } -.navbar nav ul li a { - color: var(--text-light); +nav ul li a { text-decoration: none; + color: var(--text-color); transition: color 0.3s ease; } -.navbar nav ul li a:hover { - color: var(--accent-color); +nav ul li a:hover { + color: var(--primary-color); } -.user-actions { - display: flex; - gap: 10px; -} - -.login-btn, .signup-btn { - padding: 8px 16px; - border: none; - border-radius: 4px; - cursor: pointer; - transition: background-color 0.3s ease; -} - -.login-btn { - background-color: var(--primary-color); - color: var(--text-light); -} - -.signup-btn { - background-color: var(--accent-color); - color: var(--text-light); -} - -.dashboard { - display: grid; - grid-template-columns: 3fr 1fr; - gap: 20px; - margin-top: 20px; -} - -.trading-panel { - background-color: var(--bg-darker); - border-radius: 8px; - padding: 20px; -} - -.market-overview { - margin-bottom: 20px; -} - -.crypto-pairs { - display: flex; - gap: 15px; - margin-bottom: 15px; -} - -.pair { - padding: 8px 15px; - background-color: var(--primary-color); - border-radius: 4px; - cursor: pointer; - opacity: 0.7; - transition: opacity 0.3s ease; -} - -.pair.active { - opacity: 1; - background-color: var(--accent-color); -} - -.price-chart { - background-color: var(--bg-dark); - height: 300px; +main { + max-width: 1200px; + margin: 0 auto; + padding: 0 5%; +} + +.hero { display: flex; + align-items: center; justify-content: center; - align-items: center; - border-radius: 8px; -} - -.trading-interface { - background-color: var(--bg-dark); - border-radius: 8px; - padding: 20px; -} - -.order-types { - display: flex; - gap: 15px; - margin-bottom: 20px; -} - -.order-type { - padding: 10px 20px; - background-color: var(--primary-color); - border: none; - color: var(--text-light); - border-radius: 4px; - cursor: pointer; - transition: background-color 0.3s ease; -} - -.order-type.active { - background-color: var(--accent-color); -} - -.order-form { - display: grid; - grid-template-columns: 1fr 1fr; - gap: 15px; -} - -.amount-input, .price-input { - padding: 10px; - background-color: var(--primary-color); - border: none; - color: var(--text-light); - border-radius: 4px; -} - -.buy-btn, .sell-btn { - padding: 12px; - border: none; - border-radius: 4px; - color: var(--text-light); - cursor: pointer; - transition: opacity 0.3s ease; -} - -.buy-btn { - background-color: green; -} - -.sell-btn { - background-color: red; -} - -.bot-management { - background-color: var(--bg-darker); - border-radius: 8px; - padding: 20px; -} - -.bot-list { - display: grid; - gap: 15px; -} - -.bot-card { - background-color: var(--bg-dark); - padding: 15px; - border-radius: 8px; - display: flex; - justify-content: space-between; - align-items: center; -} - -.bot-stats { - display: flex; - flex-direction: column; - align-items: flex-end; -} - -.configure-bot { - background-color: var(--accent-color); - color: var(--text-light); - border: none; - padding: 8px 15px; - border-radius: 4px; - cursor: pointer; -} - -footer { - margin-top: 20px; - padding: 20px 0; - border-top: 1px solid var(--primary-color); + height: 100vh; text-align: center; } -.legal-links { - margin-top: 10px; +.hero-content h1 { + font-size: 3rem; + margin-bottom: 1rem; + color: var(--primary-color); } -.legal-links a { - color: var(--text-muted); +.hero-content p { + font-size: 1.2rem; + margin-bottom: 2rem; +} + +.cta-button { + display: inline-block; + padding: 0.75rem 1.5rem; + background-color: var(--primary-color); + color: white; text-decoration: none; - margin: 0 10px; + border-radius: 5px; + transition: background-color 0.3s ease; } -@media (max-width: 1024px) { - .dashboard { +.cta-button:hover { + background-color: #2980b9; +} + +.features { + padding: 4rem 0; +} + +.feature-grid { + display: grid; + grid-template-columns: repeat(3, 1fr); + gap: 2rem; +} + +.feature { + text-align: center; + padding: 2rem; + background-color: white; + border-radius: 10px; + box-shadow: 0 4px 6px rgba(0,0,0,0.1); +} + +.feature h3 { + margin-bottom: 1rem; + color: var(--primary-color); +} + +.about, .contact { + padding: 4rem 0; + text-align: center; +} + +.contact-form { + max-width: 500px; + margin: 0 auto; +} + +.contact-form input, +.contact-form textarea { + width: 100%; + padding: 0.75rem; + margin-bottom: 1rem; + border: 1px solid #ddd; + border-radius: 5px; +} + +.contact-form button { + background-color: var(--secondary-color); + color: white; + border: none; + padding: 0.75rem 1.5rem; + border-radius: 5px; + cursor: pointer; + transition: background-color 0.3s ease; +} + +.contact-form button:hover { + background-color: #27ae60; +} + +footer { + background-color: var(--text-color); + color: white; + text-align: center; + padding: 1rem; +} + +@media screen and (max-width: 768px) { + .menu-toggle { + display: block; + cursor: pointer; + } + + .nav-menu { + display: none; + flex-direction: column; + width: 100%; + position: absolute; + top: 60px; + left: 0; + background-color: white; + padding: 1rem; + box-shadow: 0 2px 5px rgba(0,0,0,0.1); + } + + .nav-menu.active { + display: flex; + } + + .nav-menu li { + margin: 0.5rem 0; + text-align: center; + } + + .feature-grid { grid-template-columns: 1fr; } + + .hero-content h1 { + font-size: 2rem; + } +} + +.menu-toggle { + display: none; + flex-direction: column; + width: 25px; +} + +.menu-toggle span { + height: 3px; + width: 100%; + background-color: var(--text-color); + margin: 3px 0; + transition: 0.4s; +} + +.feature-icon { + width: 50px; + height: 50px; + stroke: var(--primary-color); + margin-bottom: 1rem; +} + +@keyframes fadeIn { + from { opacity: 0; transform: translateY(20px); } + to { opacity: 1; transform: translateY(0); } +} + +.feature { + animation: fadeIn 0.5s ease-out; +} + +.feature:nth-child(2) { + animation-delay: 0.2s; +} + +.feature:nth-child(3) { + animation-delay: 0.4s; } \ No newline at end of file diff --git a/agentpress/thread_llm_response_processor.py b/agentpress/thread_llm_response_processor.py index 9d30708c..6e3db91a 100644 --- a/agentpress/thread_llm_response_processor.py +++ b/agentpress/thread_llm_response_processor.py @@ -356,36 +356,77 @@ class StandardToolExecutor(ToolExecutorBase): return results +# --- Results Adder Base --- + +class ResultsAdderBase(ABC): + """Abstract base class for handling tool results and message processing.""" + + def __init__(self, thread_manager): + """Initialize with a ThreadManager instance. + + Args: + thread_manager: The ThreadManager instance to use for message operations + """ + self.add_message = thread_manager.add_message + self.update_message = thread_manager._update_message + self.list_messages = thread_manager.list_messages + self.message_added = False + + @abstractmethod + async def add_initial_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): + pass + + @abstractmethod + async def update_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): + pass + + @abstractmethod + async def add_tool_result(self, thread_id: str, result: Dict[str, Any]): + pass + +# --- Standard Results Adder Implementation --- + +class StandardResultsAdder(ResultsAdderBase): + """Standard implementation for handling tool results and message processing.""" + + def __init__(self, thread_manager): + """Initialize with ThreadManager instance.""" + super().__init__(thread_manager) # Use base class initialization + + async def add_initial_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): + message = { + "role": "assistant", + "content": content + } + if tool_calls: + message["tool_calls"] = tool_calls + + await self.add_message(thread_id, message) + self.message_added = True + + async def update_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): + if not self.message_added: + await self.add_initial_response(thread_id, content, tool_calls) + return + + message = { + "role": "assistant", + "content": content + } + if tool_calls: + message["tool_calls"] = tool_calls + + await self.update_message(thread_id, message) + + async def add_tool_result(self, thread_id: str, result: Dict[str, Any]): + messages = await self.list_messages(thread_id) + if not any(msg.get('tool_call_id') == result['tool_call_id'] for msg in messages): + await self.add_message(thread_id, result) + # --- Response Processor --- -@dataclass -class ProcessedResponse: - """Container for processed LLM response data.""" - content: str - tool_calls: Optional[List[Dict[str, Any]]] = None - tool_results: Optional[List[Dict[str, Any]]] = None - class StandardLLMResponseProcessor: - """Handles LLM response processing and tool execution management. - - This class coordinates the parsing of LLM responses and execution of tool calls, - managing state and message flow throughout the conversation. - - Attributes: - thread_id (str): Current thread identifier - tool_executor (StandardToolExecutor): Tool execution handler - tool_parser (StandardToolParser): Response parsing handler - available_functions (Dict): Available tool functions - add_message (Callable): Callback for adding messages - update_message (Callable): Callback for updating messages - list_messages (Callable): Callback for listing messages - threads_dir (str): Directory for thread storage - tool_calls_buffer (Dict): Buffer for incomplete tool calls - processed_tool_calls (Set): Set of executed tool call IDs - content_buffer (str): Buffer for accumulated content - tool_calls_accumulated (List): List of accumulated tool calls - message_added (bool): Flag for message addition status - """ + """Handles LLM response processing and tool execution management.""" def __init__( self, @@ -395,23 +436,35 @@ class StandardLLMResponseProcessor: update_message_callback: Callable = None, list_messages_callback: Callable = None, parallel_tool_execution: bool = True, - threads_dir: str = "threads" + threads_dir: str = "threads", + tool_parser: Optional[ToolParserBase] = None, + tool_executor: Optional[ToolExecutorBase] = None, + results_adder: Optional[ResultsAdderBase] = None, + thread_manager = None # Add thread_manager parameter ): self.thread_id = thread_id - self.tool_executor = StandardToolExecutor(parallel=parallel_tool_execution) - self.tool_parser = StandardToolParser() + self.tool_executor = tool_executor or StandardToolExecutor(parallel=parallel_tool_execution) + self.tool_parser = tool_parser or StandardToolParser() self.available_functions = available_functions or {} - self.add_message = add_message_callback - self.update_message = update_message_callback - self.list_messages = list_messages_callback self.threads_dir = threads_dir + # Create a minimal thread manager if none provided + if thread_manager is None and (add_message_callback and update_message_callback and list_messages_callback): + class MinimalThreadManager: + def __init__(self, add_msg, update_msg, list_msg): + self.add_message = add_msg + self._update_message = update_msg + self.list_messages = list_msg + thread_manager = MinimalThreadManager(add_message_callback, update_message_callback, list_messages_callback) + + # Initialize results adder + self.results_adder = results_adder or StandardResultsAdder(thread_manager) + # State tracking for streaming responses self.tool_calls_buffer = {} self.processed_tool_calls = set() self.content_buffer = "" self.tool_calls_accumulated = [] - self.message_added = False async def process_stream( self, @@ -419,23 +472,17 @@ class StandardLLMResponseProcessor: execute_tools: bool = True, immediate_execution: bool = True ) -> AsyncGenerator: - """ - Process streaming LLM response and handle tool execution. - Yields chunks immediately while managing message state and tool execution efficiently. - """ + """Process streaming LLM response and handle tool execution.""" pending_tool_calls = [] background_tasks = set() - tool_results = [] # Track tool results async def handle_message_management(chunk): try: - nonlocal tool_results - # Accumulate content if hasattr(chunk.choices[0].delta, 'content') and chunk.choices[0].delta.content: self.content_buffer += chunk.choices[0].delta.content - # Parse tool calls only if present in chunk + # Parse tool calls if present if hasattr(chunk.choices[0].delta, 'tool_calls'): parsed_message, is_complete = await self.tool_parser.parse_stream( chunk, @@ -451,26 +498,21 @@ class StandardLLMResponseProcessor: if tool_call['id'] not in self.processed_tool_calls ] - if new_tool_calls and immediate_execution: - results = await self.tool_executor.execute_tool_calls( - tool_calls=new_tool_calls, - available_functions=self.available_functions, - thread_id=self.thread_id, - executed_tool_calls=self.processed_tool_calls - ) - tool_results.extend(results) - for result in results: - self.processed_tool_calls.add(result['tool_call_id']) - elif new_tool_calls: - pending_tool_calls.extend(new_tool_calls) + if new_tool_calls: + if immediate_execution: + results = await self.tool_executor.execute_tool_calls( + tool_calls=new_tool_calls, + available_functions=self.available_functions, + thread_id=self.thread_id, + executed_tool_calls=self.processed_tool_calls + ) + for result in results: + await self.results_adder.add_tool_result(self.thread_id, result) + self.processed_tool_calls.add(result['tool_call_id']) + else: + pending_tool_calls.extend(new_tool_calls) - for result in tool_results: - if not any(msg.get('tool_call_id') == result['tool_call_id'] - for msg in await self.list_messages(self.thread_id)): - await self.add_message(self.thread_id, result) - tool_results = [] # Clear processed results - - # Then add/update assistant message + # Add/update assistant message message = { "role": "assistant", "content": self.content_buffer @@ -478,11 +520,19 @@ class StandardLLMResponseProcessor: if self.tool_calls_accumulated: message["tool_calls"] = self.tool_calls_accumulated - if not self.message_added: - await self.add_message(self.thread_id, message) - self.message_added = True + if not hasattr(self, '_message_added'): + await self.results_adder.add_initial_response( + self.thread_id, + self.content_buffer, + self.tool_calls_accumulated + ) + self._message_added = True else: - await self.update_message(self.thread_id, message) + await self.results_adder.update_response( + self.thread_id, + self.content_buffer, + self.tool_calls_accumulated + ) # Handle stream completion if chunk.choices[0].finish_reason: @@ -494,50 +544,39 @@ class StandardLLMResponseProcessor: executed_tool_calls=self.processed_tool_calls ) for result in results: - await self.add_message(self.thread_id, result) + await self.results_adder.add_tool_result(self.thread_id, result) self.processed_tool_calls.add(result['tool_call_id']) pending_tool_calls.clear() except Exception as e: logging.error(f"Error in background task: {e}") - return try: async for chunk in response_stream: - # Create and track background task task = asyncio.create_task(handle_message_management(chunk)) background_tasks.add(task) task.add_done_callback(background_tasks.discard) - - # Immediately yield the chunk yield chunk - # Wait for all background tasks to complete if background_tasks: await asyncio.gather(*background_tasks, return_exceptions=True) except Exception as e: logging.error(f"Error in stream processing: {e}") - # Clean up any remaining background tasks for task in background_tasks: if not task.done(): task.cancel() raise - async def process_response( - self, - response: Any, - execute_tools: bool = True - ) -> None: - """ - Process complete LLM response and execute tools. - - Handles non-streaming responses, parsing the complete response and - executing any tool calls according to the configured execution strategy. - """ + async def process_response(self, response: Any, execute_tools: bool = True) -> None: + """Process complete LLM response and execute tools.""" try: assistant_message = await self.tool_parser.parse_response(response) - await self.add_message(self.thread_id, assistant_message) + await self.results_adder.add_initial_response( + self.thread_id, + assistant_message['content'], + assistant_message.get('tool_calls') + ) if execute_tools and 'tool_calls' in assistant_message and assistant_message['tool_calls']: results = await self.tool_executor.execute_tool_calls( @@ -548,31 +587,10 @@ class StandardLLMResponseProcessor: ) for result in results: - await self.add_message(self.thread_id, result) + await self.results_adder.add_tool_result(self.thread_id, result) logging.info(f"Tool execution result: {result}") except Exception as e: logging.error(f"Error processing response: {e}") response_content = response.choices[0].message.get('content', '') - await self.add_message(self.thread_id, { - "role": "assistant", - "content": response_content or "" - }) - -class ThreadManager: - """Manages conversation threads with LLM models and tool execution. - - The ThreadManager provides comprehensive conversation management, handling - message threading, tool registration, and LLM interactions. - - Attributes: - threads_dir (str): Directory for storing thread files - tool_registry (ToolRegistry): Registry for managing available tools - - Key Features: - - Thread creation and management - - Message handling with support for text and images - - Tool registration and execution - - LLM interaction with streaming support - - Error handling and cleanup - """ + await self.results_adder.add_initial_response(self.thread_id, response_content) diff --git a/agentpress/thread_manager.py b/agentpress/thread_manager.py index a5c33486..b7fe9927 100644 --- a/agentpress/thread_manager.py +++ b/agentpress/thread_manager.py @@ -6,6 +6,9 @@ from agentpress.llm import make_llm_api_call from agentpress.tool import Tool, ToolResult from agentpress.tool_registry import ToolRegistry from agentpress.thread_llm_response_processor import StandardLLMResponseProcessor +from agentpress.thread_llm_response_processor import ToolParserBase +from agentpress.thread_llm_response_processor import ToolExecutorBase +from agentpress.thread_llm_response_processor import ResultsAdderBase import uuid class ThreadManager: @@ -218,45 +221,21 @@ class ThreadManager: execute_tools: bool = True, stream: bool = False, immediate_tool_execution: bool = True, - parallel_tool_execution: bool = True + parallel_tool_execution: bool = True, + tool_parser: Optional[ToolParserBase] = None, + tool_executor: Optional[ToolExecutorBase] = None, + results_adder: Optional[ResultsAdderBase] = None ) -> Union[Dict[str, Any], AsyncGenerator]: - """Run a conversation thread with specified parameters. - - Executes a conversation turn with the LLM, handling tool execution - and response processing based on the provided configuration. - - Args: - thread_id: Target thread identifier - system_message: System context message - model_name: LLM model identifier - temperature: Model temperature setting - max_tokens: Maximum response length - tool_choice: Tool selection mode - temporary_message: Optional temporary context - use_tools: Enable tool usage - execute_tools: Enable tool execution - stream: Enable response streaming - immediate_tool_execution: Execute tools immediately - parallel_tool_execution: Enable parallel execution - - Returns: - Union[Dict[str, Any], AsyncGenerator]: Response data or stream - - Raises: - Exception: For execution failures - """ + """Run a conversation thread with specified parameters.""" try: - # Get thread messages and prepare for LLM call messages = await self.list_messages(thread_id) prepared_messages = [system_message] + messages if temporary_message: prepared_messages.append(temporary_message) - # Configure tools if enabled tools = self.tool_registry.get_all_tool_schemas() if use_tools else None available_functions = self.tool_registry.get_available_functions() if use_tools else {} - # Initialize response processor with list_messages callback response_processor = StandardLLMResponseProcessor( thread_id=thread_id, available_functions=available_functions, @@ -264,10 +243,12 @@ class ThreadManager: update_message_callback=self._update_message, list_messages_callback=self.list_messages, parallel_tool_execution=parallel_tool_execution, - threads_dir=self.threads_dir + threads_dir=self.threads_dir, + tool_parser=tool_parser, # Use provided parser or default to Standard + tool_executor=tool_executor, # Use provided executor or default to Standard + results_adder=results_adder # Use provided adder or default to Standard ) - # Get LLM response llm_response = await self._run_thread_completion( messages=prepared_messages, model_name=model_name, @@ -285,7 +266,6 @@ class ThreadManager: immediate_execution=immediate_tool_execution ) - # Process non-streaming response await response_processor.process_response( response=llm_response, execute_tools=execute_tools diff --git a/agentpress/xml_results_adder.py b/agentpress/xml_results_adder.py new file mode 100644 index 00000000..7dd28342 --- /dev/null +++ b/agentpress/xml_results_adder.py @@ -0,0 +1,78 @@ +import logging +from typing import Dict, Any, List, Optional +from agentpress.thread_llm_response_processor import ResultsAdderBase + +class XMLResultsAdder(ResultsAdderBase): + """XML-specific implementation for handling tool results and message processing. + + This implementation combines tool calls and their results into a single XML-formatted + message, avoiding the need for separate tool_calls and tool_results messages. + """ + + def __init__(self, thread_manager): + super().__init__(thread_manager) + self.pending_tool_results = {} + + def _format_xml_response(self, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None) -> str: + """Format the response content with XML tool results.""" + response_parts = [] + + # Add any non-XML content first + non_xml_content = [] + lines = content.split('\n') + for line in lines: + if not (line.strip().startswith('<') and line.strip().endswith('>')): + non_xml_content.append(line) + if non_xml_content: + response_parts.append('\n'.join(non_xml_content)) + + # Add XML blocks with their results + if tool_calls: + for tool_call in tool_calls: + tool_id = tool_call['id'] + if tool_id in self.pending_tool_results: + result = self.pending_tool_results[tool_id] + response_parts.append( + f"\n" + f"{result}\n" + f"" + ) + + return '\n\n'.join(response_parts) + + async def add_initial_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): + """Add initial response with XML formatting.""" + formatted_content = self._format_xml_response(content, tool_calls) + message = { + "role": "assistant", + "content": formatted_content + } + await self.add_message(thread_id, message) + self.message_added = True + + async def update_response(self, thread_id: str, content: str, tool_calls: Optional[List[Dict[str, Any]]] = None): + """Update response with XML formatting.""" + if not self.message_added: + await self.add_initial_response(thread_id, content, tool_calls) + return + + formatted_content = self._format_xml_response(content, tool_calls) + message = { + "role": "assistant", + "content": formatted_content + } + await self.update_message(thread_id, message) + + async def add_tool_result(self, thread_id: str, result: Dict[str, Any]): + """Store tool result for inclusion in the XML response.""" + tool_call_id = result['tool_call_id'] + self.pending_tool_results[tool_call_id] = result['content'] + + # Update the message to include the new result + messages = await self.list_messages(thread_id) + for msg in reversed(messages): + if msg['role'] == 'assistant': + content = msg['content'] + tool_calls = msg.get('tool_calls', []) + await self.update_response(thread_id, content, tool_calls) + break \ No newline at end of file diff --git a/agentpress/xml_tool_executor.py b/agentpress/xml_tool_executor.py new file mode 100644 index 00000000..90393577 --- /dev/null +++ b/agentpress/xml_tool_executor.py @@ -0,0 +1,131 @@ +from typing import List, Dict, Any, Set, Callable, Optional +import asyncio +import json +import logging +from agentpress.thread_llm_response_processor import ToolExecutorBase +from agentpress.tool import ToolResult + +class XMLToolExecutor(ToolExecutorBase): + def __init__(self, parallel: bool = True): + self.parallel = parallel + + async def execute_tool_calls( + self, + tool_calls: List[Dict[str, Any]], + available_functions: Dict[str, Callable], + thread_id: str, + executed_tool_calls: Optional[Set[str]] = None + ) -> List[Dict[str, Any]]: + if executed_tool_calls is None: + executed_tool_calls = set() + + if self.parallel: + return await self._execute_parallel( + tool_calls, + available_functions, + thread_id, + executed_tool_calls + ) + else: + return await self._execute_sequential( + tool_calls, + available_functions, + thread_id, + executed_tool_calls + ) + + async def _execute_parallel( + self, + tool_calls: List[Dict[str, Any]], + available_functions: Dict[str, Callable], + thread_id: str, + executed_tool_calls: Set[str] + ) -> List[Dict[str, Any]]: + async def execute_single_tool(tool_call: Dict[str, Any]) -> Dict[str, Any]: + if tool_call['id'] in executed_tool_calls: + return None + + try: + function_name = tool_call['function']['name'] + function_args = tool_call['function']['arguments'] + if isinstance(function_args, str): + function_args = json.loads(function_args) + + function_to_call = available_functions.get(function_name) + if not function_to_call: + error_msg = f"Function {function_name} not found" + logging.error(error_msg) + return { + "role": "tool", + "tool_call_id": tool_call['id'], + "name": function_name, + "content": str(ToolResult(success=False, output=error_msg)) + } + + result = await function_to_call(**function_args) + executed_tool_calls.add(tool_call['id']) + + return { + "role": "tool", + "tool_call_id": tool_call['id'], + "name": function_name, + "content": str(result) + } + except Exception as e: + error_msg = f"Error executing {function_name}: {str(e)}" + logging.error(error_msg) + return { + "role": "tool", + "tool_call_id": tool_call['id'], + "name": function_name, + "content": str(ToolResult(success=False, output=error_msg)) + } + + tasks = [execute_single_tool(tool_call) for tool_call in tool_calls] + results = await asyncio.gather(*tasks) + return [r for r in results if r is not None] + + async def _execute_sequential( + self, + tool_calls: List[Dict[str, Any]], + available_functions: Dict[str, Callable], + thread_id: str, + executed_tool_calls: Set[str] + ) -> List[Dict[str, Any]]: + results = [] + for tool_call in tool_calls: + if tool_call['id'] in executed_tool_calls: + continue + + try: + function_name = tool_call['function']['name'] + function_args = tool_call['function']['arguments'] + if isinstance(function_args, str): + function_args = json.loads(function_args) + + function_to_call = available_functions.get(function_name) + if not function_to_call: + error_msg = f"Function {function_name} not found" + logging.error(error_msg) + result = ToolResult(success=False, output=error_msg) + else: + result = await function_to_call(**function_args) + executed_tool_calls.add(tool_call['id']) + + results.append({ + "role": "tool", + "tool_call_id": tool_call['id'], + "name": function_name, + "content": str(result) + }) + except Exception as e: + error_msg = f"Error executing {function_name}: {str(e)}" + logging.error(error_msg) + results.append({ + "role": "tool", + "tool_call_id": tool_call['id'], + "name": function_name, + "content": str(ToolResult(success=False, output=error_msg)) + }) + + return results \ No newline at end of file diff --git a/agentpress/xml_tool_parser.py b/agentpress/xml_tool_parser.py new file mode 100644 index 00000000..d09b34a4 --- /dev/null +++ b/agentpress/xml_tool_parser.py @@ -0,0 +1,189 @@ +import logging +from typing import Dict, Any, Optional +from agentpress.thread_llm_response_processor import ToolParserBase +import json +import re + +class XMLToolParser(ToolParserBase): + def __init__(self): + self.current_tag = None + self.current_content = [] + self.file_path = None + + async def parse_response(self, response: Any) -> Dict[str, Any]: + response_message = response.choices[0].message + content = response_message.get('content') or "" + + message = { + "role": "assistant", + "content": content, + } + + tool_calls = [] + try: + xml_chunks = self._extract_xml_chunks(content) + for xml_chunk in xml_chunks: + tool_call = self._parse_xml_to_tool_call(xml_chunk) + if tool_call: + tool_calls.append(tool_call) + + if tool_calls: + message["tool_calls"] = tool_calls + + except Exception as e: + logging.error(f"Error parsing XML response: {e}") + + return message + + async def parse_stream(self, response_chunk: Any, tool_calls_buffer: Dict[int, Dict]) -> tuple[Optional[Dict[str, Any]], bool]: + content_chunk = "" + is_complete = False + + if hasattr(response_chunk.choices[0], 'delta'): + delta = response_chunk.choices[0].delta + + if hasattr(delta, 'content') and delta.content: + content_chunk = delta.content + tool_calls_buffer.setdefault('xml_buffer', '') + tool_calls_buffer['xml_buffer'] += content_chunk + + # Process any complete XML tags + tool_calls = self._process_streaming_xml(tool_calls_buffer['xml_buffer']) + if tool_calls: + # Clear processed content from buffer + last_end_tag = max( + tool_calls_buffer['xml_buffer'].rfind(''), + tool_calls_buffer['xml_buffer'].rfind(''), + tool_calls_buffer['xml_buffer'].rfind('') + ) + if last_end_tag > -1: + tool_calls_buffer['xml_buffer'] = tool_calls_buffer['xml_buffer'][last_end_tag + 1:] + + return { + "role": "assistant", + "content": content_chunk, + "tool_calls": tool_calls + }, is_complete + + if hasattr(response_chunk.choices[0], 'finish_reason') and response_chunk.choices[0].finish_reason: + is_complete = True + if 'xml_buffer' in tool_calls_buffer: + tool_calls = self._process_streaming_xml(tool_calls_buffer['xml_buffer']) + if tool_calls: + return { + "role": "assistant", + "content": content_chunk, + "tool_calls": tool_calls + }, is_complete + + return None, is_complete + + def _process_streaming_xml(self, content: str) -> list[Dict[str, Any]]: + tool_calls = [] + + # Find complete XML tags + start_tags = ['', '', ''] + + for start_tag in start_tags: + start_idx = content.find(start_tag) + if start_idx >= 0: + # Find corresponding end tag + tag_type = start_tag[1:] # Remove '<' + end_tag = f"" + end_idx = content.find(end_tag, start_idx) + + if end_idx >= 0: + # Extract complete XML chunk + xml_chunk = content[start_idx:end_idx + len(end_tag)] + try: + tool_call = self._parse_xml_to_tool_call(xml_chunk) + if tool_call: + tool_calls.append(tool_call) + except Exception as e: + logging.error(f"Error parsing streaming XML chunk: {e}") + + return tool_calls + + def _extract_xml_chunks(self, content: str) -> list[str]: + chunks = [] + current_chunk = [] + in_tag = False + + lines = content.split('\n') + for line in lines: + if any(tag in line for tag in ['', '', '']): + chunks.append('\n'.join(current_chunk)) + current_chunk = [] + in_tag = False + + if current_chunk and in_tag: + chunks.append('\n'.join(current_chunk)) + + return chunks + + def _parse_xml_to_tool_call(self, xml_chunk: str) -> Optional[Dict[str, Any]]: + try: + # Extract file path from the opening tag + file_path_match = re.search(r'file_path="([^"]+)"', xml_chunk) + if not file_path_match: + return None + + file_path = file_path_match.group(1) + + # Extract content between tags + content_match = re.search(r'>(.*?)]+>$', xml_chunk, re.DOTALL) + if not content_match: + return None + + content = content_match.group(1).strip() + + # Determine operation type + if '