diff --git a/backend/pipedream/domain/services.py b/backend/pipedream/domain/services.py
index 08714416..12e0c57b 100644
--- a/backend/pipedream/domain/services.py
+++ b/backend/pipedream/domain/services.py
@@ -12,7 +12,7 @@ class ExternalUserIdGeneratorService(ABC):
class MCPQualifiedNameService(ABC):
-
+
@abstractmethod
def generate(self, app_slug: AppSlug) -> str:
pass
diff --git a/backend/triggers/api.py b/backend/triggers/api.py
index ce98033d..f1a94eae 100644
--- a/backend/triggers/api.py
+++ b/backend/triggers/api.py
@@ -3,7 +3,9 @@ from fastapi.responses import JSONResponse
from typing import List, Optional, Dict, Any
from pydantic import BaseModel
import os
-from datetime import datetime
+from datetime import datetime, timezone
+import croniter
+import pytz
from .support.factory import TriggerModuleFactory
from .support.exceptions import TriggerError, ConfigurationError, ProviderError
@@ -53,6 +55,7 @@ class TriggerResponse(BaseModel):
webhook_url: Optional[str]
created_at: str
updated_at: str
+ config: Dict[str, Any]
class ProviderResponse(BaseModel):
@@ -65,10 +68,29 @@ class ProviderResponse(BaseModel):
config_schema: Dict[str, Any]
+class UpcomingRun(BaseModel):
+ trigger_id: str
+ trigger_name: str
+ trigger_type: str
+ next_run_time: str
+ next_run_time_local: str
+ timezone: str
+ cron_expression: str
+ execution_type: str
+ agent_prompt: Optional[str] = None
+ workflow_id: Optional[str] = None
+ is_active: bool
+ human_readable: str
+
+
+class UpcomingRunsResponse(BaseModel):
+ upcoming_runs: List[UpcomingRun]
+ total_count: int
+
+
def initialize(database: DBConnection):
global db, trigger_service, execution_service, provider_service
db = database
- # Initialize workflows API with DB connection
set_workflows_db_connection(database)
@@ -170,7 +192,8 @@ async def get_agent_triggers(
is_active=trigger.is_active,
webhook_url=webhook_url,
created_at=trigger.metadata.created_at.isoformat(),
- updated_at=trigger.metadata.updated_at.isoformat()
+ updated_at=trigger.metadata.updated_at.isoformat(),
+ config=trigger.config.config
))
return responses
@@ -179,6 +202,159 @@ async def get_agent_triggers(
raise HTTPException(status_code=500, detail="Internal server error")
+@router.get("/agents/{agent_id}/upcoming-runs", response_model=UpcomingRunsResponse)
+async def get_agent_upcoming_runs(
+ agent_id: str,
+ limit: int = Query(10, ge=1, le=50),
+ user_id: str = Depends(get_current_user_id_from_jwt)
+):
+ if not await is_enabled("agent_triggers"):
+ raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
+
+ await verify_agent_access(agent_id, user_id)
+
+ try:
+ trigger_svc, _, _ = await get_services()
+ triggers = await trigger_svc.get_agent_triggers(agent_id)
+ schedule_triggers = [
+ trigger for trigger in triggers
+ if trigger.is_active and trigger.trigger_type.value == "schedule"
+ ]
+
+ upcoming_runs = []
+ now = datetime.now(timezone.utc)
+ for trigger in schedule_triggers:
+ config = trigger.config.config
+ cron_expression = config.get('cron_expression')
+ user_timezone = config.get('timezone', 'UTC')
+
+ if not cron_expression:
+ continue
+
+ try:
+ next_run = _get_next_run_time(cron_expression, user_timezone)
+ if not next_run:
+ continue
+
+ local_tz = pytz.timezone(user_timezone)
+ next_run_local = next_run.astimezone(local_tz)
+
+ human_readable = _get_human_readable_schedule(cron_expression, user_timezone)
+
+ upcoming_runs.append(UpcomingRun(
+ trigger_id=trigger.trigger_id,
+ trigger_name=trigger.config.name,
+ trigger_type=trigger.trigger_type.value,
+ next_run_time=next_run.isoformat(),
+ next_run_time_local=next_run_local.isoformat(),
+ timezone=user_timezone,
+ cron_expression=cron_expression,
+ execution_type=config.get('execution_type', 'agent'),
+ agent_prompt=config.get('agent_prompt'),
+ workflow_id=config.get('workflow_id'),
+ is_active=trigger.is_active,
+ human_readable=human_readable
+ ))
+
+ except Exception as e:
+ logger.warning(f"Error calculating next run for trigger {trigger.trigger_id}: {e}")
+ continue
+
+ upcoming_runs.sort(key=lambda x: x.next_run_time)
+ upcoming_runs = upcoming_runs[:limit]
+
+ return UpcomingRunsResponse(
+ upcoming_runs=upcoming_runs,
+ total_count=len(upcoming_runs)
+ )
+
+ except Exception as e:
+ logger.error(f"Error getting upcoming runs: {e}")
+ raise HTTPException(status_code=500, detail="Internal server error")
+
+
+def _get_next_run_time(cron_expression: str, user_timezone: str) -> Optional[datetime]:
+ try:
+ tz = pytz.timezone(user_timezone)
+ now_local = datetime.now(tz)
+
+ cron = croniter.croniter(cron_expression, now_local)
+
+ next_run_local = cron.get_next(datetime)
+ next_run_utc = next_run_local.astimezone(timezone.utc)
+
+ return next_run_utc
+
+ except Exception as e:
+ logger.error(f"Error calculating next run time: {e}")
+ return None
+
+
+def _get_human_readable_schedule(cron_expression: str, user_timezone: str) -> str:
+ try:
+ patterns = {
+ '*/5 * * * *': 'Every 5 minutes',
+ '*/10 * * * *': 'Every 10 minutes',
+ '*/15 * * * *': 'Every 15 minutes',
+ '*/30 * * * *': 'Every 30 minutes',
+ '0 * * * *': 'Every hour',
+ '0 */2 * * *': 'Every 2 hours',
+ '0 */4 * * *': 'Every 4 hours',
+ '0 */6 * * *': 'Every 6 hours',
+ '0 */12 * * *': 'Every 12 hours',
+ '0 0 * * *': 'Daily at midnight',
+ '0 9 * * *': 'Daily at 9:00 AM',
+ '0 12 * * *': 'Daily at 12:00 PM',
+ '0 18 * * *': 'Daily at 6:00 PM',
+ '0 9 * * 1-5': 'Weekdays at 9:00 AM',
+ '0 9 * * 1': 'Every Monday at 9:00 AM',
+ '0 9 * * 2': 'Every Tuesday at 9:00 AM',
+ '0 9 * * 3': 'Every Wednesday at 9:00 AM',
+ '0 9 * * 4': 'Every Thursday at 9:00 AM',
+ '0 9 * * 5': 'Every Friday at 9:00 AM',
+ '0 9 * * 6': 'Every Saturday at 9:00 AM',
+ '0 9 * * 0': 'Every Sunday at 9:00 AM',
+ '0 9 1 * *': 'Monthly on the 1st at 9:00 AM',
+ '0 9 15 * *': 'Monthly on the 15th at 9:00 AM',
+ '0 9,17 * * *': 'Daily at 9:00 AM and 5:00 PM',
+ '0 10 * * 0,6': 'Weekends at 10:00 AM',
+ }
+
+ if cron_expression in patterns:
+ description = patterns[cron_expression]
+ if user_timezone != 'UTC':
+ description += f" ({user_timezone})"
+ return description
+
+ parts = cron_expression.split()
+ if len(parts) != 5:
+ return f"Custom schedule: {cron_expression}"
+
+ minute, hour, day, month, weekday = parts
+
+ if minute.isdigit() and hour == '*' and day == '*' and month == '*' and weekday == '*':
+ return f"Every hour at :{minute.zfill(2)}"
+
+ if minute.isdigit() and hour.isdigit() and day == '*' and month == '*' and weekday == '*':
+ time_str = f"{hour.zfill(2)}:{minute.zfill(2)}"
+ description = f"Daily at {time_str}"
+ if user_timezone != 'UTC':
+ description += f" ({user_timezone})"
+ return description
+
+ if minute.isdigit() and hour.isdigit() and day == '*' and month == '*' and weekday == '1-5':
+ time_str = f"{hour.zfill(2)}:{minute.zfill(2)}"
+ description = f"Weekdays at {time_str}"
+ if user_timezone != 'UTC':
+ description += f" ({user_timezone})"
+ return description
+
+ return f"Custom schedule: {cron_expression}"
+
+ except Exception:
+ return f"Custom schedule: {cron_expression}"
+
+
@router.post("/agents/{agent_id}/triggers", response_model=TriggerResponse)
async def create_agent_trigger(
agent_id: str,
@@ -217,7 +393,8 @@ async def create_agent_trigger(
is_active=trigger.is_active,
webhook_url=webhook_url,
created_at=trigger.metadata.created_at.isoformat(),
- updated_at=trigger.metadata.updated_at.isoformat()
+ updated_at=trigger.metadata.updated_at.isoformat(),
+ config=trigger.config.config
)
except (ValueError, ConfigurationError, ProviderError) as e:
@@ -261,7 +438,8 @@ async def get_trigger(
is_active=trigger.is_active,
webhook_url=webhook_url,
created_at=trigger.metadata.created_at.isoformat(),
- updated_at=trigger.metadata.updated_at.isoformat()
+ updated_at=trigger.metadata.updated_at.isoformat(),
+ config=trigger.config.config
)
except Exception as e:
logger.error(f"Error getting trigger: {e}")
@@ -311,7 +489,8 @@ async def update_trigger(
is_active=updated_trigger.is_active,
webhook_url=webhook_url,
created_at=updated_trigger.metadata.created_at.isoformat(),
- updated_at=updated_trigger.metadata.updated_at.isoformat()
+ updated_at=updated_trigger.metadata.updated_at.isoformat(),
+ config=updated_trigger.config.config
)
except (ValueError, ConfigurationError, ProviderError) as e:
@@ -331,7 +510,6 @@ async def delete_trigger(
try:
trigger_svc, _, _ = await get_services()
-
trigger = await trigger_svc.get_trigger(trigger_id)
if not trigger:
raise HTTPException(status_code=404, detail="Trigger not found")
@@ -339,7 +517,6 @@ async def delete_trigger(
await verify_agent_access(trigger.agent_id, user_id)
success = await trigger_svc.delete_trigger(trigger_id)
-
if not success:
raise HTTPException(status_code=404, detail="Trigger not found")
@@ -360,7 +537,6 @@ async def trigger_webhook(
try:
trigger_svc, execution_svc, _ = await get_services()
-
try:
raw_data = await request.json()
except:
@@ -368,9 +544,6 @@ async def trigger_webhook(
result = await trigger_svc.process_trigger_event(trigger_id, raw_data)
- logger.info(f"Webhook trigger result: success={result.success}, should_execute_agent={result.should_execute_agent}, should_execute_workflow={result.should_execute_workflow}")
- logger.info(f"Trigger result details: workflow_id={result.workflow_id}, agent_prompt={result.agent_prompt}")
-
if not result.success:
return JSONResponse(
status_code=400,
@@ -427,96 +600,3 @@ async def trigger_webhook(
status_code=500,
content={"success": False, "error": "Internal server error"}
)
-
-
-@router.post("/qstash/webhook")
-async def qstash_webhook(request: Request):
- if not await is_enabled("agent_triggers"):
- raise HTTPException(status_code=403, detail="Agent triggers are not enabled")
-
- try:
- headers = dict(request.headers)
- logger.info(f"QStash webhook received with headers: {headers}")
-
- try:
- raw_data = await request.json()
- except:
- raw_data = {}
-
- logger.info(f"QStash webhook payload: {raw_data}")
- trigger_id = raw_data.get('trigger_id')
- if not trigger_id:
- logger.error("No trigger_id found in QStash webhook payload")
- return JSONResponse(
- status_code=400,
- content={"error": "trigger_id is required in webhook payload"}
- )
-
- raw_data.update({
- "webhook_source": "qstash",
- "webhook_headers": headers,
- "webhook_timestamp": datetime.now(timezone.utc).isoformat()
- })
-
- trigger_svc, execution_svc, _ = await get_services()
-
- result = await trigger_svc.process_trigger_event(trigger_id, raw_data)
-
- logger.info(f"QStash trigger result: success={result.success}, should_execute_agent={result.should_execute_agent}, should_execute_workflow={result.should_execute_workflow}")
-
- if not result.success:
- return JSONResponse(
- status_code=400,
- content={"success": False, "error": result.error_message}
- )
-
- if result.should_execute_agent or result.should_execute_workflow:
- trigger = await trigger_svc.get_trigger(trigger_id)
- if trigger:
- logger.info(f"Executing QStash trigger for agent {trigger.agent_id}")
-
- from .domain.entities import TriggerEvent
- event = TriggerEvent(
- trigger_id=trigger_id,
- agent_id=trigger.agent_id,
- trigger_type=trigger.trigger_type,
- raw_data=raw_data
- )
-
- execution_result = await execution_svc.execute_trigger_result(
- agent_id=trigger.agent_id,
- trigger_result=result,
- trigger_event=event
- )
-
- logger.info(f"QStash execution result: {execution_result}")
-
- return JSONResponse(content={
- "success": True,
- "message": "QStash trigger processed and execution started",
- "execution": execution_result,
- "trigger_id": trigger_id,
- "agent_id": trigger.agent_id
- })
- else:
- logger.warning(f"QStash trigger {trigger_id} not found for execution")
- return JSONResponse(
- status_code=404,
- content={"error": f"Trigger {trigger_id} not found"}
- )
-
- logger.info(f"QStash webhook processed but no execution needed")
- return JSONResponse(content={
- "success": True,
- "message": "QStash trigger processed successfully (no execution needed)",
- "trigger_id": trigger_id
- })
-
- except Exception as e:
- logger.error(f"Error processing QStash webhook: {e}")
- import traceback
- logger.error(f"QStash webhook error traceback: {traceback.format_exc()}")
- return JSONResponse(
- status_code=500,
- content={"error": "Internal server error"}
- )
diff --git a/backend/triggers/core.py b/backend/triggers/core.py
index 4fa42d36..b3dd7036 100644
--- a/backend/triggers/core.py
+++ b/backend/triggers/core.py
@@ -208,7 +208,7 @@ class TriggerManager:
name="Schedule",
description="Schedule agent or workflow execution using Cloudflare Workers and cron expressions",
trigger_type="schedule",
- provider_class="triggers.providers.schedule_provider.ScheduleTriggerProvider",
+ provider_class="triggers.infrastructure.providers.schedule_provider.ScheduleTriggerProvider",
webhook_enabled=True,
config_schema={
"type": "object",
diff --git a/backend/triggers/infrastructure/providers/schedule_provider.py b/backend/triggers/infrastructure/providers/schedule_provider.py
index 50616bb1..d0a2add0 100644
--- a/backend/triggers/infrastructure/providers/schedule_provider.py
+++ b/backend/triggers/infrastructure/providers/schedule_provider.py
@@ -3,6 +3,7 @@ import json
import os
from datetime import datetime, timezone
from typing import Dict, Any, Optional
+import pytz
from qstash.client import QStash
from utils.logger import logger
from ...domain.entities import TriggerProvider, TriggerEvent, TriggerResult, Trigger
@@ -41,6 +42,14 @@ class ScheduleTriggerProvider(TriggerProvider):
if 'workflow_id' not in config:
raise ValueError("workflow_id is required for workflow execution")
+ # Validate timezone if provided
+ user_timezone = config.get('timezone', 'UTC')
+ if user_timezone != 'UTC':
+ try:
+ pytz.timezone(user_timezone)
+ except pytz.UnknownTimeZoneError:
+ raise ValueError(f"Invalid timezone: {user_timezone}")
+
try:
import croniter
croniter.croniter(config['cron_expression'])
@@ -51,6 +60,64 @@ class ScheduleTriggerProvider(TriggerProvider):
return config
+ def _convert_cron_to_utc(self, cron_expression: str, user_timezone: str) -> str:
+ try:
+ import croniter
+ parts = cron_expression.split()
+ if len(parts) != 5:
+ logger.warning(f"Invalid cron expression format: {cron_expression}")
+ return cron_expression
+
+ minute, hour, day, month, weekday = parts
+
+ if minute.startswith('*/') and hour == '*':
+ return cron_expression
+
+ if hour == '*' or minute == '*':
+ return cron_expression
+
+ try:
+ user_tz = pytz.timezone(user_timezone)
+ utc_tz = pytz.UTC
+ from datetime import datetime as dt
+ now = dt.now(user_tz)
+
+ if hour.isdigit() and minute.isdigit():
+ user_time = user_tz.localize(dt(now.year, now.month, now.day, int(hour), int(minute)))
+ utc_time = user_time.astimezone(utc_tz)
+ utc_minute = str(utc_time.minute)
+ utc_hour = str(utc_time.hour)
+
+ return f"{utc_minute} {utc_hour} {day} {month} {weekday}"
+
+ elif ',' in hour and minute.isdigit():
+ hours = hour.split(',')
+ utc_hours = []
+ for h in hours:
+ if h.isdigit():
+ user_time = user_tz.localize(dt(now.year, now.month, now.day, int(h), int(minute)))
+ utc_time = user_time.astimezone(utc_tz)
+ utc_hours.append(str(utc_time.hour))
+
+ if utc_hours:
+ utc_minute = str(utc_time.minute)
+ return f"{utc_minute} {','.join(utc_hours)} {day} {month} {weekday}"
+
+ elif '-' in hour and minute.isdigit():
+ pass
+
+ except Exception as e:
+ logger.warning(f"Failed to convert timezone for cron expression {cron_expression}: {e}")
+
+ return cron_expression
+
+ except ImportError:
+ logger.warning("croniter not available for cron expression validation")
+ return cron_expression
+ except Exception as e:
+ logger.error(f"Error converting cron expression to UTC: {e}")
+ return cron_expression
+
async def setup_trigger(self, trigger: Trigger) -> bool:
if not self._qstash:
logger.error("QStash client not available")
@@ -58,31 +125,45 @@ class ScheduleTriggerProvider(TriggerProvider):
try:
webhook_url = f"{self._webhook_base_url}/api/triggers/{trigger.trigger_id}/webhook"
-
cron_expression = trigger.config.config['cron_expression']
+ execution_type = trigger.config.config.get('execution_type', 'agent')
+ user_timezone = trigger.config.config.get('timezone', 'UTC')
+
+ if user_timezone != 'UTC':
+ cron_expression = self._convert_cron_to_utc(cron_expression, user_timezone)
+ logger.info(f"Converted cron expression from {user_timezone} to UTC: {trigger.config.config['cron_expression']} -> {cron_expression}")
payload = {
"trigger_id": trigger.trigger_id,
"agent_id": trigger.agent_id,
- "execution_type": trigger.config.config.get('execution_type', 'agent'),
+ "execution_type": execution_type,
"agent_prompt": trigger.config.config.get('agent_prompt'),
"workflow_id": trigger.config.config.get('workflow_id'),
"workflow_input": trigger.config.config.get('workflow_input', {}),
"timestamp": datetime.now(timezone.utc).isoformat()
}
+ headers = {
+ "Content-Type": "application/json",
+ "X-Trigger-Source": "schedule"
+ }
+
+ if config.ENV_MODE == EnvMode.STAGING:
+ vercel_bypass_key = os.getenv("VERCEL_PROTECTION_BYPASS_KEY", "")
+ if vercel_bypass_key:
+ headers["X-Vercel-Protection-Bypass"] = vercel_bypass_key
+
schedule_id = await asyncio.to_thread(
self._qstash.schedule.create,
destination=webhook_url,
cron=cron_expression,
body=json.dumps(payload),
- headers={
- "Content-Type": "application/json",
- "X-Trigger-Source": "schedule"
- }
+ headers=headers,
+ retries=3,
+ delay="5s"
)
-
- logger.info(f"Created QStash schedule {schedule_id} for trigger {trigger.trigger_id}")
+ trigger.config.config['qstash_schedule_id'] = schedule_id
+ logger.info(f"Created QStash schedule {schedule_id} for trigger {trigger.trigger_id} with cron: {cron_expression}")
return True
except Exception as e:
@@ -91,9 +172,20 @@ class ScheduleTriggerProvider(TriggerProvider):
async def teardown_trigger(self, trigger: Trigger) -> bool:
if not self._qstash:
+ logger.warning("QStash client not available, skipping teardown")
return True
try:
+ schedule_id = trigger.config.config.get('qstash_schedule_id')
+ if schedule_id:
+ try:
+ await asyncio.to_thread(self._qstash.schedule.delete, schedule_id)
+ logger.info(f"Deleted QStash schedule {schedule_id} for trigger {trigger.trigger_id}")
+ return True
+ except Exception as e:
+ logger.warning(f"Failed to delete QStash schedule {schedule_id} by ID: {e}")
+
+ logger.info(f"Attempting to find and delete QStash schedule for trigger {trigger.trigger_id} by webhook URL")
schedules = await asyncio.to_thread(self._qstash.schedule.list)
webhook_url = f"{self._webhook_base_url}/api/triggers/{trigger.trigger_id}/webhook"
@@ -102,8 +194,9 @@ class ScheduleTriggerProvider(TriggerProvider):
if schedule.get('destination') == webhook_url:
await asyncio.to_thread(self._qstash.schedule.delete, schedule['scheduleId'])
logger.info(f"Deleted QStash schedule {schedule['scheduleId']} for trigger {trigger.trigger_id}")
- break
+ return True
+ logger.warning(f"No QStash schedule found for trigger {trigger.trigger_id}")
return True
except Exception as e:
@@ -156,16 +249,30 @@ class ScheduleTriggerProvider(TriggerProvider):
async def health_check(self, trigger: Trigger) -> bool:
if not self._qstash:
+ logger.warning("QStash client not available for health check")
return False
try:
+ schedule_id = trigger.config.config.get('qstash_schedule_id')
+ if schedule_id:
+ try:
+ schedule = await asyncio.to_thread(self._qstash.schedule.get, schedule_id)
+ is_healthy = schedule is not None
+ logger.info(f"Health check for trigger {trigger.trigger_id} using schedule ID {schedule_id}: {'healthy' if is_healthy else 'unhealthy'}")
+ return is_healthy
+ except Exception as e:
+ logger.warning(f"Failed to check health for QStash schedule {schedule_id} by ID: {e}")
+
+ logger.info(f"Attempting health check for trigger {trigger.trigger_id} by webhook URL")
webhook_url = f"{self._webhook_base_url}/api/triggers/{trigger.trigger_id}/webhook"
schedules = await asyncio.to_thread(self._qstash.schedule.list)
for schedule in schedules:
if schedule.get('destination') == webhook_url:
+ logger.info(f"Health check for trigger {trigger.trigger_id}: healthy (found schedule)")
return True
+ logger.warning(f"Health check for trigger {trigger.trigger_id}: no schedule found")
return False
except Exception as e:
@@ -179,20 +286,43 @@ class ScheduleTriggerProvider(TriggerProvider):
return await self.setup_trigger(trigger)
async def update_trigger(self, trigger: Trigger) -> bool:
- await self.teardown_trigger(trigger)
- if trigger.is_active:
- return await self.setup_trigger(trigger)
- return True
+ if not self._qstash:
+ logger.warning("QStash client not available for trigger update")
+ return True
+
+ try:
+ logger.info(f"Updating QStash schedule for trigger {trigger.trigger_id}")
+ teardown_success = await self.teardown_trigger(trigger)
+ if not teardown_success:
+ logger.warning(f"Failed to teardown existing schedule for trigger {trigger.trigger_id}, proceeding with setup")
+
+ if trigger.is_active:
+ setup_success = await self.setup_trigger(trigger)
+ if setup_success:
+ logger.info(f"Successfully updated QStash schedule for trigger {trigger.trigger_id}")
+ else:
+ logger.error(f"Failed to setup updated schedule for trigger {trigger.trigger_id}")
+ return setup_success
+ else:
+ logger.info(f"Trigger {trigger.trigger_id} is inactive, skipping schedule setup")
+ return True
+
+ except Exception as e:
+ logger.error(f"Error updating QStash schedule for trigger {trigger.trigger_id}: {e}")
+ return False
def get_webhook_url(self, trigger_id: str, base_url: str) -> Optional[str]:
return f"{base_url}/api/triggers/{trigger_id}/webhook"
async def list_schedules(self) -> list:
if not self._qstash:
+ logger.warning("QStash client not available for listing schedules")
return []
try:
- return await asyncio.to_thread(self._qstash.schedule.list)
+ schedules = await asyncio.to_thread(self._qstash.schedule.list)
+ logger.info(f"Successfully retrieved {len(schedules)} schedules from QStash")
+ return schedules
except Exception as e:
logger.error(f"Failed to list QStash schedules: {e}")
return []
\ No newline at end of file
diff --git a/backend/triggers/providers/__init__.py b/backend/triggers/providers/__init__.py
deleted file mode 100644
index e64e0a04..00000000
--- a/backend/triggers/providers/__init__.py
+++ /dev/null
@@ -1,5 +0,0 @@
-from .schedule_provider import ScheduleTriggerProvider
-
-__all__ = [
- 'ScheduleTriggerProvider'
-]
\ No newline at end of file
diff --git a/backend/triggers/providers/schedule_provider.py b/backend/triggers/providers/schedule_provider.py
deleted file mode 100644
index 3e9bbbf6..00000000
--- a/backend/triggers/providers/schedule_provider.py
+++ /dev/null
@@ -1,294 +0,0 @@
-import asyncio
-import json
-import os
-from datetime import datetime, timezone
-from typing import Dict, Any, Optional
-from qstash.client import QStash
-from utils.logger import logger
-from ..core import TriggerProvider, TriggerType, TriggerEvent, TriggerResult, TriggerConfig, ProviderDefinition
-from utils.config import config, EnvMode
-
-class ScheduleTriggerProvider(TriggerProvider):
- def __init__(self, provider_definition: Optional[ProviderDefinition] = None):
- super().__init__(TriggerType.SCHEDULE, provider_definition)
-
- self.qstash_token = os.getenv("QSTASH_TOKEN")
- self.webhook_base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:3000")
-
- if not self.qstash_token:
- logger.warning("QSTASH_TOKEN not found. QStash provider will not work without it.")
- self.qstash = None
- else:
- self.qstash = QStash(token=self.qstash_token)
-
- async def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]:
- if not self.qstash:
- raise ValueError("QSTASH_TOKEN environment variable is required for QStash scheduling")
-
- if 'cron_expression' not in config:
- raise ValueError("cron_expression is required for QStash schedule triggers")
-
- execution_type = config.get('execution_type', 'agent')
- if execution_type not in ['agent', 'workflow']:
- raise ValueError("execution_type must be either 'agent' or 'workflow'")
-
- if execution_type == 'agent':
- if 'agent_prompt' not in config:
- raise ValueError("agent_prompt is required for agent execution")
- elif execution_type == 'workflow':
- if 'workflow_id' not in config:
- raise ValueError("workflow_id is required for workflow execution")
-
- try:
- import croniter
- croniter.croniter(config['cron_expression'])
- except ImportError:
- raise ValueError("croniter package is required for cron expressions. Please install it with: pip install croniter")
- except Exception as e:
- raise ValueError(f"Invalid cron expression: {str(e)}")
-
- return config
-
- async def setup_trigger(self, trigger_config: TriggerConfig) -> bool:
- if config.ENV_MODE == EnvMode.STAGING:
- vercel_bypass_key = os.getenv("VERCEL_PROTECTION_BYPASS_KEY", "")
- else:
- vercel_bypass_key = ""
- try:
- webhook_url = f"{self.webhook_base_url}/api/triggers/qstash/webhook"
- execution_type = trigger_config.config.get('execution_type', 'agent')
-
- webhook_payload = {
- "trigger_id": trigger_config.trigger_id,
- "agent_id": trigger_config.agent_id,
- "execution_type": execution_type,
- "schedule_name": trigger_config.name,
- "cron_expression": trigger_config.config['cron_expression'],
- "event_type": "scheduled",
- "provider": "qstash"
- }
-
- if execution_type == 'agent':
- webhook_payload["agent_prompt"] = trigger_config.config['agent_prompt']
- elif execution_type == 'workflow':
- webhook_payload["workflow_id"] = trigger_config.config['workflow_id']
- webhook_payload["workflow_input"] = trigger_config.config.get('workflow_input', {})
- schedule_id = await asyncio.to_thread(
- self.qstash.schedule.create,
- destination=webhook_url,
- cron=trigger_config.config['cron_expression'],
- body=json.dumps(webhook_payload),
- headers={
- "Content-Type": "application/json",
- "X-Schedule-Provider": "qstash",
- "X-Trigger-ID": trigger_config.trigger_id,
- "X-Agent-ID": trigger_config.agent_id,
- "X-Vercel-Protection-Bypass": vercel_bypass_key
- },
- retries=3,
- delay="5s"
- )
- trigger_config.config['qstash_schedule_id'] = schedule_id
- logger.info(f"Successfully created QStash schedule {schedule_id} for trigger {trigger_config.trigger_id}")
- return True
-
- except Exception as e:
- logger.error(f"Error setting up QStash scheduled trigger {trigger_config.trigger_id}: {e}")
- return False
-
- async def teardown_trigger(self, trigger_config: TriggerConfig) -> bool:
- try:
- schedule_id = trigger_config.config.get('qstash_schedule_id')
- if not schedule_id:
- logger.warning(f"No QStash schedule ID found for trigger {trigger_config.trigger_id}")
- return True
- await asyncio.to_thread(
- self.qstash.schedule.delete,
- schedule_id=schedule_id
- )
- logger.info(f"Successfully deleted QStash schedule {schedule_id}")
- return True
-
- except Exception as e:
- logger.error(f"Error removing QStash scheduled trigger {trigger_config.trigger_id}: {e}")
- return False
-
- async def process_event(self, event: TriggerEvent) -> TriggerResult:
- try:
- raw_data = event.raw_data
- execution_type = raw_data.get('execution_type', 'agent')
-
- execution_variables = {
- 'scheduled_at': event.timestamp.isoformat(),
- 'trigger_id': event.trigger_id,
- 'agent_id': event.agent_id,
- 'schedule_name': raw_data.get('schedule_name', 'Scheduled Task'),
- 'execution_source': 'qstash',
- 'execution_type': execution_type,
- 'cron_expression': raw_data.get('cron_expression'),
- 'qstash_message_id': raw_data.get('messageId')
- }
-
- if execution_type == 'workflow':
- workflow_id = raw_data.get('workflow_id')
- workflow_input = raw_data.get('workflow_input', {})
-
- return TriggerResult(
- success=True,
- should_execute_workflow=True,
- workflow_id=workflow_id,
- workflow_input=workflow_input,
- execution_variables=execution_variables
- )
- else:
- agent_prompt = raw_data.get('agent_prompt', 'Execute scheduled task')
-
- return TriggerResult(
- success=True,
- should_execute_agent=True,
- agent_prompt=agent_prompt,
- execution_variables=execution_variables
- )
-
- except Exception as e:
- return TriggerResult(
- success=False,
- error_message=f"Error processing QStash scheduled trigger event: {str(e)}"
- )
-
- async def health_check(self, trigger_config: TriggerConfig) -> bool:
- try:
- schedule_id = trigger_config.config.get('qstash_schedule_id')
- if not schedule_id:
- return False
-
- schedule = await asyncio.to_thread(
- self.qstash.schedule.get,
- schedule_id=schedule_id
- )
-
- return getattr(schedule, 'is_active', False)
-
- except Exception as e:
- logger.error(f"Health check failed for QStash scheduled trigger {trigger_config.trigger_id}: {e}")
- return False
-
- async def pause_trigger(self, trigger_config: TriggerConfig) -> bool:
- try:
- schedule_id = trigger_config.config.get('qstash_schedule_id')
- if not schedule_id:
- return False
-
- await asyncio.to_thread(
- self.qstash.schedules.pause,
- schedule_id=schedule_id
- )
-
- logger.info(f"Successfully paused QStash schedule {schedule_id}")
- return True
-
- except Exception as e:
- logger.error(f"Error pausing QStash schedule: {e}")
- return False
-
- async def resume_trigger(self, trigger_config: TriggerConfig) -> bool:
- try:
- schedule_id = trigger_config.config.get('qstash_schedule_id')
- if not schedule_id:
- return False
-
- await asyncio.to_thread(
- self.qstash.schedules.resume,
- schedule_id=schedule_id
- )
-
- logger.info(f"Successfully resumed QStash schedule {schedule_id}")
- return True
-
- except Exception as e:
- logger.error(f"Error resuming QStash schedule: {e}")
- return False
-
- async def update_trigger(self, trigger_config: TriggerConfig) -> bool:
- try:
- schedule_id = trigger_config.config.get('qstash_schedule_id')
- webhook_url = f"{self.webhook_base_url}/api/triggers/qstash/webhook"
- execution_type = trigger_config.config.get('execution_type', 'agent')
-
- webhook_payload = {
- "trigger_id": trigger_config.trigger_id,
- "agent_id": trigger_config.agent_id,
- "execution_type": execution_type,
- "schedule_name": trigger_config.name,
- "cron_expression": trigger_config.config['cron_expression'],
- "event_type": "scheduled",
- "provider": "qstash"
- }
-
- if execution_type == 'agent':
- webhook_payload["agent_prompt"] = trigger_config.config['agent_prompt']
- elif execution_type == 'workflow':
- webhook_payload["workflow_id"] = trigger_config.config['workflow_id']
- webhook_payload["workflow_input"] = trigger_config.config.get('workflow_input', {})
-
- async with httpx.AsyncClient() as client:
- response = await client.post(
- f"{self.qstash_base_url}/schedules",
- headers={
- "Authorization": f"Bearer {self.qstash_token}",
- "Content-Type": "application/json"
- },
- json={
- "scheduleId": schedule_id,
- "destination": webhook_url,
- "cron": trigger_config.config['cron_expression'],
- "body": webhook_payload,
- "headers": {
- "Content-Type": "application/json",
- "X-Schedule-Provider": "qstash",
- "X-Trigger-ID": trigger_config.trigger_id,
- "X-Agent-ID": trigger_config.agent_id
- },
- "retries": 3,
- "delay": "5s"
- },
- timeout=30.0
- )
-
- if response.status_code == 200:
- logger.info(f"Successfully updated QStash schedule {schedule_id}")
- return True
- else:
- logger.error(f"Failed to update QStash schedule: {response.status_code} - {response.text}")
- return False
-
- except Exception as e:
- logger.error(f"Error updating QStash schedule: {e}")
- return False
-
- def get_webhook_url(self, trigger_id: str, base_url: str) -> Optional[str]:
- base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:3000")
- return f"{base_url}/api/triggers/qstash/webhook"
-
- async def list_schedules(self) -> list:
- try:
- schedules_data = await asyncio.to_thread(
- self.qstash.schedules.list
- )
-
- schedules = []
- for schedule in schedules_data:
- schedules.append({
- 'id': getattr(schedule, 'schedule_id', None),
- 'destination': getattr(schedule, 'destination', None),
- 'cron': getattr(schedule, 'cron', None),
- 'is_active': getattr(schedule, 'is_active', False),
- 'created_at': getattr(schedule, 'created_at', None),
- 'next_delivery': getattr(schedule, 'next_delivery', None)
- })
-
- return schedules
-
- except Exception as e:
- logger.error(f"Error listing QStash schedules: {e}")
- return []
\ No newline at end of file
diff --git a/backend/triggers/registry.py b/backend/triggers/registry.py
deleted file mode 100644
index 58b0403b..00000000
--- a/backend/triggers/registry.py
+++ /dev/null
@@ -1,51 +0,0 @@
-import warnings
-warnings.warn(
- "triggers.registry is deprecated. Use triggers.domain.services.ProviderRegistryService instead.",
- DeprecationWarning,
- stacklevel=2
-)
-
-from typing import Dict, List, Optional
-from .core import TriggerProvider, TriggerType
-from .providers import ScheduleTriggerProvider
-
-class TriggerRegistry:
- """Registry for trigger providers."""
-
- def __init__(self):
- self._providers: Dict[TriggerType, TriggerProvider] = {}
- self._initialize_default_providers()
-
- def _initialize_default_providers(self):
- """Initialize default trigger providers."""
- self.register_provider(ScheduleTriggerProvider())
-
- def register_provider(self, provider: TriggerProvider):
- """Register a trigger provider."""
- self._providers[provider.trigger_type] = provider
-
- def get_provider(self, trigger_type: TriggerType) -> Optional[TriggerProvider]:
- """Get a trigger provider by type."""
- return self._providers.get(trigger_type)
-
- def get_all_providers(self) -> Dict[TriggerType, TriggerProvider]:
- """Get all registered providers."""
- return self._providers.copy()
-
- def get_supported_types(self) -> List[TriggerType]:
- """Get list of supported trigger types."""
- return list(self._providers.keys())
-
- def get_provider_schemas(self) -> Dict[str, Dict]:
- """Get configuration schemas for all providers."""
- schemas = {}
- for trigger_type, provider in self._providers.items():
- trigger_type_str = trigger_type.value if hasattr(trigger_type, 'value') else str(trigger_type)
- schemas[trigger_type_str] = provider.get_config_schema()
- return schemas
-
- def is_supported(self, trigger_type: TriggerType) -> bool:
- """Check if a trigger type is supported."""
- return trigger_type in self._providers
-
-trigger_registry = TriggerRegistry()
\ No newline at end of file
diff --git a/frontend/src/app/(dashboard)/agents/config/[agentId]/page.tsx b/frontend/src/app/(dashboard)/agents/config/[agentId]/page.tsx
index b74f0b0c..8eac67b5 100644
--- a/frontend/src/app/(dashboard)/agents/config/[agentId]/page.tsx
+++ b/frontend/src/app/(dashboard)/agents/config/[agentId]/page.tsx
@@ -19,6 +19,7 @@ import { useAgentVersionStore } from '../../../../../lib/stores/agent-version-st
import { cn } from '@/lib/utils';
import { AgentHeader, VersionAlert, AgentBuilderTab, ConfigurationTab } from '@/components/agents/config';
+import { UpcomingRunsDropdown } from '@/components/agents/upcoming-runs-dropdown';
interface FormData {
name: string;
@@ -268,6 +269,7 @@ export default function AgentConfigurationPageRefactored() {
setOriginalData(formData);
}}
/>
+
Prompt: {truncateString(trigger.config.agent_prompt, 40)}
+ )} + {trigger.config.execution_type === 'workflow' && trigger.config.workflow_id && ( +Workflow: {trigger.config.workflow_id}
+ )} +
@@ -118,7 +147,6 @@ export const ConfiguredTriggersList: React.FC = ({
)}
{errors.name}
+ )} ++ Choose whether to execute the agent directly or run a specific workflow. +
+{errors.workflow_id}
+ )} ++ Select the workflow to execute when triggered. +
++ Current time: {new Date().toLocaleString('en-US', { + timeZone: config.timezone, + hour12: true, + weekday: 'short', + month: 'short', + day: 'numeric', + hour: '2-digit', + minute: '2-digit' + })} +
+ )} +{errors.cron_expression}
+ )} + {config.cron_expression && !errors.cron_expression && ( ++ ✓ {getSchedulePreview()} +
+ )} +minute hour day month weekday
0 9 * * 1-5
= Weekdays at 9 AM*
for any value, */5
for every 5 units{errors.cron_expression}
- )} -minute hour day month weekday
0 9 * * 1-5
= Weekdays at 9 AM*
for any value, */5
for every 5 units- Choose whether to execute the agent directly or run a specific workflow. -
-{errors.workflow_id}
- )} -- Select the workflow to execute when triggered. -
-{errors.name}
- )} -{errors.name}
+ )} +