From 414eb23949310ba95bcd28c338a8bb216633e801 Mon Sep 17 00:00:00 2001 From: Saumya Date: Tue, 15 Jul 2025 11:18:01 +0530 Subject: [PATCH] show upcoming run in agent page --- backend/pipedream/domain/services.py | 2 +- backend/triggers/api.py | 290 ++++-- backend/triggers/core.py | 2 +- .../providers/schedule_provider.py | 158 ++- backend/triggers/providers/__init__.py | 5 - .../triggers/providers/schedule_provider.py | 294 ------ backend/triggers/registry.py | 51 - .../agents/config/[agentId]/page.tsx | 98 +- .../agents/agent-tools-configuration.tsx | 10 - .../components/agents/config/agent-header.tsx | 1 + .../triggers/agent-triggers-configuration.tsx | 55 +- .../triggers/configured-triggers-list.tsx | 32 +- .../triggers/one-click-integrations.tsx | 6 +- .../triggers/providers/schedule-config.tsx | 921 +++++++++++------- .../agents/triggers/trigger-config-dialog.tsx | 100 +- .../src/components/agents/triggers/types.ts | 2 +- .../agents/upcoming-runs-dropdown.tsx | 184 ++++ .../agents/use-agent-upcoming-runs.ts | 37 + .../triggers/use-agent-triggers.ts | 10 +- 19 files changed, 1324 insertions(+), 934 deletions(-) delete mode 100644 backend/triggers/providers/__init__.py delete mode 100644 backend/triggers/providers/schedule_provider.py delete mode 100644 backend/triggers/registry.py create mode 100644 frontend/src/components/agents/upcoming-runs-dropdown.tsx create mode 100644 frontend/src/hooks/react-query/agents/use-agent-upcoming-runs.ts diff --git a/backend/pipedream/domain/services.py b/backend/pipedream/domain/services.py index 08714416..12e0c57b 100644 --- a/backend/pipedream/domain/services.py +++ b/backend/pipedream/domain/services.py @@ -12,7 +12,7 @@ class ExternalUserIdGeneratorService(ABC): class MCPQualifiedNameService(ABC): - + @abstractmethod def generate(self, app_slug: AppSlug) -> str: pass diff --git a/backend/triggers/api.py b/backend/triggers/api.py index ce98033d..f1a94eae 100644 --- a/backend/triggers/api.py +++ b/backend/triggers/api.py @@ -3,7 +3,9 @@ from fastapi.responses import JSONResponse from typing import List, Optional, Dict, Any from pydantic import BaseModel import os -from datetime import datetime +from datetime import datetime, timezone +import croniter +import pytz from .support.factory import TriggerModuleFactory from .support.exceptions import TriggerError, ConfigurationError, ProviderError @@ -53,6 +55,7 @@ class TriggerResponse(BaseModel): webhook_url: Optional[str] created_at: str updated_at: str + config: Dict[str, Any] class ProviderResponse(BaseModel): @@ -65,10 +68,29 @@ class ProviderResponse(BaseModel): config_schema: Dict[str, Any] +class UpcomingRun(BaseModel): + trigger_id: str + trigger_name: str + trigger_type: str + next_run_time: str + next_run_time_local: str + timezone: str + cron_expression: str + execution_type: str + agent_prompt: Optional[str] = None + workflow_id: Optional[str] = None + is_active: bool + human_readable: str + + +class UpcomingRunsResponse(BaseModel): + upcoming_runs: List[UpcomingRun] + total_count: int + + def initialize(database: DBConnection): global db, trigger_service, execution_service, provider_service db = database - # Initialize workflows API with DB connection set_workflows_db_connection(database) @@ -170,7 +192,8 @@ async def get_agent_triggers( is_active=trigger.is_active, webhook_url=webhook_url, created_at=trigger.metadata.created_at.isoformat(), - updated_at=trigger.metadata.updated_at.isoformat() + updated_at=trigger.metadata.updated_at.isoformat(), + config=trigger.config.config )) return responses @@ -179,6 +202,159 @@ async def get_agent_triggers( raise HTTPException(status_code=500, detail="Internal server error") +@router.get("/agents/{agent_id}/upcoming-runs", response_model=UpcomingRunsResponse) +async def get_agent_upcoming_runs( + agent_id: str, + limit: int = Query(10, ge=1, le=50), + user_id: str = Depends(get_current_user_id_from_jwt) +): + if not await is_enabled("agent_triggers"): + raise HTTPException(status_code=403, detail="Agent triggers are not enabled") + + await verify_agent_access(agent_id, user_id) + + try: + trigger_svc, _, _ = await get_services() + triggers = await trigger_svc.get_agent_triggers(agent_id) + schedule_triggers = [ + trigger for trigger in triggers + if trigger.is_active and trigger.trigger_type.value == "schedule" + ] + + upcoming_runs = [] + now = datetime.now(timezone.utc) + for trigger in schedule_triggers: + config = trigger.config.config + cron_expression = config.get('cron_expression') + user_timezone = config.get('timezone', 'UTC') + + if not cron_expression: + continue + + try: + next_run = _get_next_run_time(cron_expression, user_timezone) + if not next_run: + continue + + local_tz = pytz.timezone(user_timezone) + next_run_local = next_run.astimezone(local_tz) + + human_readable = _get_human_readable_schedule(cron_expression, user_timezone) + + upcoming_runs.append(UpcomingRun( + trigger_id=trigger.trigger_id, + trigger_name=trigger.config.name, + trigger_type=trigger.trigger_type.value, + next_run_time=next_run.isoformat(), + next_run_time_local=next_run_local.isoformat(), + timezone=user_timezone, + cron_expression=cron_expression, + execution_type=config.get('execution_type', 'agent'), + agent_prompt=config.get('agent_prompt'), + workflow_id=config.get('workflow_id'), + is_active=trigger.is_active, + human_readable=human_readable + )) + + except Exception as e: + logger.warning(f"Error calculating next run for trigger {trigger.trigger_id}: {e}") + continue + + upcoming_runs.sort(key=lambda x: x.next_run_time) + upcoming_runs = upcoming_runs[:limit] + + return UpcomingRunsResponse( + upcoming_runs=upcoming_runs, + total_count=len(upcoming_runs) + ) + + except Exception as e: + logger.error(f"Error getting upcoming runs: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +def _get_next_run_time(cron_expression: str, user_timezone: str) -> Optional[datetime]: + try: + tz = pytz.timezone(user_timezone) + now_local = datetime.now(tz) + + cron = croniter.croniter(cron_expression, now_local) + + next_run_local = cron.get_next(datetime) + next_run_utc = next_run_local.astimezone(timezone.utc) + + return next_run_utc + + except Exception as e: + logger.error(f"Error calculating next run time: {e}") + return None + + +def _get_human_readable_schedule(cron_expression: str, user_timezone: str) -> str: + try: + patterns = { + '*/5 * * * *': 'Every 5 minutes', + '*/10 * * * *': 'Every 10 minutes', + '*/15 * * * *': 'Every 15 minutes', + '*/30 * * * *': 'Every 30 minutes', + '0 * * * *': 'Every hour', + '0 */2 * * *': 'Every 2 hours', + '0 */4 * * *': 'Every 4 hours', + '0 */6 * * *': 'Every 6 hours', + '0 */12 * * *': 'Every 12 hours', + '0 0 * * *': 'Daily at midnight', + '0 9 * * *': 'Daily at 9:00 AM', + '0 12 * * *': 'Daily at 12:00 PM', + '0 18 * * *': 'Daily at 6:00 PM', + '0 9 * * 1-5': 'Weekdays at 9:00 AM', + '0 9 * * 1': 'Every Monday at 9:00 AM', + '0 9 * * 2': 'Every Tuesday at 9:00 AM', + '0 9 * * 3': 'Every Wednesday at 9:00 AM', + '0 9 * * 4': 'Every Thursday at 9:00 AM', + '0 9 * * 5': 'Every Friday at 9:00 AM', + '0 9 * * 6': 'Every Saturday at 9:00 AM', + '0 9 * * 0': 'Every Sunday at 9:00 AM', + '0 9 1 * *': 'Monthly on the 1st at 9:00 AM', + '0 9 15 * *': 'Monthly on the 15th at 9:00 AM', + '0 9,17 * * *': 'Daily at 9:00 AM and 5:00 PM', + '0 10 * * 0,6': 'Weekends at 10:00 AM', + } + + if cron_expression in patterns: + description = patterns[cron_expression] + if user_timezone != 'UTC': + description += f" ({user_timezone})" + return description + + parts = cron_expression.split() + if len(parts) != 5: + return f"Custom schedule: {cron_expression}" + + minute, hour, day, month, weekday = parts + + if minute.isdigit() and hour == '*' and day == '*' and month == '*' and weekday == '*': + return f"Every hour at :{minute.zfill(2)}" + + if minute.isdigit() and hour.isdigit() and day == '*' and month == '*' and weekday == '*': + time_str = f"{hour.zfill(2)}:{minute.zfill(2)}" + description = f"Daily at {time_str}" + if user_timezone != 'UTC': + description += f" ({user_timezone})" + return description + + if minute.isdigit() and hour.isdigit() and day == '*' and month == '*' and weekday == '1-5': + time_str = f"{hour.zfill(2)}:{minute.zfill(2)}" + description = f"Weekdays at {time_str}" + if user_timezone != 'UTC': + description += f" ({user_timezone})" + return description + + return f"Custom schedule: {cron_expression}" + + except Exception: + return f"Custom schedule: {cron_expression}" + + @router.post("/agents/{agent_id}/triggers", response_model=TriggerResponse) async def create_agent_trigger( agent_id: str, @@ -217,7 +393,8 @@ async def create_agent_trigger( is_active=trigger.is_active, webhook_url=webhook_url, created_at=trigger.metadata.created_at.isoformat(), - updated_at=trigger.metadata.updated_at.isoformat() + updated_at=trigger.metadata.updated_at.isoformat(), + config=trigger.config.config ) except (ValueError, ConfigurationError, ProviderError) as e: @@ -261,7 +438,8 @@ async def get_trigger( is_active=trigger.is_active, webhook_url=webhook_url, created_at=trigger.metadata.created_at.isoformat(), - updated_at=trigger.metadata.updated_at.isoformat() + updated_at=trigger.metadata.updated_at.isoformat(), + config=trigger.config.config ) except Exception as e: logger.error(f"Error getting trigger: {e}") @@ -311,7 +489,8 @@ async def update_trigger( is_active=updated_trigger.is_active, webhook_url=webhook_url, created_at=updated_trigger.metadata.created_at.isoformat(), - updated_at=updated_trigger.metadata.updated_at.isoformat() + updated_at=updated_trigger.metadata.updated_at.isoformat(), + config=updated_trigger.config.config ) except (ValueError, ConfigurationError, ProviderError) as e: @@ -331,7 +510,6 @@ async def delete_trigger( try: trigger_svc, _, _ = await get_services() - trigger = await trigger_svc.get_trigger(trigger_id) if not trigger: raise HTTPException(status_code=404, detail="Trigger not found") @@ -339,7 +517,6 @@ async def delete_trigger( await verify_agent_access(trigger.agent_id, user_id) success = await trigger_svc.delete_trigger(trigger_id) - if not success: raise HTTPException(status_code=404, detail="Trigger not found") @@ -360,7 +537,6 @@ async def trigger_webhook( try: trigger_svc, execution_svc, _ = await get_services() - try: raw_data = await request.json() except: @@ -368,9 +544,6 @@ async def trigger_webhook( result = await trigger_svc.process_trigger_event(trigger_id, raw_data) - logger.info(f"Webhook trigger result: success={result.success}, should_execute_agent={result.should_execute_agent}, should_execute_workflow={result.should_execute_workflow}") - logger.info(f"Trigger result details: workflow_id={result.workflow_id}, agent_prompt={result.agent_prompt}") - if not result.success: return JSONResponse( status_code=400, @@ -427,96 +600,3 @@ async def trigger_webhook( status_code=500, content={"success": False, "error": "Internal server error"} ) - - -@router.post("/qstash/webhook") -async def qstash_webhook(request: Request): - if not await is_enabled("agent_triggers"): - raise HTTPException(status_code=403, detail="Agent triggers are not enabled") - - try: - headers = dict(request.headers) - logger.info(f"QStash webhook received with headers: {headers}") - - try: - raw_data = await request.json() - except: - raw_data = {} - - logger.info(f"QStash webhook payload: {raw_data}") - trigger_id = raw_data.get('trigger_id') - if not trigger_id: - logger.error("No trigger_id found in QStash webhook payload") - return JSONResponse( - status_code=400, - content={"error": "trigger_id is required in webhook payload"} - ) - - raw_data.update({ - "webhook_source": "qstash", - "webhook_headers": headers, - "webhook_timestamp": datetime.now(timezone.utc).isoformat() - }) - - trigger_svc, execution_svc, _ = await get_services() - - result = await trigger_svc.process_trigger_event(trigger_id, raw_data) - - logger.info(f"QStash trigger result: success={result.success}, should_execute_agent={result.should_execute_agent}, should_execute_workflow={result.should_execute_workflow}") - - if not result.success: - return JSONResponse( - status_code=400, - content={"success": False, "error": result.error_message} - ) - - if result.should_execute_agent or result.should_execute_workflow: - trigger = await trigger_svc.get_trigger(trigger_id) - if trigger: - logger.info(f"Executing QStash trigger for agent {trigger.agent_id}") - - from .domain.entities import TriggerEvent - event = TriggerEvent( - trigger_id=trigger_id, - agent_id=trigger.agent_id, - trigger_type=trigger.trigger_type, - raw_data=raw_data - ) - - execution_result = await execution_svc.execute_trigger_result( - agent_id=trigger.agent_id, - trigger_result=result, - trigger_event=event - ) - - logger.info(f"QStash execution result: {execution_result}") - - return JSONResponse(content={ - "success": True, - "message": "QStash trigger processed and execution started", - "execution": execution_result, - "trigger_id": trigger_id, - "agent_id": trigger.agent_id - }) - else: - logger.warning(f"QStash trigger {trigger_id} not found for execution") - return JSONResponse( - status_code=404, - content={"error": f"Trigger {trigger_id} not found"} - ) - - logger.info(f"QStash webhook processed but no execution needed") - return JSONResponse(content={ - "success": True, - "message": "QStash trigger processed successfully (no execution needed)", - "trigger_id": trigger_id - }) - - except Exception as e: - logger.error(f"Error processing QStash webhook: {e}") - import traceback - logger.error(f"QStash webhook error traceback: {traceback.format_exc()}") - return JSONResponse( - status_code=500, - content={"error": "Internal server error"} - ) diff --git a/backend/triggers/core.py b/backend/triggers/core.py index 4fa42d36..b3dd7036 100644 --- a/backend/triggers/core.py +++ b/backend/triggers/core.py @@ -208,7 +208,7 @@ class TriggerManager: name="Schedule", description="Schedule agent or workflow execution using Cloudflare Workers and cron expressions", trigger_type="schedule", - provider_class="triggers.providers.schedule_provider.ScheduleTriggerProvider", + provider_class="triggers.infrastructure.providers.schedule_provider.ScheduleTriggerProvider", webhook_enabled=True, config_schema={ "type": "object", diff --git a/backend/triggers/infrastructure/providers/schedule_provider.py b/backend/triggers/infrastructure/providers/schedule_provider.py index 50616bb1..d0a2add0 100644 --- a/backend/triggers/infrastructure/providers/schedule_provider.py +++ b/backend/triggers/infrastructure/providers/schedule_provider.py @@ -3,6 +3,7 @@ import json import os from datetime import datetime, timezone from typing import Dict, Any, Optional +import pytz from qstash.client import QStash from utils.logger import logger from ...domain.entities import TriggerProvider, TriggerEvent, TriggerResult, Trigger @@ -41,6 +42,14 @@ class ScheduleTriggerProvider(TriggerProvider): if 'workflow_id' not in config: raise ValueError("workflow_id is required for workflow execution") + # Validate timezone if provided + user_timezone = config.get('timezone', 'UTC') + if user_timezone != 'UTC': + try: + pytz.timezone(user_timezone) + except pytz.UnknownTimeZoneError: + raise ValueError(f"Invalid timezone: {user_timezone}") + try: import croniter croniter.croniter(config['cron_expression']) @@ -51,6 +60,64 @@ class ScheduleTriggerProvider(TriggerProvider): return config + def _convert_cron_to_utc(self, cron_expression: str, user_timezone: str) -> str: + try: + import croniter + parts = cron_expression.split() + if len(parts) != 5: + logger.warning(f"Invalid cron expression format: {cron_expression}") + return cron_expression + + minute, hour, day, month, weekday = parts + + if minute.startswith('*/') and hour == '*': + return cron_expression + + if hour == '*' or minute == '*': + return cron_expression + + try: + user_tz = pytz.timezone(user_timezone) + utc_tz = pytz.UTC + from datetime import datetime as dt + now = dt.now(user_tz) + + if hour.isdigit() and minute.isdigit(): + user_time = user_tz.localize(dt(now.year, now.month, now.day, int(hour), int(minute))) + utc_time = user_time.astimezone(utc_tz) + utc_minute = str(utc_time.minute) + utc_hour = str(utc_time.hour) + + return f"{utc_minute} {utc_hour} {day} {month} {weekday}" + + elif ',' in hour and minute.isdigit(): + hours = hour.split(',') + utc_hours = [] + for h in hours: + if h.isdigit(): + user_time = user_tz.localize(dt(now.year, now.month, now.day, int(h), int(minute))) + utc_time = user_time.astimezone(utc_tz) + utc_hours.append(str(utc_time.hour)) + + if utc_hours: + utc_minute = str(utc_time.minute) + return f"{utc_minute} {','.join(utc_hours)} {day} {month} {weekday}" + + elif '-' in hour and minute.isdigit(): + pass + + except Exception as e: + logger.warning(f"Failed to convert timezone for cron expression {cron_expression}: {e}") + + return cron_expression + + except ImportError: + logger.warning("croniter not available for cron expression validation") + return cron_expression + except Exception as e: + logger.error(f"Error converting cron expression to UTC: {e}") + return cron_expression + async def setup_trigger(self, trigger: Trigger) -> bool: if not self._qstash: logger.error("QStash client not available") @@ -58,31 +125,45 @@ class ScheduleTriggerProvider(TriggerProvider): try: webhook_url = f"{self._webhook_base_url}/api/triggers/{trigger.trigger_id}/webhook" - cron_expression = trigger.config.config['cron_expression'] + execution_type = trigger.config.config.get('execution_type', 'agent') + user_timezone = trigger.config.config.get('timezone', 'UTC') + + if user_timezone != 'UTC': + cron_expression = self._convert_cron_to_utc(cron_expression, user_timezone) + logger.info(f"Converted cron expression from {user_timezone} to UTC: {trigger.config.config['cron_expression']} -> {cron_expression}") payload = { "trigger_id": trigger.trigger_id, "agent_id": trigger.agent_id, - "execution_type": trigger.config.config.get('execution_type', 'agent'), + "execution_type": execution_type, "agent_prompt": trigger.config.config.get('agent_prompt'), "workflow_id": trigger.config.config.get('workflow_id'), "workflow_input": trigger.config.config.get('workflow_input', {}), "timestamp": datetime.now(timezone.utc).isoformat() } + headers = { + "Content-Type": "application/json", + "X-Trigger-Source": "schedule" + } + + if config.ENV_MODE == EnvMode.STAGING: + vercel_bypass_key = os.getenv("VERCEL_PROTECTION_BYPASS_KEY", "") + if vercel_bypass_key: + headers["X-Vercel-Protection-Bypass"] = vercel_bypass_key + schedule_id = await asyncio.to_thread( self._qstash.schedule.create, destination=webhook_url, cron=cron_expression, body=json.dumps(payload), - headers={ - "Content-Type": "application/json", - "X-Trigger-Source": "schedule" - } + headers=headers, + retries=3, + delay="5s" ) - - logger.info(f"Created QStash schedule {schedule_id} for trigger {trigger.trigger_id}") + trigger.config.config['qstash_schedule_id'] = schedule_id + logger.info(f"Created QStash schedule {schedule_id} for trigger {trigger.trigger_id} with cron: {cron_expression}") return True except Exception as e: @@ -91,9 +172,20 @@ class ScheduleTriggerProvider(TriggerProvider): async def teardown_trigger(self, trigger: Trigger) -> bool: if not self._qstash: + logger.warning("QStash client not available, skipping teardown") return True try: + schedule_id = trigger.config.config.get('qstash_schedule_id') + if schedule_id: + try: + await asyncio.to_thread(self._qstash.schedule.delete, schedule_id) + logger.info(f"Deleted QStash schedule {schedule_id} for trigger {trigger.trigger_id}") + return True + except Exception as e: + logger.warning(f"Failed to delete QStash schedule {schedule_id} by ID: {e}") + + logger.info(f"Attempting to find and delete QStash schedule for trigger {trigger.trigger_id} by webhook URL") schedules = await asyncio.to_thread(self._qstash.schedule.list) webhook_url = f"{self._webhook_base_url}/api/triggers/{trigger.trigger_id}/webhook" @@ -102,8 +194,9 @@ class ScheduleTriggerProvider(TriggerProvider): if schedule.get('destination') == webhook_url: await asyncio.to_thread(self._qstash.schedule.delete, schedule['scheduleId']) logger.info(f"Deleted QStash schedule {schedule['scheduleId']} for trigger {trigger.trigger_id}") - break + return True + logger.warning(f"No QStash schedule found for trigger {trigger.trigger_id}") return True except Exception as e: @@ -156,16 +249,30 @@ class ScheduleTriggerProvider(TriggerProvider): async def health_check(self, trigger: Trigger) -> bool: if not self._qstash: + logger.warning("QStash client not available for health check") return False try: + schedule_id = trigger.config.config.get('qstash_schedule_id') + if schedule_id: + try: + schedule = await asyncio.to_thread(self._qstash.schedule.get, schedule_id) + is_healthy = schedule is not None + logger.info(f"Health check for trigger {trigger.trigger_id} using schedule ID {schedule_id}: {'healthy' if is_healthy else 'unhealthy'}") + return is_healthy + except Exception as e: + logger.warning(f"Failed to check health for QStash schedule {schedule_id} by ID: {e}") + + logger.info(f"Attempting health check for trigger {trigger.trigger_id} by webhook URL") webhook_url = f"{self._webhook_base_url}/api/triggers/{trigger.trigger_id}/webhook" schedules = await asyncio.to_thread(self._qstash.schedule.list) for schedule in schedules: if schedule.get('destination') == webhook_url: + logger.info(f"Health check for trigger {trigger.trigger_id}: healthy (found schedule)") return True + logger.warning(f"Health check for trigger {trigger.trigger_id}: no schedule found") return False except Exception as e: @@ -179,20 +286,43 @@ class ScheduleTriggerProvider(TriggerProvider): return await self.setup_trigger(trigger) async def update_trigger(self, trigger: Trigger) -> bool: - await self.teardown_trigger(trigger) - if trigger.is_active: - return await self.setup_trigger(trigger) - return True + if not self._qstash: + logger.warning("QStash client not available for trigger update") + return True + + try: + logger.info(f"Updating QStash schedule for trigger {trigger.trigger_id}") + teardown_success = await self.teardown_trigger(trigger) + if not teardown_success: + logger.warning(f"Failed to teardown existing schedule for trigger {trigger.trigger_id}, proceeding with setup") + + if trigger.is_active: + setup_success = await self.setup_trigger(trigger) + if setup_success: + logger.info(f"Successfully updated QStash schedule for trigger {trigger.trigger_id}") + else: + logger.error(f"Failed to setup updated schedule for trigger {trigger.trigger_id}") + return setup_success + else: + logger.info(f"Trigger {trigger.trigger_id} is inactive, skipping schedule setup") + return True + + except Exception as e: + logger.error(f"Error updating QStash schedule for trigger {trigger.trigger_id}: {e}") + return False def get_webhook_url(self, trigger_id: str, base_url: str) -> Optional[str]: return f"{base_url}/api/triggers/{trigger_id}/webhook" async def list_schedules(self) -> list: if not self._qstash: + logger.warning("QStash client not available for listing schedules") return [] try: - return await asyncio.to_thread(self._qstash.schedule.list) + schedules = await asyncio.to_thread(self._qstash.schedule.list) + logger.info(f"Successfully retrieved {len(schedules)} schedules from QStash") + return schedules except Exception as e: logger.error(f"Failed to list QStash schedules: {e}") return [] \ No newline at end of file diff --git a/backend/triggers/providers/__init__.py b/backend/triggers/providers/__init__.py deleted file mode 100644 index e64e0a04..00000000 --- a/backend/triggers/providers/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .schedule_provider import ScheduleTriggerProvider - -__all__ = [ - 'ScheduleTriggerProvider' -] \ No newline at end of file diff --git a/backend/triggers/providers/schedule_provider.py b/backend/triggers/providers/schedule_provider.py deleted file mode 100644 index 3e9bbbf6..00000000 --- a/backend/triggers/providers/schedule_provider.py +++ /dev/null @@ -1,294 +0,0 @@ -import asyncio -import json -import os -from datetime import datetime, timezone -from typing import Dict, Any, Optional -from qstash.client import QStash -from utils.logger import logger -from ..core import TriggerProvider, TriggerType, TriggerEvent, TriggerResult, TriggerConfig, ProviderDefinition -from utils.config import config, EnvMode - -class ScheduleTriggerProvider(TriggerProvider): - def __init__(self, provider_definition: Optional[ProviderDefinition] = None): - super().__init__(TriggerType.SCHEDULE, provider_definition) - - self.qstash_token = os.getenv("QSTASH_TOKEN") - self.webhook_base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:3000") - - if not self.qstash_token: - logger.warning("QSTASH_TOKEN not found. QStash provider will not work without it.") - self.qstash = None - else: - self.qstash = QStash(token=self.qstash_token) - - async def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]: - if not self.qstash: - raise ValueError("QSTASH_TOKEN environment variable is required for QStash scheduling") - - if 'cron_expression' not in config: - raise ValueError("cron_expression is required for QStash schedule triggers") - - execution_type = config.get('execution_type', 'agent') - if execution_type not in ['agent', 'workflow']: - raise ValueError("execution_type must be either 'agent' or 'workflow'") - - if execution_type == 'agent': - if 'agent_prompt' not in config: - raise ValueError("agent_prompt is required for agent execution") - elif execution_type == 'workflow': - if 'workflow_id' not in config: - raise ValueError("workflow_id is required for workflow execution") - - try: - import croniter - croniter.croniter(config['cron_expression']) - except ImportError: - raise ValueError("croniter package is required for cron expressions. Please install it with: pip install croniter") - except Exception as e: - raise ValueError(f"Invalid cron expression: {str(e)}") - - return config - - async def setup_trigger(self, trigger_config: TriggerConfig) -> bool: - if config.ENV_MODE == EnvMode.STAGING: - vercel_bypass_key = os.getenv("VERCEL_PROTECTION_BYPASS_KEY", "") - else: - vercel_bypass_key = "" - try: - webhook_url = f"{self.webhook_base_url}/api/triggers/qstash/webhook" - execution_type = trigger_config.config.get('execution_type', 'agent') - - webhook_payload = { - "trigger_id": trigger_config.trigger_id, - "agent_id": trigger_config.agent_id, - "execution_type": execution_type, - "schedule_name": trigger_config.name, - "cron_expression": trigger_config.config['cron_expression'], - "event_type": "scheduled", - "provider": "qstash" - } - - if execution_type == 'agent': - webhook_payload["agent_prompt"] = trigger_config.config['agent_prompt'] - elif execution_type == 'workflow': - webhook_payload["workflow_id"] = trigger_config.config['workflow_id'] - webhook_payload["workflow_input"] = trigger_config.config.get('workflow_input', {}) - schedule_id = await asyncio.to_thread( - self.qstash.schedule.create, - destination=webhook_url, - cron=trigger_config.config['cron_expression'], - body=json.dumps(webhook_payload), - headers={ - "Content-Type": "application/json", - "X-Schedule-Provider": "qstash", - "X-Trigger-ID": trigger_config.trigger_id, - "X-Agent-ID": trigger_config.agent_id, - "X-Vercel-Protection-Bypass": vercel_bypass_key - }, - retries=3, - delay="5s" - ) - trigger_config.config['qstash_schedule_id'] = schedule_id - logger.info(f"Successfully created QStash schedule {schedule_id} for trigger {trigger_config.trigger_id}") - return True - - except Exception as e: - logger.error(f"Error setting up QStash scheduled trigger {trigger_config.trigger_id}: {e}") - return False - - async def teardown_trigger(self, trigger_config: TriggerConfig) -> bool: - try: - schedule_id = trigger_config.config.get('qstash_schedule_id') - if not schedule_id: - logger.warning(f"No QStash schedule ID found for trigger {trigger_config.trigger_id}") - return True - await asyncio.to_thread( - self.qstash.schedule.delete, - schedule_id=schedule_id - ) - logger.info(f"Successfully deleted QStash schedule {schedule_id}") - return True - - except Exception as e: - logger.error(f"Error removing QStash scheduled trigger {trigger_config.trigger_id}: {e}") - return False - - async def process_event(self, event: TriggerEvent) -> TriggerResult: - try: - raw_data = event.raw_data - execution_type = raw_data.get('execution_type', 'agent') - - execution_variables = { - 'scheduled_at': event.timestamp.isoformat(), - 'trigger_id': event.trigger_id, - 'agent_id': event.agent_id, - 'schedule_name': raw_data.get('schedule_name', 'Scheduled Task'), - 'execution_source': 'qstash', - 'execution_type': execution_type, - 'cron_expression': raw_data.get('cron_expression'), - 'qstash_message_id': raw_data.get('messageId') - } - - if execution_type == 'workflow': - workflow_id = raw_data.get('workflow_id') - workflow_input = raw_data.get('workflow_input', {}) - - return TriggerResult( - success=True, - should_execute_workflow=True, - workflow_id=workflow_id, - workflow_input=workflow_input, - execution_variables=execution_variables - ) - else: - agent_prompt = raw_data.get('agent_prompt', 'Execute scheduled task') - - return TriggerResult( - success=True, - should_execute_agent=True, - agent_prompt=agent_prompt, - execution_variables=execution_variables - ) - - except Exception as e: - return TriggerResult( - success=False, - error_message=f"Error processing QStash scheduled trigger event: {str(e)}" - ) - - async def health_check(self, trigger_config: TriggerConfig) -> bool: - try: - schedule_id = trigger_config.config.get('qstash_schedule_id') - if not schedule_id: - return False - - schedule = await asyncio.to_thread( - self.qstash.schedule.get, - schedule_id=schedule_id - ) - - return getattr(schedule, 'is_active', False) - - except Exception as e: - logger.error(f"Health check failed for QStash scheduled trigger {trigger_config.trigger_id}: {e}") - return False - - async def pause_trigger(self, trigger_config: TriggerConfig) -> bool: - try: - schedule_id = trigger_config.config.get('qstash_schedule_id') - if not schedule_id: - return False - - await asyncio.to_thread( - self.qstash.schedules.pause, - schedule_id=schedule_id - ) - - logger.info(f"Successfully paused QStash schedule {schedule_id}") - return True - - except Exception as e: - logger.error(f"Error pausing QStash schedule: {e}") - return False - - async def resume_trigger(self, trigger_config: TriggerConfig) -> bool: - try: - schedule_id = trigger_config.config.get('qstash_schedule_id') - if not schedule_id: - return False - - await asyncio.to_thread( - self.qstash.schedules.resume, - schedule_id=schedule_id - ) - - logger.info(f"Successfully resumed QStash schedule {schedule_id}") - return True - - except Exception as e: - logger.error(f"Error resuming QStash schedule: {e}") - return False - - async def update_trigger(self, trigger_config: TriggerConfig) -> bool: - try: - schedule_id = trigger_config.config.get('qstash_schedule_id') - webhook_url = f"{self.webhook_base_url}/api/triggers/qstash/webhook" - execution_type = trigger_config.config.get('execution_type', 'agent') - - webhook_payload = { - "trigger_id": trigger_config.trigger_id, - "agent_id": trigger_config.agent_id, - "execution_type": execution_type, - "schedule_name": trigger_config.name, - "cron_expression": trigger_config.config['cron_expression'], - "event_type": "scheduled", - "provider": "qstash" - } - - if execution_type == 'agent': - webhook_payload["agent_prompt"] = trigger_config.config['agent_prompt'] - elif execution_type == 'workflow': - webhook_payload["workflow_id"] = trigger_config.config['workflow_id'] - webhook_payload["workflow_input"] = trigger_config.config.get('workflow_input', {}) - - async with httpx.AsyncClient() as client: - response = await client.post( - f"{self.qstash_base_url}/schedules", - headers={ - "Authorization": f"Bearer {self.qstash_token}", - "Content-Type": "application/json" - }, - json={ - "scheduleId": schedule_id, - "destination": webhook_url, - "cron": trigger_config.config['cron_expression'], - "body": webhook_payload, - "headers": { - "Content-Type": "application/json", - "X-Schedule-Provider": "qstash", - "X-Trigger-ID": trigger_config.trigger_id, - "X-Agent-ID": trigger_config.agent_id - }, - "retries": 3, - "delay": "5s" - }, - timeout=30.0 - ) - - if response.status_code == 200: - logger.info(f"Successfully updated QStash schedule {schedule_id}") - return True - else: - logger.error(f"Failed to update QStash schedule: {response.status_code} - {response.text}") - return False - - except Exception as e: - logger.error(f"Error updating QStash schedule: {e}") - return False - - def get_webhook_url(self, trigger_id: str, base_url: str) -> Optional[str]: - base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:3000") - return f"{base_url}/api/triggers/qstash/webhook" - - async def list_schedules(self) -> list: - try: - schedules_data = await asyncio.to_thread( - self.qstash.schedules.list - ) - - schedules = [] - for schedule in schedules_data: - schedules.append({ - 'id': getattr(schedule, 'schedule_id', None), - 'destination': getattr(schedule, 'destination', None), - 'cron': getattr(schedule, 'cron', None), - 'is_active': getattr(schedule, 'is_active', False), - 'created_at': getattr(schedule, 'created_at', None), - 'next_delivery': getattr(schedule, 'next_delivery', None) - }) - - return schedules - - except Exception as e: - logger.error(f"Error listing QStash schedules: {e}") - return [] \ No newline at end of file diff --git a/backend/triggers/registry.py b/backend/triggers/registry.py deleted file mode 100644 index 58b0403b..00000000 --- a/backend/triggers/registry.py +++ /dev/null @@ -1,51 +0,0 @@ -import warnings -warnings.warn( - "triggers.registry is deprecated. Use triggers.domain.services.ProviderRegistryService instead.", - DeprecationWarning, - stacklevel=2 -) - -from typing import Dict, List, Optional -from .core import TriggerProvider, TriggerType -from .providers import ScheduleTriggerProvider - -class TriggerRegistry: - """Registry for trigger providers.""" - - def __init__(self): - self._providers: Dict[TriggerType, TriggerProvider] = {} - self._initialize_default_providers() - - def _initialize_default_providers(self): - """Initialize default trigger providers.""" - self.register_provider(ScheduleTriggerProvider()) - - def register_provider(self, provider: TriggerProvider): - """Register a trigger provider.""" - self._providers[provider.trigger_type] = provider - - def get_provider(self, trigger_type: TriggerType) -> Optional[TriggerProvider]: - """Get a trigger provider by type.""" - return self._providers.get(trigger_type) - - def get_all_providers(self) -> Dict[TriggerType, TriggerProvider]: - """Get all registered providers.""" - return self._providers.copy() - - def get_supported_types(self) -> List[TriggerType]: - """Get list of supported trigger types.""" - return list(self._providers.keys()) - - def get_provider_schemas(self) -> Dict[str, Dict]: - """Get configuration schemas for all providers.""" - schemas = {} - for trigger_type, provider in self._providers.items(): - trigger_type_str = trigger_type.value if hasattr(trigger_type, 'value') else str(trigger_type) - schemas[trigger_type_str] = provider.get_config_schema() - return schemas - - def is_supported(self, trigger_type: TriggerType) -> bool: - """Check if a trigger type is supported.""" - return trigger_type in self._providers - -trigger_registry = TriggerRegistry() \ No newline at end of file diff --git a/frontend/src/app/(dashboard)/agents/config/[agentId]/page.tsx b/frontend/src/app/(dashboard)/agents/config/[agentId]/page.tsx index b74f0b0c..8eac67b5 100644 --- a/frontend/src/app/(dashboard)/agents/config/[agentId]/page.tsx +++ b/frontend/src/app/(dashboard)/agents/config/[agentId]/page.tsx @@ -19,6 +19,7 @@ import { useAgentVersionStore } from '../../../../../lib/stores/agent-version-st import { cn } from '@/lib/utils'; import { AgentHeader, VersionAlert, AgentBuilderTab, ConfigurationTab } from '@/components/agents/config'; +import { UpcomingRunsDropdown } from '@/components/agents/upcoming-runs-dropdown'; interface FormData { name: string; @@ -268,6 +269,7 @@ export default function AgentConfigurationPageRefactored() { setOriginalData(formData); }} /> +
{hasUnsavedChanges && !isViewingOldVersion && ( @@ -341,7 +343,101 @@ export default function AgentConfigurationPageRefactored() {
-
+
+
+
+
+
+ + { + setOriginalData(formData); + }} + /> + +
+
+ {hasUnsavedChanges && !isViewingOldVersion && ( + + )} +
+
+ + {isViewingOldVersion && ( + + )} + + +
+
+ +
+ + + + + + + + + +
diff --git a/frontend/src/components/agents/agent-tools-configuration.tsx b/frontend/src/components/agents/agent-tools-configuration.tsx index 9667b33e..ba1ed2b6 100644 --- a/frontend/src/components/agents/agent-tools-configuration.tsx +++ b/frontend/src/components/agents/agent-tools-configuration.tsx @@ -49,17 +49,7 @@ export const AgentToolsConfiguration = ({ tools, onToolsChange }: AgentToolsConf {getSelectedToolsCount()} selected
-
- - setSearchQuery(e.target.value)} - className="pl-10" - /> -
-
{getFilteredTools().map(([toolName, toolInfo]) => ( diff --git a/frontend/src/components/agents/config/agent-header.tsx b/frontend/src/components/agents/config/agent-header.tsx index 21be0b19..017ea4fa 100644 --- a/frontend/src/components/agents/config/agent-header.tsx +++ b/frontend/src/components/agents/config/agent-header.tsx @@ -58,6 +58,7 @@ export function AgentHeader({ />
+ (null); const { data: triggers = [], isLoading, error } = useAgentTriggers(agentId); + const { data: providers = [] } = useTriggerProviders(); const createTriggerMutation = useCreateTrigger(); const updateTriggerMutation = useUpdateTrigger(); const deleteTriggerMutation = useDeleteTrigger(); @@ -37,19 +37,28 @@ export const AgentTriggersConfiguration: React.FC { setEditingTrigger(trigger); - setConfiguringProvider({ - provider_id: trigger.provider_id, - name: trigger.trigger_type, - description: '', - trigger_type: trigger.trigger_type, - webhook_enabled: !!trigger.webhook_url, - config_schema: {} - }); + + const provider = providers.find(p => p.provider_id === trigger.provider_id); + if (provider) { + setConfiguringProvider(provider); + } else { + setConfiguringProvider({ + provider_id: trigger.provider_id, + name: trigger.trigger_type, + description: '', + trigger_type: trigger.trigger_type, + webhook_enabled: !!trigger.webhook_url, + config_schema: {} + }); + } }; const handleRemoveTrigger = async (trigger: TriggerConfiguration) => { try { - await deleteTriggerMutation.mutateAsync(trigger.trigger_id); + await deleteTriggerMutation.mutateAsync({ + triggerId: trigger.trigger_id, + agentId: trigger.agent_id + }); toast.success('Trigger deleted successfully'); } catch (error) { toast.error('Failed to delete trigger'); @@ -123,23 +132,13 @@ export const AgentTriggersConfiguration: React.FC {triggers.length > 0 && ( -
-
-

- - Configured Triggers -

-
-
- -
-
+ )} {!isLoading && triggers.length === 0 && ( diff --git a/frontend/src/components/agents/triggers/configured-triggers-list.tsx b/frontend/src/components/agents/triggers/configured-triggers-list.tsx index e9137223..d498f047 100644 --- a/frontend/src/components/agents/triggers/configured-triggers-list.tsx +++ b/frontend/src/components/agents/triggers/configured-triggers-list.tsx @@ -38,6 +38,26 @@ const copyToClipboard = async (text: string) => { } }; +const getCronDescription = (cron: string): string => { + const cronDescriptions: Record = { + '0 9 * * *': 'Daily at 9:00 AM', + '0 18 * * *': 'Daily at 6:00 PM', + '0 9 * * 1-5': 'Weekdays at 9:00 AM', + '0 10 * * 1-5': 'Weekdays at 10:00 AM', + '0 9 * * 1': 'Every Monday at 9:00 AM', + '0 9 1 * *': 'Monthly on the 1st at 9:00 AM', + '0 9 1 1 *': 'Yearly on Jan 1st at 9:00 AM', + '0 */2 * * *': 'Every 2 hours', + '*/30 * * * *': 'Every 30 minutes', + '0 0 * * *': 'Daily at midnight', + '0 12 * * *': 'Daily at noon', + '0 9 * * 0': 'Every Sunday at 9:00 AM', + '0 9 * * 6': 'Every Saturday at 9:00 AM', + }; + + return cronDescriptions[cron] || cron; +}; + export const ConfiguredTriggersList: React.FC = ({ triggers, onEdit, @@ -76,7 +96,16 @@ export const ConfiguredTriggersList: React.FC = ({ {truncateString(trigger.description, 50)}

)} - + {trigger.trigger_type === 'schedule' && trigger.config && ( +
+ {trigger.config.execution_type === 'agent' && trigger.config.agent_prompt && ( +

Prompt: {truncateString(trigger.config.agent_prompt, 40)}

+ )} + {trigger.config.execution_type === 'workflow' && trigger.config.workflow_id && ( +

Workflow: {trigger.config.workflow_id}

+ )} +
+ )} {trigger.webhook_url && (
@@ -118,7 +147,6 @@ export const ConfiguredTriggersList: React.FC = ({ )}
-
diff --git a/frontend/src/components/agents/triggers/one-click-integrations.tsx b/frontend/src/components/agents/triggers/one-click-integrations.tsx index 7fa39d39..068ad993 100644 --- a/frontend/src/components/agents/triggers/one-click-integrations.tsx +++ b/frontend/src/components/agents/triggers/one-click-integrations.tsx @@ -66,7 +66,10 @@ export const OneClickIntegrations: React.FC = ({ const handleUninstall = async (provider: ProviderKey, triggerId?: string) => { if (provider === 'schedule' && triggerId) { try { - await deleteTriggerMutation.mutateAsync(triggerId); + await deleteTriggerMutation.mutateAsync({ + triggerId, + agentId + }); toast.success('Schedule trigger removed successfully'); } catch (error) { toast.error('Failed to remove schedule trigger'); @@ -120,7 +123,6 @@ export const OneClickIntegrations: React.FC = ({ const scheduleProvider: TriggerProvider = { provider_id: 'schedule', name: 'Schedule', - description: 'Schedule agent execution using cron expressions', trigger_type: 'schedule', webhook_enabled: true, config_schema: {} diff --git a/frontend/src/components/agents/triggers/providers/schedule-config.tsx b/frontend/src/components/agents/triggers/providers/schedule-config.tsx index c6f56ea4..10ffb56d 100644 --- a/frontend/src/components/agents/triggers/providers/schedule-config.tsx +++ b/frontend/src/components/agents/triggers/providers/schedule-config.tsx @@ -11,6 +11,7 @@ import { Card, CardContent, CardDescription, CardHeader } from '@/components/ui/ import { RadioGroup, RadioGroupItem } from '@/components/ui/radio-group'; import { Calendar } from '@/components/ui/calendar'; import { Popover, PopoverContent, PopoverTrigger } from '@/components/ui/popover'; +import { Switch } from '@/components/ui/switch'; import { Clock, Calendar as CalendarIcon, Info, Zap, Repeat, Timer, Target } from 'lucide-react'; import { format, startOfDay } from 'date-fns'; import { cn } from '@/lib/utils'; @@ -23,6 +24,12 @@ interface ScheduleTriggerConfigFormProps { onChange: (config: ScheduleTriggerConfig) => void; errors: Record; agentId: string; + name: string; + description: string; + onNameChange: (name: string) => void; + onDescriptionChange: (description: string) => void; + isActive: boolean; + onActiveChange: (active: boolean) => void; } type ScheduleType = 'quick' | 'recurring' | 'advanced' | 'one-time'; @@ -102,6 +109,12 @@ export const ScheduleTriggerConfigForm: React.FC onChange, errors, agentId, + name, + description, + onNameChange, + onDescriptionChange, + isActive, + onActiveChange, }) => { const { data: workflows = [], isLoading: isLoadingWorkflows } = useAgentWorkflows(agentId); const [scheduleType, setScheduleType] = useState('quick'); @@ -116,7 +129,34 @@ export const ScheduleTriggerConfigForm: React.FC const [selectedDate, setSelectedDate] = useState(); const [oneTimeTime, setOneTimeTime] = useState<{ hour: string; minute: string }>({ hour: '09', minute: '00' }); + useEffect(() => { + if (!config.timezone) { + try { + const detectedTimezone = Intl.DateTimeFormat().resolvedOptions().timeZone; + onChange({ + ...config, + timezone: detectedTimezone, + }); + } catch (error) { + onChange({ + ...config, + timezone: 'UTC', + }); + } + } + }, []); + useEffect(() => { + if (config.cron_expression) { + const preset = QUICK_PRESETS.find(p => p.cron === config.cron_expression); + if (preset) { + setScheduleType('quick'); + setSelectedPreset(config.cron_expression); + } else { + setScheduleType('advanced'); + } + } + }, [config.cron_expression]); const generateCronExpression = () => { if (scheduleType === 'quick' && selectedPreset) { @@ -179,6 +219,29 @@ export const ScheduleTriggerConfigForm: React.FC }); }; + const getSchedulePreview = () => { + if (!config.cron_expression) return null; + + try { + const descriptions: Record = { + '0 9 * * *': 'Every day at 9:00 AM', + '0 18 * * *': 'Every day at 6:00 PM', + '0 9 * * 1-5': 'Weekdays at 9:00 AM', + '0 10 * * 1-5': 'Weekdays at 10:00 AM', + '0 9 * * 1': 'Every Monday at 9:00 AM', + '0 9 1 * *': 'Monthly on the 1st at 9:00 AM', + '0 */2 * * *': 'Every 2 hours', + '*/30 * * * *': 'Every 30 minutes', + '0 0 * * *': 'Every day at midnight', + '0 12 * * *': 'Every day at noon', + }; + + return descriptions[config.cron_expression] || config.cron_expression; + } catch { + return config.cron_expression; + } + }; + const handleExecutionTypeChange = (value: 'agent' | 'workflow') => { const newConfig = { ...config, @@ -206,8 +269,6 @@ export const ScheduleTriggerConfigForm: React.FC }); }; - - const handleWeekdayToggle = (weekday: string) => { setSelectedWeekdays(prev => prev.includes(weekday) @@ -238,392 +299,508 @@ export const ScheduleTriggerConfigForm: React.FC return (
- + Configure when your agent should be triggered automatically. Choose from quick presets, recurring schedules, or set up advanced cron expressions. - - setScheduleType(value as ScheduleType)} className="w-full"> - - - - Quick - - - - Recurring - - - - One-time - - - - Advanced - - + +
+
+
+

+ + Trigger Details +

+
+
+ + onNameChange(e.target.value)} + placeholder="Enter a name for this trigger" + className={errors.name ? 'border-destructive' : ''} + /> + {errors.name && ( +

{errors.name}

+ )} +
+ +
+ +