From ff47404a89e5b6fbc85803bf73563746a55942d1 Mon Sep 17 00:00:00 2001 From: Soumyadas15 Date: Tue, 1 Jul 2025 13:35:55 +0530 Subject: [PATCH] feat: agent schedules --- backend/triggers/api.py | 185 +++++++ backend/triggers/core.py | 28 + backend/triggers/providers/__init__.py | 4 +- .../triggers/providers/schedule_provider.py | 259 +++++++++ .../(dashboard)/agents/new/[agentId]/page.tsx | 2 +- .../triggers/agent-triggers-configuration.tsx | 5 +- .../triggers/one-click-integrations.tsx | 164 ++++-- .../triggers/providers/schedule-config.tsx | 520 ++++++++++++++++++ .../agents/triggers/trigger-config-dialog.tsx | 41 +- .../src/components/agents/triggers/types.ts | 6 + .../src/components/agents/triggers/utils.tsx | 6 +- 11 files changed, 1155 insertions(+), 65 deletions(-) create mode 100644 backend/triggers/providers/schedule_provider.py create mode 100644 frontend/src/components/agents/triggers/providers/schedule-config.tsx diff --git a/backend/triggers/api.py b/backend/triggers/api.py index 56014a31..be01a145 100644 --- a/backend/triggers/api.py +++ b/backend/triggers/api.py @@ -437,6 +437,191 @@ async def universal_slack_webhook(request: Request): content={"error": "Internal server error"} ) +@router.post("/qstash/webhook") +async def handle_qstash_webhook(request: Request): + try: + logger.info("QStash webhook received") + body = await request.body() + headers = dict(request.headers) + + logger.debug(f"QStash webhook body: {body[:500]}...") + logger.debug(f"QStash webhook headers: {headers}") + + try: + if body: + data = await request.json() + else: + data = {} + except Exception as e: + logger.warning(f"Failed to parse JSON body: {e}") + data = { + "raw_body": body.decode('utf-8', errors='ignore'), + "content_type": headers.get('content-type', '') + } + + trigger_id = data.get('trigger_id') + + if not trigger_id: + logger.error("No trigger_id in QStash webhook payload") + return JSONResponse( + status_code=400, + content={"error": "trigger_id is required"} + ) + + data["headers"] = headers + data["qstash_message_id"] = headers.get('upstash-message-id') + data["qstash_schedule_id"] = headers.get('upstash-schedule-id') + + logger.info(f"Processing QStash trigger event for {trigger_id}") + manager = await get_trigger_manager() + result = await manager.process_trigger_event(trigger_id, data) + + logger.info(f"QStash trigger processing result: success={result.success}, should_execute={result.should_execute_agent}, error={result.error_message}") + + if result.success and result.should_execute_agent: + executor = AgentTriggerExecutor(db) + trigger_config = await manager.get_trigger(trigger_id) + if trigger_config: + from .core import TriggerEvent, TriggerType + trigger_type = trigger_config.trigger_type + if isinstance(trigger_type, str): + trigger_type = TriggerType(trigger_type) + + trigger_event = TriggerEvent( + trigger_id=trigger_id, + agent_id=trigger_config.agent_id, + trigger_type=trigger_type, + raw_data=data + ) + + execution_result = await executor.execute_triggered_agent( + agent_id=trigger_config.agent_id, + trigger_result=result, + trigger_event=trigger_event + ) + + logger.info(f"QStash agent execution result: {execution_result}") + return JSONResponse(content={ + "message": "QStash webhook processed and agent execution started", + "trigger_id": trigger_id, + "agent_id": trigger_config.agent_id, + "thread_id": execution_result.get("thread_id"), + "agent_run_id": execution_result.get("agent_run_id") + }) + + if result.response_data: + return JSONResponse(content=result.response_data) + elif result.success: + return {"message": "QStash webhook processed successfully"} + else: + logger.warning(f"QStash webhook processing failed for {trigger_id}: {result.error_message}") + return JSONResponse( + status_code=400, + content={"error": result.error_message} + ) + + except Exception as e: + logger.error(f"Error processing QStash webhook: {e}") + import traceback + logger.error(f"Traceback: {traceback.format_exc()}") + return JSONResponse( + status_code=500, + content={"error": "Internal server error"} + ) + +@router.post("/schedule/webhook") +async def handle_schedule_webhook(request: Request): + try: + logger.info("Schedule webhook received from Pipedream") + body = await request.body() + headers = dict(request.headers) + + logger.debug(f"Schedule webhook body: {body[:500]}...") + logger.debug(f"Schedule webhook headers: {headers}") + + try: + if body: + data = await request.json() + else: + data = {} + except Exception as e: + logger.warning(f"Failed to parse JSON body: {e}") + data = { + "raw_body": body.decode('utf-8', errors='ignore'), + "content_type": headers.get('content-type', '') + } + + trigger_id = data.get('trigger_id') + agent_id = data.get('agent_id') + + if not trigger_id: + logger.error("No trigger_id in schedule webhook payload") + return JSONResponse( + status_code=400, + content={"error": "trigger_id is required"} + ) + + logger.info(f"Processing scheduled trigger event for {trigger_id}") + manager = await get_trigger_manager() + + trigger_config = await manager.get_trigger(trigger_id) + if trigger_config: + data['trigger_config'] = trigger_config.config + + result = await manager.process_trigger_event(trigger_id, data) + + logger.info(f"Schedule trigger processing result: success={result.success}, should_execute={result.should_execute_agent}, error={result.error_message}") + + if result.success and result.should_execute_agent: + executor = AgentTriggerExecutor(db) + if trigger_config: + from .core import TriggerEvent, TriggerType + trigger_type = trigger_config.trigger_type + if isinstance(trigger_type, str): + trigger_type = TriggerType(trigger_type) + + trigger_event = TriggerEvent( + trigger_id=trigger_id, + agent_id=trigger_config.agent_id, + trigger_type=trigger_type, + raw_data=data + ) + + execution_result = await executor.execute_triggered_agent( + agent_id=trigger_config.agent_id, + trigger_result=result, + trigger_event=trigger_event + ) + + logger.info(f"Scheduled agent execution result: {execution_result}") + return JSONResponse(content={ + "message": "Schedule webhook processed and agent execution started", + "trigger_id": trigger_id, + "agent_id": trigger_config.agent_id, + "thread_id": execution_result.get("thread_id"), + "agent_run_id": execution_result.get("agent_run_id") + }) + + if result.response_data: + return JSONResponse(content=result.response_data) + elif result.success: + return {"message": "Schedule webhook processed successfully"} + else: + logger.warning(f"Schedule webhook processing failed for {trigger_id}: {result.error_message}") + return JSONResponse( + status_code=400, + content={"error": result.error_message} + ) + + except Exception as e: + logger.error(f"Error processing schedule webhook: {e}") + import traceback + logger.error(f"Traceback: {traceback.format_exc()}") + return JSONResponse( + status_code=500, + content={"error": "Internal server error"} + ) + @router.post("/{trigger_id}/webhook") async def handle_webhook( trigger_id: str, diff --git a/backend/triggers/core.py b/backend/triggers/core.py index 621ee907..2fcfe614 100644 --- a/backend/triggers/core.py +++ b/backend/triggers/core.py @@ -291,6 +291,34 @@ class TriggerManager: response_template={ "agent_prompt": "GitHub {github_event} event in {github_repo} by {github_sender}" } + ), + ProviderDefinition( + provider_id="schedule", + name="Schedule", + description="Schedule agent execution using Cloudflare Workers and cron expressions", + trigger_type="schedule", + provider_class="triggers.providers.schedule_provider.ScheduleTriggerProvider", + webhook_enabled=True, + config_schema={ + "type": "object", + "properties": { + "cron_expression": { + "type": "string", + "description": "Cron expression for scheduling", + "pattern": r"^(\*|([0-9]|1[0-9]|2[0-9]|3[0-9]|4[0-9]|5[0-9])|\*\/([0-9]|1[0-9]|2[0-9]|3[0-9]|4[0-9]|5[0-9])) (\*|([0-9]|1[0-9]|2[0-3])|\*\/([0-9]|1[0-9]|2[0-3])) (\*|([1-9]|1[0-9]|2[0-9]|3[0-1])|\*\/([1-9]|1[0-9]|2[0-9]|3[0-1])) (\*|([1-9]|1[0-2])|\*\/([1-9]|1[0-2])) (\*|([0-6])|\*\/([0-6]))$" + }, + "agent_prompt": { + "type": "string", + "description": "The prompt to run the agent with when triggered" + }, + "timezone": { + "type": "string", + "description": "Timezone for schedule execution (default: UTC)", + "default": "UTC" + } + }, + "required": ["cron_expression", "agent_prompt"] + } ) ] diff --git a/backend/triggers/providers/__init__.py b/backend/triggers/providers/__init__.py index 866e2d5c..9aff8bda 100644 --- a/backend/triggers/providers/__init__.py +++ b/backend/triggers/providers/__init__.py @@ -1,7 +1,9 @@ from .telegram_provider import TelegramTriggerProvider from .slack_provider import SlackTriggerProvider +from .schedule_provider import ScheduleTriggerProvider __all__ = [ 'TelegramTriggerProvider', - 'SlackTriggerProvider' + 'SlackTriggerProvider', + 'ScheduleTriggerProvider' ] \ No newline at end of file diff --git a/backend/triggers/providers/schedule_provider.py b/backend/triggers/providers/schedule_provider.py new file mode 100644 index 00000000..42c7fcaf --- /dev/null +++ b/backend/triggers/providers/schedule_provider.py @@ -0,0 +1,259 @@ +import asyncio +import json +import os +from datetime import datetime, timezone +from typing import Dict, Any, Optional +from qstash.client import QStash +from utils.logger import logger +from ..core import TriggerProvider, TriggerType, TriggerEvent, TriggerResult, TriggerConfig, ProviderDefinition + +class ScheduleTriggerProvider(TriggerProvider): + """Schedule trigger provider using Upstash QStash.""" + + def __init__(self, provider_definition: Optional[ProviderDefinition] = None): + super().__init__(TriggerType.SCHEDULE, provider_definition) + + self.qstash_token = os.getenv("QSTASH_TOKEN") + self.webhook_base_url = os.getenv("WEBHOOK_BASE_URL", "http://localhost:8000") + + if not self.qstash_token: + logger.warning("QSTASH_TOKEN not found. QStash provider will not work without it.") + self.qstash = None + else: + self.qstash = QStash(token=self.qstash_token) + + async def validate_config(self, config: Dict[str, Any]) -> Dict[str, Any]: + """Validate schedule configuration.""" + if not self.qstash: + raise ValueError("QSTASH_TOKEN environment variable is required for QStash scheduling") + + if 'cron_expression' not in config: + raise ValueError("cron_expression is required for QStash schedule triggers") + + if 'agent_prompt' not in config: + raise ValueError("agent_prompt is required for schedule triggers") + + try: + import croniter + croniter.croniter(config['cron_expression']) + except ImportError: + raise ValueError("croniter package is required for cron expressions. Please install it with: pip install croniter") + except Exception as e: + raise ValueError(f"Invalid cron expression: {str(e)}") + + return config + + async def setup_trigger(self, trigger_config: TriggerConfig) -> bool: + """Set up scheduled trigger using QStash.""" + try: + webhook_url = f"{self.webhook_base_url}/api/triggers/qstash/webhook" + webhook_payload = { + "trigger_id": trigger_config.trigger_id, + "agent_id": trigger_config.agent_id, + "agent_prompt": trigger_config.config['agent_prompt'], + "schedule_name": trigger_config.name, + "cron_expression": trigger_config.config['cron_expression'], + "event_type": "scheduled", + "provider": "qstash" + } + schedule_id = await asyncio.to_thread( + self.qstash.schedule.create, + destination=webhook_url, + cron=trigger_config.config['cron_expression'], + body=json.dumps(webhook_payload), + headers={ + "Content-Type": "application/json", + "X-Schedule-Provider": "qstash", + "X-Trigger-ID": trigger_config.trigger_id, + "X-Agent-ID": trigger_config.agent_id + }, + retries=3, + delay="5s" + ) + trigger_config.config['qstash_schedule_id'] = schedule_id + logger.info(f"Successfully created QStash schedule {schedule_id} for trigger {trigger_config.trigger_id}") + return True + + except Exception as e: + logger.error(f"Error setting up QStash scheduled trigger {trigger_config.trigger_id}: {e}") + return False + + async def teardown_trigger(self, trigger_config: TriggerConfig) -> bool: + """Remove scheduled trigger from QStash.""" + try: + schedule_id = trigger_config.config.get('qstash_schedule_id') + if not schedule_id: + logger.warning(f"No QStash schedule ID found for trigger {trigger_config.trigger_id}") + return True + await asyncio.to_thread( + self.qstash.schedule.delete, + schedule_id=schedule_id + ) + logger.info(f"Successfully deleted QStash schedule {schedule_id}") + return True + + except Exception as e: + logger.error(f"Error removing QStash scheduled trigger {trigger_config.trigger_id}: {e}") + return False + + async def process_event(self, event: TriggerEvent) -> TriggerResult: + """Process scheduled trigger event from QStash.""" + try: + raw_data = event.raw_data + agent_prompt = raw_data.get('agent_prompt', 'Execute scheduled task') + execution_variables = { + 'scheduled_at': event.timestamp.isoformat(), + 'trigger_id': event.trigger_id, + 'agent_id': event.agent_id, + 'schedule_name': raw_data.get('schedule_name', 'Scheduled Task'), + 'execution_source': 'qstash', + 'cron_expression': raw_data.get('cron_expression'), + 'qstash_message_id': raw_data.get('messageId') + } + return TriggerResult( + success=True, + should_execute_agent=True, + agent_prompt=agent_prompt, + execution_variables=execution_variables + ) + + except Exception as e: + return TriggerResult( + success=False, + error_message=f"Error processing QStash scheduled trigger event: {str(e)}" + ) + + async def health_check(self, trigger_config: TriggerConfig) -> bool: + """Check if the QStash scheduled trigger is healthy.""" + try: + schedule_id = trigger_config.config.get('qstash_schedule_id') + if not schedule_id: + return False + + schedule = await asyncio.to_thread( + self.qstash.schedule.get, + schedule_id=schedule_id + ) + + return getattr(schedule, 'is_active', False) + + except Exception as e: + logger.error(f"Health check failed for QStash scheduled trigger {trigger_config.trigger_id}: {e}") + return False + + async def pause_trigger(self, trigger_config: TriggerConfig) -> bool: + """Pause a QStash schedule.""" + try: + schedule_id = trigger_config.config.get('qstash_schedule_id') + if not schedule_id: + return False + + await asyncio.to_thread( + self.qstash.schedules.pause, + schedule_id=schedule_id + ) + + logger.info(f"Successfully paused QStash schedule {schedule_id}") + return True + + except Exception as e: + logger.error(f"Error pausing QStash schedule: {e}") + return False + + async def resume_trigger(self, trigger_config: TriggerConfig) -> bool: + """Resume a QStash schedule.""" + try: + schedule_id = trigger_config.config.get('qstash_schedule_id') + if not schedule_id: + return False + + await asyncio.to_thread( + self.qstash.schedules.resume, + schedule_id=schedule_id + ) + + logger.info(f"Successfully resumed QStash schedule {schedule_id}") + return True + + except Exception as e: + logger.error(f"Error resuming QStash schedule: {e}") + return False + + async def update_trigger(self, trigger_config: TriggerConfig) -> bool: + """Update a QStash schedule by recreating it.""" + try: + schedule_id = trigger_config.config.get('qstash_schedule_id') + webhook_url = f"{self.webhook_base_url}/api/triggers/qstash/webhook" + + webhook_payload = { + "trigger_id": trigger_config.trigger_id, + "agent_id": trigger_config.agent_id, + "agent_prompt": trigger_config.config['agent_prompt'], + "schedule_name": trigger_config.name, + "cron_expression": trigger_config.config['cron_expression'], + "event_type": "scheduled", + "provider": "qstash" + } + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{self.qstash_base_url}/schedules", + headers={ + "Authorization": f"Bearer {self.qstash_token}", + "Content-Type": "application/json" + }, + json={ + "scheduleId": schedule_id, + "destination": webhook_url, + "cron": trigger_config.config['cron_expression'], + "body": webhook_payload, + "headers": { + "Content-Type": "application/json", + "X-Schedule-Provider": "qstash", + "X-Trigger-ID": trigger_config.trigger_id, + "X-Agent-ID": trigger_config.agent_id + }, + "retries": 3, + "delay": "5s" + }, + timeout=30.0 + ) + + if response.status_code == 200: + logger.info(f"Successfully updated QStash schedule {schedule_id}") + return True + else: + logger.error(f"Failed to update QStash schedule: {response.status_code} - {response.text}") + return False + + except Exception as e: + logger.error(f"Error updating QStash schedule: {e}") + return False + + def get_webhook_url(self, trigger_id: str, base_url: str) -> Optional[str]: + """Return webhook URL for QStash schedules.""" + return f"{base_url}/api/triggers/qstash/webhook" + + async def list_schedules(self) -> list: + """List all QStash schedules.""" + try: + schedules_data = await asyncio.to_thread( + self.qstash.schedules.list + ) + + schedules = [] + for schedule in schedules_data: + schedules.append({ + 'id': getattr(schedule, 'schedule_id', None), + 'destination': getattr(schedule, 'destination', None), + 'cron': getattr(schedule, 'cron', None), + 'is_active': getattr(schedule, 'is_active', False), + 'created_at': getattr(schedule, 'created_at', None), + 'next_delivery': getattr(schedule, 'next_delivery', None) + }) + + return schedules + + except Exception as e: + logger.error(f"Error listing QStash schedules: {e}") + return [] \ No newline at end of file diff --git a/frontend/src/app/(dashboard)/agents/new/[agentId]/page.tsx b/frontend/src/app/(dashboard)/agents/new/[agentId]/page.tsx index a7f401ab..d2553be3 100644 --- a/frontend/src/app/(dashboard)/agents/new/[agentId]/page.tsx +++ b/frontend/src/app/(dashboard)/agents/new/[agentId]/page.tsx @@ -34,7 +34,6 @@ export default function AgentConfigurationPage() { const updateAgentMutation = useUpdateAgent(); const { state, setOpen, setOpenMobile } = useSidebar(); - // Ref to track if initial layout has been applied (for sidebar closing) const initialLayoutAppliedRef = useRef(false); const [formData, setFormData] = useState({ @@ -395,6 +394,7 @@ export default function AgentConfigurationPage() {
Triggers + New
diff --git a/frontend/src/components/agents/triggers/agent-triggers-configuration.tsx b/frontend/src/components/agents/triggers/agent-triggers-configuration.tsx index 8eb1b945..768754db 100644 --- a/frontend/src/components/agents/triggers/agent-triggers-configuration.tsx +++ b/frontend/src/components/agents/triggers/agent-triggers-configuration.tsx @@ -89,7 +89,6 @@ export const AgentTriggersConfiguration: React.FC ['telegram', 'slack', 'webhook'].includes(provider.trigger_type) ); @@ -136,7 +134,7 @@ export const AgentTriggersConfiguration: React.FC

Manual Configuration

- For advanced users who want to configure triggers manually with custom settings. + Configure triggers manually with custom settings for advanced use cases.

{availableProviders.map((provider) => ( @@ -173,7 +171,6 @@ export const AgentTriggersConfiguration: React.FC )} - {/* Empty State */} {!isLoading && triggers.length === 0 && (
diff --git a/frontend/src/components/agents/triggers/one-click-integrations.tsx b/frontend/src/components/agents/triggers/one-click-integrations.tsx index 6d72c893..07a0d7a2 100644 --- a/frontend/src/components/agents/triggers/one-click-integrations.tsx +++ b/frontend/src/components/agents/triggers/one-click-integrations.tsx @@ -1,16 +1,25 @@ "use client"; -import React, { useEffect } from 'react'; +import React, { useEffect, useState } from 'react'; import { Button } from '@/components/ui/button'; -import { Loader2, ExternalLink, AlertCircle } from 'lucide-react'; +import { Loader2, ExternalLink, AlertCircle, Clock } from 'lucide-react'; import { SlackIcon } from '@/components/ui/icons/slack'; import { getTriggerIcon } from './utils'; +import { TriggerConfigDialog } from './trigger-config-dialog'; +import { TriggerProvider, ScheduleTriggerConfig } from './types'; +import { Dialog } from '@/components/ui/dialog'; import { useOAuthIntegrations, useInstallOAuthIntegration, useUninstallOAuthIntegration, useOAuthCallbackHandler } from '@/hooks/react-query/triggers/use-oauth-integrations'; +import { + useAgentTriggers, + useCreateTrigger, + useDeleteTrigger +} from '@/hooks/react-query/triggers'; +import { toast } from 'sonner'; interface OneClickIntegrationsProps { agentId: string; @@ -19,15 +28,13 @@ interface OneClickIntegrationsProps { const OAUTH_PROVIDERS = { slack: { name: 'Slack', - icon: + icon: , + isOAuth: true }, - discord: { - name: 'Discord', - icon: {getTriggerIcon('discord')} - }, - teams: { - name: 'Microsoft Teams', - icon: {getTriggerIcon('teams')} + schedule: { + name: 'Schedule', + icon: , + isOAuth: false } } as const; @@ -36,9 +43,14 @@ type ProviderKey = keyof typeof OAUTH_PROVIDERS; export const OneClickIntegrations: React.FC = ({ agentId }) => { + const [configuringSchedule, setConfiguringSchedule] = useState(false); + const { data: integrationStatus, isLoading, error } = useOAuthIntegrations(agentId); + const { data: triggers = [] } = useAgentTriggers(agentId); const installMutation = useInstallOAuthIntegration(); const uninstallMutation = useUninstallOAuthIntegration(); + const createTriggerMutation = useCreateTrigger(); + const deleteTriggerMutation = useDeleteTrigger(); const { handleCallback } = useOAuthCallbackHandler(); useEffect(() => { @@ -46,6 +58,11 @@ export const OneClickIntegrations: React.FC = ({ }, []); const handleInstall = async (provider: ProviderKey) => { + if (provider === 'schedule') { + setConfiguringSchedule(true); + return; + } + try { await installMutation.mutateAsync({ agent_id: agentId, @@ -56,15 +73,46 @@ export const OneClickIntegrations: React.FC = ({ } }; - const handleUninstall = async (triggerId: string) => { + const handleUninstall = async (provider: ProviderKey, triggerId?: string) => { + if (provider === 'schedule' && triggerId) { + try { + await deleteTriggerMutation.mutateAsync(triggerId); + toast.success('Schedule trigger removed successfully'); + } catch (error) { + toast.error('Failed to remove schedule trigger'); + console.error('Error removing schedule trigger:', error); + } + return; + } + try { - await uninstallMutation.mutateAsync(triggerId); + await uninstallMutation.mutateAsync(triggerId!); } catch (error) { console.error('Error uninstalling integration:', error); } }; + const handleScheduleSave = async (config: any) => { + try { + await createTriggerMutation.mutateAsync({ + agentId, + provider_id: 'schedule', + name: config.name || 'Scheduled Trigger', + description: config.description || 'Automatically scheduled trigger', + config: config.config, + }); + toast.success('Schedule trigger created successfully'); + setConfiguringSchedule(false); + } catch (error: any) { + toast.error(error.message || 'Failed to create schedule trigger'); + console.error('Error creating schedule trigger:', error); + } + }; + const getIntegrationForProvider = (provider: ProviderKey) => { + if (provider === 'schedule') { + return triggers.find(trigger => trigger.trigger_type === 'schedule'); + } return integrationStatus?.integrations.find(integration => integration.provider === provider ); @@ -74,6 +122,14 @@ export const OneClickIntegrations: React.FC = ({ return !!getIntegrationForProvider(provider); }; + const getTriggerId = (provider: ProviderKey) => { + const integration = getIntegrationForProvider(provider); + if (provider === 'schedule') { + return integration?.trigger_id; + } + return integration?.trigger_id; + }; + if (error) { return (
@@ -90,30 +146,68 @@ export const OneClickIntegrations: React.FC = ({ ); } + const scheduleProvider: TriggerProvider = { + provider_id: 'schedule', + name: 'Schedule', + description: 'Schedule agent execution using cron expressions', + trigger_type: 'schedule', + webhook_enabled: true, + config_schema: {} + }; + return ( -
- {Object.entries(OAUTH_PROVIDERS).map(([providerId, config]) => { - const provider = providerId as ProviderKey; - const integration = getIntegrationForProvider(provider); - const isInstalled = isProviderInstalled(provider); - const isLoading = installMutation.isPending || uninstallMutation.isPending; - return ( - - ); - })} +
+
+ {Object.entries(OAUTH_PROVIDERS).map(([providerId, config]) => { + const provider = providerId as ProviderKey; + const integration = getIntegrationForProvider(provider); + const isInstalled = isProviderInstalled(provider); + const isLoading = installMutation.isPending || uninstallMutation.isPending || + (provider === 'schedule' && (createTriggerMutation.isPending || deleteTriggerMutation.isPending)); + const triggerId = getTriggerId(provider); + + const buttonText = provider === 'schedule' + ? config.name + : (isInstalled ? `Disconnect ${config.name}` : `Connect ${config.name}`); + + return ( + + ); + })} +
+ + {configuringSchedule && ( + + setConfiguringSchedule(false)} + isLoading={createTriggerMutation.isPending} + /> + + )}
); }; \ No newline at end of file diff --git a/frontend/src/components/agents/triggers/providers/schedule-config.tsx b/frontend/src/components/agents/triggers/providers/schedule-config.tsx new file mode 100644 index 00000000..ac1b2bd8 --- /dev/null +++ b/frontend/src/components/agents/triggers/providers/schedule-config.tsx @@ -0,0 +1,520 @@ +"use client"; + +import React, { useState, useEffect } from 'react'; +import { Input } from '@/components/ui/input'; +import { Label } from '@/components/ui/label'; +import { Textarea } from '@/components/ui/textarea'; +import { Button } from '@/components/ui/button'; +import { Select, SelectContent, SelectItem, SelectTrigger, SelectValue } from '@/components/ui/select'; +import { Tabs, TabsContent, TabsList, TabsTrigger } from '@/components/ui/tabs'; +import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/card'; +import { RadioGroup, RadioGroupItem } from '@/components/ui/radio-group'; +import { Calendar } from '@/components/ui/calendar'; +import { Popover, PopoverContent, PopoverTrigger } from '@/components/ui/popover'; +import { Clock, Calendar as CalendarIcon, Info, Zap, Repeat, Timer, Target } from 'lucide-react'; +import { format, startOfDay } from 'date-fns'; +import { cn } from '@/lib/utils'; +import { TriggerProvider, ScheduleTriggerConfig } from '../types'; + +interface ScheduleTriggerConfigFormProps { + provider: TriggerProvider; + config: ScheduleTriggerConfig; + onChange: (config: ScheduleTriggerConfig) => void; + errors: Record; +} + +type ScheduleType = 'quick' | 'recurring' | 'advanced' | 'one-time'; + +interface QuickPreset { + name: string; + cron: string; + description: string; + icon: React.ReactNode; + category: 'frequent' | 'daily' | 'weekly' | 'monthly'; +} + +const QUICK_PRESETS: QuickPreset[] = [ + { name: 'Every minute', cron: '* * * * *', description: 'Every minute', icon: , category: 'frequent' }, + { name: 'Every 5 minutes', cron: '*/5 * * * *', description: 'Every 5 minutes', icon: , category: 'frequent' }, + { name: 'Every 15 minutes', cron: '*/15 * * * *', description: 'Every 15 minutes', icon: , category: 'frequent' }, + { name: 'Every 30 minutes', cron: '*/30 * * * *', description: 'Every 30 minutes', icon: , category: 'frequent' }, + { name: 'Every hour', cron: '0 * * * *', description: 'At the start of every hour', icon: , category: 'frequent' }, + + { name: 'Daily at 9 AM', cron: '0 9 * * *', description: 'Every day at 9:00 AM', icon: , category: 'daily' }, + { name: 'Daily at 12 PM', cron: '0 12 * * *', description: 'Every day at 12:00 PM', icon: , category: 'daily' }, + { name: 'Daily at 6 PM', cron: '0 18 * * *', description: 'Every day at 6:00 PM', icon: , category: 'daily' }, + { name: 'Twice daily', cron: '0 9,17 * * *', description: 'Every day at 9 AM and 5 PM', icon: , category: 'daily' }, + + { name: 'Weekdays at 9 AM', cron: '0 9 * * 1-5', description: 'Monday-Friday at 9:00 AM', icon: , category: 'weekly' }, + { name: 'Monday mornings', cron: '0 9 * * 1', description: 'Every Monday at 9:00 AM', icon: , category: 'weekly' }, + { name: 'Friday evenings', cron: '0 17 * * 5', description: 'Every Friday at 5:00 PM', icon: , category: 'weekly' }, + { name: 'Weekend mornings', cron: '0 10 * * 0,6', description: 'Saturday & Sunday at 10:00 AM', icon: , category: 'weekly' }, + + { name: 'Monthly on 1st', cron: '0 9 1 * *', description: 'First day of month at 9:00 AM', icon: , category: 'monthly' }, + { name: 'Monthly on 15th', cron: '0 9 15 * *', description: '15th of month at 9:00 AM', icon: , category: 'monthly' }, + { name: 'End of month', cron: '0 9 28-31 * *', description: 'Last few days of month at 9:00 AM', icon: , category: 'monthly' }, +]; + +const TIMEZONES = [ + { value: 'UTC', label: 'UTC (Coordinated Universal Time)' }, + { value: 'America/New_York', label: 'Eastern Time (ET)' }, + { value: 'America/Chicago', label: 'Central Time (CT)' }, + { value: 'America/Denver', label: 'Mountain Time (MT)' }, + { value: 'America/Los_Angeles', label: 'Pacific Time (PT)' }, + { value: 'Europe/London', label: 'Greenwich Mean Time (GMT)' }, + { value: 'Europe/Paris', label: 'Central European Time (CET)' }, + { value: 'Europe/Berlin', label: 'Central European Time (CET)' }, + { value: 'Asia/Tokyo', label: 'Japan Standard Time (JST)' }, + { value: 'Asia/Shanghai', label: 'China Standard Time (CST)' }, + { value: 'Australia/Sydney', label: 'Australian Eastern Time (AET)' }, +]; + +const WEEKDAYS = [ + { value: '1', label: 'Monday', short: 'Mon' }, + { value: '2', label: 'Tuesday', short: 'Tue' }, + { value: '3', label: 'Wednesday', short: 'Wed' }, + { value: '4', label: 'Thursday', short: 'Thu' }, + { value: '5', label: 'Friday', short: 'Fri' }, + { value: '6', label: 'Saturday', short: 'Sat' }, + { value: '0', label: 'Sunday', short: 'Sun' }, +]; + +const MONTHS = [ + { value: '1', label: 'January' }, + { value: '2', label: 'February' }, + { value: '3', label: 'March' }, + { value: '4', label: 'April' }, + { value: '5', label: 'May' }, + { value: '6', label: 'June' }, + { value: '7', label: 'July' }, + { value: '8', label: 'August' }, + { value: '9', label: 'September' }, + { value: '10', label: 'October' }, + { value: '11', label: 'November' }, + { value: '12', label: 'December' }, +]; + +export const ScheduleTriggerConfigForm: React.FC = ({ + provider, + config, + onChange, + errors, +}) => { + const [scheduleType, setScheduleType] = useState('quick'); + const [selectedPreset, setSelectedPreset] = useState(''); + + const [recurringType, setRecurringType] = useState<'daily' | 'weekly' | 'monthly'>('daily'); + const [selectedWeekdays, setSelectedWeekdays] = useState(['1', '2', '3', '4', '5']); + const [selectedMonths, setSelectedMonths] = useState(['*']); + const [dayOfMonth, setDayOfMonth] = useState('1'); + const [scheduleTime, setScheduleTime] = useState<{ hour: string; minute: string }>({ hour: '09', minute: '00' }); + + const [selectedDate, setSelectedDate] = useState(); + const [oneTimeTime, setOneTimeTime] = useState<{ hour: string; minute: string }>({ hour: '09', minute: '00' }); + + const generateCronExpression = () => { + if (scheduleType === 'quick' && selectedPreset) { + return selectedPreset; + } + if (scheduleType === 'recurring') { + const { hour, minute } = scheduleTime; + switch (recurringType) { + case 'daily': + return `${minute} ${hour} * * *`; + case 'weekly': + const weekdayStr = selectedWeekdays.join(','); + return `${minute} ${hour} * * ${weekdayStr}`; + case 'monthly': + const monthStr = selectedMonths.includes('*') ? '*' : selectedMonths.join(','); + return `${minute} ${hour} ${dayOfMonth} ${monthStr} *`; + default: + return `${minute} ${hour} * * *`; + } + } + if (scheduleType === 'one-time' && selectedDate) { + const { hour, minute } = oneTimeTime; + const day = selectedDate.getDate(); + const month = selectedDate.getMonth() + 1; + const year = selectedDate.getFullYear(); + return `${minute} ${hour} ${day} ${month} *`; + } + return config.cron_expression || ''; + }; + + useEffect(() => { + const newCron = generateCronExpression(); + if (newCron && newCron !== config.cron_expression) { + onChange({ + ...config, + cron_expression: newCron, + }); + } + }, [scheduleType, selectedPreset, recurringType, selectedWeekdays, selectedMonths, dayOfMonth, scheduleTime, selectedDate, oneTimeTime]); + + const handlePresetSelect = (preset: QuickPreset) => { + setSelectedPreset(preset.cron); + onChange({ + ...config, + cron_expression: preset.cron, + }); + }; + + const handleAgentPromptChange = (value: string) => { + onChange({ + ...config, + agent_prompt: value, + }); + }; + + const handleTimezoneChange = (value: string) => { + onChange({ + ...config, + timezone: value, + }); + }; + + const handleWeekdayToggle = (weekday: string) => { + setSelectedWeekdays(prev => + prev.includes(weekday) + ? prev.filter(w => w !== weekday) + : [...prev, weekday].sort() + ); + }; + + const handleMonthToggle = (month: string) => { + if (month === '*') { + setSelectedMonths(['*']); + } else { + setSelectedMonths(prev => { + const filtered = prev.filter(m => m !== '*'); + return filtered.includes(month) + ? filtered.filter(m => m !== month) + : [...filtered, month].sort((a, b) => parseInt(a) - parseInt(b)); + }); + } + }; + + const groupedPresets = QUICK_PRESETS.reduce((acc, preset) => { + if (!acc[preset.category]) acc[preset.category] = []; + acc[preset.category].push(preset); + return acc; + }, {} as Record); + + return ( +
+ + + + Configure when your agent should be triggered automatically. Choose from quick presets, recurring schedules, or set up advanced cron expressions. + + + + setScheduleType(value as ScheduleType)} className="w-full"> + + + + Quick + + + + Recurring + + + + One-time + + + + Advanced + + + + +
+ {Object.entries(groupedPresets).map(([category, presets]) => ( +
+

{category} Schedules

+
+ {presets.map((preset) => ( + handlePresetSelect(preset)} + > + +
+
{preset.icon}
+
+
{preset.name}
+
{preset.description}
+
+
+
+
+ ))} +
+
+ ))} +
+
+ + +
+
+ + setRecurringType(value as any)}> +
+ + +
+
+ + +
+
+ + +
+
+
+ + {recurringType === 'weekly' && ( +
+ +
+ {WEEKDAYS.map((day) => ( + + ))} +
+
+ )} + + {recurringType === 'monthly' && ( +
+
+ + +
+
+ +
+ +
+ {MONTHS.map((month) => ( + + ))} +
+
+
+
+ )} + +
+ +
+ + : + +
+
+
+
+ + +
+
+ + + + + + + date < startOfDay(new Date())} + initialFocus + /> + + +
+ +
+ +
+ + : + +
+
+
+
+ + +
+ + onChange({ ...config, cron_expression: e.target.value })} + placeholder="0 9 * * 1-5" + className={errors.cron_expression ? 'border-destructive' : ''} + /> + {errors.cron_expression && ( +

{errors.cron_expression}

+ )} + + +
+ + Cron Format +
+
+
Format: minute hour day month weekday
+
Example: 0 9 * * 1-5 = Weekdays at 9 AM
+
Use * for any value, */5 for every 5 units
+
+
+
+
+
+
+
+
+ + + +
+ + +
+ +
+ +