chore(dev): workflows feature-flagged

This commit is contained in:
Soumyadas15 2025-06-23 17:29:19 +05:30
commit 29429ab8cf
27 changed files with 1091 additions and 170 deletions

View File

@ -63,7 +63,9 @@ jobs:
script: |
cd /home/suna/backend
git pull
docker compose up -d --build
docker compose build
docker compose restart redis
docker compose up -d
- name: Deploy to prod
if: steps.get_tag_name.outputs.environment == 'prod'
@ -75,4 +77,6 @@ jobs:
script: |
cd /home/suna/backend
git pull
docker compose -f docker-compose.yml -f docker-compose.prod.yml up -d --build
docker compose -f docker-compose.yml -f docker-compose.prod.yml build
docker compose -f docker-compose.yml -f docker-compose.prod.yml restart redis
docker compose -f docker-compose.yml -f docker-compose.prod.yml up -d

2
.gitignore vendored
View File

@ -189,7 +189,6 @@ supabase/.temp/storage-version
**/.prompts/
**/__pycache__/
.env.scripts
redis_data
@ -200,3 +199,4 @@ rabbitmq_data
.setup_env.json
backend/.test_token_compression.py
backend/test_token_compression_data.py

View File

@ -54,7 +54,7 @@ CMD ["sh", "-c", "gunicorn api:app \
--worker-connections $WORKER_CONNECTIONS \
--worker-tmp-dir /dev/shm \
--preload \
--log-level info \
--log-level critical \
--access-logfile - \
--error-logfile - \
--capture-output \

View File

@ -15,7 +15,7 @@ from agentpress.thread_manager import ThreadManager
from services.supabase import DBConnection
from services import redis
from utils.auth_utils import get_current_user_id_from_jwt, get_user_id_from_stream_auth, verify_thread_access
from utils.logger import logger
from utils.logger import logger, structlog
from services.billing import check_billing_status, can_use_model
from utils.config import config
from sandbox.sandbox import create_sandbox, delete_sandbox, get_or_start_sandbox
@ -371,6 +371,9 @@ async def start_agent(
user_id: str = Depends(get_current_user_id_from_jwt)
):
"""Start an agent for a specific thread in the background."""
structlog.contextvars.bind_contextvars(
thread_id=thread_id,
)
global instance_id # Ensure instance_id is accessible
if not instance_id:
raise HTTPException(status_code=500, detail="Agent API not initialized with instance ID")
@ -402,6 +405,13 @@ async def start_agent(
account_id = thread_data.get('account_id')
thread_agent_id = thread_data.get('agent_id')
thread_metadata = thread_data.get('metadata', {})
structlog.contextvars.bind_contextvars(
project_id=project_id,
account_id=account_id,
thread_agent_id=thread_agent_id,
thread_metadata=thread_metadata,
)
# Check if this is an agent builder thread
is_agent_builder = thread_metadata.get('is_agent_builder', False)
@ -563,6 +573,9 @@ async def start_agent(
"started_at": datetime.now(timezone.utc).isoformat()
}).execute()
agent_run_id = agent_run.data[0]['id']
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
)
logger.info(f"Created new agent run: {agent_run_id}")
# Register this run in Redis with TTL using instance ID
@ -572,6 +585,8 @@ async def start_agent(
except Exception as e:
logger.warning(f"Failed to register agent run in Redis ({instance_key}): {str(e)}")
request_id = structlog.contextvars.get_contextvars().get('request_id')
# Run the agent in the background
run_agent_background.send(
agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id,
@ -581,7 +596,8 @@ async def start_agent(
stream=body.stream, enable_context_manager=body.enable_context_manager,
agent_config=agent_config, # Pass agent configuration
is_agent_builder=is_agent_builder,
target_agent_id=target_agent_id
target_agent_id=target_agent_id,
request_id=request_id,
)
return {"agent_run_id": agent_run_id, "status": "running"}
@ -589,6 +605,9 @@ async def start_agent(
@router.post("/agent-run/{agent_run_id}/stop")
async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
"""Stop a running agent."""
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
)
logger.info(f"Received request to stop agent run: {agent_run_id}")
client = await db.client
await get_agent_run_with_access_check(client, agent_run_id, user_id)
@ -598,16 +617,22 @@ async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_
@router.get("/thread/{thread_id}/agent-runs")
async def get_agent_runs(thread_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
"""Get all agent runs for a thread."""
structlog.contextvars.bind_contextvars(
thread_id=thread_id,
)
logger.info(f"Fetching agent runs for thread: {thread_id}")
client = await db.client
await verify_thread_access(client, thread_id, user_id)
agent_runs = await client.table('agent_runs').select('*').eq("thread_id", thread_id).order('created_at', desc=True).execute()
agent_runs = await client.table('agent_runs').select('id, thread_id, status, started_at, completed_at, error, created_at, updated_at').eq("thread_id", thread_id).order('created_at', desc=True).execute()
logger.debug(f"Found {len(agent_runs.data)} agent runs for thread: {thread_id}")
return {"agent_runs": agent_runs.data}
@router.get("/agent-run/{agent_run_id}")
async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
"""Get agent run status and responses."""
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
)
logger.info(f"Fetching agent run details: {agent_run_id}")
client = await db.client
agent_run_data = await get_agent_run_with_access_check(client, agent_run_id, user_id)
@ -624,6 +649,9 @@ async def get_agent_run(agent_run_id: str, user_id: str = Depends(get_current_us
@router.get("/thread/{thread_id}/agent", response_model=ThreadAgentResponse)
async def get_thread_agent(thread_id: str, user_id: str = Depends(get_current_user_id_from_jwt)):
"""Get the agent details for a specific thread."""
structlog.contextvars.bind_contextvars(
thread_id=thread_id,
)
logger.info(f"Fetching agent details for thread: {thread_id}")
client = await db.client
@ -713,6 +741,11 @@ async def stream_agent_run(
user_id = await get_user_id_from_stream_auth(request, token)
agent_run_data = await get_agent_run_with_access_check(client, agent_run_id, user_id)
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
user_id=user_id,
)
response_list_key = f"agent_run:{agent_run_id}:responses"
response_channel = f"agent_run:{agent_run_id}:new_response"
control_channel = f"agent_run:{agent_run_id}:control" # Global control channel
@ -739,13 +772,17 @@ async def stream_agent_run(
initial_yield_complete = True
# 2. Check run status *after* yielding initial data
run_status = await client.table('agent_runs').select('status').eq("id", agent_run_id).maybe_single().execute()
run_status = await client.table('agent_runs').select('status', 'thread_id').eq("id", agent_run_id).maybe_single().execute()
current_status = run_status.data.get('status') if run_status.data else None
if current_status != 'running':
logger.info(f"Agent run {agent_run_id} is not running (status: {current_status}). Ending stream.")
yield f"data: {json.dumps({'type': 'status', 'status': 'completed'})}\n\n"
return
structlog.contextvars.bind_contextvars(
thread_id=run_status.data.get('thread_id'),
)
# 3. Set up Pub/Sub listeners for new responses and control signals
pubsub_response = await redis.create_pubsub()
@ -1047,11 +1084,20 @@ async def initiate_agent_with_files(
"account_id": account_id,
"created_at": datetime.now(timezone.utc).isoformat()
}
structlog.contextvars.bind_contextvars(
thread_id=thread_data["thread_id"],
project_id=project_id,
account_id=account_id,
)
# Store the agent_id in the thread if we have one
if agent_config:
thread_data["agent_id"] = agent_config['agent_id']
logger.info(f"Storing agent_id {agent_config['agent_id']} in thread")
structlog.contextvars.bind_contextvars(
agent_id=agent_config['agent_id'],
)
# Store agent builder metadata if this is an agent builder session
if is_agent_builder:
@ -1060,6 +1106,9 @@ async def initiate_agent_with_files(
"target_agent_id": target_agent_id
}
logger.info(f"Storing agent builder metadata in thread: target_agent_id={target_agent_id}")
structlog.contextvars.bind_contextvars(
target_agent_id=target_agent_id,
)
thread = await client.table('threads').insert(thread_data).execute()
thread_id = thread.data[0]['thread_id']
@ -1141,6 +1190,9 @@ async def initiate_agent_with_files(
}).execute()
agent_run_id = agent_run.data[0]['id']
logger.info(f"Created new agent run: {agent_run_id}")
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
)
# Register run in Redis
instance_key = f"active_run:{instance_id}:{agent_run_id}"
@ -1149,6 +1201,8 @@ async def initiate_agent_with_files(
except Exception as e:
logger.warning(f"Failed to register agent run in Redis ({instance_key}): {str(e)}")
request_id = structlog.contextvars.get_contextvars().get('request_id')
# Run agent in background
run_agent_background.send(
agent_run_id=agent_run_id, thread_id=thread_id, instance_id=instance_id,
@ -1158,7 +1212,8 @@ async def initiate_agent_with_files(
stream=stream, enable_context_manager=enable_context_manager,
agent_config=agent_config, # Pass agent configuration
is_agent_builder=is_agent_builder,
target_agent_id=target_agent_id
target_agent_id=target_agent_id,
request_id=request_id,
)
return {"thread_id": thread_id, "agent_run_id": agent_run_id}

View File

@ -566,7 +566,7 @@ For casual conversation and social interactions:
<function_calls>
<invoke name="ask">
<parameter name="attachments">file1,file2,file3</parameter>
<parameter name="message">Your question or message here</parameter>
<parameter name="text">Your question or message here</parameter>
</invoke>
</function_calls>
* **MANDATORY RULE: If you have created ANY files during this conversation, you MUST include them as attachments when using the ask tool**
@ -582,7 +582,7 @@ For casual conversation and social interactions:
<function_calls>
<invoke name="ask">
<parameter name="attachments">main.py,README.md,config.json,notes.txt</parameter>
<parameter name="message">Your question or message here</parameter>
<parameter name="text">Your question or message here</parameter>
</invoke>
</function_calls>
* Verify that ALL visual outputs have been attached before proceeding
@ -639,7 +639,7 @@ First, I'll create a todo list to organize our work and make sure we cover all a
<function_calls>
<invoke name="create_file">
<parameter name="file_path">todo.md</parameter>
<parameter name="content"># Japan Trip Planning Todo
<parameter name="file_contents"># Japan Trip Planning Todo
## Research Phase
- [ ] Research flight options from Seattle to Japan
@ -785,7 +785,7 @@ Based on all the research, I'm now going to create a detailed itinerary file:
<function_calls>
<invoke name="create_file">
<parameter name="file_path">Japan_Itinerary.md</parameter>
<parameter name="content">
<parameter name="file_contents">
# 7-Day Japan Itinerary: April 15-23, 2025
### A Journey Through History, Culture, and Romance
@ -950,7 +950,7 @@ Now I'll start creating the HTML travel handbook for your trip. First, let's set
<function_calls>
<invoke name="create_file">
<parameter name="file_path">travel_handbook/styles.css</parameter>
<parameter name="content">
<parameter name="file_contents">
/* Japan Travel Handbook CSS */
:root {
--primary-color: #d53e4f;
@ -1244,7 +1244,7 @@ Now, let's create the HTML structure for the travel handbook:
<function_calls>
<invoke name="create_file">
<parameter name="file_path">travel_handbook/index.html</parameter>
<parameter name="content">
<parameter name="file_contents">
<!DOCTYPE html>
<html lang="en">
<head>
@ -1709,7 +1709,7 @@ Let's combine all our work into a comprehensive package for you to download. Fir
<function_calls>
<invoke name="ask">
<parameter name="attachments">Japan_Itinerary.md,travel_handbook/index.html,travel_handbook/styles.css,japan_trip_package.zip</parameter>
<parameter name="message">
<parameter name="text">
# Japan Trip Itinerary & Travel Handbook
I've completed your 7-day Japan itinerary and travel handbook! Here's what I've created:

View File

@ -569,7 +569,7 @@ For casual conversation and social interactions:
<function_calls>
<invoke name="ask">
<parameter name="attachments">file1, file2, file3</parameter>
<parameter name="message">Your question or message here</parameter>
<parameter name="text">Your question or message here</parameter>
</invoke>
</function_calls>
* This includes but is not limited to: HTML files, PDF documents, markdown files, images, data visualizations, presentations, reports, dashboards, and UI mockups

View File

@ -692,7 +692,7 @@ Let's update our todo list to mark the testing as complete:
<function_calls>
<invoke name="ask">
<parameter name="attachments">index.html,style.css,script.js</parameter>
<parameter name="message">I've built a classic Pong game for you! The game features:
<parameter name="text">I've built a classic Pong game for you! The game features:
1. Player vs Computer gameplay
2. Responsive design that works on different screen sizes

View File

@ -1028,7 +1028,7 @@ Let's combine all our work into a comprehensive package for you to download. Fir
<function_calls>
<invoke name="ask">
<parameter name="attachments">Japan_Itinerary.md,travel_handbook/index.html,travel_handbook/styles.css,japan_trip_package.zip</parameter>
<parameter name="message"># Japan Trip Itinerary & Travel Handbook
<parameter name="text"># Japan Trip Itinerary & Travel Handbook
I've completed your 7-day Japan itinerary and travel handbook! Here's what I've created:

View File

@ -420,7 +420,7 @@ Based on my research, I recommend replacing the API_KEY placeholder with a real
<function_calls>
<invoke name="ask">
<parameter name="attachments">index.html,style.css,script.js</parameter>
<parameter name="message">I've created a simple weather dashboard that:
<parameter name="text">I've created a simple weather dashboard that:
1. Shows current weather conditions for any city
2. Displays a 5-day forecast

View File

@ -68,10 +68,12 @@ class ThreadManager:
content = msg['content']
if isinstance(content, str) and "ToolResult" in content: return True
if isinstance(content, dict) and "tool_execution" in content: return True
if isinstance(content, dict) and "interactive_elements" in content: return True
if isinstance(content, str):
try:
parsed_content = json.loads(content)
if isinstance(parsed_content, dict) and "tool_execution" in parsed_content: return True
if isinstance(parsed_content, dict) and "interactive_elements" in content: return True
except (json.JSONDecodeError, TypeError):
pass
return False
@ -240,14 +242,98 @@ class ThreadManager:
logger.info(f"_compress_messages: {uncompressed_total_token_count} -> {compressed_token_count}") # Log the token compression for debugging later
if max_iterations <= 0:
logger.warning(f"_compress_messages: Max iterations reached")
logger.warning(f"_compress_messages: Max iterations reached, omitting messages")
result = self._compress_messages_by_omitting_messages(messages, llm_model, max_tokens)
return result
if (compressed_token_count > max_tokens):
logger.warning(f"Further token compression is needed: {compressed_token_count} > {max_tokens}")
result = self._compress_messages(messages, llm_model, max_tokens, int(token_threshold / 2), max_iterations - 1)
return result
return self._middle_out_messages(result)
def _compress_messages_by_omitting_messages(
self,
messages: List[Dict[str, Any]],
llm_model: str,
max_tokens: Optional[int] = 41000,
removal_batch_size: int = 10,
min_messages_to_keep: int = 10
) -> List[Dict[str, Any]]:
"""Compress the messages by omitting messages from the middle.
Args:
messages: List of messages to compress
llm_model: Model name for token counting
max_tokens: Maximum allowed tokens
removal_batch_size: Number of messages to remove per iteration
min_messages_to_keep: Minimum number of messages to preserve
"""
if not messages:
return messages
result = messages
result = self._remove_meta_messages(result)
# Early exit if no compression needed
initial_token_count = token_counter(model=llm_model, messages=result)
max_allowed_tokens = max_tokens or (100 * 1000)
if initial_token_count <= max_allowed_tokens:
return result
# Separate system message (assumed to be first) from conversation messages
system_message = messages[0] if messages and messages[0].get('role') == 'system' else None
conversation_messages = result[1:] if system_message else result
safety_limit = 500
current_token_count = initial_token_count
while current_token_count > max_allowed_tokens and safety_limit > 0:
safety_limit -= 1
if len(conversation_messages) <= min_messages_to_keep:
logger.warning(f"Cannot compress further: only {len(conversation_messages)} messages remain (min: {min_messages_to_keep})")
break
# Calculate removal strategy based on current message count
if len(conversation_messages) > (removal_batch_size * 2):
# Remove from middle, keeping recent and early context
middle_start = len(conversation_messages) // 2 - (removal_batch_size // 2)
middle_end = middle_start + removal_batch_size
conversation_messages = conversation_messages[:middle_start] + conversation_messages[middle_end:]
else:
# Remove from earlier messages, preserving recent context
messages_to_remove = min(removal_batch_size, len(conversation_messages) // 2)
if messages_to_remove > 0:
conversation_messages = conversation_messages[messages_to_remove:]
else:
# Can't remove any more messages
break
# Recalculate token count
messages_to_count = ([system_message] + conversation_messages) if system_message else conversation_messages
current_token_count = token_counter(model=llm_model, messages=messages_to_count)
# Prepare final result
final_messages = ([system_message] + conversation_messages) if system_message else conversation_messages
final_token_count = token_counter(model=llm_model, messages=final_messages)
logger.info(f"_compress_messages_by_omitting_messages: {initial_token_count} -> {final_token_count} tokens ({len(messages)} -> {len(final_messages)} messages)")
return final_messages
def _middle_out_messages(self, messages: List[Dict[str, Any]], max_messages: int = 320) -> List[Dict[str, Any]]:
"""Remove messages from the middle of the list, keeping max_messages total."""
if len(messages) <= max_messages:
return messages
# Keep half from the beginning and half from the end
keep_start = max_messages // 2
keep_end = max_messages - keep_start
return messages[:keep_start] + messages[-keep_end:]
def add_tool(self, tool_class: Type[Tool], function_names: Optional[List[str]] = None, **kwargs):
"""Add a tool to the ThreadManager."""
@ -316,15 +402,36 @@ class ThreadManager:
try:
# result = await client.rpc('get_llm_formatted_messages', {'p_thread_id': thread_id}).execute()
result = await client.table('messages').select('message_id, content').eq('thread_id', thread_id).eq('is_llm_message', True).order('created_at').execute()
# Fetch messages in batches of 1000 to avoid overloading the database
all_messages = []
batch_size = 1000
offset = 0
while True:
result = await client.table('messages').select('message_id, content').eq('thread_id', thread_id).eq('is_llm_message', True).order('created_at').range(offset, offset + batch_size - 1).execute()
if not result.data or len(result.data) == 0:
break
all_messages.extend(result.data)
# If we got fewer than batch_size records, we've reached the end
if len(result.data) < batch_size:
break
offset += batch_size
# Use all_messages instead of result.data in the rest of the method
result_data = all_messages
# Parse the returned data which might be stringified JSON
if not result.data:
if not result_data:
return []
# Return properly parsed JSON objects
messages = []
for item in result.data:
for item in result_data:
if isinstance(item['content'], str):
try:
parsed_item = json.loads(item['content'])

View File

@ -1,7 +1,7 @@
from fastapi import FastAPI, Request, HTTPException, Response, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
import sentry
import sentry # Keep this import here, right after fastapi imports
from contextlib import asynccontextmanager
from agentpress.thread_manager import ThreadManager
from services.supabase import DBConnection
@ -9,12 +9,13 @@ from datetime import datetime, timezone
from dotenv import load_dotenv
from utils.config import config, EnvMode
import asyncio
from utils.logger import logger
from utils.logger import logger, structlog
import time
from collections import OrderedDict
from typing import Dict, Any
from pydantic import BaseModel
import uuid
# Import the agent API module
from agent import api as agent_api
from sandbox import api as sandbox_api
@ -89,13 +90,23 @@ app = FastAPI(lifespan=lifespan)
@app.middleware("http")
async def log_requests_middleware(request: Request, call_next):
structlog.contextvars.clear_contextvars()
request_id = str(uuid.uuid4())
start_time = time.time()
client_ip = request.client.host
method = request.method
url = str(request.url)
path = request.url.path
query_params = str(request.query_params)
structlog.contextvars.bind_contextvars(
request_id=request_id,
client_ip=client_ip,
method=method,
path=path,
query_params=query_params
)
# Log the incoming request
logger.info(f"Request started: {method} {path} from {client_ip} | Query: {query_params}")

View File

@ -10,7 +10,7 @@ services:
memory: 32G
worker:
command: python -m dramatiq --processes 40 --threads 8 run_agent_background
command: python -m dramatiq --skip-logging --processes 40 --threads 8 run_agent_background
deploy:
resources:
limits:

View File

@ -42,7 +42,7 @@ services:
build:
context: .
dockerfile: Dockerfile
command: python -m dramatiq --processes 4 --threads 4 run_agent_background run_workflow_background
command: python -m dramatiq --skip-logging --processes 4 --threads 4 run_agent_background run_workflow_background
env_file:
- .env
volumes:

14
backend/poetry.lock generated
View File

@ -3209,6 +3209,18 @@ files = [
requests = {version = ">=2.20", markers = "python_version >= \"3.0\""}
typing_extensions = {version = ">=4.5.0", markers = "python_version >= \"3.7\""}
[[package]]
name = "structlog"
version = "25.4.0"
description = "Structured Logging for Python"
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "structlog-25.4.0-py3-none-any.whl", hash = "sha256:fe809ff5c27e557d14e613f45ca441aabda051d119ee5a0102aaba6ce40eed2c"},
{file = "structlog-25.4.0.tar.gz", hash = "sha256:186cd1b0a8ae762e29417095664adf1d6a31702160a46dacb7796ea82f7409e4"},
]
[[package]]
name = "supabase"
version = "2.15.0"
@ -3904,4 +3916,4 @@ testing = ["coverage[toml]", "zope.event", "zope.testing"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "99368309d41431f2104a182b680e9361409121e7a6df3ead3f14173dbaf9d066"
content-hash = "22e38245da1c442147f6b870361b56f898ec3f3b87cbe09c3f82e9c96c621ae7"

View File

@ -60,6 +60,7 @@ httpx = "^0.28.0"
aiohttp = "^3.9.0"
email-validator = "^2.0.0"
mailtrap = "^2.0.1"
structlog = "^25.4.0"
cryptography = "^41.0.0"
apscheduler = "^3.10.0"

View File

@ -43,7 +43,11 @@ mcp_use>=1.0.0
aiohttp>=3.9.0
email-validator>=2.0.0
mailtrap>=2.0.1
<<<<<<< HEAD
cryptography>=41.0.0
apscheduler>=3.10.0
croniter>=1.4.0
qstash>=2.0.0
=======
structlog==25.4.0
>>>>>>> main

View File

@ -6,7 +6,7 @@ from datetime import datetime, timezone
from typing import Optional
from services import redis
from agent.run import run_agent
from utils.logger import logger
from utils.logger import logger, structlog
import dramatiq
import uuid
from agentpress.thread_manager import ThreadManager
@ -57,9 +57,18 @@ async def run_agent_background(
enable_context_manager: bool,
agent_config: Optional[dict] = None,
is_agent_builder: Optional[bool] = False,
target_agent_id: Optional[str] = None
target_agent_id: Optional[str] = None,
request_id: Optional[str] = None,
):
"""Run the agent in the background using Redis for state."""
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
agent_run_id=agent_run_id,
thread_id=thread_id,
request_id=request_id,
)
try:
await initialize()
except Exception as e:
@ -88,6 +97,16 @@ async def run_agent_background(
sentry.sentry.set_tag("thread_id", thread_id)
logger.info(f"Starting background agent run: {agent_run_id} for thread: {thread_id} (Instance: {instance_id})")
logger.info({
"model_name": model_name,
"enable_thinking": enable_thinking,
"reasoning_effort": reasoning_effort,
"stream": stream,
"enable_context_manager": enable_context_manager,
"agent_config": agent_config,
"is_agent_builder": is_agent_builder,
"target_agent_id": target_agent_id,
})
logger.info(f"🚀 Using model: {model_name} (thinking: {enable_thinking}, reasoning_effort: {reasoning_effort})")
if agent_config:
logger.info(f"Using custom agent: {agent_config.get('name', 'Unknown')}")

View File

@ -135,7 +135,7 @@ async def delete_sandbox(sandbox_id: str):
sandbox = daytona.get(sandbox_id)
# Delete the sandbox
daytona.remove(sandbox)
daytona.delete(sandbox)
logger.info(f"Successfully deleted sandbox {sandbox_id}")
return True

View File

@ -195,8 +195,11 @@ async def calculate_monthly_usage(client, user_id: str) -> float:
if run['completed_at']:
end_time = datetime.fromisoformat(run['completed_at'].replace('Z', '+00:00')).timestamp()
else:
# For running jobs, use current time
end_time = now_ts
# if the start time is more than an hour ago, don't consider that time in total. else use the current time
if start_time < now_ts - 3600:
continue
else:
end_time = now_ts
total_seconds += (end_time - start_time)

View File

@ -129,6 +129,10 @@ def prepare_params(
# "anthropic-beta": "max-tokens-3-5-sonnet-2024-07-15"
"anthropic-beta": "output-128k-2025-02-19"
}
params["fallbacks"] = [{
"model": "openrouter/anthropic/claude-sonnet-4",
"messages": messages,
}]
logger.debug("Added Claude-specific headers")
# Add OpenRouter-specific parameters

View File

@ -3,6 +3,7 @@ from fastapi import HTTPException, Request
from typing import Optional
import jwt
from jwt.exceptions import PyJWTError
from utils.logger import structlog
# This function extracts the user ID from Supabase JWT
async def get_current_user_id_from_jwt(request: Request) -> str:
@ -48,6 +49,9 @@ async def get_current_user_id_from_jwt(request: Request) -> str:
)
sentry.sentry.set_user({ "id": user_id })
structlog.contextvars.bind_contextvars(
user_id=user_id
)
return user_id
except PyJWTError:
@ -121,8 +125,11 @@ async def get_user_id_from_stream_auth(
# For Supabase JWT, we just need to decode and extract the user ID
payload = jwt.decode(token, options={"verify_signature": False})
user_id = payload.get('sub')
sentry.sentry.set_user({ "id": user_id })
if user_id:
sentry.sentry.set_user({ "id": user_id })
structlog.contextvars.bind_contextvars(
user_id=user_id
)
return user_id
except Exception:
pass
@ -213,6 +220,11 @@ async def get_optional_user_id(request: Request) -> Optional[str]:
# Supabase stores the user ID in the 'sub' claim
user_id = payload.get('sub')
if user_id:
sentry.sentry.set_user({ "id": user_id })
structlog.contextvars.bind_contextvars(
user_id=user_id
)
return user_id
except PyJWTError:

View File

@ -1,131 +1,28 @@
"""
Centralized logging configuration for AgentPress.
import structlog, logging, os
This module provides a unified logging interface with:
- Structured JSON logging for better parsing
- Log levels for different environments
- Correlation IDs for request tracing
- Contextual information for debugging
"""
ENV_MODE = os.getenv("ENV_MODE", "LOCAL")
import logging
import json
import sys
import os
from datetime import datetime, timezone
from contextvars import ContextVar
from functools import wraps
import traceback
from logging.handlers import RotatingFileHandler
renderer = [structlog.processors.JSONRenderer()]
if ENV_MODE.lower() == "local".lower():
renderer = [structlog.dev.ConsoleRenderer()]
from utils.config import config, EnvMode
# Context variable for request correlation ID
request_id: ContextVar[str] = ContextVar('request_id', default='')
class JSONFormatter(logging.Formatter):
"""Custom JSON formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
"""Format log record as JSON with contextual information."""
log_data = {
'timestamp': datetime.now(timezone.utc).replace(tzinfo=None).isoformat(),
'level': record.levelname,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
'request_id': request_id.get(),
'thread_id': getattr(record, 'thread_id', None),
'correlation_id': getattr(record, 'correlation_id', None)
}
# Add extra fields if present
if hasattr(record, 'extra'):
log_data.update(record.extra)
# Add exception info if present
if record.exc_info:
log_data['exception'] = {
'type': str(record.exc_info[0].__name__),
'message': str(record.exc_info[1]),
'traceback': traceback.format_exception(*record.exc_info)
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.dict_tracebacks,
structlog.processors.CallsiteParameterAdder(
{
structlog.processors.CallsiteParameter.FILENAME,
structlog.processors.CallsiteParameter.FUNC_NAME,
structlog.processors.CallsiteParameter.LINENO,
}
return json.dumps(log_data)
),
structlog.processors.TimeStamper(fmt="iso"),
structlog.contextvars.merge_contextvars,
*renderer,
],
cache_logger_on_first_use=True,
)
def setup_logger(name: str = 'agentpress') -> logging.Logger:
"""
Set up a centralized logger with both file and console handlers.
Args:
name: The name of the logger
Returns:
logging.Logger: Configured logger instance
"""
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# Create logs directory if it doesn't exist
log_dir = os.path.join(os.getcwd(), 'logs')
try:
if not os.path.exists(log_dir):
os.makedirs(log_dir)
print(f"Created log directory at: {log_dir}")
except Exception as e:
print(f"Error creating log directory: {e}")
return logger
# File handler with rotation
try:
log_file = os.path.join(log_dir, f'{name}_{datetime.now().strftime("%Y%m%d")}.log')
file_handler = RotatingFileHandler(
log_file,
maxBytes=10*1024*1024, # 10MB
backupCount=5,
encoding='utf-8'
)
file_handler.setLevel(logging.DEBUG)
# Create formatters
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s'
)
file_handler.setFormatter(file_formatter)
# Add file handler to logger
logger.addHandler(file_handler)
print(f"Added file handler for: {log_file}")
except Exception as e:
print(f"Error setting up file handler: {e}")
# Console handler - WARNING in production, DEBUG in other environments
try:
console_handler = logging.StreamHandler(sys.stdout)
if config.ENV_MODE == EnvMode.PRODUCTION:
console_handler.setLevel(logging.WARNING)
else:
console_handler.setLevel(logging.DEBUG)
console_formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
# Add console handler to logger
logger.addHandler(console_handler)
logger.info(f"Added console handler with level: {console_handler.level}")
logger.info(f"Log file will be created at: {log_dir}")
except Exception as e:
print(f"Error setting up console handler: {e}")
# # Test logging
# logger.debug("Logger setup complete - DEBUG test")
# logger.info("Logger setup complete - INFO test")
# logger.warning("Logger setup complete - WARNING test")
return logger
# Create default logger instance
logger = setup_logger()
logger: structlog.stdlib.BoundLogger = structlog.get_logger(level=logging.DEBUG)

View File

@ -0,0 +1,388 @@
import asyncio
import argparse
from dotenv import load_dotenv
load_dotenv(".env")
from services.supabase import DBConnection
from daytona_sdk import Sandbox
from sandbox.sandbox import daytona, create_sandbox, delete_sandbox
from utils.logger import logger
db_connection = None
db = None
async def get_db():
global db_connection, db
if db_connection is None or db is None:
db_connection = DBConnection()
db = await db_connection.client
return db
async def get_project(project_id: str):
db = await get_db()
project = (
await db.schema("public")
.from_("projects")
.select("*")
.eq("project_id", project_id)
.maybe_single()
.execute()
)
return project.data
async def get_threads(project_id: str):
db = await get_db()
threads = (
await db.schema("public")
.from_("threads")
.select("*")
.eq("project_id", project_id)
.execute()
)
return threads.data
async def copy_thread(thread_id: str, account_id: str, project_id: str):
db = await get_db()
thread = (
await db.schema("public")
.from_("threads")
.select("*")
.eq("thread_id", thread_id)
.maybe_single()
.execute()
)
if not thread.data:
raise Exception(f"Thread {thread_id} not found")
thread_data = thread.data
new_thread = (
await db.schema("public")
.from_("threads")
.insert(
{
"account_id": account_id,
"project_id": project_id,
"is_public": thread_data["is_public"],
"agent_id": thread_data["agent_id"],
"metadata": thread_data["metadata"] or {},
}
)
.execute()
)
return new_thread.data[0]
async def copy_project(project_id: str, to_user_id: str, sandbox_data: dict):
db = await get_db()
project = await get_project(project_id)
to_user = await get_user(to_user_id)
if not project:
raise Exception(f"Project {project_id} not found")
if not to_user:
raise Exception(f"User {to_user_id} not found")
result = (
await db.schema("public")
.from_("projects")
.insert(
{
"name": project["name"],
"description": project["description"],
"account_id": to_user["id"],
"is_public": project["is_public"],
"sandbox": sandbox_data,
}
)
.execute()
)
return result.data[0]
async def copy_agent_runs(thread_id: str, new_thread_id: str):
db = await get_db()
agent_runs = (
await db.schema("public")
.from_("agent_runs")
.select("*")
.eq("thread_id", thread_id)
.execute()
)
async def copy_single_agent_run(agent_run, new_thread_id, db):
new_agent_run = (
await db.schema("public")
.from_("agent_runs")
.insert(
{
"thread_id": new_thread_id,
"status": agent_run["status"],
"started_at": agent_run["started_at"],
"completed_at": agent_run["completed_at"],
"responses": agent_run["responses"],
"error": agent_run["error"],
}
)
.execute()
)
return new_agent_run.data[0]
tasks = [
copy_single_agent_run(agent_run, new_thread_id, db)
for agent_run in agent_runs.data
]
new_agent_runs = await asyncio.gather(*tasks)
return new_agent_runs
async def copy_messages(thread_id: str, new_thread_id: str):
db = await get_db()
messages_data = []
offset = 0
batch_size = 1000
while True:
batch = (
await db.schema("public")
.from_("messages")
.select("*")
.eq("thread_id", thread_id)
.range(offset, offset + batch_size - 1)
.execute()
)
if not batch.data:
break
messages_data.extend(batch.data)
if len(batch.data) < batch_size:
break
offset += batch_size
async def copy_single_message(message, new_thread_id, db):
new_message = (
await db.schema("public")
.from_("messages")
.insert(
{
"thread_id": new_thread_id,
"type": message["type"],
"is_llm_message": message["is_llm_message"],
"content": message["content"],
"metadata": message["metadata"],
"created_at": message["created_at"],
"updated_at": message["updated_at"],
}
)
.execute()
)
return new_message.data[0]
tasks = []
for message in messages_data:
tasks.append(copy_single_message(message, new_thread_id, db))
# Process tasks in batches to avoid overwhelming the database
batch_size = 100
new_messages = []
for i in range(0, len(tasks), batch_size):
batch_tasks = tasks[i : i + batch_size]
batch_results = await asyncio.gather(*batch_tasks)
new_messages.extend(batch_results)
# Add delay between batches
if i + batch_size < len(tasks):
await asyncio.sleep(1)
return new_messages
async def get_user(user_id: str):
db = await get_db()
user = await db.auth.admin.get_user_by_id(user_id)
return user.user.model_dump()
async def copy_sandbox(sandbox_id: str, password: str, project_id: str) -> Sandbox:
sandbox = daytona.find_one(sandbox_id=sandbox_id)
if not sandbox:
raise Exception(f"Sandbox {sandbox_id} not found")
# TODO: Currently there's no way to create a copy of a sandbox, so we will create a new one
new_sandbox = create_sandbox(password, project_id)
return new_sandbox
async def main():
"""Main function to run the script."""
# Parse command line arguments
parser = argparse.ArgumentParser(description="Create copy of a project")
parser.add_argument(
"--project-id", type=str, help="Project ID to copy", required=True
)
parser.add_argument(
"--new-user-id",
type=str,
default=None,
help="[OPTIONAL] User ID to copy the project to",
required=False,
)
args = parser.parse_args()
# Initialize variables for cleanup
new_sandbox = None
new_project = None
new_threads = []
new_agent_runs = []
new_messages = []
try:
project = await get_project(args.project_id)
if not project:
raise Exception(f"Project {args.project_id} not found")
to_user_id = args.new_user_id or project["account_id"]
to_user = await get_user(to_user_id)
logger.info(
f"Project: {project['project_id']} ({project['name']}) -> User: {to_user['id']} ({to_user['email']})"
)
new_sandbox = await copy_sandbox(
project["sandbox"]["id"], project["sandbox"]["pass"], args.project_id
)
if new_sandbox:
vnc_link = new_sandbox.get_preview_link(6080)
website_link = new_sandbox.get_preview_link(8080)
vnc_url = (
vnc_link.url
if hasattr(vnc_link, "url")
else str(vnc_link).split("url='")[1].split("'")[0]
)
website_url = (
website_link.url
if hasattr(website_link, "url")
else str(website_link).split("url='")[1].split("'")[0]
)
token = None
if hasattr(vnc_link, "token"):
token = vnc_link.token
elif "token='" in str(vnc_link):
token = str(vnc_link).split("token='")[1].split("'")[0]
else:
raise Exception("Failed to create new sandbox")
sandbox_data = {
"id": new_sandbox.id,
"pass": project["sandbox"]["pass"],
"token": token,
"vnc_preview": vnc_url,
"sandbox_url": website_url,
}
logger.info(f"New sandbox: {new_sandbox.id}")
new_project = await copy_project(
project["project_id"], to_user["id"], sandbox_data
)
logger.info(f"New project: {new_project['project_id']} ({new_project['name']})")
threads = await get_threads(project["project_id"])
if threads:
for thread in threads:
new_thread = await copy_thread(
thread["thread_id"], to_user["id"], new_project["project_id"]
)
new_threads.append(new_thread)
logger.info(f"New threads: {len(new_threads)}")
for i in range(len(new_threads)):
runs = await copy_agent_runs(
threads[i]["thread_id"], new_threads[i]["thread_id"]
)
new_agent_runs.extend(runs)
logger.info(f"New agent runs: {len(new_agent_runs)}")
for i in range(len(new_threads)):
messages = await copy_messages(
threads[i]["thread_id"], new_threads[i]["thread_id"]
)
new_messages.extend(messages)
logger.info(f"New messages: {len(new_messages)}")
else:
logger.info("No threads found for this project")
except Exception as e:
db = await get_db()
# Clean up any resources that were created before the error
if new_sandbox:
try:
logger.info(f"Cleaning up sandbox: {new_sandbox.id}")
await delete_sandbox(new_sandbox.id)
except Exception as cleanup_error:
logger.error(
f"Error cleaning up sandbox {new_sandbox.id}: {cleanup_error}"
)
if new_messages:
for message in new_messages:
try:
logger.info(f"Cleaning up message: {message['message_id']}")
await db.table("messages").delete().eq(
"message_id", message["message_id"]
).execute()
except Exception as cleanup_error:
logger.error(
f"Error cleaning up message {message['message_id']}: {cleanup_error}"
)
if new_agent_runs:
for agent_run in new_agent_runs:
try:
logger.info(f"Cleaning up agent run: {agent_run['id']}")
await db.table("agent_runs").delete().eq(
"id", agent_run["id"]
).execute()
except Exception as cleanup_error:
logger.error(
f"Error cleaning up agent run {agent_run['id']}: {cleanup_error}"
)
if new_threads:
for thread in new_threads:
try:
logger.info(f"Cleaning up thread: {thread['thread_id']}")
await db.table("threads").delete().eq(
"thread_id", thread["thread_id"]
).execute()
except Exception as cleanup_error:
logger.error(
f"Error cleaning up thread {thread['thread_id']}: {cleanup_error}"
)
if new_project:
try:
logger.info(f"Cleaning up project: {new_project['project_id']}")
await db.table("projects").delete().eq(
"project_id", new_project["project_id"]
).execute()
except Exception as cleanup_error:
logger.error(
f"Error cleaning up project {new_project['project_id']}: {cleanup_error}"
)
await DBConnection.disconnect()
raise e
finally:
await DBConnection.disconnect()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,400 @@
import asyncio
import argparse
import json
import os
from datetime import datetime
from dotenv import load_dotenv
load_dotenv(".env")
from services.supabase import DBConnection
from daytona_sdk import Sandbox
from sandbox.sandbox import daytona, create_sandbox, delete_sandbox
from utils.logger import logger
db_connection = None
db = None
async def get_db():
global db_connection, db
if db_connection is None or db is None:
db_connection = DBConnection()
db = await db_connection.client
return db
async def get_project(project_id: str):
db = await get_db()
project = (
await db.schema("public")
.from_("projects")
.select("*")
.eq("project_id", project_id)
.maybe_single()
.execute()
)
return project.data
async def get_threads(project_id: str):
db = await get_db()
threads = (
await db.schema("public")
.from_("threads")
.select("*")
.eq("project_id", project_id)
.execute()
)
return threads.data
async def get_agent_runs(thread_id: str):
db = await get_db()
agent_runs = (
await db.schema("public")
.from_("agent_runs")
.select("*")
.eq("thread_id", thread_id)
.execute()
)
return agent_runs.data
async def get_messages(thread_id: str):
db = await get_db()
messages_data = []
offset = 0
batch_size = 1000
while True:
batch = (
await db.schema("public")
.from_("messages")
.select("*")
.eq("thread_id", thread_id)
.range(offset, offset + batch_size - 1)
.execute()
)
if not batch.data:
break
messages_data.extend(batch.data)
if len(batch.data) < batch_size:
break
offset += batch_size
return messages_data
async def get_user(user_id: str):
db = await get_db()
user = await db.auth.admin.get_user_by_id(user_id)
return user.user.model_dump()
async def export_project_to_file(project_id: str, output_file: str):
"""Export all project data to a JSON file."""
try:
logger.info(f"Starting export of project {project_id}")
# Get project data
project = await get_project(project_id)
if not project:
raise Exception(f"Project {project_id} not found")
logger.info(f"Exporting project: {project['name']}")
# Get threads
threads = await get_threads(project_id)
logger.info(f"Found {len(threads)} threads")
# Get agent runs and messages for each thread
threads_data = []
for thread in threads:
thread_data = dict(thread)
# Get agent runs for this thread
agent_runs = await get_agent_runs(thread["thread_id"])
thread_data["agent_runs"] = agent_runs
# Get messages for this thread
messages = await get_messages(thread["thread_id"])
thread_data["messages"] = messages
threads_data.append(thread_data)
logger.info(f"Thread {thread['thread_id']}: {len(agent_runs)} runs, {len(messages)} messages")
# Prepare export data
export_data = {
"export_metadata": {
"export_date": datetime.now().isoformat(),
"project_id": project_id,
"project_name": project["name"]
},
"project": project,
"threads": threads_data
}
# Write to file
os.makedirs(os.path.dirname(output_file), exist_ok=True)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(export_data, f, indent=2, ensure_ascii=False, default=str)
logger.info(f"Project exported successfully to {output_file}")
logger.info(f"Export summary: 1 project, {len(threads_data)} threads")
return export_data
except Exception as e:
logger.error(f"Error exporting project: {e}")
raise e
finally:
await DBConnection.disconnect()
async def import_project_from_file(input_file: str, to_user_id: str = None, create_new_sandbox: bool = True):
"""Import project data from a JSON file and create a new project."""
new_sandbox = None
new_project = None
new_threads = []
new_agent_runs = []
new_messages = []
try:
logger.info(f"Starting import from {input_file}")
# Read data from file
with open(input_file, 'r', encoding='utf-8') as f:
import_data = json.load(f)
project_data = import_data["project"]
threads_data = import_data["threads"]
logger.info(f"Importing project: {project_data['name']}")
logger.info(f"Found {len(threads_data)} threads to import")
# Determine target user
to_user_id = to_user_id or project_data["account_id"]
to_user = await get_user(to_user_id)
logger.info(f"Target user: {to_user['id']} ({to_user['email']})")
# Create new sandbox if requested
if create_new_sandbox:
logger.info("Creating new sandbox...")
new_sandbox = create_sandbox(project_data["sandbox"]["pass"], project_data["project_id"])
if new_sandbox:
vnc_link = new_sandbox.get_preview_link(6080)
website_link = new_sandbox.get_preview_link(8080)
vnc_url = (
vnc_link.url
if hasattr(vnc_link, "url")
else str(vnc_link).split("url='")[1].split("'")[0]
)
website_url = (
website_link.url
if hasattr(website_link, "url")
else str(website_link).split("url='")[1].split("'")[0]
)
token = None
if hasattr(vnc_link, "token"):
token = vnc_link.token
elif "token='" in str(vnc_link):
token = str(vnc_link).split("token='")[1].split("'")[0]
sandbox_data = {
"id": new_sandbox.id,
"pass": project_data["sandbox"]["pass"],
"token": token,
"vnc_preview": vnc_url,
"sandbox_url": website_url,
}
logger.info(f"New sandbox created: {new_sandbox.id}")
else:
raise Exception("Failed to create new sandbox")
else:
# Use existing sandbox data
sandbox_data = project_data["sandbox"]
logger.info("Using existing sandbox data")
# Create new project
db = await get_db()
result = (
await db.schema("public")
.from_("projects")
.insert(
{
"name": project_data["name"],
"description": project_data["description"],
"account_id": to_user["id"],
"is_public": project_data["is_public"],
"sandbox": sandbox_data,
}
)
.execute()
)
new_project = result.data[0]
logger.info(f"New project created: {new_project['project_id']} ({new_project['name']})")
# Import threads
for thread_data in threads_data:
# Create new thread
new_thread = (
await db.schema("public")
.from_("threads")
.insert(
{
"account_id": to_user["id"],
"project_id": new_project["project_id"],
"is_public": thread_data["is_public"],
"agent_id": thread_data["agent_id"],
"metadata": thread_data["metadata"] or {},
}
)
.execute()
)
new_thread = new_thread.data[0]
new_threads.append(new_thread)
# Create agent runs for this thread
for agent_run_data in thread_data.get("agent_runs", []):
new_agent_run = (
await db.schema("public")
.from_("agent_runs")
.insert(
{
"thread_id": new_thread["thread_id"],
"status": agent_run_data["status"],
"started_at": agent_run_data["started_at"],
"completed_at": agent_run_data["completed_at"],
"responses": agent_run_data["responses"],
"error": agent_run_data["error"],
}
)
.execute()
)
new_agent_runs.append(new_agent_run.data[0])
# Create messages for this thread in batches
messages = thread_data.get("messages", [])
batch_size = 100
for i in range(0, len(messages), batch_size):
batch_messages = messages[i:i + batch_size]
message_inserts = []
for message_data in batch_messages:
message_inserts.append({
"thread_id": new_thread["thread_id"],
"type": message_data["type"],
"is_llm_message": message_data["is_llm_message"],
"content": message_data["content"],
"metadata": message_data["metadata"],
"created_at": message_data["created_at"],
"updated_at": message_data["updated_at"],
})
if message_inserts:
batch_result = (
await db.schema("public")
.from_("messages")
.insert(message_inserts)
.execute()
)
new_messages.extend(batch_result.data)
# Add delay between batches
if i + batch_size < len(messages):
await asyncio.sleep(0.5)
logger.info(f"Thread imported: {len(thread_data.get('agent_runs', []))} runs, {len(messages)} messages")
logger.info(f"Import completed successfully!")
logger.info(f"Summary: 1 project, {len(new_threads)} threads, {len(new_agent_runs)} agent runs, {len(new_messages)} messages")
return {
"project": new_project,
"threads": new_threads,
"agent_runs": new_agent_runs,
"messages": new_messages
}
except Exception as e:
logger.error(f"Error importing project: {e}")
# Clean up any resources that were created before the error
db = await get_db()
if new_sandbox:
try:
logger.info(f"Cleaning up sandbox: {new_sandbox.id}")
await delete_sandbox(new_sandbox.id)
except Exception as cleanup_error:
logger.error(f"Error cleaning up sandbox {new_sandbox.id}: {cleanup_error}")
if new_messages:
for message in new_messages:
try:
await db.table("messages").delete().eq("message_id", message["message_id"]).execute()
except Exception as cleanup_error:
logger.error(f"Error cleaning up message {message['message_id']}: {cleanup_error}")
if new_agent_runs:
for agent_run in new_agent_runs:
try:
await db.table("agent_runs").delete().eq("id", agent_run["id"]).execute()
except Exception as cleanup_error:
logger.error(f"Error cleaning up agent run {agent_run['id']}: {cleanup_error}")
if new_threads:
for thread in new_threads:
try:
await db.table("threads").delete().eq("thread_id", thread["thread_id"]).execute()
except Exception as cleanup_error:
logger.error(f"Error cleaning up thread {thread['thread_id']}: {cleanup_error}")
if new_project:
try:
await db.table("projects").delete().eq("project_id", new_project["project_id"]).execute()
except Exception as cleanup_error:
logger.error(f"Error cleaning up project {new_project['project_id']}: {cleanup_error}")
await DBConnection.disconnect()
raise e
finally:
await DBConnection.disconnect()
async def main():
"""Main function to run the script."""
parser = argparse.ArgumentParser(description="Export/Import project data")
parser.add_argument("action", choices=["export", "import"], help="Action to perform")
parser.add_argument("--project-id", type=str, help="Project ID to export (required for export)")
parser.add_argument("--file", type=str, help="File path for export/import", required=True)
parser.add_argument("--user-id", type=str, help="User ID to import project to (optional for import)")
parser.add_argument("--no-sandbox", action="store_true", help="Don't create new sandbox during import")
args = parser.parse_args()
try:
if args.action == "export":
if not args.project_id:
raise Exception("--project-id is required for export")
await export_project_to_file(args.project_id, args.file)
elif args.action == "import":
create_new_sandbox = not args.no_sandbox
await import_project_from_file(args.file, args.user_id, create_new_sandbox)
except Exception as e:
logger.error(f"Script failed: {e}")
raise e
if __name__ == "__main__":
asyncio.run(main())

View File

@ -57,7 +57,7 @@ services:
build:
context: ./backend
dockerfile: Dockerfile
command: python -m dramatiq run_agent_background run_workflow_background
command: python -m dramatiq --skip-logging run_agent_background run_workflow_background
volumes:
- ./backend/.env:/app/.env:ro
env_file:

View File

@ -1359,7 +1359,7 @@ export const siteConfig = {
),
image:
'https://images.unsplash.com/photo-1532153975070-2e9ab71f1b14?ixlib=rb-4.0.3&ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&auto=format&fit=crop&w=2400&q=80',
url: 'https://suna.so/share/bf6a819b-6af5-4ef7-b861-16e5261ceeb0',
url: 'https://suna.so/share/2a147a3a-3778-4624-8285-42474c8c1c9c',
},
{
id: 'speaker-prospecting',
@ -1485,7 +1485,7 @@ export const siteConfig = {
),
image:
'https://images.unsplash.com/photo-1552581234-26160f608093?ixlib=rb-4.0.3&ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&auto=format&fit=crop&w=2400&q=80',
url: 'https://suna.so/share/a01744fc-6b33-434c-9d4e-67d7e820297c',
url: 'https://suna.so/share/c3472df7-adc1-4d5f-9927-4f8f513ec2fe',
},
{
id: 'seo-analysis',
@ -1501,7 +1501,7 @@ export const siteConfig = {
viewBox="0 0 24 24"
fill="none"
xmlns="http://www.w3.org/2000/svg"
>
>
<path
d="M4.75 11.75L10.25 6.25L14.75 10.75L19.25 6.25"
stroke="currentColor"
@ -1534,7 +1534,7 @@ export const siteConfig = {
),
image:
'https://images.unsplash.com/photo-1611974789855-9c2a0a7236a3?ixlib=rb-4.0.3&ixid=MnwxMjA3fDB8MHxwaG90by1wYWdlfHx8fGVufDB8fHx8&auto=format&fit=crop&w=2400&q=80',
url: 'https://suna.so/share/59be8603-3225-4c15-a948-ab976e5912f6',
url: 'https://suna.so/share/cf756e02-fee9-4281-a0e4-76ac850f1ac9',
},
{
id: 'personal-trip',

View File

@ -336,7 +336,7 @@ def collect_llm_api_keys():
for i, model in enumerate(model_aliases['ANTHROPIC'], 1):
print(f"{Colors.CYAN}[{i}] {Colors.GREEN}{model}{Colors.ENDC}")
model_choice = input("Select default model (1-3) or press Enter for claude-3-7-sonnet: ").strip()
model_choice = input("Select default model (1-3) or press Enter for claude-3-7-sonnet ").strip()
if not model_choice or model_choice == '1':
model_info['default_model'] = 'anthropic/claude-3-7-sonnet-latest'
elif model_choice.isdigit() and 1 <= int(model_choice) <= len(model_aliases['ANTHROPIC']):
@ -615,6 +615,10 @@ def setup_supabase():
supabase_url = line.strip().split('=', 1)[1]
break
# Add this check
if not supabase_url:
raise RuntimeError("SUPABASE_URL not found in environment or backend/.env file.")
project_ref = None
if supabase_url:
# Extract project reference from URL (format: https://[project_ref].supabase.co)