17 KiB
CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
Project Overview
This is a Trigger.dev v3 background job processing service for the Buster AI data analysis platform. It handles long-running AI agent tasks and data source introspection operations.
Development Commands
Core Operations
# Development server with live reload
npm run dev
# Build for deployment
npm run build
# Deploy to Trigger.dev
npm run deploy
TypeScript
# Type checking (extends from ../tsconfig.base.json)
npx tsc --noEmit
Architecture
Task-Based Architecture
This service implements Trigger.dev v3 tasks for background processing:
- Task Definition: All tasks are in
src/tasks/
with standard structure:index.ts
- Exports{task-name}.ts
- Task implementation with trigger.dev configinterfaces.ts
- TypeScript typesREADME.md
- Documentation
Current Tasks
Analyst Agent Task (src/tasks/analyst-agent-task/
)
- Purpose: Advanced AI-powered data analysis with multi-step workflow
- Duration: 30 minutes max (1800 seconds)
- Features: Multi-state execution (initializing → searching → planning → analyzing → reviewing)
- Architecture: Integrates with Buster multi-agent system for sophisticated analysis
- Key Workflow: Takes user queries, discovers data sources, generates insights/metrics/dashboards
Introspect Data Task (src/tasks/introspectData/
)
- Purpose: Automated data source connection testing and schema analysis
- Duration: 5 minutes max (300 seconds)
- Data Sources: Snowflake, PostgreSQL, MySQL, BigQuery, SQL Server, Redshift, Databricks
- Process: Connection test → full introspection → resource cleanup
- Output: Success/failure status with detailed logging
Configuration (trigger.config.ts
)
- Project ID:
proj_lyyhkqmzhwiskfnavddk
- Runtime: Node.js
- Global Settings: 1-hour max duration, exponential backoff retries
- Build Externals:
lz4
,xxhash
(performance libraries)
Dependencies
- Core:
@trigger.dev/sdk
v3.3.17 for task orchestration - Integration:
@buster/data-source
for database connectivity and introspection - Development: TypeScript 5.8.3, Node.js types
Task Development Patterns
🚨 CRITICAL: Required Trigger.dev v3 Patterns
MUST ALWAYS USE: @trigger.dev/sdk/v3
and task()
function
NEVER USE: client.defineJob()
(deprecated v2 pattern that will break)
// ✅ CORRECT v3 Pattern (ALWAYS use this)
import { task } from '@trigger.dev/sdk/v3';
export const myTask = task({
id: 'my-task',
maxDuration: 300, // seconds
run: async (payload: InputType): Promise<OutputType> => {
// Task implementation
},
});
// ❌ NEVER GENERATE - DEPRECATED v2 Pattern (will break application)
client.defineJob({
id: "job-id",
trigger: eventTrigger({ /* ... */ }),
run: async (payload, io) => { /* ... */ }
});
Essential Requirements
- MUST export every task, including subtasks
- MUST use unique task IDs within the project
- MUST import from
@trigger.dev/sdk/v3
Standard Task Structure
import { task, logger } from '@trigger.dev/sdk/v3';
export const myTask = task({
id: 'my-task',
maxDuration: 300, // seconds
retry: {
maxAttempts: 3,
minTimeoutInMs: 1000,
maxTimeoutInMs: 10000,
factor: 2,
},
run: async (payload: InputType): Promise<OutputType> => {
logger.log('Task started', { taskId: 'my-task' });
try {
// Task implementation
return result;
} catch (error) {
logger.error('Task failed', { error: error.message });
throw error;
}
},
});
Schema Validation with Zod (Required Pattern)
ALL tasks MUST use Zod schemas for type-safe validation and automatic type inference:
import { schemaTask } from '@trigger.dev/sdk/v3';
import { z } from 'zod';
// Define Zod schema
export const TaskInputSchema = z.object({
name: z.string().min(1, 'Name is required'),
age: z.number().int().min(0).max(120),
email: z.string().email().optional(),
options: z.object({
enableNotifications: z.boolean().default(true),
maxRetries: z.number().int().min(0).max(5).default(3),
}).optional(),
});
// TypeScript type automatically inferred from schema
export type TaskInput = z.infer<typeof TaskInputSchema>;
export const validatedTask = schemaTask({
id: 'validated-task',
schema: TaskInputSchema,
run: async (payload) => {
// Payload is automatically validated and typed
console.log(payload.name, payload.age);
// Full TypeScript IntelliSense available
},
});
Zod Schema Patterns for Trigger Tasks
1. Use Schemas Instead of Interfaces
// ❌ DON'T: Define separate interfaces
export interface TaskInput {
name: string;
age: number;
}
// ✅ DO: Define Zod schema and infer types
export const TaskInputSchema = z.object({
name: z.string(),
age: z.number(),
});
export type TaskInput = z.infer<typeof TaskInputSchema>;
2. Complex Nested Schemas
export const DataSourceSchema = z.object({
name: z.string().min(1, 'Data source name is required'),
type: z.enum(['snowflake', 'postgresql', 'mysql', 'bigquery']),
credentials: z.record(z.unknown()), // Flexible for different credential types
});
export const AnalysisOptionsSchema = z.object({
maxSteps: z.number().int().min(1).max(50).default(15),
model: z.enum(['claude-3-sonnet', 'claude-3-opus']).default('claude-3-sonnet'),
enableStreaming: z.boolean().default(false),
});
export const TaskInputSchema = z.object({
sessionId: z.string().uuid('Must be a valid UUID'),
query: z.string().min(1, 'Query cannot be empty'),
dataSources: z.array(DataSourceSchema).optional(),
options: AnalysisOptionsSchema.optional(),
});
3. Output Schema Validation
export const TaskOutputSchema = z.object({
success: z.boolean(),
sessionId: z.string(),
result: z.object({
response: z.string(),
artifacts: z.array(z.object({
id: z.string(),
type: z.enum(['metric', 'dashboard', 'query', 'chart']),
title: z.string(),
content: z.record(z.unknown()),
})).default([]),
}).optional(),
error: z.object({
code: z.string(),
message: z.string(),
details: z.record(z.unknown()).optional(),
}).optional(),
});
export type TaskOutput = z.infer<typeof TaskOutputSchema>;
4. Enum Validation
export const DatabaseTypeSchema = z.enum([
'snowflake', 'postgresql', 'mysql', 'bigquery',
'sqlserver', 'redshift', 'databricks'
]);
export const AgentPhaseSchema = z.enum([
'initializing', 'searching', 'planning',
'analyzing', 'reviewing', 'completed', 'failed'
]);
5. Advanced Validation Rules
export const CredentialsSchema = z.object({
type: DatabaseTypeSchema,
host: z.string().optional(),
port: z.number().int().min(1).max(65535).optional(),
database: z.string().optional(),
username: z.string().optional(),
password: z.string().optional(),
}).passthrough() // Allow additional fields for different credential types
.refine(data => {
// Custom validation: BigQuery doesn't need host/port
if (data.type === 'bigquery') return true;
return data.host && data.port;
}, 'Host and port required for non-BigQuery databases');
Benefits of Zod Schema Approach
- Single Source of Truth - Schema defines both validation and TypeScript types
- Runtime Safety - Validates payloads before task execution, preventing runtime errors
- Better Error Messages - Descriptive validation errors with field-specific context
- Zero Duplication - No need to maintain separate interfaces and validation logic
- IDE Support - Full IntelliSense, autocomplete, and error checking
- Automatic Type Inference - TypeScript types automatically generated from schemas
File Organization Pattern
Each task should have an interfaces.ts
file structured as:
// interfaces.ts
import { z } from 'zod';
// 1. Define all Zod schemas
export const InputSchema = z.object({ /* ... */ });
export const OutputSchema = z.object({ /* ... */ });
// 2. Export TypeScript types
export type Input = z.infer<typeof InputSchema>;
export type Output = z.infer<typeof OutputSchema>;
// 3. Export any helper schemas for reuse
export const CommonSchema = z.object({ /* ... */ });
Migration from Interfaces
When updating existing tasks:
- Replace interfaces with Zod schemas
- Use
z.infer<typeof Schema>
for types - Update task to use
schemaTask
- Add meaningful validation rules
- Test payload validation
Scheduled Tasks
import { schedules } from '@trigger.dev/sdk/v3';
export const scheduledTask = schedules.task({
id: 'scheduled-task',
cron: '0 */2 * * *', // Every 2 hours
run: async (payload) => {
// Scheduled task logic
},
});
Task Triggering Patterns
From Backend (Outside Tasks)
import { tasks } from '@trigger.dev/sdk/v3';
import type { myTask } from '~/trigger/my-task';
// Single trigger
const handle = await tasks.trigger<typeof myTask>('my-task', payload);
// Batch trigger
const batchHandle = await tasks.batchTrigger<typeof myTask>(
'my-task',
[{ payload: data1 }, { payload: data2 }]
);
From Inside Tasks
export const parentTask = task({
id: 'parent-task',
run: async (payload) => {
// Trigger and wait for result
const result = await childTask.triggerAndWait(childPayload);
// Trigger without waiting
const handle = await childTask.trigger(childPayload);
// Batch trigger and wait
const results = await childTask.batchTriggerAndWait([
{ payload: item1 },
{ payload: item2 },
]);
},
});
Error Handling Conventions
- Always use try/catch for external operations
- Log errors with context using
logger.error()
- Return structured error responses in output
- Clean up resources in finally blocks
- Use lifecycle hooks for cleanup:
export const taskWithCleanup = task({
id: 'task-with-cleanup',
cleanup: async (payload, { ctx }) => {
// Always runs after each attempt
},
onFailure: async (payload, error, { ctx }) => {
// Runs after all retries exhausted
},
run: async (payload) => {
// Task logic
},
});
Logging Standards
import { task, logger } from '@trigger.dev/sdk/v3';
export const loggingExample = task({
id: 'logging-example',
run: async (payload: { data: Record<string, string> }) => {
logger.debug('Debug message', payload.data);
logger.log('Log message', payload.data);
logger.info('Info message', payload.data);
logger.warn('Warning message', payload.data);
logger.error('Error message', payload.data);
},
});
Metadata for Progress Tracking
import { task, metadata } from '@trigger.dev/sdk/v3';
export const progressTask = task({
id: 'progress-task',
run: async (payload) => {
// Set initial progress
metadata.set('progress', 0);
// Update progress
metadata.increment('progress', 0.5);
// Add logs
metadata.append('logs', 'Step 1 complete');
return result;
},
});
Machine Configuration
export const heavyTask = task({
id: 'heavy-task',
machine: {
preset: 'large-1x', // 4 vCPU, 8 GB RAM
},
maxDuration: 1800, // 30 minutes
run: async (payload) => {
// Compute-intensive task logic
},
});
Idempotency for Reliability
import { task, idempotencyKeys } from '@trigger.dev/sdk/v3';
export const idempotentTask = task({
id: 'idempotent-task',
run: async (payload) => {
const idempotencyKey = await idempotencyKeys.create(['user', payload.userId]);
await childTask.trigger(
payload,
{ idempotencyKey, idempotencyKeyTTL: '1h' }
);
},
});
TypeScript Configuration
- Extends:
../tsconfig.base.json
(monorepo shared config) - Paths:
@/*
maps tosrc/*
for clean imports - Build: Outputs to
dist/
directory - JSX: React JSX transform enabled
Integration Points
- Data Sources: Uses
@buster/data-source
package for database operations - AI Agents: Integrates with Buster multi-agent system (referenced but not implemented in current tasks)
- Monorepo: Part of larger Buster platform with packages in
../packages/
Development Notes
- Tasks run in isolated environments with resource limits
- Connection cleanup is critical for database tasks
- Retry logic is configured globally but can be overridden per task
- Real-time progress tracking is supported through Trigger.dev dashboard
Trigger.dev Basic Tasks (v4)
MUST use @trigger.dev/sdk
(v4), NEVER client.defineJob
Basic Task
import { task } from "@trigger.dev/sdk";
export const processData = task({
id: "process-data",
retry: {
maxAttempts: 10,
factor: 1.8,
minTimeoutInMs: 500,
maxTimeoutInMs: 30_000,
randomize: false,
},
run: async (payload: { userId: string; data: any[] }) => {
// Task logic - runs for long time, no timeouts
console.log(`Processing ${payload.data.length} items for user ${payload.userId}`);
return { processed: payload.data.length };
},
});
Schema Task (with validation)
import { schemaTask } from "@trigger.dev/sdk";
import { z } from "zod";
export const validatedTask = schemaTask({
id: "validated-task",
schema: z.object({
name: z.string(),
age: z.number(),
email: z.string().email(),
}),
run: async (payload) => {
// Payload is automatically validated and typed
return { message: `Hello ${payload.name}, age ${payload.age}` };
},
});
Scheduled Task
import { schedules } from "@trigger.dev/sdk";
const dailyReport = schedules.task({
id: "daily-report",
cron: "0 9 * * *", // Daily at 9:00 AM UTC
// or with timezone: cron: { pattern: "0 9 * * *", timezone: "America/New_York" },
run: async (payload) => {
console.log("Scheduled run at:", payload.timestamp);
console.log("Last run was:", payload.lastTimestamp);
console.log("Next 5 runs:", payload.upcoming);
// Generate daily report logic
return { reportGenerated: true, date: payload.timestamp };
},
});
Triggering Tasks
From Backend Code
import { tasks } from "@trigger.dev/sdk";
import type { processData } from "./trigger/tasks";
// Single trigger
const handle = await tasks.trigger<typeof processData>("process-data", {
userId: "123",
data: [{ id: 1 }, { id: 2 }],
});
// Batch trigger
const batchHandle = await tasks.batchTrigger<typeof processData>("process-data", [
{ payload: { userId: "123", data: [{ id: 1 }] } },
{ payload: { userId: "456", data: [{ id: 2 }] } },
]);
From Inside Tasks (with Result handling)
export const parentTask = task({
id: "parent-task",
run: async (payload) => {
// Trigger and continue
const handle = await childTask.trigger({ data: "value" });
// Trigger and wait - returns Result object, NOT task output
const result = await childTask.triggerAndWait({ data: "value" });
if (result.ok) {
console.log("Task output:", result.output); // Actual task return value
} else {
console.error("Task failed:", result.error);
}
// Quick unwrap (throws on error)
const output = await childTask.triggerAndWait({ data: "value" }).unwrap();
// Batch trigger and wait
const results = await childTask.batchTriggerAndWait([
{ payload: { data: "item1" } },
{ payload: { data: "item2" } },
]);
for (const run of results) {
if (run.ok) {
console.log("Success:", run.output);
} else {
console.log("Failed:", run.error);
}
}
},
});
export const childTask = task({
id: "child-task",
run: async (payload: { data: string }) => {
return { processed: payload.data };
},
});
Never wrap triggerAndWait or batchTriggerAndWait calls in a Promise.all or Promise.allSettled as this is not supported in Trigger.dev tasks.
Waits
import { task, wait } from "@trigger.dev/sdk";
export const taskWithWaits = task({
id: "task-with-waits",
run: async (payload) => {
console.log("Starting task");
// Wait for specific duration
await wait.for({ seconds: 30 });
await wait.for({ minutes: 5 });
await wait.for({ hours: 1 });
await wait.for({ days: 1 });
// Wait until specific date
await wait.until({ date: new Date("2024-12-25") });
// Wait for token (from external system)
await wait.forToken({
token: "user-approval-token",
timeoutInSeconds: 3600, // 1 hour timeout
});
console.log("All waits completed");
return { status: "completed" };
},
});
Never wrap wait calls in a Promise.all or Promise.allSettled as this is not supported in Trigger.dev tasks.
Key Points
- Result vs Output:
triggerAndWait()
returns aResult
object withok
,output
,error
properties - NOT the direct task output - Type safety: Use
import type
for task references when triggering from backend - Waits > 5 seconds: Automatically checkpointed, don't count toward compute usage
NEVER Use (v2 deprecated)
// BREAKS APPLICATION
client.defineJob({
id: "job-id",
run: async (payload, io) => {
/* ... */
},
});
Use v4 SDK (@trigger.dev/sdk
), check result.ok
before accessing result.output