mirror of https://github.com/buster-so/buster.git
Add metric file export functionality with Cloudflare R2 support
- Introduced new API endpoint for downloading metric files as CSV. - Implemented export logic to handle large datasets (up to 1 million rows) and generate presigned URLs for secure downloads. - Added cleanup task to remove exported files from R2 storage after 60 seconds. - Updated environment configuration to include Cloudflare R2 credentials. - Enhanced error handling for various export scenarios. - Refactored related database queries and schemas for better integration. - Updated documentation for new features and usage instructions.
This commit is contained in:
parent
94a127cb0e
commit
3eb80ae630
|
@ -59,5 +59,11 @@ POSTHOG_TELEMETRY_KEY=
|
|||
BRAINTRUST_KEY=
|
||||
TRIGGER_SECRET_KEY=
|
||||
|
||||
# Cloudflare R2 Storage (for metric exports)
|
||||
R2_ACCOUNT_ID=
|
||||
R2_ACCESS_KEY_ID=
|
||||
R2_SECRET_ACCESS_KEY=
|
||||
R2_BUCKET=metric-exports
|
||||
|
||||
# Playwright Testing
|
||||
PLAYWRIGHT_START_COMMAND=
|
||||
|
|
|
@ -4,6 +4,7 @@ import healthcheckRoutes from '../healthcheck';
|
|||
import chatsRoutes from './chats';
|
||||
import dictionariesRoutes from './dictionaries';
|
||||
import electricShapeRoutes from './electric-shape';
|
||||
import metricFilesRoutes from './metric_files';
|
||||
import organizationRoutes from './organization';
|
||||
import reportsRoutes from './reports';
|
||||
import securityRoutes from './security';
|
||||
|
@ -18,6 +19,7 @@ const app = new Hono()
|
|||
.route('/electric-shape', electricShapeRoutes)
|
||||
.route('/healthcheck', healthcheckRoutes)
|
||||
.route('/chats', chatsRoutes)
|
||||
.route('/metric_files', metricFilesRoutes)
|
||||
.route('/slack', slackRoutes)
|
||||
.route('/support', supportRoutes)
|
||||
.route('/security', securityRoutes)
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
import type { User } from '@buster/database';
|
||||
import { getUserOrganizationId } from '@buster/database';
|
||||
import type { ExportMetricDataOutput, MetricDownloadResponse } from '@buster/server-shared/metrics';
|
||||
import { runs, tasks } from '@trigger.dev/sdk';
|
||||
import { HTTPException } from 'hono/http-exception';
|
||||
|
||||
/**
|
||||
* Handler for downloading metric file data as CSV
|
||||
*
|
||||
* This handler:
|
||||
* 1. Validates user has access to the organization
|
||||
* 2. Triggers the export task in Trigger.dev
|
||||
* 3. Waits for the task to complete (max 2 minutes)
|
||||
* 4. Returns a presigned URL for downloading the CSV file
|
||||
*
|
||||
* The download URL expires after 60 seconds for security
|
||||
*/
|
||||
export async function downloadMetricFileHandler(
|
||||
metricId: string,
|
||||
user: User
|
||||
): Promise<MetricDownloadResponse> {
|
||||
// Get user's organization
|
||||
const userOrg = await getUserOrganizationId(user.id);
|
||||
|
||||
if (!userOrg) {
|
||||
throw new HTTPException(403, {
|
||||
message: 'You must be part of an organization to download metric files',
|
||||
});
|
||||
}
|
||||
|
||||
const { organizationId } = userOrg;
|
||||
|
||||
try {
|
||||
// Trigger the export task
|
||||
const handle = await tasks.trigger('export-metric-data', {
|
||||
metricId,
|
||||
userId: user.id,
|
||||
organizationId,
|
||||
});
|
||||
|
||||
// Poll for task completion with timeout
|
||||
const startTime = Date.now();
|
||||
const timeout = 120000; // 2 minutes
|
||||
const pollInterval = 2000; // Poll every 2 seconds
|
||||
|
||||
let run;
|
||||
while (true) {
|
||||
run = await runs.retrieve(handle.id);
|
||||
|
||||
// Check if task completed, failed, or was canceled
|
||||
if (run.status === 'COMPLETED' || run.status === 'FAILED' || run.status === 'CANCELED') {
|
||||
break;
|
||||
}
|
||||
|
||||
// Check for timeout
|
||||
if (Date.now() - startTime > timeout) {
|
||||
throw new HTTPException(504, {
|
||||
message: 'Export took too long to complete. Please try again with a smaller date range.',
|
||||
});
|
||||
}
|
||||
|
||||
// Wait before next poll
|
||||
await new Promise((resolve) => setTimeout(resolve, pollInterval));
|
||||
}
|
||||
|
||||
// Check task status
|
||||
if (run.status === 'FAILED' || run.status === 'CANCELED') {
|
||||
throw new HTTPException(500, {
|
||||
message: `Export task ${run.status.toLowerCase()}`,
|
||||
});
|
||||
}
|
||||
|
||||
// Check if task completed successfully
|
||||
if (!run.output) {
|
||||
throw new HTTPException(500, {
|
||||
message: 'Export task did not return any output',
|
||||
});
|
||||
}
|
||||
|
||||
const output = run.output as ExportMetricDataOutput;
|
||||
|
||||
if (!output.success) {
|
||||
// Handle specific error codes
|
||||
const errorCode = output.errorCode;
|
||||
const errorMessage = output.error || 'Export failed';
|
||||
|
||||
switch (errorCode) {
|
||||
case 'UNAUTHORIZED':
|
||||
throw new HTTPException(403, {
|
||||
message: errorMessage,
|
||||
});
|
||||
case 'NOT_FOUND':
|
||||
throw new HTTPException(404, {
|
||||
message: 'Metric file not found or data source credentials missing',
|
||||
});
|
||||
case 'QUERY_ERROR':
|
||||
throw new HTTPException(400, {
|
||||
message: `Query execution failed: ${errorMessage}`,
|
||||
});
|
||||
case 'UPLOAD_ERROR':
|
||||
throw new HTTPException(500, {
|
||||
message: 'Failed to prepare download file',
|
||||
});
|
||||
default:
|
||||
throw new HTTPException(500, {
|
||||
message: errorMessage,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Validate required output fields
|
||||
if (!output.downloadUrl || !output.expiresAt) {
|
||||
throw new HTTPException(500, {
|
||||
message: 'Export succeeded but download URL was not generated',
|
||||
});
|
||||
}
|
||||
|
||||
// Return successful response
|
||||
return {
|
||||
downloadUrl: output.downloadUrl,
|
||||
expiresAt: output.expiresAt,
|
||||
fileSize: output.fileSize || 0,
|
||||
fileName: output.fileName || `metric-${metricId}.csv`,
|
||||
rowCount: output.rowCount || 0,
|
||||
message: 'Download link expires in 60 seconds. Please start your download immediately.',
|
||||
};
|
||||
} catch (error) {
|
||||
// Re-throw HTTPException as-is
|
||||
if (error instanceof HTTPException) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Log unexpected errors
|
||||
console.error('Unexpected error during metric download:', error);
|
||||
|
||||
throw new HTTPException(500, {
|
||||
message: 'An unexpected error occurred during export',
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
import { MetricDownloadParamsSchema } from '@buster/server-shared/metrics';
|
||||
import { zValidator } from '@hono/zod-validator';
|
||||
import { Hono } from 'hono';
|
||||
import { requireAuth } from '../../../middleware/auth';
|
||||
import '../../../types/hono.types';
|
||||
import { downloadMetricFileHandler } from './download-metric-file';
|
||||
|
||||
const app = new Hono()
|
||||
// Apply authentication middleware to all routes
|
||||
.use('*', requireAuth)
|
||||
|
||||
// GET /metric_files/:id/download - Download metric file data as CSV
|
||||
.get('/:id/download', zValidator('param', MetricDownloadParamsSchema), async (c) => {
|
||||
const { id } = c.req.valid('param');
|
||||
const user = c.get('busterUser');
|
||||
|
||||
const response = await downloadMetricFileHandler(id, user);
|
||||
|
||||
// Option 1: Return JSON with download URL for client to handle
|
||||
return c.json(response);
|
||||
|
||||
// Option 2: Redirect directly to download URL (uncomment if preferred)
|
||||
// return c.redirect(response.downloadUrl);
|
||||
})
|
||||
|
||||
// Error handler for metric_files routes
|
||||
.onError((err, c) => {
|
||||
console.error('Metric files API error:', err);
|
||||
|
||||
// Let HTTPException responses pass through
|
||||
if (err instanceof Error && 'getResponse' in err) {
|
||||
return (err as any).getResponse();
|
||||
}
|
||||
|
||||
// Default error response
|
||||
return c.json({ error: 'Internal server error' }, 500);
|
||||
});
|
||||
|
||||
export default app;
|
|
@ -20,11 +20,14 @@
|
|||
"typecheck": "tsc --noEmit"
|
||||
},
|
||||
"dependencies": {
|
||||
"@aws-sdk/client-s3": "^3.864.0",
|
||||
"@aws-sdk/s3-request-presigner": "^3.864.0",
|
||||
"@buster/access-controls": "workspace:*",
|
||||
"@buster/ai": "workspace:*",
|
||||
"@buster/data-source": "workspace:^",
|
||||
"@buster/database": "workspace:*",
|
||||
"@buster/env-utils": "workspace:*",
|
||||
"@buster/server-shared": "workspace:*",
|
||||
"@buster/ai": "workspace:*",
|
||||
"@buster/database": "workspace:*",
|
||||
"@buster/slack": "workspace:*",
|
||||
"@buster/test-utils": "workspace:*",
|
||||
"@buster/typescript-config": "workspace:*",
|
||||
|
@ -34,9 +37,9 @@
|
|||
"@trigger.dev/sdk": "4.0.0-v4-beta.27",
|
||||
"ai": "catalog:",
|
||||
"braintrust": "catalog:",
|
||||
"drizzle-orm": "catalog:",
|
||||
"vitest": "catalog:",
|
||||
"zod": "catalog:",
|
||||
"drizzle-orm": "catalog:"
|
||||
"zod": "catalog:"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@trigger.dev/build": "4.0.0-v4-beta.27"
|
||||
|
|
|
@ -0,0 +1,110 @@
|
|||
# Export Metric Data Task
|
||||
|
||||
This Trigger.dev task exports metric data to CSV format and provides a secure download URL via Cloudflare R2 storage.
|
||||
|
||||
## Features
|
||||
|
||||
- **Large Dataset Support**: Handles up to 1 million rows
|
||||
- **CSV Format**: Universal compatibility with Excel, Google Sheets, etc.
|
||||
- **Secure Downloads**: 60-second presigned URLs for security
|
||||
- **Automatic Cleanup**: Files are deleted after 60 seconds
|
||||
- **Memory Efficient**: Streams data without loading entire dataset into memory
|
||||
- **Multi-Database Support**: Works with all supported data sources (PostgreSQL, MySQL, Snowflake, BigQuery, etc.)
|
||||
|
||||
## Configuration
|
||||
|
||||
Required environment variables:
|
||||
```bash
|
||||
# Cloudflare R2 Storage
|
||||
R2_ACCOUNT_ID=your-account-id
|
||||
R2_ACCESS_KEY_ID=your-access-key
|
||||
R2_SECRET_ACCESS_KEY=your-secret-key
|
||||
R2_BUCKET=metric-exports # Default bucket name
|
||||
```
|
||||
|
||||
## API Usage
|
||||
|
||||
### Download Metric Data
|
||||
|
||||
```http
|
||||
GET /api/v2/metric_files/:id/download
|
||||
Authorization: Bearer <token>
|
||||
```
|
||||
|
||||
#### Response
|
||||
```json
|
||||
{
|
||||
"downloadUrl": "https://...", // Presigned URL (expires in 60 seconds)
|
||||
"expiresAt": "2024-01-01T12:01:00Z",
|
||||
"fileSize": 1048576,
|
||||
"fileName": "metric_name_1234567890.csv",
|
||||
"rowCount": 5000,
|
||||
"message": "Download link expires in 60 seconds. Please start your download immediately."
|
||||
}
|
||||
```
|
||||
|
||||
#### Error Responses
|
||||
- `403 Forbidden`: User doesn't have access to the metric
|
||||
- `404 Not Found`: Metric not found or data source credentials missing
|
||||
- `400 Bad Request`: Query execution failed
|
||||
- `504 Gateway Timeout`: Export took longer than 2 minutes
|
||||
|
||||
## Task Architecture
|
||||
|
||||
### 1. Export Task (`export-metric-data.ts`)
|
||||
- Fetches metric configuration from database
|
||||
- Retrieves data source credentials from vault
|
||||
- Executes SQL query using appropriate adapter
|
||||
- Converts results to CSV format
|
||||
- Uploads to R2 with unique key
|
||||
- Generates 60-second presigned URL
|
||||
- Schedules cleanup after 60 seconds
|
||||
|
||||
### 2. Cleanup Task (`cleanup-export-file.ts`)
|
||||
- Runs 60 seconds after export
|
||||
- Deletes file from R2 storage
|
||||
- Serves as backup to R2 lifecycle rules
|
||||
|
||||
### 3. CSV Helpers (`csv-helpers.ts`)
|
||||
- Properly escapes CSV values
|
||||
- Handles special characters, quotes, and newlines
|
||||
- Supports all data types including dates and JSON
|
||||
|
||||
## Security
|
||||
|
||||
1. **Organization Validation**: Ensures user has access to the metric's organization
|
||||
2. **Unique Keys**: Each export uses a random 16-byte hex ID
|
||||
3. **Short-Lived URLs**: 60-second expiration prevents URL sharing
|
||||
4. **Automatic Cleanup**: Files are deleted after 60 seconds
|
||||
5. **Private Bucket**: Files are only accessible via presigned URLs
|
||||
|
||||
## Limitations
|
||||
|
||||
- Maximum 1 million rows per export
|
||||
- Maximum 500MB file size
|
||||
- 2-minute timeout for API response
|
||||
- 5-minute maximum task execution time
|
||||
|
||||
## Development
|
||||
|
||||
### Running Tests
|
||||
```bash
|
||||
npm test export-metric-data
|
||||
```
|
||||
|
||||
### Local Development
|
||||
1. Set up R2 bucket in Cloudflare dashboard
|
||||
2. Create R2 API tokens with read/write permissions
|
||||
3. Add environment variables to `.env`
|
||||
4. Test with: `curl -H "Authorization: Bearer <token>" http://localhost:3000/api/v2/metrics/<id>/download`
|
||||
|
||||
## Monitoring
|
||||
|
||||
The task logs key metrics:
|
||||
- Query execution time
|
||||
- Row count
|
||||
- File size
|
||||
- Upload success/failure
|
||||
- Presigned URL generation
|
||||
|
||||
Check Trigger.dev dashboard for task execution history and logs.
|
|
@ -0,0 +1,74 @@
|
|||
import { DeleteObjectCommand, S3Client } from '@aws-sdk/client-s3';
|
||||
import { logger, task } from '@trigger.dev/sdk';
|
||||
import { CleanupExportFileInputSchema } from './interfaces';
|
||||
|
||||
// Initialize R2 client
|
||||
const r2Client = new S3Client({
|
||||
region: 'auto',
|
||||
endpoint: `https://${process.env.R2_ACCOUNT_ID}.r2.cloudflarestorage.com`,
|
||||
credentials: {
|
||||
accessKeyId: process.env.R2_ACCESS_KEY_ID!,
|
||||
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY!,
|
||||
},
|
||||
});
|
||||
|
||||
const R2_BUCKET = process.env.R2_BUCKET || 'metric-exports';
|
||||
|
||||
/**
|
||||
* Cleanup task to delete export files from R2 storage
|
||||
* This serves as a backup to R2's lifecycle rules
|
||||
*/
|
||||
export const cleanupExportFile = task({
|
||||
id: 'cleanup-export-file',
|
||||
retry: {
|
||||
maxAttempts: 3,
|
||||
minTimeoutInMs: 1000,
|
||||
maxTimeoutInMs: 5000,
|
||||
factor: 2,
|
||||
},
|
||||
run: async (payload: { key: string }) => {
|
||||
const validated = CleanupExportFileInputSchema.parse(payload);
|
||||
|
||||
try {
|
||||
logger.log('Cleaning up export file', {
|
||||
key: validated.key,
|
||||
bucket: R2_BUCKET,
|
||||
});
|
||||
|
||||
await r2Client.send(
|
||||
new DeleteObjectCommand({
|
||||
Bucket: R2_BUCKET,
|
||||
Key: validated.key,
|
||||
})
|
||||
);
|
||||
|
||||
logger.log('Export file deleted successfully', { key: validated.key });
|
||||
|
||||
return {
|
||||
success: true,
|
||||
key: validated.key,
|
||||
};
|
||||
} catch (error) {
|
||||
// File might already be deleted by lifecycle rules
|
||||
if (error instanceof Error && error.name === 'NoSuchKey') {
|
||||
logger.info('File already deleted (likely by lifecycle rule)', {
|
||||
key: validated.key,
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
key: validated.key,
|
||||
note: 'Already deleted',
|
||||
};
|
||||
}
|
||||
|
||||
logger.error('Failed to delete export file', {
|
||||
key: validated.key,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
|
||||
// Re-throw to trigger retry
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
});
|
|
@ -0,0 +1,95 @@
|
|||
import type { FieldMetadata } from '@buster/data-source';
|
||||
|
||||
/**
|
||||
* Escapes a value for CSV format
|
||||
* Handles commas, quotes, and newlines properly
|
||||
*/
|
||||
export function escapeCSV(value: unknown): string {
|
||||
if (value == null) return '';
|
||||
|
||||
const str = String(value);
|
||||
|
||||
// Check if escaping is needed
|
||||
if (str.includes(',') || str.includes('"') || str.includes('\n') || str.includes('\r')) {
|
||||
// Escape quotes by doubling them and wrap in quotes
|
||||
return `"${str.replace(/"/g, '""')}"`;
|
||||
}
|
||||
|
||||
return str;
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts database query results to CSV format
|
||||
* @param rows - Array of row objects from database
|
||||
* @param fields - Field metadata for column information
|
||||
* @returns CSV string with headers and data
|
||||
*/
|
||||
export function convertToCSV(rows: Record<string, unknown>[], fields: FieldMetadata[]): string {
|
||||
// Handle empty result set
|
||||
if (rows.length === 0) {
|
||||
// Return just headers for empty results
|
||||
const headers = fields.map((f) => escapeCSV(f.name));
|
||||
return headers.join(',');
|
||||
}
|
||||
|
||||
// Get column names from field metadata
|
||||
const columnNames = fields.map((f) => f.name);
|
||||
|
||||
// Build CSV lines
|
||||
const csvLines: string[] = [];
|
||||
|
||||
// Add header row
|
||||
csvLines.push(columnNames.map(escapeCSV).join(','));
|
||||
|
||||
// Add data rows
|
||||
for (const row of rows) {
|
||||
const values = columnNames.map((col) => {
|
||||
const value = row[col];
|
||||
|
||||
// Special handling for different data types
|
||||
if (value instanceof Date) {
|
||||
return escapeCSV(value.toISOString());
|
||||
}
|
||||
|
||||
if (typeof value === 'object' && value !== null) {
|
||||
// Convert objects/arrays to JSON string
|
||||
return escapeCSV(JSON.stringify(value));
|
||||
}
|
||||
|
||||
return escapeCSV(value);
|
||||
});
|
||||
|
||||
csvLines.push(values.join(','));
|
||||
}
|
||||
|
||||
return csvLines.join('\n');
|
||||
}
|
||||
|
||||
/**
|
||||
* Estimates the size of a CSV file from row data
|
||||
* Useful for pre-checking if data will be too large
|
||||
*/
|
||||
export function estimateCSVSize(rows: Record<string, unknown>[], fields: FieldMetadata[]): number {
|
||||
if (rows.length === 0) {
|
||||
return fields.map((f) => f.name).join(',').length;
|
||||
}
|
||||
|
||||
// Sample first 100 rows to estimate average row size
|
||||
const sampleSize = Math.min(100, rows.length);
|
||||
let totalSize = 0;
|
||||
|
||||
// Header size
|
||||
totalSize += fields.map((f) => f.name).join(',').length + 1; // +1 for newline
|
||||
|
||||
// Calculate average row size from sample
|
||||
for (let i = 0; i < sampleSize; i++) {
|
||||
const row = rows[i];
|
||||
if (!row) continue;
|
||||
const rowStr = fields.map((f) => String(row[f.name] ?? '')).join(',');
|
||||
totalSize += rowStr.length + 1; // +1 for newline
|
||||
}
|
||||
|
||||
// Estimate total size
|
||||
const avgRowSize = totalSize / (sampleSize + 1); // +1 for header
|
||||
return Math.ceil(avgRowSize * (rows.length + 1));
|
||||
}
|
|
@ -0,0 +1,295 @@
|
|||
import { randomBytes } from 'node:crypto';
|
||||
import { GetObjectCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3';
|
||||
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
|
||||
import { createAdapter } from '@buster/data-source';
|
||||
import type { Credentials } from '@buster/data-source';
|
||||
import { getDataSourceCredentials, getMetricForExport } from '@buster/database';
|
||||
import { logger, schemaTask } from '@trigger.dev/sdk';
|
||||
import { convertToCSV, estimateCSVSize } from './csv-helpers';
|
||||
import {
|
||||
type ExportMetricDataInput,
|
||||
ExportMetricDataInputSchema,
|
||||
type ExportMetricDataOutput,
|
||||
} from './interfaces';
|
||||
|
||||
// Initialize R2 client (S3-compatible)
|
||||
const r2Client = new S3Client({
|
||||
region: 'auto',
|
||||
endpoint: `https://${process.env.R2_ACCOUNT_ID}.r2.cloudflarestorage.com`,
|
||||
credentials: {
|
||||
accessKeyId: process.env.R2_ACCESS_KEY_ID!,
|
||||
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY!,
|
||||
},
|
||||
});
|
||||
|
||||
const R2_BUCKET = process.env.R2_BUCKET || 'metric-exports';
|
||||
const MAX_ROWS = 1000000; // 1 million row limit for safety
|
||||
const MAX_FILE_SIZE = 500 * 1024 * 1024; // 500MB max file size
|
||||
|
||||
/**
|
||||
* Task for exporting metric data to CSV and generating a presigned download URL
|
||||
*
|
||||
* This task:
|
||||
* 1. Fetches metric configuration and validates user access
|
||||
* 2. Retrieves data source credentials from vault
|
||||
* 3. Executes the metric's SQL query
|
||||
* 4. Converts results to CSV format
|
||||
* 5. Uploads to R2 storage
|
||||
* 6. Generates a 60-second presigned URL for download
|
||||
* 7. Schedules cleanup after 60 seconds
|
||||
*/
|
||||
export const exportMetricData: ReturnType<
|
||||
typeof schemaTask<
|
||||
'export-metric-data',
|
||||
typeof ExportMetricDataInputSchema,
|
||||
ExportMetricDataOutput
|
||||
>
|
||||
> = schemaTask({
|
||||
id: 'export-metric-data',
|
||||
schema: ExportMetricDataInputSchema,
|
||||
machine: {
|
||||
preset: 'large-1x', // 4 vCPU, 8GB RAM for handling large datasets
|
||||
},
|
||||
maxDuration: 300, // 5 minutes max
|
||||
retry: {
|
||||
maxAttempts: 2,
|
||||
minTimeoutInMs: 1000,
|
||||
maxTimeoutInMs: 5000,
|
||||
factor: 2,
|
||||
},
|
||||
run: async (payload): Promise<ExportMetricDataOutput> => {
|
||||
const startTime = Date.now();
|
||||
|
||||
try {
|
||||
logger.log('Starting metric export', {
|
||||
metricId: payload.metricId,
|
||||
userId: payload.userId,
|
||||
organizationId: payload.organizationId,
|
||||
});
|
||||
|
||||
// Step 1: Fetch metric details and validate access
|
||||
const metric = await getMetricForExport({ metricId: payload.metricId });
|
||||
|
||||
// Validate organization access
|
||||
if (metric.organizationId !== payload.organizationId) {
|
||||
logger.error('Unauthorized access attempt', {
|
||||
metricId: payload.metricId,
|
||||
metricOrgId: metric.organizationId,
|
||||
requestOrgId: payload.organizationId,
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: 'You do not have permission to export this metric',
|
||||
errorCode: 'UNAUTHORIZED',
|
||||
};
|
||||
}
|
||||
|
||||
logger.log('Metric validated', {
|
||||
metricName: metric.name,
|
||||
dataSourceId: metric.dataSourceId,
|
||||
});
|
||||
|
||||
// Step 2: Get data source credentials from vault
|
||||
let credentials: Credentials;
|
||||
|
||||
try {
|
||||
const rawCredentials = await getDataSourceCredentials({
|
||||
dataSourceId: metric.dataSourceId,
|
||||
});
|
||||
|
||||
// Ensure credentials have the correct type
|
||||
credentials = {
|
||||
...rawCredentials,
|
||||
type: rawCredentials.type || metric.dataSourceType,
|
||||
} as Credentials;
|
||||
} catch (error) {
|
||||
logger.error('Failed to retrieve data source credentials', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
dataSourceId: metric.dataSourceId,
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: 'Failed to access data source credentials',
|
||||
errorCode: 'NOT_FOUND',
|
||||
};
|
||||
}
|
||||
|
||||
// Step 3: Execute query using data source adapter
|
||||
logger.log('Executing metric query', { sql: `${metric.sql?.substring(0, 100)}...` });
|
||||
|
||||
const adapter = await createAdapter(credentials);
|
||||
let queryResult;
|
||||
|
||||
try {
|
||||
queryResult = await adapter.query(
|
||||
metric.sql!,
|
||||
[], // No parameters for metric queries
|
||||
MAX_ROWS,
|
||||
60000 // 60 second query timeout
|
||||
);
|
||||
|
||||
logger.log('Query executed successfully', {
|
||||
rowCount: queryResult.rowCount,
|
||||
fieldCount: queryResult.fields.length,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Query execution failed', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: `Query execution failed: ${error instanceof Error ? error.message : 'Unknown error'}`,
|
||||
errorCode: 'QUERY_ERROR',
|
||||
};
|
||||
} finally {
|
||||
// Always close the adapter connection
|
||||
await adapter.close().catch((err: unknown) => {
|
||||
logger.warn('Failed to close adapter connection', { error: err });
|
||||
});
|
||||
}
|
||||
|
||||
// Step 4: Convert to CSV
|
||||
logger.log('Converting results to CSV');
|
||||
|
||||
const csv = convertToCSV(queryResult.rows, queryResult.fields);
|
||||
const csvSize = Buffer.byteLength(csv, 'utf-8');
|
||||
|
||||
// Check file size
|
||||
if (csvSize > MAX_FILE_SIZE) {
|
||||
logger.error('CSV file too large', {
|
||||
size: csvSize,
|
||||
maxSize: MAX_FILE_SIZE,
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: `File size (${Math.round(csvSize / 1024 / 1024)}MB) exceeds maximum allowed size (${Math.round(MAX_FILE_SIZE / 1024 / 1024)}MB)`,
|
||||
errorCode: 'QUERY_ERROR',
|
||||
};
|
||||
}
|
||||
|
||||
logger.log('CSV generated', {
|
||||
size: csvSize,
|
||||
rowCount: queryResult.rowCount,
|
||||
});
|
||||
|
||||
// Step 5: Generate unique storage key with security
|
||||
const randomId = randomBytes(16).toString('hex');
|
||||
const timestamp = Date.now();
|
||||
const sanitizedMetricName = metric.name.replace(/[^a-zA-Z0-9-_]/g, '_').substring(0, 50);
|
||||
const fileName = `${sanitizedMetricName}_${timestamp}.csv`;
|
||||
const key = `exports/${payload.organizationId}/${payload.metricId}/${timestamp}-${randomId}/${fileName}`;
|
||||
|
||||
// Step 6: Upload to R2
|
||||
logger.log('Uploading to R2', { key, bucket: R2_BUCKET });
|
||||
|
||||
try {
|
||||
await r2Client.send(
|
||||
new PutObjectCommand({
|
||||
Bucket: R2_BUCKET,
|
||||
Key: key,
|
||||
Body: csv,
|
||||
ContentType: 'text/csv',
|
||||
ContentDisposition: `attachment; filename="${fileName}"`,
|
||||
Metadata: {
|
||||
'metric-id': payload.metricId,
|
||||
'user-id': payload.userId,
|
||||
'organization-id': payload.organizationId,
|
||||
'row-count': String(queryResult.rowCount),
|
||||
'created-at': new Date().toISOString(),
|
||||
'auto-delete': 'true',
|
||||
},
|
||||
})
|
||||
);
|
||||
|
||||
logger.log('File uploaded successfully');
|
||||
} catch (error) {
|
||||
logger.error('Upload to R2 failed', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: 'Failed to upload file to storage',
|
||||
errorCode: 'UPLOAD_ERROR',
|
||||
};
|
||||
}
|
||||
|
||||
// Step 7: Generate presigned URL with 60-second expiry
|
||||
let downloadUrl: string;
|
||||
|
||||
try {
|
||||
downloadUrl = await getSignedUrl(
|
||||
r2Client,
|
||||
new GetObjectCommand({
|
||||
Bucket: R2_BUCKET,
|
||||
Key: key,
|
||||
ResponseContentDisposition: `attachment; filename="${fileName}"`,
|
||||
}),
|
||||
{
|
||||
expiresIn: 60, // 60 seconds
|
||||
signableHeaders: new Set(['host']),
|
||||
}
|
||||
);
|
||||
|
||||
logger.log('Presigned URL generated', { expiresIn: 60 });
|
||||
} catch (error) {
|
||||
logger.error('Failed to generate presigned URL', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: 'Failed to generate download URL',
|
||||
errorCode: 'UNKNOWN',
|
||||
};
|
||||
}
|
||||
|
||||
// Step 8: Schedule cleanup after 60 seconds (matches URL expiry)
|
||||
try {
|
||||
const { cleanupExportFile } = await import('./cleanup-export-file');
|
||||
await cleanupExportFile.trigger(
|
||||
{ key },
|
||||
{ delay: '60s' } // 60 seconds delay
|
||||
);
|
||||
|
||||
logger.log('Cleanup scheduled for 60 seconds');
|
||||
} catch (error) {
|
||||
// Non-critical error, just log
|
||||
logger.warn('Failed to schedule cleanup', { error });
|
||||
}
|
||||
|
||||
const processingTime = Date.now() - startTime;
|
||||
|
||||
logger.log('Export completed successfully', {
|
||||
metricId: payload.metricId,
|
||||
processingTime,
|
||||
fileSize: csvSize,
|
||||
rowCount: queryResult.rowCount,
|
||||
});
|
||||
|
||||
return {
|
||||
success: true,
|
||||
downloadUrl,
|
||||
expiresAt: new Date(Date.now() + 60000).toISOString(), // 60 seconds from now
|
||||
fileSize: csvSize,
|
||||
rowCount: queryResult.rowCount,
|
||||
fileName,
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Unexpected error during export', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'An unexpected error occurred',
|
||||
errorCode: 'UNKNOWN',
|
||||
};
|
||||
}
|
||||
},
|
||||
});
|
|
@ -0,0 +1,20 @@
|
|||
import { z } from 'zod';
|
||||
// Re-export the output type from server-shared
|
||||
export type { ExportMetricDataOutput } from '@buster/server-shared/metrics';
|
||||
export { ExportMetricDataOutputSchema } from '@buster/server-shared/metrics';
|
||||
|
||||
// Input schema for the export task
|
||||
export const ExportMetricDataInputSchema = z.object({
|
||||
metricId: z.string().uuid('Metric ID must be a valid UUID'),
|
||||
userId: z.string().uuid('User ID must be a valid UUID'),
|
||||
organizationId: z.string().uuid('Organization ID must be a valid UUID'),
|
||||
});
|
||||
|
||||
export type ExportMetricDataInput = z.infer<typeof ExportMetricDataInputSchema>;
|
||||
|
||||
// Schema for cleanup task
|
||||
export const CleanupExportFileInputSchema = z.object({
|
||||
key: z.string().min(1, 'Storage key is required'),
|
||||
});
|
||||
|
||||
export type CleanupExportFileInput = z.infer<typeof CleanupExportFileInputSchema>;
|
|
@ -1,4 +1,4 @@
|
|||
import { logger, schemaTask } from '@trigger.dev/sdk/v3';
|
||||
import { logger, schemaTask } from '@trigger.dev/sdk';
|
||||
import { IntrospectDataInputSchema, type IntrospectDataOutput } from './interfaces';
|
||||
/**
|
||||
* Task for introspecting data sources by connecting to them and analyzing their structure.
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import * as fs from 'node:fs';
|
||||
import * as path from 'node:path';
|
||||
import { esbuildPlugin } from '@trigger.dev/build/extensions';
|
||||
import { defineConfig } from '@trigger.dev/sdk';
|
||||
|
@ -39,17 +40,21 @@ export default defineConfig({
|
|||
|
||||
let resolvedPath: string;
|
||||
if (subPath) {
|
||||
// Handle sub-paths like @buster/ai/workflows/analyst-workflow
|
||||
// Handle sub-paths like @buster/server-shared/metrics
|
||||
// Check if subPath already starts with 'src', if so, don't add it again
|
||||
const cleanSubPath = subPath.startsWith('src/') ? subPath.slice(4) : subPath;
|
||||
resolvedPath = path.resolve(
|
||||
process.cwd(),
|
||||
'../..',
|
||||
'packages',
|
||||
packageName,
|
||||
'src',
|
||||
`${cleanSubPath}.ts`
|
||||
);
|
||||
const basePath = path.resolve(process.cwd(), '../..', 'packages', packageName, 'src');
|
||||
|
||||
// Try to resolve as a directory with index.ts first, then as a .ts file
|
||||
const indexPath = path.join(basePath, cleanSubPath, 'index.ts');
|
||||
const directPath = path.join(basePath, `${cleanSubPath}.ts`);
|
||||
|
||||
// Check if it's a directory with index.ts
|
||||
if (fs.existsSync(indexPath)) {
|
||||
resolvedPath = indexPath;
|
||||
} else {
|
||||
resolvedPath = directPath;
|
||||
}
|
||||
} else {
|
||||
// Handle direct package imports like @buster/ai
|
||||
resolvedPath = path.resolve(
|
||||
|
|
|
@ -9,3 +9,4 @@ export * from './dashboards';
|
|||
export * from './metrics';
|
||||
export * from './collections';
|
||||
export * from './reports';
|
||||
export * from './vault';
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
import { and, eq, isNull } from 'drizzle-orm';
|
||||
import { z } from 'zod';
|
||||
import { db } from '../../connection';
|
||||
import { dataSources, metricFiles } from '../../schema';
|
||||
|
||||
export const GetMetricForExportInputSchema = z.object({
|
||||
metricId: z.string().uuid(),
|
||||
});
|
||||
|
||||
export type GetMetricForExportInput = z.infer<typeof GetMetricForExportInputSchema>;
|
||||
|
||||
export interface MetricForExport {
|
||||
id: string;
|
||||
name: string;
|
||||
content: any; // JSONB content containing SQL and other metadata
|
||||
dataSourceId: string;
|
||||
organizationId: string;
|
||||
secretId: string;
|
||||
dataSourceType: string;
|
||||
sql?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches metric details along with data source information for export
|
||||
* Includes the SQL query and credentials needed to execute it
|
||||
*/
|
||||
export async function getMetricForExport(input: GetMetricForExportInput): Promise<MetricForExport> {
|
||||
const validated = GetMetricForExportInputSchema.parse(input);
|
||||
|
||||
const [result] = await db
|
||||
.select({
|
||||
id: metricFiles.id,
|
||||
name: metricFiles.name,
|
||||
content: metricFiles.content,
|
||||
dataSourceId: metricFiles.dataSourceId,
|
||||
organizationId: metricFiles.organizationId,
|
||||
secretId: dataSources.secretId,
|
||||
dataSourceType: dataSources.type,
|
||||
})
|
||||
.from(metricFiles)
|
||||
.innerJoin(dataSources, eq(metricFiles.dataSourceId, dataSources.id))
|
||||
.where(
|
||||
and(
|
||||
eq(metricFiles.id, validated.metricId),
|
||||
isNull(metricFiles.deletedAt),
|
||||
isNull(dataSources.deletedAt)
|
||||
)
|
||||
)
|
||||
.limit(1);
|
||||
|
||||
if (!result) {
|
||||
throw new Error(`Metric with ID ${validated.metricId} not found or has been deleted`);
|
||||
}
|
||||
|
||||
// Extract SQL from metric content
|
||||
// The content structure may vary, so we check multiple possible locations
|
||||
let sql: string | undefined;
|
||||
|
||||
if (typeof result.content === 'object' && result.content !== null) {
|
||||
// Check common locations for SQL in metric content
|
||||
const content = result.content as Record<string, any>;
|
||||
sql =
|
||||
content.sql ||
|
||||
content.query ||
|
||||
content.sqlQuery ||
|
||||
content.definition?.sql ||
|
||||
content.definition?.query;
|
||||
}
|
||||
|
||||
if (!sql) {
|
||||
throw new Error(`No SQL query found in metric ${validated.metricId}`);
|
||||
}
|
||||
|
||||
return {
|
||||
id: result.id,
|
||||
name: result.name,
|
||||
content: result.content,
|
||||
dataSourceId: result.dataSourceId,
|
||||
organizationId: result.organizationId,
|
||||
secretId: result.secretId,
|
||||
dataSourceType: result.dataSourceType,
|
||||
sql,
|
||||
};
|
||||
}
|
|
@ -3,3 +3,10 @@ export {
|
|||
GetMetricTitleInputSchema,
|
||||
type GetMetricTitleInput,
|
||||
} from './get-metric-title';
|
||||
|
||||
export {
|
||||
getMetricForExport,
|
||||
GetMetricForExportInputSchema,
|
||||
type GetMetricForExportInput,
|
||||
type MetricForExport,
|
||||
} from './get-metric-for-export';
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
import { sql } from 'drizzle-orm';
|
||||
import { z } from 'zod';
|
||||
import { db } from '../../connection';
|
||||
|
||||
export const GetDataSourceCredentialsInputSchema = z.object({
|
||||
dataSourceId: z.string().min(1, 'Data source ID is required'),
|
||||
});
|
||||
|
||||
export type GetDataSourceCredentialsInput = z.infer<typeof GetDataSourceCredentialsInputSchema>;
|
||||
|
||||
/**
|
||||
* Retrieves decrypted credentials from the vault for a data source
|
||||
* @param input - Contains the data source ID to retrieve credentials for
|
||||
* @returns The decrypted credentials as a parsed object
|
||||
* @throws Error if credentials not found or invalid
|
||||
*/
|
||||
export async function getDataSourceCredentials(
|
||||
input: GetDataSourceCredentialsInput
|
||||
): Promise<Record<string, unknown>> {
|
||||
const validated = GetDataSourceCredentialsInputSchema.parse(input);
|
||||
|
||||
try {
|
||||
// Use the data source ID as the vault secret name
|
||||
const secretResult = await db.execute(
|
||||
sql`SELECT decrypted_secret FROM vault.decrypted_secrets WHERE name = ${validated.dataSourceId} LIMIT 1`
|
||||
);
|
||||
|
||||
if (!secretResult.length || !secretResult[0]?.decrypted_secret) {
|
||||
throw new Error(`No credentials found for data source ID: ${validated.dataSourceId}`);
|
||||
}
|
||||
|
||||
const secretString = secretResult[0].decrypted_secret as string;
|
||||
|
||||
// Parse and return the credentials
|
||||
try {
|
||||
return JSON.parse(secretString);
|
||||
} catch (parseError) {
|
||||
throw new Error(
|
||||
`Failed to parse credentials for data source ID ${validated.dataSourceId}: Invalid JSON format`
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
// Re-throw with more context if it's not our error
|
||||
if (
|
||||
error instanceof Error &&
|
||||
!error.message.includes('No credentials found') &&
|
||||
!error.message.includes('Failed to parse')
|
||||
) {
|
||||
throw new Error(
|
||||
`Database error retrieving credentials for data source ID ${validated.dataSourceId}: ${error.message}`
|
||||
);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,5 @@
|
|||
export {
|
||||
getDataSourceCredentials,
|
||||
GetDataSourceCredentialsInputSchema,
|
||||
type GetDataSourceCredentialsInput,
|
||||
} from './get-data-source-credentials';
|
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,53 @@
|
|||
import { z } from 'zod';
|
||||
|
||||
/**
|
||||
* Path parameters for metric download endpoint
|
||||
*/
|
||||
export const MetricDownloadParamsSchema = z.object({
|
||||
id: z.string().uuid('Metric ID must be a valid UUID'),
|
||||
});
|
||||
|
||||
export type MetricDownloadParams = z.infer<typeof MetricDownloadParamsSchema>;
|
||||
|
||||
/**
|
||||
* Response for successful metric download
|
||||
*/
|
||||
export const MetricDownloadResponseSchema = z.object({
|
||||
downloadUrl: z.string().url('Download URL must be valid'),
|
||||
expiresAt: z.string().datetime({ offset: true }),
|
||||
fileSize: z.number().int().positive(),
|
||||
fileName: z.string(),
|
||||
rowCount: z.number().int().nonnegative(),
|
||||
message: z.string().optional(),
|
||||
});
|
||||
|
||||
export type MetricDownloadResponse = z.infer<typeof MetricDownloadResponseSchema>;
|
||||
|
||||
/**
|
||||
* Error response for metric download
|
||||
*/
|
||||
export const MetricDownloadErrorSchema = z.object({
|
||||
error: z.string(),
|
||||
code: z.enum(['UNAUTHORIZED', 'NOT_FOUND', 'EXPORT_FAILED', 'TIMEOUT']),
|
||||
});
|
||||
|
||||
export type MetricDownloadError = z.infer<typeof MetricDownloadErrorSchema>;
|
||||
|
||||
/**
|
||||
* Output schema for export metric data task
|
||||
* This is the output from the Trigger.dev task
|
||||
*/
|
||||
export const ExportMetricDataOutputSchema = z.object({
|
||||
success: z.boolean(),
|
||||
downloadUrl: z.string().url('Download URL must be valid').optional(),
|
||||
expiresAt: z.string().datetime({ offset: true }).optional(),
|
||||
fileSize: z.number().int().positive().optional(),
|
||||
rowCount: z.number().int().nonnegative().optional(),
|
||||
fileName: z.string().optional(),
|
||||
error: z.string().optional(),
|
||||
errorCode: z
|
||||
.enum(['UNAUTHORIZED', 'NOT_FOUND', 'QUERY_ERROR', 'UPLOAD_ERROR', 'UNKNOWN'])
|
||||
.optional(),
|
||||
});
|
||||
|
||||
export type ExportMetricDataOutput = z.infer<typeof ExportMetricDataOutputSchema>;
|
|
@ -4,3 +4,4 @@ export * from './charts';
|
|||
export * from './metrics-list.types';
|
||||
export * from './requests.types';
|
||||
export * from './responses.types';
|
||||
export * from './download.types';
|
||||
|
|
1143
pnpm-lock.yaml
1143
pnpm-lock.yaml
File diff suppressed because it is too large
Load Diff
|
@ -119,6 +119,11 @@
|
|||
"BRAINTRUST_KEY",
|
||||
"TRIGGER_SECRET_KEY",
|
||||
|
||||
"R2_ACCOUNT_ID",
|
||||
"R2_ACCESS_KEY_ID",
|
||||
"R2_SECRET_ACCESS_KEY",
|
||||
"R2_BUCKET",
|
||||
|
||||
"PLAYWRIGHT_START_COMMAND",
|
||||
"DAYTONA_API_KEY"
|
||||
],
|
||||
|
|
Loading…
Reference in New Issue