suna/backend/scheduling/api.py

573 lines
20 KiB
Python

from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks, Request
from typing import List, Optional
import logging
from datetime import datetime, timezone
import uuid
from .models import (
WorkflowSchedule, ScheduleCreateRequest, ScheduleUpdateRequest,
ScheduleListResponse, ScheduleTemplate, SCHEDULE_TEMPLATES,
ScheduleExecutionLog, CronValidationRequest, CronValidationResponse
)
from .qstash_service import QStashService
from workflows.executor import WorkflowExecutor
from services.supabase import DBConnection
from flags.flags import is_enabled
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1/schedules", tags=["schedules"])
db = DBConnection()
workflow_executor = WorkflowExecutor(db)
def get_qstash_service() -> QStashService:
return QStashService()
def get_workflow_executor() -> WorkflowExecutor:
return workflow_executor
@router.post("/", response_model=WorkflowSchedule)
async def create_schedule(
request: ScheduleCreateRequest,
qstash_service: QStashService = Depends(get_qstash_service)
):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Create a new workflow schedule"""
try:
schedule = await qstash_service.create_schedule(request)
logger.info(f"Created schedule {schedule.id} for workflow {schedule.workflow_id}")
return schedule
except Exception as e:
logger.error(f"Failed to create schedule: {e}")
raise HTTPException(status_code=400, detail=str(e))
@router.get("/", response_model=ScheduleListResponse)
async def list_schedules(
workflow_id: Optional[str] = None,
page: int = 1,
page_size: int = 20,
qstash_service: QStashService = Depends(get_qstash_service)
):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""List workflow schedules with optional filtering"""
try:
if page < 1:
page = 1
if page_size < 1 or page_size > 100:
page_size = 20
all_schedules = await qstash_service.list_schedules(workflow_id)
start_idx = (page - 1) * page_size
end_idx = start_idx + page_size
schedules = all_schedules[start_idx:end_idx]
return ScheduleListResponse(
schedules=schedules,
total=len(all_schedules),
page=page,
page_size=page_size
)
except Exception as e:
logger.error(f"Failed to list schedules: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/templates", response_model=List[ScheduleTemplate])
async def get_schedule_templates():
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Get predefined schedule templates"""
return SCHEDULE_TEMPLATES
@router.get("/{schedule_id}", response_model=WorkflowSchedule)
async def get_schedule(
schedule_id: str,
qstash_service: QStashService = Depends(get_qstash_service)
):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Get a specific schedule by ID"""
try:
schedule = await qstash_service.get_schedule(schedule_id)
if not schedule:
raise HTTPException(status_code=404, detail="Schedule not found")
return schedule
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get schedule {schedule_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.put("/{schedule_id}", response_model=WorkflowSchedule)
async def update_schedule(
schedule_id: str,
request: ScheduleUpdateRequest,
qstash_service: QStashService = Depends(get_qstash_service)
):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Update an existing schedule"""
try:
schedule = await qstash_service.update_schedule(schedule_id, request)
logger.info(f"Updated schedule {schedule_id}")
return schedule
except Exception as e:
logger.error(f"Failed to update schedule {schedule_id}: {e}")
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/{schedule_id}")
async def delete_schedule(
schedule_id: str,
qstash_service: QStashService = Depends(get_qstash_service)
):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Delete a schedule"""
try:
success = await qstash_service.delete_schedule(schedule_id)
if not success:
raise HTTPException(status_code=404, detail="Schedule not found or could not be deleted")
logger.info(f"Deleted schedule {schedule_id}")
return {"message": "Schedule deleted successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete schedule {schedule_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/{schedule_id}/pause")
async def pause_schedule(
schedule_id: str,
qstash_service: QStashService = Depends(get_qstash_service)
):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Pause a schedule"""
try:
success = await qstash_service.pause_schedule(schedule_id)
if not success:
raise HTTPException(status_code=404, detail="Schedule not found or could not be paused")
logger.info(f"Paused schedule {schedule_id}")
return {"message": "Schedule paused successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to pause schedule {schedule_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/{schedule_id}/resume")
async def resume_schedule(
schedule_id: str,
qstash_service: QStashService = Depends(get_qstash_service)
):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Resume a paused schedule"""
try:
success = await qstash_service.resume_schedule(schedule_id)
if not success:
raise HTTPException(status_code=404, detail="Schedule not found or could not be resumed")
logger.info(f"Resumed schedule {schedule_id}")
return {"message": "Schedule resumed successfully"}
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to resume schedule {schedule_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{schedule_id}/logs", response_model=List[ScheduleExecutionLog])
async def get_schedule_logs(
schedule_id: str,
limit: int = 50,
qstash_service: QStashService = Depends(get_qstash_service)
):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Get execution logs for a schedule"""
try:
if limit < 1 or limit > 1000:
limit = 50
logs = await qstash_service.get_schedule_logs(schedule_id, limit)
return logs
except Exception as e:
logger.error(f"Failed to get logs for schedule {schedule_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/trigger/{workflow_id}")
async def trigger_scheduled_workflow(
workflow_id: str,
request: Request,
background_tasks: BackgroundTasks,
workflow_executor: WorkflowExecutor = Depends(get_workflow_executor)
):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Webhook endpoint for QStash to trigger scheduled workflows"""
try:
logger.info(f"Received scheduled trigger for workflow {workflow_id}")
headers = dict(request.headers)
try:
body = await request.json()
except Exception:
body = {}
if not headers.get("x-workflow-schedule"):
logger.warning(f"Received non-schedule trigger for workflow {workflow_id}")
schedule_name = headers.get("x-schedule-name", "Unknown Schedule")
schedule_description = headers.get("x-schedule-description", "")
logger.info(f"Triggering workflow {workflow_id} from schedule '{schedule_name}'")
trigger_data = {
"trigger_type": "SCHEDULE",
"schedule_name": schedule_name,
"schedule_description": schedule_description,
"triggered_at": datetime.utcnow().isoformat(),
"qstash_headers": headers,
"payload": body
}
background_tasks.add_task(
execute_scheduled_workflow,
workflow_executor,
workflow_id,
trigger_data
)
return {
"message": "Workflow scheduled for execution",
"workflow_id": workflow_id,
"trigger_type": "SCHEDULE",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Failed to trigger scheduled workflow {workflow_id}: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/trigger/{workflow_id}")
async def test_scheduled_workflow(workflow_id: str):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Test endpoint for scheduled workflow triggers (for debugging)"""
return {
"message": "Schedule trigger endpoint is working",
"workflow_id": workflow_id,
"timestamp": datetime.utcnow().isoformat()
}
async def execute_scheduled_workflow(
workflow_executor: WorkflowExecutor,
workflow_id: str,
trigger_data: dict
):
"""Execute a workflow triggered by a schedule using background worker"""
try:
logger.info(f"Scheduling background execution for workflow {workflow_id}")
# First, we need to fetch the workflow definition from the database
client = await db.client
result = await client.table('workflows').select('*').eq('id', workflow_id).execute()
if not result.data:
logger.error(f"Workflow {workflow_id} not found in database")
return
# Convert database record to WorkflowDefinition
from workflows.api import _map_db_to_workflow_definition
workflow_data = result.data[0]
workflow = _map_db_to_workflow_definition(workflow_data)
logger.info(f"Loaded workflow: {workflow.name} (ID: {workflow.id})")
# Extract variables from trigger data if any
variables = trigger_data.get('payload', {})
if not isinstance(variables, dict):
variables = {}
# Add trigger metadata to variables
variables.update({
'trigger_type': trigger_data.get('trigger_type', 'SCHEDULE'),
'schedule_name': trigger_data.get('schedule_name', 'Unknown'),
'triggered_at': trigger_data.get('triggered_at')
})
# Create workflow execution record
execution_id = str(uuid.uuid4())
execution_data = {
"id": execution_id,
"workflow_id": workflow_id,
"workflow_version": getattr(workflow, 'version', 1),
"workflow_name": workflow.name,
"execution_context": variables,
"project_id": workflow.project_id,
"account_id": workflow.created_by,
"triggered_by": "SCHEDULE",
"status": "pending",
"started_at": datetime.now(timezone.utc).isoformat()
}
await client.table('workflow_executions').insert(execution_data).execute()
logger.info(f"Created workflow execution record: {execution_id}")
# Generate thread_id for execution
thread_id = str(uuid.uuid4())
# Create thread first (required for agent_runs foreign key)
from workflows.api import _create_workflow_thread_for_api
await _create_workflow_thread_for_api(thread_id, workflow.project_id, workflow, variables)
logger.info(f"Created workflow thread: {thread_id}")
# Create agent run record for frontend streaming compatibility
agent_run = await client.table('agent_runs').insert({
"thread_id": thread_id,
"status": "running",
"started_at": datetime.now(timezone.utc).isoformat()
}).execute()
agent_run_id = agent_run.data[0]['id']
logger.info(f"Created agent run for scheduled workflow: {agent_run_id}")
# Prepare workflow definition for background worker
if hasattr(workflow, 'model_dump'):
workflow_dict = workflow.model_dump(mode='json')
else:
workflow_dict = workflow.dict()
# Handle datetime serialization
if 'created_at' in workflow_dict and workflow_dict['created_at']:
workflow_dict['created_at'] = workflow_dict['created_at'].isoformat()
if 'updated_at' in workflow_dict and workflow_dict['updated_at']:
workflow_dict['updated_at'] = workflow_dict['updated_at'].isoformat()
# Send workflow to background worker
from run_agent_background import run_workflow_background
run_workflow_background.send(
execution_id=execution_id,
workflow_id=workflow_id,
workflow_name=workflow.name,
workflow_definition=workflow_dict,
variables=variables,
triggered_by="SCHEDULE",
project_id=workflow.project_id,
thread_id=thread_id,
agent_run_id=agent_run_id
)
logger.info(f"Scheduled workflow {workflow_id} sent to background worker (execution_id: {execution_id})")
except Exception as e:
logger.error(f"Failed to schedule workflow {workflow_id} for background execution: {e}")
# Don't raise the exception to avoid 500 errors for QStash webhook
# QStash will retry failed webhooks, but we don't want to retry missing workflows
@router.post("/validate/cron", response_model=CronValidationResponse)
async def validate_cron_expression(request: CronValidationRequest):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Validate a cron expression and return next execution times"""
try:
import croniter
from datetime import datetime, timezone
# Validate the cron expression
base_time = datetime.now(timezone.utc)
cron = croniter.croniter(request.cron_expression, base_time)
# Get next 5 execution times
next_executions = []
for _ in range(5):
next_time = cron.get_next(datetime)
next_executions.append(next_time.isoformat())
return CronValidationResponse(
valid=True,
cron_expression=request.cron_expression,
next_executions=next_executions,
description=describe_cron_expression(request.cron_expression)
)
except Exception as e:
return CronValidationResponse(
valid=False,
cron_expression=request.cron_expression,
error=str(e)
)
def describe_cron_expression(cron_expression: str) -> str:
"""Generate a human-readable description of a cron expression"""
try:
parts = cron_expression.split()
if len(parts) != 5:
return "Custom cron expression"
minute, hour, day, month, weekday = parts
descriptions = []
if minute == "*":
descriptions.append("every minute")
elif minute.startswith("*/"):
interval = minute[2:]
descriptions.append(f"every {interval} minutes")
elif minute.isdigit():
descriptions.append(f"at minute {minute}")
if hour == "*":
if "every minute" not in descriptions:
descriptions.append("every hour")
elif hour.startswith("*/"):
interval = hour[2:]
descriptions.append(f"every {interval} hours")
elif hour.isdigit():
descriptions.append(f"at {hour}:00")
if day != "*":
if day.startswith("*/"):
interval = day[2:]
descriptions.append(f"every {interval} days")
elif day.isdigit():
descriptions.append(f"on day {day} of the month")
if weekday != "*":
weekday_names = ["Sunday", "Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday"]
if weekday.isdigit():
day_name = weekday_names[int(weekday)]
descriptions.append(f"on {day_name}")
elif "-" in weekday:
start, end = weekday.split("-")
start_name = weekday_names[int(start)]
end_name = weekday_names[int(end)]
descriptions.append(f"from {start_name} to {end_name}")
if descriptions:
return "Runs " + ", ".join(descriptions)
else:
return "Custom schedule"
except Exception:
return "Custom cron expression"
@router.post("/cleanup/orphaned-schedules")
async def cleanup_orphaned_schedules(
qstash_service: QStashService = Depends(get_qstash_service)
):
if not await is_enabled("workflows"):
raise HTTPException(
status_code=403,
detail="This feature is not available at the moment."
)
"""Clean up QStash schedules that point to deleted workflows"""
try:
logger.info("Starting cleanup of orphaned QStash schedules")
# Get all QStash schedules
all_schedules = await qstash_service.list_schedules()
logger.info(f"Found {len(all_schedules)} total schedules in QStash")
# Get all existing workflow IDs from database
client = await db.client
workflows_result = await client.table('workflows').select('id').execute()
existing_workflow_ids = {w['id'] for w in workflows_result.data}
logger.info(f"Found {len(existing_workflow_ids)} workflows in database")
orphaned_schedules = []
for schedule in all_schedules:
if schedule.workflow_id not in existing_workflow_ids:
orphaned_schedules.append(schedule)
logger.info(f"Found {len(orphaned_schedules)} orphaned schedules")
# Delete orphaned schedules
deleted_count = 0
for schedule in orphaned_schedules:
try:
success = await qstash_service.delete_schedule(schedule.id)
if success:
deleted_count += 1
logger.info(f"Deleted orphaned schedule {schedule.id} for workflow {schedule.workflow_id}")
else:
logger.warning(f"Failed to delete orphaned schedule {schedule.id}")
except Exception as e:
logger.error(f"Error deleting orphaned schedule {schedule.id}: {e}")
return {
"message": "Cleanup completed",
"total_schedules": len(all_schedules),
"orphaned_found": len(orphaned_schedules),
"deleted": deleted_count
}
except Exception as e:
logger.error(f"Failed to cleanup orphaned schedules: {e}")
raise HTTPException(status_code=500, detail=str(e))