This commit is contained in:
marko-kraemer 2025-04-23 05:20:05 +01:00
parent 5a0e9f8a38
commit 6829fef699
3 changed files with 96 additions and 325 deletions

View File

@ -372,192 +372,6 @@ async def start_agent(
return {"agent_run_id": agent_run_id, "status": "running"}
@router.post("/create-and-start")
async def create_project_and_start_agent(
request: Request,
user_id: str = Depends(get_current_user_id)
):
"""
Create a project, thread, and start an agent in a single atomic operation.
This prevents race conditions in sandbox creation.
"""
logger.info(f"Creating project and starting agent for user: {user_id}")
# Parse request body
body = await request.json()
# Extract parameters
project_name = body.get("name", "New Project")
project_description = body.get("description", "")
user_message = body.get("message", "")
model_name = body.get("model_name", "anthropic/claude-3-7-sonnet-latest")
enable_thinking = body.get("enable_thinking", False)
reasoning_effort = body.get("reasoning_effort", "low")
stream = body.get("stream", True)
enable_context_manager = body.get("enable_context_manager", False)
# Connect to the database
client = await db.client
# Check if the user has access to an account
user_accounts = await client.table('account_users').select('account_id').eq('user_id', user_id).execute()
if not user_accounts.data:
raise HTTPException(status_code=403, detail="User has no account access")
# Get the user's personal account (or first account if no personal account)
account_id = None
for acc in user_accounts.data:
account_data = await client.table('accounts').select('personal_account').eq('account_id', acc['account_id']).execute()
if account_data.data and account_data.data[0].get('personal_account'):
account_id = acc['account_id']
break
if not account_id:
# If no personal account, use first one
account_id = user_accounts.data[0]['account_id']
# Check billing status
can_run, message, subscription = await check_billing_status(client, account_id)
if not can_run:
raise HTTPException(status_code=402, detail={
"message": message,
"subscription": subscription
})
try:
# Create the project
project = await client.table('projects').insert({
"name": project_name,
"description": project_description,
"account_id": account_id,
"created_by": user_id,
"created_at": datetime.now(timezone.utc).isoformat()
}).execute()
project_id = project.data[0]['project_id']
logger.info(f"Created new project: {project_id}")
# Create a sandbox for the project (with race condition protection)
sandbox_pass = str(uuid.uuid4())
sandbox = create_sandbox(sandbox_pass)
logger.info(f"Created new sandbox for project {project_id} with preview: {sandbox.get_preview_link(6080)}/vnc_lite.html?password={sandbox_pass}")
sandbox_id = sandbox.id
# Get preview links
vnc_link = sandbox.get_preview_link(6080)
website_link = sandbox.get_preview_link(8080)
# Extract the actual URLs and token from the preview link objects
vnc_url = vnc_link.url if hasattr(vnc_link, 'url') else str(vnc_link).split("url='")[1].split("'")[0]
website_url = website_link.url if hasattr(website_link, 'url') else str(website_link).split("url='")[1].split("'")[0]
# Extract token if available
token = None
if hasattr(vnc_link, 'token'):
token = vnc_link.token
elif "token='" in str(vnc_link):
token = str(vnc_link).split("token='")[1].split("'")[0]
# Immediately update the project with sandbox info to avoid race conditions
await client.table('projects').update({
'sandbox': {
'id': sandbox_id,
'pass': sandbox_pass,
'vnc_preview': vnc_url,
'sandbox_url': website_url,
'token': token
}
}).eq('project_id', project_id).execute()
# Create a thread for the project
thread = await client.table('threads').insert({
"name": project_name,
"project_id": project_id,
"account_id": account_id,
"created_by": user_id,
"created_at": datetime.now(timezone.utc).isoformat()
}).execute()
thread_id = thread.data[0]['thread_id']
logger.info(f"Created new thread: {thread_id} for project: {project_id}")
# Add the user message to the thread
if user_message:
await client.table('messages').insert({
"thread_id": thread_id,
"role": "user",
"content": user_message,
"created_at": datetime.now(timezone.utc).isoformat()
}).execute()
logger.info(f"Added user message to thread: {thread_id}")
# Check if there is already an active agent run for this project
active_run_id = await check_for_active_project_agent_run(client, project_id)
# If there's an active run, stop it first
if active_run_id:
logger.info(f"Stopping existing agent run {active_run_id} before starting new one")
await stop_agent_run(active_run_id)
# Create a new agent run
agent_run = await client.table('agent_runs').insert({
"thread_id": thread_id,
"status": "running",
"started_at": datetime.now(timezone.utc).isoformat()
}).execute()
agent_run_id = agent_run.data[0]['id']
logger.info(f"Created new agent run: {agent_run_id}")
# Initialize in-memory storage for this agent run
active_agent_runs[agent_run_id] = []
# Register this run in Redis with TTL
try:
await redis.set(
f"active_run:{instance_id}:{agent_run_id}",
"running",
ex=redis.REDIS_KEY_TTL
)
except Exception as e:
logger.warning(f"Failed to register agent run in Redis, continuing without Redis tracking: {str(e)}")
# Run the agent in the background
task = asyncio.create_task(
run_agent_background(
agent_run_id=agent_run_id,
thread_id=thread_id,
instance_id=instance_id,
project_id=project_id,
sandbox=sandbox,
model_name=MODEL_NAME_ALIASES.get(model_name, model_name),
enable_thinking=enable_thinking,
reasoning_effort=reasoning_effort,
stream=stream,
enable_context_manager=enable_context_manager
)
)
# Set a callback to clean up when task is done
task.add_done_callback(
lambda _: asyncio.create_task(
_cleanup_agent_run(agent_run_id)
)
)
# Return all the created resources
return {
"project_id": project_id,
"thread_id": thread_id,
"agent_run_id": agent_run_id,
"status": "running"
}
except Exception as e:
logger.error(f"Error in create_project_and_start_agent: {str(e)}", exc_info=True)
# Re-raise as HTTP exception
raise HTTPException(status_code=500, detail=f"Failed to create project and start agent: {str(e)}")
@router.post("/agent-run/{agent_run_id}/stop")
async def stop_agent(agent_run_id: str, user_id: str = Depends(get_current_user_id)):
"""Stop a running agent."""

View File

@ -5,7 +5,7 @@ import { Skeleton } from "@/components/ui/skeleton";
import { useRouter } from 'next/navigation';
import { Menu } from "lucide-react";
import { ChatInput } from '@/components/thread/chat-input';
import { createProject, addUserMessage, startAgent, createThread, createProjectAndStartAgent } from "@/lib/api";
import { createProject, addUserMessage, startAgent, createThread } from "@/lib/api";
import { generateThreadName } from "@/lib/actions/threads";
import { useIsMobile } from "@/hooks/use-mobile";
import { useSidebar } from "@/components/ui/sidebar";
@ -22,7 +22,6 @@ function DashboardContent() {
const [inputValue, setInputValue] = useState("");
const [isSubmitting, setIsSubmitting] = useState(false);
const [autoSubmit, setAutoSubmit] = useState(false);
const [isSubmitDisabled, setIsSubmitDisabled] = useState(false);
const { billingError, handleBillingError, clearBillingError } = useBillingError();
const router = useRouter();
const isMobile = useIsMobile();
@ -31,134 +30,128 @@ function DashboardContent() {
const personalAccount = accounts?.find(account => account.personal_account);
const handleSubmit = async (message: string, options?: { model_name?: string; enable_thinking?: boolean }) => {
if (!message.trim() || isSubmitting || isSubmitDisabled) return;
if (!message.trim() || isSubmitting) return;
setIsSubmitting(true);
setIsSubmitDisabled(true); // Disable submit button immediately
try {
// Generate a name for the project using GPT
const projectName = await generateThreadName(message);
// Use the new unified API endpoint to create project, thread, and start agent in one call
const result = await createProjectAndStartAgent({
// 1. Create a new project with the GPT-generated name
const newAgent = await createProject({
name: projectName,
description: "",
message: message.trim(),
model_name: options?.model_name,
enable_thinking: options?.enable_thinking,
stream: true
});
// If successful, clear the pending prompt
localStorage.removeItem(PENDING_PROMPT_KEY);
// 2. Create a new thread for this project
const thread = await createThread(newAgent.id);
// 3. Add the user message to the thread
await addUserMessage(thread.thread_id, message.trim());
// Navigate to the new agent's thread page
router.push(`/agents/${result.thread_id}`);
} catch (error: any) {
// Check specifically for billing errors (402 Payment Required)
if (error.message?.includes('(402)') || error?.status === 402) {
console.log("Billing error detected:", error);
try {
// 4. Start the agent with the thread ID
const agentRun = await startAgent(thread.thread_id, {
model_name: options?.model_name,
enable_thinking: options?.enable_thinking,
stream: true
});
// Try to extract the error details from the error object
try {
// Try to parse the error.response or the error itself
let errorDetails;
// If successful, clear the pending prompt
localStorage.removeItem(PENDING_PROMPT_KEY);
// 5. Navigate to the new agent's thread page
router.push(`/agents/${thread.thread_id}`);
} catch (error: any) {
// Check specifically for billing errors (402 Payment Required)
if (error.message?.includes('(402)') || error?.status === 402) {
console.log("Billing error detected:", error);
// First attempt: check if error.data exists and has a detail property
if (error.data?.detail) {
errorDetails = error.data.detail;
console.log("Extracted billing error details from error.data.detail:", errorDetails);
}
// Second attempt: check if error.detail exists directly
else if (error.detail) {
errorDetails = error.detail;
console.log("Extracted billing error details from error.detail:", errorDetails);
}
// Third attempt: try to parse the error text if it's JSON
else if (typeof error.text === 'function') {
const text = await error.text();
console.log("Extracted error text:", text);
try {
const parsed = JSON.parse(text);
errorDetails = parsed.detail || parsed;
console.log("Parsed error text as JSON:", errorDetails);
} catch (e) {
// Not JSON, use regex to extract info
console.log("Error text is not valid JSON");
// Try to extract the error details from the error object
try {
// Try to parse the error.response or the error itself
let errorDetails;
// First attempt: check if error.data exists and has a detail property
if (error.data?.detail) {
errorDetails = error.data.detail;
console.log("Extracted billing error details from error.data.detail:", errorDetails);
}
// Second attempt: check if error.detail exists directly
else if (error.detail) {
errorDetails = error.detail;
console.log("Extracted billing error details from error.detail:", errorDetails);
}
}
// If we still don't have details, try to extract from the error message
if (!errorDetails && error.message) {
const match = error.message.match(/Monthly limit of (\d+) minutes reached/);
if (match) {
const minutes = parseInt(match[1]);
errorDetails = {
message: error.message,
subscription: {
price_id: "price_1RGJ9GG6l1KZGqIroxSqgphC", // Free tier by default
plan_name: "Free",
current_usage: minutes / 60, // Convert to hours
limit: minutes / 60 // Convert to hours
}
};
console.log("Extracted billing error details from error message:", errorDetails);
}
}
// Handle the billing error with the details we extracted
if (errorDetails) {
console.log("Handling billing error with extracted details:", errorDetails);
handleBillingError(errorDetails);
} else {
// Fallback with generic billing error
console.log("Using fallback generic billing error");
handleBillingError({
message: "You've reached your monthly usage limit. Please upgrade your plan.",
subscription: {
price_id: "price_1RGJ9GG6l1KZGqIroxSqgphC", // Free tier
plan_name: "Free"
// Third attempt: try to parse the error text if it's JSON
else if (typeof error.text === 'function') {
const text = await error.text();
console.log("Extracted error text:", text);
try {
const parsed = JSON.parse(text);
errorDetails = parsed.detail || parsed;
console.log("Parsed error text as JSON:", errorDetails);
} catch (e) {
// Not JSON, use regex to extract info
console.log("Error text is not valid JSON");
}
}
// If we still don't have details, try to extract from the error message
if (!errorDetails && error.message) {
const match = error.message.match(/Monthly limit of (\d+) minutes reached/);
if (match) {
const minutes = parseInt(match[1]);
errorDetails = {
message: error.message,
subscription: {
price_id: "price_1RGJ9GG6l1KZGqIroxSqgphC", // Free tier by default
plan_name: "Free",
current_usage: minutes / 60, // Convert to hours
limit: minutes / 60 // Convert to hours
}
};
console.log("Extracted billing error details from error message:", errorDetails);
}
}
// Handle the billing error with the details we extracted
if (errorDetails) {
console.log("Handling billing error with extracted details:", errorDetails);
handleBillingError(errorDetails);
} else {
// Fallback with generic billing error
console.log("Using fallback generic billing error");
handleBillingError({
message: "You've reached your monthly usage limit. Please upgrade your plan.",
subscription: {
price_id: "price_1RGJ9GG6l1KZGqIroxSqgphC", // Free tier
plan_name: "Free"
}
});
}
} catch (parseError) {
console.error("Error parsing billing error details:", parseError);
// Fallback with generic error
handleBillingError({
message: "You've reached your monthly usage limit. Please upgrade your plan."
});
}
} catch (parseError) {
console.error("Error parsing billing error details:", parseError);
// Fallback with generic error
handleBillingError({
message: "You've reached your monthly usage limit. Please upgrade your plan."
});
// Don't rethrow - we've handled this error with the billing alert
setIsSubmitting(false);
return; // Exit handleSubmit
}
// Don't rethrow - we've handled this error with the billing alert
setIsSubmitting(false);
setIsSubmitDisabled(false); // Re-enable submit button on error
return; // Exit handleSubmit
// Rethrow any non-billing errors
throw error;
}
// Rethrow any non-billing errors
throw error;
} catch (error) {
console.error("Error creating agent:", error);
setIsSubmitting(false);
setIsSubmitDisabled(false); // Re-enable submit button on error
}
};
// Reset the submit disabled state after 2 seconds to prevent accidental double submission
useEffect(() => {
let timer: NodeJS.Timeout;
if (isSubmitDisabled) {
timer = setTimeout(() => {
setIsSubmitDisabled(false);
}, 2000);
}
return () => {
if (timer) clearTimeout(timer);
};
}, [isSubmitDisabled]);
// Check for pending prompt in localStorage on mount
useEffect(() => {
// Use a small delay to ensure we're fully mounted
@ -257,4 +250,3 @@ export default function DashboardPage() {
</Suspense>
);
}

View File

@ -1039,38 +1039,3 @@ export const getPublicProjects = async (): Promise<Project[]> => {
}
};
export async function createProjectAndStartAgent(data: {
name: string;
description: string;
message: string;
model_name?: string;
enable_thinking?: boolean;
reasoning_effort?: string;
stream?: boolean;
enable_context_manager?: boolean;
}) {
const supabase = createClient();
const { data: { session } } = await supabase.auth.getSession();
if (!session?.access_token) {
throw new Error('No access token available');
}
const response = await fetch(`${API_URL}/agent/create-and-start`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${session.access_token}`,
},
body: JSON.stringify(data),
});
if (!response.ok) {
const error = await response.text();
console.error("Error creating project and starting agent:", error);
throw new Error(`Failed to create project and start agent (${response.status}): ${error}`);
}
return await response.json();
}