This commit is contained in:
marko-kraemer 2024-10-23 04:16:35 +02:00
parent 8a407efc27
commit 4362334a53
7 changed files with 170 additions and 323 deletions

View File

@ -1,6 +1,4 @@
dir_base_path=/Users/markokraemer/Projects/agentpress
OPENAI_API_KEY= OPENAI_API_KEY=
ANTHROPIC_API_KEY= ANTHROPIC_API_KEY=
GROQ_API_KEY= GROQ_API_KEY=
database_url=
api_key=
dir_base_path=/Users/markokraemer/Projects/agentpress

237
README.md
View File

@ -1,171 +1,107 @@
# AgentPress # AgentPress: Messages[] API on Steroids with Threads & Automatic Tool Execution
AgentPress is a powerful framework for creating AI agents, with the ThreadManager at its core. This system simplifies the process of building, configuring, and running AI agents that can engage in conversations, perform tasks, and interact with various tools. AgentPress is a lightweight, powerful utility for kickstarting your LLM App or AI Agent. It provides a simple way to manage message threads, execute LLM calls, and automatically handle tool interactions.
## Key Concept: ThreadManager ## Key Features
The ThreadManager is the central component of AgentPress. It manages conversation threads, handles tool integrations, and coordinates the execution of AI models. Here's why it's crucial: - **Thread Management**: Easily create, update, and manage message threads.
- **Automatic Tool Execution**: Define tools as Python classes and have them automatically called by the LLM.
- **Flexible LLM Integration**: Uses LiteLLM under the hood, allowing easy switching between different LLM providers.
1. **Conversation Management**: It creates and manages threads, allowing for coherent multi-turn conversations. ## Quick Start
2. **Tool Integration**: It integrates various tools that the AI can use to perform tasks.
3. **Model Execution**: It handles the execution of AI models, managing the context and responses.
4. **State Management**: It maintains the state of conversations and tool executions across multiple turns.
## How It Works 1. Clone the repository:
```
git clone https://github.com/your-username/agentpress.git
cd agentpress
```
1. **Create a ThreadManager**: This is your first step in using AgentPress. 2. Install dependencies:
2. **Add Tools**: Register any tools your agent might need. ```
3. **Create a Thread**: Each conversation or task execution is managed in a thread. pip install -r requirements.txt
4. **Run the Thread**: Execute the AI model within the context of the thread, optionally using tools. ```
## Standalone Example 3. Set up your environment variables (API keys, etc.) in a `.env` file.
Here's how to use the ThreadManager standalone: 4. Create a simple tool:
```python
from agentpress.tool import Tool, ToolResult, tool_schema
```python class CalculatorTool(Tool):
import asyncio @tool_schema({
from agentpress.thread_manager import ThreadManager "name": "add",
from tools.files_tool import FilesTool "description": "Add two numbers",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "number"},
"b": {"type": "number"}
},
"required": ["a", "b"]
}
})
async def add(self, a: float, b: float) -> ToolResult:
return self.success_response(f"The sum is {a + b}")
```
async def main(): 5. Use the ThreadManager to run a conversation:
# Create a ThreadManager instance ```python
thread_manager = ThreadManager() import asyncio
from agentpress.thread_manager import ThreadManager
# Add a tool async def main():
thread_manager.add_tool(FilesTool) manager = ThreadManager()
manager.add_tool(CalculatorTool)
thread_id = await manager.create_thread()
await manager.add_message(thread_id, {"role": "user", "content": "What's 2 + 2?"})
system_message = {"role": "system", "content": "You are a helpful assistant with calculation abilities."}
response = await manager.run_thread(
thread_id=thread_id,
system_message=system_message,
model_name="gpt-4o",
execute_model_tool_calls=True
)
print("Response:", response)
# Create a new thread asyncio.run(main())
thread_id = await thread_manager.create_thread() ```
# Add an initial message to the thread
await thread_manager.add_message(thread_id, {"role": "user", "content": "Create a file named 'hello.txt' with the content 'Hello, World!'"})
# Run the thread 6. Create an autonomous agent with multiple iterations:
response = await thread_manager.run_thread( ```python
thread_id=thread_id, import asyncio
system_message={"role": "system", "content": "You are a helpful assistant that can create and manage files."}, from agentpress.thread_manager import ThreadManager
model_name="gpt-4", from tools.files_tool import FilesTool
temperature=0.7,
max_tokens=150,
tool_choice="auto"
)
# Print the response async def run_autonomous_agent(max_iterations=5):
print(response) thread_manager = ThreadManager()
thread_id = await thread_manager.create_thread()
thread_manager.add_tool(FilesTool)
# You can continue the conversation by adding more messages and running the thread again system_message = {"role": "system", "content": "You are a helpful assistant that can create, read, update, and delete files."}
await thread_manager.add_message(thread_id, {"role": "user", "content": "Now read the contents of 'hello.txt'"})
response = await thread_manager.run_thread( for iteration in range(max_iterations):
thread_id=thread_id, print(f"Iteration {iteration + 1}/{max_iterations}")
system_message={"role": "system", "content": "You are a helpful assistant that can create and manage files."},
model_name="gpt-4", await thread_manager.add_message(thread_id, {"role": "user", "content": "Continue!"})
temperature=0.7,
max_tokens=150,
tool_choice="auto"
)
print(response) response = await thread_manager.run_thread(
thread_id=thread_id,
system_message=system_message,
model_name="anthropic/claude-3-5-sonnet-20240620",
temperature=0.7,
max_tokens=4096,
tool_choice="auto",
execute_tools_async=False,
execute_model_tool_calls=True
)
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(run_autonomous_agent())
``` ```
This example demonstrates how to: This example demonstrates how to create an autonomous agent that runs for a specified number of iterations. It uses the `FilesTool` to interact with the file system and showcases how to control the behavior of `run_thread` by adjusting parameters like `temperature`, `max_tokens`, and `tool_choice`. The agent creates files autonomously.
1. Create a ThreadManager
2. Add a tool (FilesTool)
3. Create a new thread
4. Add messages to the thread
5. Run the thread, which executes the AI model and potentially uses tools
6. Continue the conversation with additional messages and thread runs
## Building More Complex Agents
While the ThreadManager can be used standalone, it's also the foundation for building more complex agents. You can create custom agent behaviors by defining initialization, pre-iteration, post-iteration, and finalization steps, setting up loops for autonomous iterations, and implementing custom logic for when and how to run threads.
Here's an example of a more complex agent implementation using the `run_agent` function:
```python
async def run_agent(
thread_manager: ThreadManager,
thread_id: int,
max_iterations: int = 10
):
async def init():
# Initialization code here
pass
async def pre_iteration():
# Pre-iteration code here
pass
async def after_iteration():
# Post-iteration code here
await thread_manager.add_message(thread_id, {"role": "user", "content": "CREATE MORE RANDOM FILES WITH RANDOM CONTENTS. JUST CREATE IT NO QUESTIONS PLEASE."})
async def finalizer():
# Finalization code here
pass
await init()
iteration = 0
while iteration < max_iterations:
iteration += 1
await pre_iteration()
system_message = {"role": "system", "content": "You are a helpful assistant that can create, read, update, and delete files."}
model_name = "gpt-4"
response = await thread_manager.run_thread(
thread_id=thread_id,
system_message=system_message,
model_name=model_name,
temperature=0.7,
max_tokens=150,
tool_choice="auto",
additional_message=None,
execute_tools_async=False,
execute_model_tool_calls=True
)
await after_iteration()
await finalizer()
# Usage
if __name__ == "__main__":
async def main():
thread_manager = ThreadManager()
thread_id = await thread_manager.create_thread()
await thread_manager.add_message(thread_id, {"role": "user", "content": "Please create a file with a random name with the content 'Hello, world!'"})
thread_manager.add_tool(FilesTool)
await run_agent(
thread_manager=thread_manager,
thread_id=thread_id,
max_iterations=5
)
asyncio.run(main())
```
This more complex example shows how to:
1. Define custom behavior for different stages of the agent's execution
2. Set up a loop for multiple iterations
3. Use the ThreadManager within a larger agent structure
## Documentation
For more detailed information about the AgentPress components:
- `ThreadManager`: The core class that manages threads, tools, and model execution.
- `Tool`: Base class for creating custom tools that can be used by the AI.
- `ToolRegistry`: Manages the registration and retrieval of tools.
Refer to the comments in the source code files for comprehensive documentation on each component.
## Contributing ## Contributing
@ -176,3 +112,10 @@ We welcome contributions to AgentPress! Please feel free to submit issues, fork
[MIT License](LICENSE) [MIT License](LICENSE)
Built with ❤️ by [Kortix AI Corp](https://www.kortix.ai) Built with ❤️ by [Kortix AI Corp](https://www.kortix.ai)

View File

@ -1,10 +1,8 @@
from .config import settings from .config import settings
from .db import Database, Thread
from .llm import make_llm_api_call from .llm import make_llm_api_call
from .thread_manager import ThreadManager from .thread_manager import ThreadManager
# from .working_memory_manager import WorkingMemory
__all__ = [ __all__ = [
'settings', 'Database', 'Thread', 'settings',
'make_llm_api_call', 'ThreadManager' 'make_llm_api_call', 'ThreadManager'
] #'WorkingMemory' ]

View File

@ -3,13 +3,12 @@ from pydantic_settings import BaseSettings
from typing import Optional from typing import Optional
class Settings(BaseSettings): class Settings(BaseSettings):
database_url: str
openai_api_key: Optional[str] = None openai_api_key: Optional[str] = None
anthropic_api_key: Optional[str] = None anthropic_api_key: Optional[str] = None
groq_api_key: Optional[str] = None groq_api_key: Optional[str] = None
dir_base_path: str = '' dir_base_path: str = ''
workspace_dir: str = '' workspace_dir: str = ''
tools_dir: str = ''
class Config: class Config:
env_file = ".env" env_file = ".env"
@ -17,7 +16,6 @@ class Settings(BaseSettings):
def __init__(self, **values): def __init__(self, **values):
super().__init__(**values) super().__init__(**values)
self.workspace_dir = os.path.join(self.dir_base_path, 'workspace') self.workspace_dir = os.path.join(self.dir_base_path, 'workspace')
self.tools_dir = os.path.join(self.dir_base_path, 'tools')
os.makedirs(self.workspace_dir, exist_ok=True) os.makedirs(self.workspace_dir, exist_ok=True)
settings = Settings() settings = Settings()

View File

@ -1,9 +0,0 @@
# Available AI models
AI_MODELS = [
'gpt-4o',
'gpt-4o-mini',
'anthropic/claude-3-5-sonnet-20240620'
]
# Standard system message
STANDARD_SYSTEM_MESSAGE = """You are a friendly and intelligent assistant, always ready to help with a wide range of tasks. Your responses should be helpful, accurate, and tailored to the user's needs. If you're unsure about something, don't hesitate to ask for clarification. Your goal is to provide the best possible assistance to the user."""

View File

@ -1,85 +0,0 @@
from sqlalchemy import Column, Integer, String, Text, ForeignKey, Float, JSON, Boolean
from sqlalchemy.orm import relationship, declarative_base
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from agentpress.config import settings # Changed from Settings to settings
import os
from contextlib import asynccontextmanager
import uuid
from datetime import datetime
Base = declarative_base()
class Thread(Base):
__tablename__ = 'threads'
thread_id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
messages = Column(Text)
created_at = Column(Integer)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.created_at = int(datetime.utcnow().timestamp())
# class MemoryModule(Base):
# __tablename__ = 'memory_modules'
# id = Column(Integer, primary_key=True)
# thread_id = Column(Integer, ForeignKey('threads.thread_id'))
# module_name = Column(String)
# data = Column(Text)
# __table_args__ = (UniqueConstraint('thread_id', 'module_name', name='_thread_module_uc'),)
# thread = relationship("Thread", back_populates="memory_modules")
class Database:
def __init__(self):
db_url = f"{settings.database_url}"
self.engine = create_async_engine(db_url, echo=False)
self.SessionLocal = sessionmaker(
class_=AsyncSession, expire_on_commit=False, autocommit=False, autoflush=False, bind=self.engine
)
@asynccontextmanager
async def get_async_session(self):
async with self.SessionLocal() as session:
try:
yield session
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def create_tables(self):
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def close(self):
await self.engine.dispose()
if __name__ == "__main__":
import asyncio
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def init_db():
logger.info("Initializing database...")
db = Database()
try:
await db.create_tables()
logger.info("Database tables created successfully.")
except Exception as e:
logger.error(f"Error creating database tables: {e}")
finally:
await db.close()
asyncio.run(init_db())

View File

@ -1,9 +1,8 @@
import json import json
import logging import logging
import asyncio import asyncio
import os
from typing import List, Dict, Any, Optional, Callable, Type from typing import List, Dict, Any, Optional, Callable, Type
from sqlalchemy import select
from agentpress.db import Database, Thread
from agentpress.llm import make_llm_api_call from agentpress.llm import make_llm_api_call
from datetime import datetime, UTC from datetime import datetime, UTC
from agentpress.tool import Tool, ToolResult from agentpress.tool import Tool, ToolResult
@ -11,9 +10,10 @@ from agentpress.tool_registry import ToolRegistry
import uuid import uuid
class ThreadManager: class ThreadManager:
def __init__(self, db: Optional[Database] = None): def __init__(self, threads_dir: str = 'threads'):
self.db = db if db is not None else Database() self.threads_dir = threads_dir
self.tool_registry = ToolRegistry() self.tool_registry = ToolRegistry()
os.makedirs(self.threads_dir, exist_ok=True)
def add_tool(self, tool_class: Type[Tool], function_names: Optional[List[str]] = None): def add_tool(self, tool_class: Type[Tool], function_names: Optional[List[str]] = None):
""" """
@ -23,71 +23,71 @@ class ThreadManager:
""" """
self.tool_registry.register_tool(tool_class, function_names) self.tool_registry.register_tool(tool_class, function_names)
async def create_thread(self) -> int: async def create_thread(self) -> str:
async with self.db.get_async_session() as session: thread_id = str(uuid.uuid4())
new_thread = Thread( thread_path = os.path.join(self.threads_dir, f"{thread_id}.json")
messages=json.dumps([]) with open(thread_path, 'w') as f:
) json.dump({"messages": []}, f)
session.add(new_thread) return thread_id
await session.commit()
await session.refresh(new_thread)
return new_thread.thread_id
async def add_message(self, thread_id: int, message_data: Dict[str, Any], images: Optional[List[Dict[str, Any]]] = None): async def add_message(self, thread_id: str, message_data: Dict[str, Any], images: Optional[List[Dict[str, Any]]] = None):
logging.info(f"Adding message to thread {thread_id} with images: {images}") logging.info(f"Adding message to thread {thread_id} with images: {images}")
async with self.db.get_async_session() as session: thread_path = os.path.join(self.threads_dir, f"{thread_id}.json")
thread = await session.get(Thread, thread_id)
if not thread: try:
raise ValueError(f"Thread with id {thread_id} not found") with open(thread_path, 'r') as f:
thread_data = json.load(f)
try:
messages = json.loads(thread.messages) messages = thread_data["messages"]
if message_data['role'] == 'user':
last_assistant_index = next((i for i in reversed(range(len(messages))) if messages[i]['role'] == 'assistant' and 'tool_calls' in messages[i]), None)
if message_data['role'] == 'user': if last_assistant_index is not None:
last_assistant_index = next((i for i in reversed(range(len(messages))) if messages[i]['role'] == 'assistant' and 'tool_calls' in messages[i]), None) tool_call_count = len(messages[last_assistant_index]['tool_calls'])
tool_response_count = sum(1 for msg in messages[last_assistant_index+1:] if msg['role'] == 'tool')
if last_assistant_index is not None: if tool_call_count != tool_response_count:
tool_call_count = len(messages[last_assistant_index]['tool_calls']) await self.cleanup_incomplete_tool_calls(thread_id)
tool_response_count = sum(1 for msg in messages[last_assistant_index+1:] if msg['role'] == 'tool')
if tool_call_count != tool_response_count:
await self.cleanup_incomplete_tool_calls(thread_id)
for key, value in message_data.items(): for key, value in message_data.items():
if isinstance(value, ToolResult): if isinstance(value, ToolResult):
message_data[key] = str(value) message_data[key] = str(value)
if images: if images:
if isinstance(message_data['content'], str): if isinstance(message_data['content'], str):
message_data['content'] = [{"type": "text", "text": message_data['content']}] message_data['content'] = [{"type": "text", "text": message_data['content']}]
elif not isinstance(message_data['content'], list): elif not isinstance(message_data['content'], list):
message_data['content'] = [] message_data['content'] = []
for image in images: for image in images:
image_content = { image_content = {
"type": "image_url", "type": "image_url",
"image_url": { "image_url": {
"url": f"data:{image['content_type']};base64,{image['base64']}", "url": f"data:{image['content_type']};base64,{image['base64']}",
"detail": "high" "detail": "high"
}
} }
message_data['content'].append(image_content) }
message_data['content'].append(image_content)
messages.append(message_data) messages.append(message_data)
thread.messages = json.dumps(messages) thread_data["messages"] = messages
await session.commit()
logging.info(f"Message added to thread {thread_id}: {message_data}") with open(thread_path, 'w') as f:
except Exception as e: json.dump(thread_data, f)
await session.rollback()
logging.error(f"Failed to add message to thread {thread_id}: {e}") logging.info(f"Message added to thread {thread_id}: {message_data}")
raise e except Exception as e:
logging.error(f"Failed to add message to thread {thread_id}: {e}")
raise e
async def list_messages(self, thread_id: int, hide_tool_msgs: bool = False, only_latest_assistant: bool = False, regular_list: bool = True) -> List[Dict[str, Any]]: async def list_messages(self, thread_id: str, hide_tool_msgs: bool = False, only_latest_assistant: bool = False, regular_list: bool = True) -> List[Dict[str, Any]]:
async with self.db.get_async_session() as session: thread_path = os.path.join(self.threads_dir, f"{thread_id}.json")
thread = await session.get(Thread, thread_id)
if not thread: try:
return [] with open(thread_path, 'r') as f:
messages = json.loads(thread.messages) thread_data = json.load(f)
messages = thread_data["messages"]
if only_latest_assistant: if only_latest_assistant:
for msg in reversed(messages): for msg in reversed(messages):
@ -111,8 +111,10 @@ class ThreadManager:
] ]
return filtered_messages return filtered_messages
except FileNotFoundError:
return []
async def cleanup_incomplete_tool_calls(self, thread_id: int): async def cleanup_incomplete_tool_calls(self, thread_id: str):
messages = await self.list_messages(thread_id) messages = await self.list_messages(thread_id)
last_assistant_message = next((m for m in reversed(messages) if m['role'] == 'assistant' and 'tool_calls' in m), None) last_assistant_message = next((m for m in reversed(messages) if m['role'] == 'assistant' and 'tool_calls' in m), None)
@ -134,16 +136,14 @@ class ThreadManager:
assistant_index = messages.index(last_assistant_message) assistant_index = messages.index(last_assistant_message)
messages[assistant_index+1:assistant_index+1] = failed_tool_results messages[assistant_index+1:assistant_index+1] = failed_tool_results
async with self.db.get_async_session() as session: thread_path = os.path.join(self.threads_dir, f"{thread_id}.json")
thread = await session.get(Thread, thread_id) with open(thread_path, 'w') as f:
if thread: json.dump({"messages": messages}, f)
thread.messages = json.dumps(messages)
await session.commit()
return True return True
return False return False
async def run_thread(self, thread_id: int, system_message: Dict[str, Any], model_name: str, temperature: float = 0, max_tokens: Optional[int] = None, tool_choice: str = "auto", additional_message: Optional[Dict[str, Any]] = None, execute_tools_async: bool = True, execute_model_tool_calls: bool = True, use_tools: bool = True) -> Dict[str, Any]: async def run_thread(self, thread_id: str, system_message: Dict[str, Any], model_name: str, temperature: float = 0, max_tokens: Optional[int] = None, tool_choice: str = "auto", additional_message: Optional[Dict[str, Any]] = None, execute_tools_async: bool = True, execute_model_tool_calls: bool = True, use_tools: bool = True) -> Dict[str, Any]:
messages = await self.list_messages(thread_id) messages = await self.list_messages(thread_id)
prepared_messages = [system_message] + messages prepared_messages = [system_message] + messages
@ -203,11 +203,11 @@ class ThreadManager:
} }
} }
async def handle_response_without_tools(self, thread_id: int, response: Any): async def handle_response_without_tools(self, thread_id: str, response: Any):
response_content = response.choices[0].message['content'] response_content = response.choices[0].message['content']
await self.add_message(thread_id, {"role": "assistant", "content": response_content}) await self.add_message(thread_id, {"role": "assistant", "content": response_content})
async def handle_response_with_tools(self, thread_id: int, response: Any, execute_tools_async: bool): async def handle_response_with_tools(self, thread_id: str, response: Any, execute_tools_async: bool):
try: try:
response_message = response.choices[0].message response_message = response.choices[0].message
tool_calls = response_message.get('tool_calls', []) tool_calls = response_message.get('tool_calls', [])
@ -306,9 +306,13 @@ class ThreadManager:
"content": str(function_response), "content": str(function_response),
} }
async def get_thread(self, thread_id: int) -> Optional[Thread]: async def get_thread(self, thread_id: str) -> Optional[Dict[str, Any]]:
async with self.db.get_async_session() as session: thread_path = os.path.join(self.threads_dir, f"{thread_id}.json")
return await session.get(Thread, thread_id) try:
with open(thread_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return None
if __name__ == "__main__": if __name__ == "__main__":
import asyncio import asyncio