suna/agentpress/examples/example_agent/tools/files_tool.py

216 lines
7.9 KiB
Python
Raw Normal View History

2024-10-06 01:04:15 +08:00
import os
import asyncio
2024-11-03 01:47:24 +08:00
from pathlib import Path
2024-10-23 09:42:38 +08:00
from agentpress.tool import Tool, ToolResult, tool_schema
2024-10-30 00:21:19 +08:00
from agentpress.state_manager import StateManager
2024-10-06 01:04:15 +08:00
class FilesTool(Tool):
2024-10-30 00:21:19 +08:00
# Excluded files, directories, and extensions
EXCLUDED_FILES = {
".DS_Store",
".gitignore",
"package-lock.json",
"postcss.config.js",
"postcss.config.mjs",
"jsconfig.json",
"components.json",
"tsconfig.tsbuildinfo",
"tsconfig.json",
}
EXCLUDED_DIRS = {
"node_modules",
".next",
"dist",
"build",
".git"
}
EXCLUDED_EXT = {
".ico",
".svg",
".png",
".jpg",
".jpeg",
".gif",
".bmp",
".tiff",
".webp",
".db",
".sql"
}
2024-10-06 01:04:15 +08:00
def __init__(self):
super().__init__()
2024-10-23 10:23:50 +08:00
self.workspace = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'workspace')
2024-10-06 01:04:15 +08:00
os.makedirs(self.workspace, exist_ok=True)
2024-10-28 05:04:42 +08:00
self.state_manager = StateManager("state.json")
2024-11-03 01:47:24 +08:00
self.SNIPPET_LINES = 4 # Number of context lines to show around edits
2024-10-28 05:04:42 +08:00
asyncio.create_task(self._init_workspace_state())
2024-10-30 00:21:19 +08:00
def _should_exclude_file(self, rel_path: str) -> bool:
"""Check if a file should be excluded based on path, name, or extension"""
# Check filename
filename = os.path.basename(rel_path)
if filename in self.EXCLUDED_FILES:
return True
# Check directory
dir_path = os.path.dirname(rel_path)
if any(excluded in dir_path for excluded in self.EXCLUDED_DIRS):
return True
# Check extension
_, ext = os.path.splitext(filename)
if ext.lower() in self.EXCLUDED_EXT:
return True
return False
2024-10-28 05:04:42 +08:00
async def _init_workspace_state(self):
"""Initialize or update the workspace state in JSON"""
files_state = {}
# Walk through workspace and record all files
for root, _, files in os.walk(self.workspace):
for file in files:
full_path = os.path.join(root, file)
rel_path = os.path.relpath(full_path, self.workspace)
2024-10-30 00:21:19 +08:00
# Skip excluded files
if self._should_exclude_file(rel_path):
continue
2024-10-28 05:04:42 +08:00
try:
with open(full_path, 'r') as f:
content = f.read()
2024-11-03 01:47:24 +08:00
files_state[rel_path] = {
2024-11-03 02:09:39 +08:00
"content": content
2024-11-03 01:47:24 +08:00
}
2024-10-28 05:04:42 +08:00
except Exception as e:
print(f"Error reading file {rel_path}: {e}")
2024-10-30 00:21:19 +08:00
except UnicodeDecodeError:
print(f"Skipping binary file: {rel_path}")
2024-10-28 05:04:42 +08:00
await self.state_manager.set("files", files_state)
async def _update_workspace_state(self):
"""Update the workspace state after any file operation"""
await self._init_workspace_state()
2024-10-06 01:04:15 +08:00
2024-10-23 09:42:38 +08:00
@tool_schema({
"name": "create_file",
2024-11-02 11:13:51 +08:00
"description": "Create a new file with the provided contents at a given path in the workspace",
2024-10-23 09:42:38 +08:00
"parameters": {
"type": "object",
"properties": {
2024-11-02 11:13:51 +08:00
"file_path": {"type": "string", "description": "Path to the file to be created."},
2024-10-23 09:42:38 +08:00
"content": {"type": "string", "description": "The content to write to the file"}
},
"required": ["file_path", "content"]
}
})
2024-10-06 01:04:15 +08:00
async def create_file(self, file_path: str, content: str) -> ToolResult:
try:
full_path = os.path.join(self.workspace, file_path)
if os.path.exists(full_path):
return self.fail_response(f"File '{file_path}' already exists. Use update_file to modify existing files.")
2024-10-28 05:04:42 +08:00
2024-10-06 01:04:15 +08:00
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, 'w') as f:
f.write(content)
2024-10-28 05:04:42 +08:00
await self._update_workspace_state()
2024-10-06 01:04:15 +08:00
return self.success_response(f"File '{file_path}' created successfully.")
except Exception as e:
return self.fail_response(f"Error creating file: {str(e)}")
2024-10-23 09:42:38 +08:00
@tool_schema({
2024-11-03 01:47:24 +08:00
"name": "delete_file",
"description": "Delete a file at the given path in the workspace.",
2024-10-23 09:42:38 +08:00
"parameters": {
"type": "object",
"properties": {
2024-11-03 01:47:24 +08:00
"file_path": {"type": "string", "description": "Path to the file to be deleted."}
2024-10-23 09:42:38 +08:00
},
2024-11-03 01:47:24 +08:00
"required": ["file_path"]
2024-10-23 09:42:38 +08:00
}
})
2024-11-03 01:47:24 +08:00
async def delete_file(self, file_path: str) -> ToolResult:
2024-10-06 01:04:15 +08:00
try:
full_path = os.path.join(self.workspace, file_path)
2024-11-03 01:47:24 +08:00
os.remove(full_path)
2024-10-28 05:04:42 +08:00
await self._update_workspace_state()
2024-11-03 01:47:24 +08:00
return self.success_response(f"File '{file_path}' deleted successfully.")
2024-10-06 01:04:15 +08:00
except Exception as e:
2024-11-03 01:47:24 +08:00
return self.fail_response(f"Error deleting file: {str(e)}")
2024-10-06 01:04:15 +08:00
2024-10-23 09:42:38 +08:00
@tool_schema({
2024-11-03 01:47:24 +08:00
"name": "str_replace",
"description": "Replace a string with another string in a file",
2024-10-23 09:42:38 +08:00
"parameters": {
"type": "object",
"properties": {
2024-11-03 01:47:24 +08:00
"file_path": {"type": "string", "description": "Path to the file"},
"old_str": {"type": "string", "description": "String to replace"},
"new_str": {"type": "string", "description": "Replacement string"}
2024-10-23 09:42:38 +08:00
},
2024-11-03 01:47:24 +08:00
"required": ["file_path", "old_str", "new_str"]
2024-10-23 09:42:38 +08:00
}
})
2024-11-03 01:47:24 +08:00
async def str_replace(self, file_path: str, old_str: str, new_str: str) -> ToolResult:
2024-10-06 01:04:15 +08:00
try:
2024-11-03 01:47:24 +08:00
full_path = Path(os.path.join(self.workspace, file_path))
if not full_path.exists():
return self.fail_response(f"File '{file_path}' does not exist")
content = full_path.read_text().expandtabs()
old_str = old_str.expandtabs()
new_str = new_str.expandtabs()
occurrences = content.count(old_str)
if occurrences == 0:
return self.fail_response(f"String '{old_str}' not found in file")
if occurrences > 1:
lines = [i+1 for i, line in enumerate(content.split('\n')) if old_str in line]
return self.fail_response(f"Multiple occurrences found in lines {lines}. Please ensure string is unique")
# Perform replacement
new_content = content.replace(old_str, new_str)
full_path.write_text(new_content)
# Show snippet around the edit
replacement_line = content.split(old_str)[0].count('\n')
start_line = max(0, replacement_line - self.SNIPPET_LINES)
end_line = replacement_line + self.SNIPPET_LINES + new_str.count('\n')
snippet = '\n'.join(new_content.split('\n')[start_line:end_line + 1])
return self.success_response(f"Replacement successful. Snippet of changes:\n{snippet}")
2024-10-28 05:04:42 +08:00
2024-10-06 01:04:15 +08:00
except Exception as e:
2024-11-03 01:47:24 +08:00
return self.fail_response(f"Error replacing string: {str(e)}")
2024-10-06 01:04:15 +08:00
if __name__ == "__main__":
async def test_files_tool():
files_tool = FilesTool()
test_file_path = "test_file.txt"
test_content = "This is a test file."
updated_content = "This is an updated test file."
print(f"Using workspace directory: {files_tool.workspace}")
# Test create_file
create_result = await files_tool.create_file(test_file_path, test_content)
print("Create file result:", create_result)
# Test delete_file
delete_result = await files_tool.delete_file(test_file_path)
print("Delete file result:", delete_result)
# Test read_file after delete (should fail)
read_deleted_result = await files_tool.read_file(test_file_path)
print("Read deleted file result:", read_deleted_result)
2024-11-02 07:05:29 +08:00
asyncio.run(test_files_tool())