buster/apps/trigger/CLAUDE.md

17 KiB

CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

Project Overview

This is a Trigger.dev v3 background job processing service for the Buster AI data analysis platform. It handles long-running AI agent tasks and data source introspection operations.

Development Commands

Core Operations

# Development server with live reload
npm run dev

# Build for deployment
npm run build

# Deploy to Trigger.dev
npm run deploy

TypeScript

# Type checking (extends from ../tsconfig.base.json)
npx tsc --noEmit

Architecture

Task-Based Architecture

This service implements Trigger.dev v3 tasks for background processing:

  • Task Definition: All tasks are in src/tasks/ with standard structure:
    • index.ts - Exports
    • {task-name}.ts - Task implementation with trigger.dev config
    • interfaces.ts - TypeScript types
    • README.md - Documentation

Current Tasks

Analyst Agent Task (src/tasks/analyst-agent-task/)

  • Purpose: Advanced AI-powered data analysis with multi-step workflow
  • Duration: 30 minutes max (1800 seconds)
  • Features: Multi-state execution (initializing → searching → planning → analyzing → reviewing)
  • Architecture: Integrates with Buster multi-agent system for sophisticated analysis
  • Key Workflow: Takes user queries, discovers data sources, generates insights/metrics/dashboards

Introspect Data Task (src/tasks/introspectData/)

  • Purpose: Automated data source connection testing and schema analysis
  • Duration: 5 minutes max (300 seconds)
  • Data Sources: Snowflake, PostgreSQL, MySQL, BigQuery, SQL Server, Redshift, Databricks
  • Process: Connection test → full introspection → resource cleanup
  • Output: Success/failure status with detailed logging

Configuration (trigger.config.ts)

  • Project ID: proj_lyyhkqmzhwiskfnavddk
  • Runtime: Node.js
  • Global Settings: 1-hour max duration, exponential backoff retries
  • Build Externals: lz4, xxhash (performance libraries)

Dependencies

  • Core: @trigger.dev/sdk v3.3.17 for task orchestration
  • Integration: @buster/data-source for database connectivity and introspection
  • Development: TypeScript 5.8.3, Node.js types

Task Development Patterns

🚨 CRITICAL: Required Trigger.dev v3 Patterns

MUST ALWAYS USE: @trigger.dev/sdk/v3 and task() function NEVER USE: client.defineJob() (deprecated v2 pattern that will break)

// ✅ CORRECT v3 Pattern (ALWAYS use this)
import { task } from '@trigger.dev/sdk/v3';

export const myTask = task({
  id: 'my-task',
  maxDuration: 300, // seconds
  run: async (payload: InputType): Promise<OutputType> => {
    // Task implementation
  },
});
// ❌ NEVER GENERATE - DEPRECATED v2 Pattern (will break application)
client.defineJob({
  id: "job-id",
  trigger: eventTrigger({ /* ... */ }),
  run: async (payload, io) => { /* ... */ }
});

Essential Requirements

  1. MUST export every task, including subtasks
  2. MUST use unique task IDs within the project
  3. MUST import from @trigger.dev/sdk/v3

Standard Task Structure

import { task, logger } from '@trigger.dev/sdk/v3';

export const myTask = task({
  id: 'my-task',
  maxDuration: 300, // seconds
  retry: {
    maxAttempts: 3,
    minTimeoutInMs: 1000,
    maxTimeoutInMs: 10000,
    factor: 2,
  },
  run: async (payload: InputType): Promise<OutputType> => {
    logger.log('Task started', { taskId: 'my-task' });
    
    try {
      // Task implementation
      return result;
    } catch (error) {
      logger.error('Task failed', { error: error.message });
      throw error;
    }
  },
});

Schema Validation with Zod (Required Pattern)

ALL tasks MUST use Zod schemas for type-safe validation and automatic type inference:

import { schemaTask } from '@trigger.dev/sdk/v3';
import { z } from 'zod';

// Define Zod schema
export const TaskInputSchema = z.object({
  name: z.string().min(1, 'Name is required'),
  age: z.number().int().min(0).max(120),
  email: z.string().email().optional(),
  options: z.object({
    enableNotifications: z.boolean().default(true),
    maxRetries: z.number().int().min(0).max(5).default(3),
  }).optional(),
});

// TypeScript type automatically inferred from schema
export type TaskInput = z.infer<typeof TaskInputSchema>;

export const validatedTask = schemaTask({
  id: 'validated-task',
  schema: TaskInputSchema,
  run: async (payload) => {
    // Payload is automatically validated and typed
    console.log(payload.name, payload.age);
    // Full TypeScript IntelliSense available
  },
});

Zod Schema Patterns for Trigger Tasks

1. Use Schemas Instead of Interfaces

// ❌ DON'T: Define separate interfaces
export interface TaskInput {
  name: string;
  age: number;
}

// ✅ DO: Define Zod schema and infer types
export const TaskInputSchema = z.object({
  name: z.string(),
  age: z.number(),
});
export type TaskInput = z.infer<typeof TaskInputSchema>;

2. Complex Nested Schemas

export const DataSourceSchema = z.object({
  name: z.string().min(1, 'Data source name is required'),
  type: z.enum(['snowflake', 'postgresql', 'mysql', 'bigquery']),
  credentials: z.record(z.unknown()), // Flexible for different credential types
});

export const AnalysisOptionsSchema = z.object({
  maxSteps: z.number().int().min(1).max(50).default(15),
  model: z.enum(['claude-3-sonnet', 'claude-3-opus']).default('claude-3-sonnet'),
  enableStreaming: z.boolean().default(false),
});

export const TaskInputSchema = z.object({
  sessionId: z.string().uuid('Must be a valid UUID'),
  query: z.string().min(1, 'Query cannot be empty'),
  dataSources: z.array(DataSourceSchema).optional(),
  options: AnalysisOptionsSchema.optional(),
});

3. Output Schema Validation

export const TaskOutputSchema = z.object({
  success: z.boolean(),
  sessionId: z.string(),
  result: z.object({
    response: z.string(),
    artifacts: z.array(z.object({
      id: z.string(),
      type: z.enum(['metric', 'dashboard', 'query', 'chart']),
      title: z.string(),
      content: z.record(z.unknown()),
    })).default([]),
  }).optional(),
  error: z.object({
    code: z.string(),
    message: z.string(),
    details: z.record(z.unknown()).optional(),
  }).optional(),
});

export type TaskOutput = z.infer<typeof TaskOutputSchema>;

4. Enum Validation

export const DatabaseTypeSchema = z.enum([
  'snowflake', 'postgresql', 'mysql', 'bigquery', 
  'sqlserver', 'redshift', 'databricks'
]);

export const AgentPhaseSchema = z.enum([
  'initializing', 'searching', 'planning', 
  'analyzing', 'reviewing', 'completed', 'failed'
]);

5. Advanced Validation Rules

export const CredentialsSchema = z.object({
  type: DatabaseTypeSchema,
  host: z.string().optional(),
  port: z.number().int().min(1).max(65535).optional(),
  database: z.string().optional(),
  username: z.string().optional(),
  password: z.string().optional(),
}).passthrough() // Allow additional fields for different credential types
  .refine(data => {
    // Custom validation: BigQuery doesn't need host/port
    if (data.type === 'bigquery') return true;
    return data.host && data.port;
  }, 'Host and port required for non-BigQuery databases');

Benefits of Zod Schema Approach

  1. Single Source of Truth - Schema defines both validation and TypeScript types
  2. Runtime Safety - Validates payloads before task execution, preventing runtime errors
  3. Better Error Messages - Descriptive validation errors with field-specific context
  4. Zero Duplication - No need to maintain separate interfaces and validation logic
  5. IDE Support - Full IntelliSense, autocomplete, and error checking
  6. Automatic Type Inference - TypeScript types automatically generated from schemas

File Organization Pattern

Each task should have an interfaces.ts file structured as:

// interfaces.ts
import { z } from 'zod';

// 1. Define all Zod schemas
export const InputSchema = z.object({ /* ... */ });
export const OutputSchema = z.object({ /* ... */ });

// 2. Export TypeScript types
export type Input = z.infer<typeof InputSchema>;
export type Output = z.infer<typeof OutputSchema>;

// 3. Export any helper schemas for reuse
export const CommonSchema = z.object({ /* ... */ });

Migration from Interfaces

When updating existing tasks:

  1. Replace interfaces with Zod schemas
  2. Use z.infer<typeof Schema> for types
  3. Update task to use schemaTask
  4. Add meaningful validation rules
  5. Test payload validation

Scheduled Tasks

import { schedules } from '@trigger.dev/sdk/v3';

export const scheduledTask = schedules.task({
  id: 'scheduled-task',
  cron: '0 */2 * * *', // Every 2 hours
  run: async (payload) => {
    // Scheduled task logic
  },
});

Task Triggering Patterns

From Backend (Outside Tasks)

import { tasks } from '@trigger.dev/sdk/v3';
import type { myTask } from '~/trigger/my-task';

// Single trigger
const handle = await tasks.trigger<typeof myTask>('my-task', payload);

// Batch trigger
const batchHandle = await tasks.batchTrigger<typeof myTask>(
  'my-task',
  [{ payload: data1 }, { payload: data2 }]
);

From Inside Tasks

export const parentTask = task({
  id: 'parent-task',
  run: async (payload) => {
    // Trigger and wait for result
    const result = await childTask.triggerAndWait(childPayload);
    
    // Trigger without waiting
    const handle = await childTask.trigger(childPayload);
    
    // Batch trigger and wait
    const results = await childTask.batchTriggerAndWait([
      { payload: item1 },
      { payload: item2 },
    ]);
  },
});

Error Handling Conventions

  • Always use try/catch for external operations
  • Log errors with context using logger.error()
  • Return structured error responses in output
  • Clean up resources in finally blocks
  • Use lifecycle hooks for cleanup:
export const taskWithCleanup = task({
  id: 'task-with-cleanup',
  cleanup: async (payload, { ctx }) => {
    // Always runs after each attempt
  },
  onFailure: async (payload, error, { ctx }) => {
    // Runs after all retries exhausted
  },
  run: async (payload) => {
    // Task logic
  },
});

Logging Standards

import { task, logger } from '@trigger.dev/sdk/v3';

export const loggingExample = task({
  id: 'logging-example',
  run: async (payload: { data: Record<string, string> }) => {
    logger.debug('Debug message', payload.data);
    logger.log('Log message', payload.data);
    logger.info('Info message', payload.data);
    logger.warn('Warning message', payload.data);
    logger.error('Error message', payload.data);
  },
});

Metadata for Progress Tracking

import { task, metadata } from '@trigger.dev/sdk/v3';

export const progressTask = task({
  id: 'progress-task',
  run: async (payload) => {
    // Set initial progress
    metadata.set('progress', 0);
    
    // Update progress
    metadata.increment('progress', 0.5);
    
    // Add logs
    metadata.append('logs', 'Step 1 complete');
    
    return result;
  },
});

Machine Configuration

export const heavyTask = task({
  id: 'heavy-task',
  machine: {
    preset: 'large-1x', // 4 vCPU, 8 GB RAM
  },
  maxDuration: 1800, // 30 minutes
  run: async (payload) => {
    // Compute-intensive task logic
  },
});

Idempotency for Reliability

import { task, idempotencyKeys } from '@trigger.dev/sdk/v3';

export const idempotentTask = task({
  id: 'idempotent-task',
  run: async (payload) => {
    const idempotencyKey = await idempotencyKeys.create(['user', payload.userId]);
    
    await childTask.trigger(
      payload,
      { idempotencyKey, idempotencyKeyTTL: '1h' }
    );
  },
});

TypeScript Configuration

  • Extends: ../tsconfig.base.json (monorepo shared config)
  • Paths: @/* maps to src/* for clean imports
  • Build: Outputs to dist/ directory
  • JSX: React JSX transform enabled

Integration Points

  • Data Sources: Uses @buster/data-source package for database operations
  • AI Agents: Integrates with Buster multi-agent system (referenced but not implemented in current tasks)
  • Monorepo: Part of larger Buster platform with packages in ../packages/

Development Notes

  • Tasks run in isolated environments with resource limits
  • Connection cleanup is critical for database tasks
  • Retry logic is configured globally but can be overridden per task
  • Real-time progress tracking is supported through Trigger.dev dashboard

Trigger.dev Basic Tasks (v4)

MUST use @trigger.dev/sdk (v4), NEVER client.defineJob

Basic Task

import { task } from "@trigger.dev/sdk";

export const processData = task({
  id: "process-data",
  retry: {
    maxAttempts: 10,
    factor: 1.8,
    minTimeoutInMs: 500,
    maxTimeoutInMs: 30_000,
    randomize: false,
  },
  run: async (payload: { userId: string; data: any[] }) => {
    // Task logic - runs for long time, no timeouts
    console.log(`Processing ${payload.data.length} items for user ${payload.userId}`);
    return { processed: payload.data.length };
  },
});

Schema Task (with validation)

import { schemaTask } from "@trigger.dev/sdk";
import { z } from "zod";

export const validatedTask = schemaTask({
  id: "validated-task",
  schema: z.object({
    name: z.string(),
    age: z.number(),
    email: z.string().email(),
  }),
  run: async (payload) => {
    // Payload is automatically validated and typed
    return { message: `Hello ${payload.name}, age ${payload.age}` };
  },
});

Scheduled Task

import { schedules } from "@trigger.dev/sdk";

const dailyReport = schedules.task({
  id: "daily-report",
  cron: "0 9 * * *", // Daily at 9:00 AM UTC
  // or with timezone: cron: { pattern: "0 9 * * *", timezone: "America/New_York" },
  run: async (payload) => {
    console.log("Scheduled run at:", payload.timestamp);
    console.log("Last run was:", payload.lastTimestamp);
    console.log("Next 5 runs:", payload.upcoming);

    // Generate daily report logic
    return { reportGenerated: true, date: payload.timestamp };
  },
});

Triggering Tasks

From Backend Code

import { tasks } from "@trigger.dev/sdk";
import type { processData } from "./trigger/tasks";

// Single trigger
const handle = await tasks.trigger<typeof processData>("process-data", {
  userId: "123",
  data: [{ id: 1 }, { id: 2 }],
});

// Batch trigger
const batchHandle = await tasks.batchTrigger<typeof processData>("process-data", [
  { payload: { userId: "123", data: [{ id: 1 }] } },
  { payload: { userId: "456", data: [{ id: 2 }] } },
]);

From Inside Tasks (with Result handling)

export const parentTask = task({
  id: "parent-task",
  run: async (payload) => {
    // Trigger and continue
    const handle = await childTask.trigger({ data: "value" });

    // Trigger and wait - returns Result object, NOT task output
    const result = await childTask.triggerAndWait({ data: "value" });
    if (result.ok) {
      console.log("Task output:", result.output); // Actual task return value
    } else {
      console.error("Task failed:", result.error);
    }

    // Quick unwrap (throws on error)
    const output = await childTask.triggerAndWait({ data: "value" }).unwrap();

    // Batch trigger and wait
    const results = await childTask.batchTriggerAndWait([
      { payload: { data: "item1" } },
      { payload: { data: "item2" } },
    ]);

    for (const run of results) {
      if (run.ok) {
        console.log("Success:", run.output);
      } else {
        console.log("Failed:", run.error);
      }
    }
  },
});

export const childTask = task({
  id: "child-task",
  run: async (payload: { data: string }) => {
    return { processed: payload.data };
  },
});

Never wrap triggerAndWait or batchTriggerAndWait calls in a Promise.all or Promise.allSettled as this is not supported in Trigger.dev tasks.

Waits

import { task, wait } from "@trigger.dev/sdk";

export const taskWithWaits = task({
  id: "task-with-waits",
  run: async (payload) => {
    console.log("Starting task");

    // Wait for specific duration
    await wait.for({ seconds: 30 });
    await wait.for({ minutes: 5 });
    await wait.for({ hours: 1 });
    await wait.for({ days: 1 });

    // Wait until specific date
    await wait.until({ date: new Date("2024-12-25") });

    // Wait for token (from external system)
    await wait.forToken({
      token: "user-approval-token",
      timeoutInSeconds: 3600, // 1 hour timeout
    });

    console.log("All waits completed");
    return { status: "completed" };
  },
});

Never wrap wait calls in a Promise.all or Promise.allSettled as this is not supported in Trigger.dev tasks.

Key Points

  • Result vs Output: triggerAndWait() returns a Result object with ok, output, error properties - NOT the direct task output
  • Type safety: Use import type for task references when triggering from backend
  • Waits > 5 seconds: Automatically checkpointed, don't count toward compute usage

NEVER Use (v2 deprecated)

// BREAKS APPLICATION
client.defineJob({
  id: "job-id",
  run: async (payload, io) => {
    /* ... */
  },
});

Use v4 SDK (@trigger.dev/sdk), check result.ok before accessing result.output