buster/apps/trigger
Nate Kelley 87a515a95f
add supabase app to workspace
2025-09-23 11:06:12 -06:00
..
.cursor/rules Add basic task examples and update SDK version in Trigger.dev documentation 2025-08-21 13:27:31 -06:00
src adding v2 list chats endpoint 2025-09-19 15:22:11 -06:00
tests Move to scoped database imports 2025-09-18 12:36:37 -06:00
.env.example env vars and oauth scopes 2025-07-07 11:03:40 -06:00
.gitignore Mastra braintrust (#391) 2025-07-02 14:33:40 -07:00
AGENTS.md Add basic task examples and update SDK version in Trigger.dev documentation 2025-08-21 13:27:31 -06:00
CLAUDE.md CLAUDE.md and README.md updates... 2025-09-15 15:06:41 -06:00
README.md CLAUDE.md and README.md updates... 2025-09-15 15:06:41 -06:00
biome.json Mastra braintrust (#391) 2025-07-02 14:33:40 -07:00
env.d.ts env config 2025-09-02 13:20:38 -06:00
package.json add supabase app to workspace 2025-09-23 11:06:12 -06:00
trigger.config.ts exclude effect 2025-09-18 15:43:37 -06:00
tsconfig.json fix all of the bugs 2025-07-12 22:14:08 -06:00
turbo.json turbo start for trigger, server, web 2025-09-23 10:09:30 -06:00
vitest.config.ts Mastra braintrust (#391) 2025-07-02 14:33:40 -07:00

README.md

Trigger Application

This app handles all background job processing using Trigger.dev v3. It assembles packages to run long-running and scheduled tasks.

Installation

pnpm add @buster-app/trigger

Overview

@buster-app/trigger is responsible for:

  • Background job processing
  • Scheduled/cron tasks
  • Long-running AI agent workflows
  • Async processing that shouldn't block the API
  • Never interfaces directly with clients

Technology Stack

  • Framework: Trigger.dev v3
  • Runtime: Node.js
  • Validation: Zod for input validation
  • Architecture: Task-based, functional

Architecture

Apps → @buster-app/trigger → Trigger.dev v3
              ↓
         Task Functions
              ↓
          Packages
    (Reuse all package logic)

Task Organization

Directory Structure

trigger/
├── src/
│   ├── tasks/
│   │   ├── ai/
│   │   │   ├── analyst-workflow.ts
│   │   │   ├── data-processing.ts
│   │   │   └── report-generation.ts
│   │   ├── data/
│   │   │   ├── sync-data-sources.ts
│   │   │   ├── refresh-materialized-views.ts
│   │   │   └── cleanup-old-data.ts
│   │   ├── notifications/
│   │   │   ├── send-email.ts
│   │   │   ├── send-slack.ts
│   │   │   └── webhook-delivery.ts
│   │   └── scheduled/
│   │       ├── daily-reports.ts
│   │       ├── usage-metrics.ts
│   │       └── health-checks.ts
│   ├── trigger.config.ts
│   └── index.ts

Task Implementation

Basic Task Pattern

Tasks are pure functions that use packages:

import { task } from '@trigger.dev/sdk/v3';
import { z } from 'zod';
import { analystAgent } from '@buster/ai';
import { createChatMessage } from '@buster/database';

// Task input schema
const AnalystWorkflowParamsSchema = z.object({
  chatId: z.string().uuid().describe('Chat conversation ID'),
  query: z.string().describe('User query to analyze'),
  dataSourceId: z.string().uuid().describe('Data source to query'),
  userId: z.string().uuid().describe('User requesting analysis')
});

type AnalystWorkflowParams = z.infer<typeof AnalystWorkflowParamsSchema>;

// Task definition
export const analystWorkflow = task({
  id: 'analyst-workflow',
  retry: {
    maxAttempts: 3,
    minTimeout: '1s',
    maxTimeout: '10s'
  },
  run: async (params: AnalystWorkflowParams) => {
    const validated = AnalystWorkflowParamsSchema.parse(params);
    
    // Step 1: Run AI analysis
    const analysis = await analystAgent({
      query: validated.query,
      context: {
        dataSourceId: validated.dataSourceId,
        userId: validated.userId
      }
    });
    
    // Step 2: Save results to database
    await createChatMessage({
      chatId: validated.chatId,
      content: analysis.response,
      role: 'assistant',
      metadata: {
        toolCalls: analysis.toolCalls,
        usage: analysis.usage
      }
    });
    
    return {
      success: true,
      messageId: analysis.messageId
    };
  }
});

Scheduled Task Pattern

import { schedules } from '@trigger.dev/sdk/v3';
import { generateDailyReports } from '@buster/reporting';
import { getActiveOrganizations } from '@buster/database';

export const dailyReports = schedules.task({
  id: 'daily-reports',
  cron: '0 9 * * *', // 9 AM daily
  run: async () => {
    const organizations = await getActiveOrganizations();
    
    // Process each org in parallel
    const results = await Promise.all(
      organizations.map(org => 
        generateDailyReports({
          organizationId: org.id,
          date: new Date()
        })
      )
    );
    
    return {
      processed: results.length,
      successful: results.filter(r => r.success).length
    };
  }
});

Long-Running Workflows

Step-Based Workflows

import { task, wait } from '@trigger.dev/sdk/v3';

export const dataImportWorkflow = task({
  id: 'data-import-workflow',
  run: async (params: ImportParams) => {
    // Step 1: Validate data source
    const validation = await task.run('validate-source', async () => {
      return validateDataSource(params.dataSourceId);
    });
    
    if (!validation.isValid) {
      throw new Error(`Invalid data source: ${validation.error}`);
    }
    
    // Step 2: Extract data
    const extraction = await task.run('extract-data', async () => {
      return extractData(params.dataSourceId);
    });
    
    // Step 3: Transform data
    const transformation = await task.run('transform-data', async () => {
      return transformData(extraction.data);
    });
    
    // Step 4: Wait for rate limit window
    await wait.for({ seconds: 5 });
    
    // Step 5: Load data
    const loading = await task.run('load-data', async () => {
      return loadData(transformation.data);
    });
    
    return {
      recordsProcessed: loading.count,
      duration: Date.now() - startTime
    };
  }
});

Parallel Processing

export const bulkAnalysis = task({
  id: 'bulk-analysis',
  run: async (params: BulkParams) => {
    const items = await getItemsToProcess(params.batchId);
    
    // Process in chunks to avoid overwhelming the system
    const chunks = chunkArray(items, 10);
    
    const results = [];
    for (const chunk of chunks) {
      const chunkResults = await Promise.all(
        chunk.map(item => 
          task.run(`analyze-${item.id}`, () => 
            analyzeItem(item)
          )
        )
      );
      results.push(...chunkResults);
      
      // Rate limiting between chunks
      await wait.for({ seconds: 2 });
    }
    
    return {
      total: results.length,
      successful: results.filter(r => r.success).length
    };
  }
});

Error Handling

Retry Configuration

export const resilientTask = task({
  id: 'resilient-task',
  retry: {
    maxAttempts: 5,
    minTimeout: '1s',
    maxTimeout: '30s',
    factor: 2, // Exponential backoff
    randomize: true
  },
  run: async (params) => {
    try {
      return await riskyOperation(params);
    } catch (error) {
      // Log error for monitoring
      console.error('Task failed:', error);
      
      // Determine if should retry
      if (error.code === 'RATE_LIMIT') {
        // Will be retried automatically
        throw error;
      }
      
      if (error.code === 'INVALID_INPUT') {
        // Don't retry for bad input
        return {
          success: false,
          error: 'Invalid input provided'
        };
      }
      
      // Unknown error, let it retry
      throw error;
    }
  }
});

Dead Letter Queue

export const criticalTask = task({
  id: 'critical-task',
  onFailure: async ({ error, params, attempts }) => {
    // Send to dead letter queue
    await saveFailedTask({
      taskId: 'critical-task',
      params,
      error: error.message,
      attempts,
      failedAt: new Date()
    });
    
    // Alert team
    await notifyOps({
      message: `Critical task failed after ${attempts} attempts`,
      error: error.message
    });
  },
  run: async (params) => {
    // Task implementation
  }
});

Event-Driven Tasks

Webhook Handler

import { eventTrigger } from '@trigger.dev/sdk/v3';

export const handleWebhook = eventTrigger({
  id: 'handle-webhook',
  event: 'webhook.received',
  run: async (event) => {
    const { payload, headers } = event;
    
    // Verify webhook signature
    const isValid = verifyWebhookSignature(
      payload,
      headers['x-signature']
    );
    
    if (!isValid) {
      throw new Error('Invalid webhook signature');
    }
    
    // Process webhook
    await processWebhookPayload(payload);
    
    return { processed: true };
  }
});

Package Integration

Using Database Package

import { 
  createJob,
  updateJobStatus,
  getJobById 
} from '@buster/database';

export const databaseTask = task({
  id: 'database-task',
  run: async (params) => {
    // Create job record
    const job = await createJob({
      type: 'data-processing',
      status: 'running',
      metadata: params
    });
    
    try {
      // Do work
      const result = await processData(params);
      
      // Update job status
      await updateJobStatus({
        jobId: job.id,
        status: 'completed',
        result
      });
      
      return result;
    } catch (error) {
      await updateJobStatus({
        jobId: job.id,
        status: 'failed',
        error: error.message
      });
      throw error;
    }
  }
});

Using AI Package

import { dataAnalysisWorkflow } from '@buster/ai';

export const aiAnalysisTask = task({
  id: 'ai-analysis',
  run: async (params) => {
    const result = await dataAnalysisWorkflow({
      userQuery: params.query,
      context: {
        dataSourceId: params.dataSourceId,
        userId: params.userId
      },
      schema: params.schema,
      examples: params.examples
    });
    
    return {
      understanding: result.understanding,
      sql: result.sql,
      analysis: result.analysis
    };
  }
});

Testing Patterns

Task Testing

import { createTestTask } from '@trigger.dev/sdk/v3/testing';

describe('analystWorkflow', () => {
  it('should process analysis request', async () => {
    const testTask = createTestTask(analystWorkflow);
    
    const result = await testTask.run({
      chatId: 'test-chat',
      query: 'Show me sales data',
      dataSourceId: 'test-source',
      userId: 'test-user'
    });
    
    expect(result.success).toBe(true);
    expect(result.messageId).toBeDefined();
  });
  
  it('should retry on failure', async () => {
    const testTask = createTestTask(analystWorkflow);
    
    // Mock failure
    jest.spyOn(ai, 'analystAgent')
      .mockRejectedValueOnce(new Error('Temporary failure'))
      .mockResolvedValueOnce({ response: 'Success' });
    
    const result = await testTask.run({
      chatId: 'test-chat',
      query: 'Test query',
      dataSourceId: 'test-source',
      userId: 'test-user'
    });
    
    expect(result.success).toBe(true);
    expect(testTask.attempts).toBe(2);
  });
});

Configuration

Trigger Config

// trigger.config.ts
import { defineConfig } from '@trigger.dev/sdk/v3';

export default defineConfig({
  project: 'buster-trigger',
  runtime: 'node',
  logLevel: 'info',
  retries: {
    enabledInDev: true,
    default: {
      maxAttempts: 3,
      minTimeout: 1000,
      maxTimeout: 10000,
      factor: 2
    }
  },
  dirs: ['./src/tasks']
});

Best Practices

DO:

  • Use packages for all business logic
  • Validate inputs with Zod
  • Implement proper error handling
  • Use retries for transient failures
  • Log important events
  • Break large tasks into steps
  • Use parallel processing when possible
  • Clean up resources in finally blocks

DON'T:

  • Interface directly with clients
  • Store state in task functions
  • Use classes for task logic
  • Skip input validation
  • Ignore error handling
  • Create infinite loops
  • Make synchronous blocking calls
  • Access external services without packages

Monitoring

Task Metrics

export const monitoredTask = task({
  id: 'monitored-task',
  run: async (params) => {
    const startTime = Date.now();
    
    try {
      const result = await performWork(params);
      
      // Log success metric
      await logMetric({
        task: 'monitored-task',
        status: 'success',
        duration: Date.now() - startTime
      });
      
      return result;
    } catch (error) {
      // Log failure metric
      await logMetric({
        task: 'monitored-task',
        status: 'failure',
        duration: Date.now() - startTime,
        error: error.message
      });
      
      throw error;
    }
  }
});

Development

# Development
turbo dev --filter=@buster-app/trigger

# Build
turbo build --filter=@buster-app/trigger

# Test
turbo test:unit --filter=@buster-app/trigger
turbo test:integration --filter=@buster-app/trigger

# Lint
turbo lint --filter=@buster-app/trigger

Deployment

Trigger.dev handles deployment and scaling:

# Deploy to Trigger.dev
npx trigger deploy

# View logs
npx trigger logs

# Monitor tasks
npx trigger dashboard

This app should ONLY orchestrate background tasks using packages. All business logic belongs in packages.