mirror of https://github.com/buster-so/buster.git
Refactor metric file handling and improve error handling
- Updated type definitions for better clarity and type safety in metric file download and export tasks. - Enhanced error handling in the metric files API to utilize HTTPException for more consistent error responses. - Added environment variable validation in export tasks to ensure necessary credentials are present before execution. - Improved type assertions for content in metric export queries to ensure proper handling of JSONB data.
This commit is contained in:
parent
a3c4238da1
commit
594f08202b
|
@ -43,7 +43,7 @@ export async function downloadMetricFileHandler(
|
||||||
const timeout = 120000; // 2 minutes
|
const timeout = 120000; // 2 minutes
|
||||||
const pollInterval = 2000; // Poll every 2 seconds
|
const pollInterval = 2000; // Poll every 2 seconds
|
||||||
|
|
||||||
let run;
|
let run: Awaited<ReturnType<typeof runs.retrieve>>;
|
||||||
while (true) {
|
while (true) {
|
||||||
run = await runs.retrieve(handle.id);
|
run = await runs.retrieve(handle.id);
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import { MetricDownloadParamsSchema } from '@buster/server-shared/metrics';
|
import { MetricDownloadParamsSchema } from '@buster/server-shared/metrics';
|
||||||
import { zValidator } from '@hono/zod-validator';
|
import { zValidator } from '@hono/zod-validator';
|
||||||
import { Hono } from 'hono';
|
import { Hono } from 'hono';
|
||||||
|
import { HTTPException } from 'hono/http-exception';
|
||||||
import { requireAuth } from '../../../middleware/auth';
|
import { requireAuth } from '../../../middleware/auth';
|
||||||
import '../../../types/hono.types';
|
import '../../../types/hono.types';
|
||||||
import { downloadMetricFileHandler } from './download-metric-file';
|
import { downloadMetricFileHandler } from './download-metric-file';
|
||||||
|
@ -28,8 +29,8 @@ const app = new Hono()
|
||||||
console.error('Metric files API error:', err);
|
console.error('Metric files API error:', err);
|
||||||
|
|
||||||
// Let HTTPException responses pass through
|
// Let HTTPException responses pass through
|
||||||
if (err instanceof Error && 'getResponse' in err) {
|
if (err instanceof HTTPException) {
|
||||||
return (err as any).getResponse();
|
return err.getResponse();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Default error response
|
// Default error response
|
||||||
|
|
|
@ -2,13 +2,24 @@ import { DeleteObjectCommand, S3Client } from '@aws-sdk/client-s3';
|
||||||
import { logger, task } from '@trigger.dev/sdk';
|
import { logger, task } from '@trigger.dev/sdk';
|
||||||
import { CleanupExportFileInputSchema } from './interfaces';
|
import { CleanupExportFileInputSchema } from './interfaces';
|
||||||
|
|
||||||
|
// Validate required environment variables
|
||||||
|
if (!process.env.R2_ACCOUNT_ID) {
|
||||||
|
throw new Error('R2_ACCOUNT_ID environment variable is missing');
|
||||||
|
}
|
||||||
|
if (!process.env.R2_ACCESS_KEY_ID) {
|
||||||
|
throw new Error('R2_ACCESS_KEY_ID environment variable is missing');
|
||||||
|
}
|
||||||
|
if (!process.env.R2_SECRET_ACCESS_KEY) {
|
||||||
|
throw new Error('R2_SECRET_ACCESS_KEY environment variable is missing');
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize R2 client
|
// Initialize R2 client
|
||||||
const r2Client = new S3Client({
|
const r2Client = new S3Client({
|
||||||
region: 'auto',
|
region: 'auto',
|
||||||
endpoint: `https://${process.env.R2_ACCOUNT_ID}.r2.cloudflarestorage.com`,
|
endpoint: `https://${process.env.R2_ACCOUNT_ID}.r2.cloudflarestorage.com`,
|
||||||
credentials: {
|
credentials: {
|
||||||
accessKeyId: process.env.R2_ACCESS_KEY_ID!,
|
accessKeyId: process.env.R2_ACCESS_KEY_ID,
|
||||||
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY!,
|
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -12,13 +12,24 @@ import {
|
||||||
type ExportMetricDataOutput,
|
type ExportMetricDataOutput,
|
||||||
} from './interfaces';
|
} from './interfaces';
|
||||||
|
|
||||||
|
// Validate required environment variables
|
||||||
|
if (!process.env.R2_ACCOUNT_ID) {
|
||||||
|
throw new Error('R2_ACCOUNT_ID environment variable is missing');
|
||||||
|
}
|
||||||
|
if (!process.env.R2_ACCESS_KEY_ID) {
|
||||||
|
throw new Error('R2_ACCESS_KEY_ID environment variable is missing');
|
||||||
|
}
|
||||||
|
if (!process.env.R2_SECRET_ACCESS_KEY) {
|
||||||
|
throw new Error('R2_SECRET_ACCESS_KEY environment variable is missing');
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize R2 client (S3-compatible)
|
// Initialize R2 client (S3-compatible)
|
||||||
const r2Client = new S3Client({
|
const r2Client = new S3Client({
|
||||||
region: 'auto',
|
region: 'auto',
|
||||||
endpoint: `https://${process.env.R2_ACCOUNT_ID}.r2.cloudflarestorage.com`,
|
endpoint: `https://${process.env.R2_ACCOUNT_ID}.r2.cloudflarestorage.com`,
|
||||||
credentials: {
|
credentials: {
|
||||||
accessKeyId: process.env.R2_ACCESS_KEY_ID!,
|
accessKeyId: process.env.R2_ACCESS_KEY_ID,
|
||||||
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY!,
|
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -120,11 +131,14 @@ export const exportMetricData: ReturnType<
|
||||||
logger.log('Executing metric query', { sql: `${metric.sql?.substring(0, 100)}...` });
|
logger.log('Executing metric query', { sql: `${metric.sql?.substring(0, 100)}...` });
|
||||||
|
|
||||||
const adapter = await createAdapter(credentials);
|
const adapter = await createAdapter(credentials);
|
||||||
let queryResult;
|
let queryResult: Awaited<ReturnType<typeof adapter.query>> | undefined;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
if (!metric.sql) {
|
||||||
|
throw new Error('Metric SQL is missing');
|
||||||
|
}
|
||||||
queryResult = await adapter.query(
|
queryResult = await adapter.query(
|
||||||
metric.sql!,
|
metric.sql,
|
||||||
[], // No parameters for metric queries
|
[], // No parameters for metric queries
|
||||||
MAX_ROWS,
|
MAX_ROWS,
|
||||||
60000 // 60 second query timeout
|
60000 // 60 second query timeout
|
||||||
|
|
|
@ -54,7 +54,7 @@ const StreamTokenArrayDemo = ({
|
||||||
}, [tokens, autoStream, streamDelay]);
|
}, [tokens, autoStream, streamDelay]);
|
||||||
|
|
||||||
const { throttledTokens, throttledContent, isDone, flushNow, reset } = useStreamTokenArray({
|
const { throttledTokens, throttledContent, isDone, flushNow, reset } = useStreamTokenArray({
|
||||||
tokens: (autoStream ? currentTokens : tokens).map(t => ({ token: t, delayMs: 0 })),
|
tokens: (autoStream ? currentTokens : tokens).map((t) => ({ token: t, delayMs: 0 })),
|
||||||
isStreamFinished: autoStream ? finished : isStreamFinished,
|
isStreamFinished: autoStream ? finished : isStreamFinished,
|
||||||
...hookProps
|
...hookProps
|
||||||
});
|
});
|
||||||
|
|
|
@ -12,7 +12,7 @@ export type GetMetricForExportInput = z.infer<typeof GetMetricForExportInputSche
|
||||||
export interface MetricForExport {
|
export interface MetricForExport {
|
||||||
id: string;
|
id: string;
|
||||||
name: string;
|
name: string;
|
||||||
content: any; // JSONB content containing SQL and other metadata
|
content: Record<string, unknown>; // JSONB content containing SQL and other metadata
|
||||||
dataSourceId: string;
|
dataSourceId: string;
|
||||||
organizationId: string;
|
organizationId: string;
|
||||||
secretId: string;
|
secretId: string;
|
||||||
|
@ -58,13 +58,18 @@ export async function getMetricForExport(input: GetMetricForExportInput): Promis
|
||||||
|
|
||||||
if (typeof result.content === 'object' && result.content !== null) {
|
if (typeof result.content === 'object' && result.content !== null) {
|
||||||
// Check common locations for SQL in metric content
|
// Check common locations for SQL in metric content
|
||||||
const content = result.content as Record<string, any>;
|
const content = result.content as Record<string, unknown>;
|
||||||
sql =
|
sql =
|
||||||
content.sql ||
|
(typeof content.sql === 'string' ? content.sql : undefined) ||
|
||||||
content.query ||
|
(typeof content.query === 'string' ? content.query : undefined) ||
|
||||||
content.sqlQuery ||
|
(typeof content.sqlQuery === 'string' ? content.sqlQuery : undefined) ||
|
||||||
content.definition?.sql ||
|
(typeof content.definition === 'object' && content.definition !== null
|
||||||
content.definition?.query;
|
? typeof (content.definition as Record<string, unknown>).sql === 'string'
|
||||||
|
? ((content.definition as Record<string, unknown>).sql as string)
|
||||||
|
: typeof (content.definition as Record<string, unknown>).query === 'string'
|
||||||
|
? ((content.definition as Record<string, unknown>).query as string)
|
||||||
|
: undefined
|
||||||
|
: undefined);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!sql) {
|
if (!sql) {
|
||||||
|
@ -74,7 +79,7 @@ export async function getMetricForExport(input: GetMetricForExportInput): Promis
|
||||||
return {
|
return {
|
||||||
id: result.id,
|
id: result.id,
|
||||||
name: result.name,
|
name: result.name,
|
||||||
content: result.content,
|
content: result.content as Record<string, unknown>,
|
||||||
dataSourceId: result.dataSourceId,
|
dataSourceId: result.dataSourceId,
|
||||||
organizationId: result.organizationId,
|
organizationId: result.organizationId,
|
||||||
secretId: result.secretId,
|
secretId: result.secretId,
|
||||||
|
|
Loading…
Reference in New Issue