mirror of https://github.com/buster-so/buster.git
metric download query params for report_file_id and metric_version_number
This commit is contained in:
parent
b5041606cd
commit
edd1de9f34
|
@ -12,10 +12,15 @@ const app = new Hono()
|
|||
zValidator('query', MetricDownloadQueryParamsSchema),
|
||||
async (c) => {
|
||||
const { id } = c.req.valid('param');
|
||||
// const { report_file_id } = c.req.valid('query');
|
||||
const { report_file_id, metric_version_number } = c.req.valid('query');
|
||||
const user = c.get('busterUser');
|
||||
|
||||
const response = await downloadMetricFileHandler(id, user);
|
||||
const response = await downloadMetricFileHandler(
|
||||
id,
|
||||
user,
|
||||
report_file_id,
|
||||
metric_version_number
|
||||
);
|
||||
|
||||
return c.json(response);
|
||||
}
|
||||
|
|
|
@ -19,7 +19,9 @@ import { HTTPException } from 'hono/http-exception';
|
|||
*/
|
||||
export async function downloadMetricFileHandler(
|
||||
metricId: string,
|
||||
user: User
|
||||
user: User,
|
||||
reportFileId?: string,
|
||||
metricVersionNumber?: number
|
||||
): Promise<MetricDownloadResponse> {
|
||||
// Get user's organization
|
||||
const userOrg = await getUserOrganizationId(user.id);
|
||||
|
@ -59,9 +61,15 @@ export async function downloadMetricFileHandler(
|
|||
metricId,
|
||||
userId: user.id,
|
||||
organizationId,
|
||||
reportFileId,
|
||||
metricVersionNumber,
|
||||
},
|
||||
{
|
||||
idempotencyKey: `export-${user.id}-${metricId}`,
|
||||
idempotencyKey: metricVersionNumber
|
||||
? `export-${user.id}-${metricId}-v${metricVersionNumber}`
|
||||
: reportFileId
|
||||
? `export-${user.id}-${metricId}-${reportFileId}`
|
||||
: `export-${user.id}-${metricId}`,
|
||||
idempotencyKeyTTL: '5m', // 5 minutes TTL
|
||||
}
|
||||
);
|
||||
|
|
|
@ -1,8 +1,13 @@
|
|||
import { randomBytes } from 'node:crypto';
|
||||
import { type AssetPermissionCheck, checkPermission } from '@buster/access-controls';
|
||||
import { createAdapter, getProviderForOrganization } from '@buster/data-source';
|
||||
import {
|
||||
createAdapter,
|
||||
getCachedMetricData,
|
||||
getProviderForOrganization,
|
||||
} from '@buster/data-source';
|
||||
import type { Credentials } from '@buster/data-source';
|
||||
import { getDataSourceCredentials, getMetricForExport } from '@buster/database/queries';
|
||||
import type { MetricDataResponse } from '@buster/server-shared/metrics';
|
||||
import { logger, schemaTask } from '@trigger.dev/sdk';
|
||||
import { convertToCSV } from './csv-helpers';
|
||||
import { ExportMetricDataInputSchema, type ExportMetricDataOutput } from './interfaces';
|
||||
|
@ -52,7 +57,10 @@ export const exportMetricData: ReturnType<
|
|||
});
|
||||
|
||||
// Step 1: Fetch metric details and validate access
|
||||
const metric = await getMetricForExport({ metricId: payload.metricId });
|
||||
const metric = await getMetricForExport({
|
||||
metricId: payload.metricId,
|
||||
versionNumber: payload.metricVersionNumber,
|
||||
});
|
||||
|
||||
// Validate organization access
|
||||
if (metric.organizationId !== payload.organizationId) {
|
||||
|
@ -99,74 +107,151 @@ export const exportMetricData: ReturnType<
|
|||
dataSourceId: metric.dataSourceId,
|
||||
});
|
||||
|
||||
// Step 2: Get data source credentials from vault
|
||||
let credentials: Credentials;
|
||||
// Step 2: Check cache if reportFileId is provided AND no specific version is requested
|
||||
// When a specific version is requested, always fetch fresh data
|
||||
let queryResult:
|
||||
| Awaited<ReturnType<Awaited<ReturnType<typeof createAdapter>>['query']>>
|
||||
| undefined;
|
||||
let cachedData: MetricDataResponse | undefined;
|
||||
|
||||
try {
|
||||
const rawCredentials = await getDataSourceCredentials({
|
||||
dataSourceId: metric.dataSourceId,
|
||||
if (payload.reportFileId && !payload.metricVersionNumber) {
|
||||
logger.log('Checking cache for metric data', {
|
||||
metricId: payload.metricId,
|
||||
reportFileId: payload.reportFileId,
|
||||
organizationId: payload.organizationId,
|
||||
});
|
||||
|
||||
// Ensure credentials have the correct type
|
||||
credentials = {
|
||||
...rawCredentials,
|
||||
type: rawCredentials.type || metric.dataSourceType,
|
||||
} as Credentials;
|
||||
} catch (error) {
|
||||
logger.error('Failed to retrieve data source credentials', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
dataSourceId: metric.dataSourceId,
|
||||
});
|
||||
try {
|
||||
const cached = await getCachedMetricData(
|
||||
payload.organizationId,
|
||||
payload.metricId,
|
||||
payload.reportFileId
|
||||
);
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: 'Failed to access data source credentials',
|
||||
errorCode: 'NOT_FOUND',
|
||||
};
|
||||
if (cached) {
|
||||
cachedData = cached;
|
||||
logger.log('Cache hit - using cached data', {
|
||||
metricId: payload.metricId,
|
||||
reportFileId: payload.reportFileId,
|
||||
rowCount: cachedData.data?.length || 0,
|
||||
});
|
||||
} else {
|
||||
logger.log('Cache miss - will fetch from data source', {
|
||||
metricId: payload.metricId,
|
||||
reportFileId: payload.reportFileId,
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn('Error checking cache, falling back to data source', {
|
||||
metricId: payload.metricId,
|
||||
reportFileId: payload.reportFileId,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
} else if (payload.metricVersionNumber) {
|
||||
logger.log('Skipping cache - specific version requested', {
|
||||
metricId: payload.metricId,
|
||||
versionNumber: payload.metricVersionNumber,
|
||||
});
|
||||
}
|
||||
|
||||
// Step 3: Execute query using data source adapter
|
||||
logger.log('Executing metric query', { sql: `${metric.sql?.substring(0, 100)}...` });
|
||||
// Step 3: If no cached data, get from data source
|
||||
if (!cachedData) {
|
||||
// Get data source credentials from vault
|
||||
let credentials: Credentials;
|
||||
|
||||
const adapter = await createAdapter(credentials);
|
||||
let queryResult: Awaited<ReturnType<typeof adapter.query>> | undefined;
|
||||
try {
|
||||
const rawCredentials = await getDataSourceCredentials({
|
||||
dataSourceId: metric.dataSourceId,
|
||||
});
|
||||
|
||||
try {
|
||||
if (!metric.sql) {
|
||||
throw new Error('Metric SQL is missing');
|
||||
// Ensure credentials have the correct type
|
||||
credentials = {
|
||||
...rawCredentials,
|
||||
type: rawCredentials.type || metric.dataSourceType,
|
||||
} as Credentials;
|
||||
} catch (error) {
|
||||
logger.error('Failed to retrieve data source credentials', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
dataSourceId: metric.dataSourceId,
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: 'Failed to access data source credentials',
|
||||
errorCode: 'NOT_FOUND',
|
||||
};
|
||||
}
|
||||
queryResult = await adapter.query(
|
||||
metric.sql,
|
||||
[], // No parameters for metric queries
|
||||
MAX_ROWS,
|
||||
60000 // 60 second query timeout
|
||||
);
|
||||
|
||||
logger.log('Query executed successfully', {
|
||||
rowCount: queryResult.rowCount,
|
||||
fieldCount: queryResult.fields.length,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Query execution failed', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
// Execute query using data source adapter
|
||||
logger.log('Executing metric query', { sql: `${metric.sql?.substring(0, 100)}...` });
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: `Query execution failed: ${error instanceof Error ? error.message : 'Unknown error'}`,
|
||||
errorCode: 'QUERY_ERROR',
|
||||
};
|
||||
} finally {
|
||||
// Always close the adapter connection
|
||||
await adapter.close().catch((err: unknown) => {
|
||||
logger.warn('Failed to close adapter connection', { error: err });
|
||||
});
|
||||
const adapter = await createAdapter(credentials);
|
||||
|
||||
try {
|
||||
if (!metric.sql) {
|
||||
throw new Error('Metric SQL is missing');
|
||||
}
|
||||
queryResult = await adapter.query(
|
||||
metric.sql,
|
||||
[], // No parameters for metric queries
|
||||
MAX_ROWS,
|
||||
60000 // 60 second query timeout
|
||||
);
|
||||
|
||||
logger.log('Query executed successfully', {
|
||||
rowCount: queryResult.rowCount,
|
||||
fieldCount: queryResult.fields.length,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Query execution failed', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
|
||||
return {
|
||||
success: false,
|
||||
error: `Query execution failed: ${error instanceof Error ? error.message : 'Unknown error'}`,
|
||||
errorCode: 'QUERY_ERROR',
|
||||
};
|
||||
} finally {
|
||||
// Always close the adapter connection
|
||||
await adapter.close().catch((err: unknown) => {
|
||||
logger.warn('Failed to close adapter connection', { error: err });
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: Convert to CSV
|
||||
logger.log('Converting results to CSV');
|
||||
|
||||
const csv = convertToCSV(queryResult.rows, queryResult.fields);
|
||||
let csv: string;
|
||||
let rowCount: number;
|
||||
|
||||
if (cachedData) {
|
||||
// Convert cached MetricDataResponse to CSV
|
||||
// data_metadata contains column information similar to query fields
|
||||
const fields =
|
||||
cachedData.data_metadata?.column_metadata?.map((col) => ({
|
||||
name: col.name,
|
||||
type: col.type || 'text', // Provide default type if not specified
|
||||
})) || [];
|
||||
|
||||
csv = convertToCSV(cachedData.data || [], fields);
|
||||
rowCount = cachedData.data?.length || 0;
|
||||
} else if (queryResult) {
|
||||
// Convert fresh query results to CSV
|
||||
csv = convertToCSV(queryResult.rows, queryResult.fields);
|
||||
rowCount = queryResult.rowCount;
|
||||
} else {
|
||||
// This shouldn't happen, but handle gracefully
|
||||
logger.error('No data to convert to CSV');
|
||||
return {
|
||||
success: false,
|
||||
error: 'No data available for export',
|
||||
errorCode: 'UNKNOWN',
|
||||
};
|
||||
}
|
||||
|
||||
const csvSize = Buffer.byteLength(csv, 'utf-8');
|
||||
|
||||
// Check file size
|
||||
|
@ -185,7 +270,8 @@ export const exportMetricData: ReturnType<
|
|||
|
||||
logger.log('CSV generated', {
|
||||
size: csvSize,
|
||||
rowCount: queryResult.rowCount,
|
||||
rowCount: rowCount,
|
||||
fromCache: !!cachedData,
|
||||
});
|
||||
|
||||
// Step 5: Generate unique storage key with security
|
||||
|
@ -208,7 +294,7 @@ export const exportMetricData: ReturnType<
|
|||
'metric-id': payload.metricId,
|
||||
'user-id': payload.userId,
|
||||
'organization-id': payload.organizationId,
|
||||
'row-count': String(queryResult.rowCount),
|
||||
'row-count': String(rowCount),
|
||||
'created-at': new Date().toISOString(),
|
||||
'auto-delete': 'true',
|
||||
},
|
||||
|
@ -266,7 +352,8 @@ export const exportMetricData: ReturnType<
|
|||
metricId: payload.metricId,
|
||||
processingTime,
|
||||
fileSize: csvSize,
|
||||
rowCount: queryResult.rowCount,
|
||||
rowCount: rowCount,
|
||||
fromCache: !!cachedData,
|
||||
});
|
||||
|
||||
return {
|
||||
|
@ -274,7 +361,7 @@ export const exportMetricData: ReturnType<
|
|||
downloadUrl,
|
||||
expiresAt: new Date(Date.now() + 60000).toISOString(), // 60 seconds from now
|
||||
fileSize: csvSize,
|
||||
rowCount: queryResult.rowCount,
|
||||
rowCount: rowCount,
|
||||
fileName,
|
||||
};
|
||||
} catch (error) {
|
||||
|
|
|
@ -8,6 +8,8 @@ export const ExportMetricDataInputSchema = z.object({
|
|||
metricId: z.string().uuid('Metric ID must be a valid UUID'),
|
||||
userId: z.string().uuid('User ID must be a valid UUID'),
|
||||
organizationId: z.string().uuid('Organization ID must be a valid UUID'),
|
||||
reportFileId: z.string().uuid('Report file ID must be a valid UUID').optional(),
|
||||
metricVersionNumber: z.number().optional(),
|
||||
});
|
||||
|
||||
export type ExportMetricDataInput = z.infer<typeof ExportMetricDataInputSchema>;
|
||||
|
|
|
@ -5,6 +5,7 @@ import { dataSources, metricFiles } from '../../schema';
|
|||
|
||||
export const GetMetricForExportInputSchema = z.object({
|
||||
metricId: z.string().uuid(),
|
||||
versionNumber: z.number().optional(),
|
||||
});
|
||||
|
||||
export type GetMetricForExportInput = z.infer<typeof GetMetricForExportInputSchema>;
|
||||
|
@ -34,6 +35,7 @@ export async function getMetricForExport(input: GetMetricForExportInput): Promis
|
|||
content: metricFiles.content,
|
||||
dataSourceId: metricFiles.dataSourceId,
|
||||
organizationId: metricFiles.organizationId,
|
||||
versionHistory: metricFiles.versionHistory,
|
||||
secretId: dataSources.secretId,
|
||||
dataSourceType: dataSources.type,
|
||||
})
|
||||
|
@ -52,15 +54,32 @@ export async function getMetricForExport(input: GetMetricForExportInput): Promis
|
|||
throw new Error(`Metric with ID ${validated.metricId} not found or has been deleted`);
|
||||
}
|
||||
|
||||
// Handle version-specific content if requested
|
||||
let contentToUse = result.content as Record<string, unknown>;
|
||||
|
||||
if (validated.versionNumber !== undefined && result.versionHistory) {
|
||||
const versionKey = validated.versionNumber.toString();
|
||||
const versionHistory = result.versionHistory as Record<string, any>;
|
||||
const versionData = versionHistory[versionKey];
|
||||
|
||||
if (versionData?.content) {
|
||||
contentToUse = versionData.content;
|
||||
} else {
|
||||
throw new Error(
|
||||
`Version ${validated.versionNumber} not found for metric ${validated.metricId}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Extract SQL from metric content
|
||||
// The content structure may vary, so we check multiple possible locations
|
||||
let sql: string | undefined;
|
||||
|
||||
//TODO: we need to use the metric type when we merge in the new ai sdk v5 branch date: 08/14/2025
|
||||
|
||||
if (typeof result.content === 'object' && result.content !== null) {
|
||||
if (typeof contentToUse === 'object' && contentToUse !== null) {
|
||||
// Check common locations for SQL in metric content
|
||||
const content = result.content as Record<string, unknown>;
|
||||
const content = contentToUse;
|
||||
sql =
|
||||
(typeof content.sql === 'string' ? content.sql : undefined) ||
|
||||
(typeof content.query === 'string' ? content.query : undefined) ||
|
||||
|
@ -81,7 +100,7 @@ export async function getMetricForExport(input: GetMetricForExportInput): Promis
|
|||
return {
|
||||
id: result.id,
|
||||
name: result.name,
|
||||
content: result.content as Record<string, unknown>,
|
||||
content: contentToUse,
|
||||
dataSourceId: result.dataSourceId,
|
||||
organizationId: result.organizationId,
|
||||
secretId: result.secretId,
|
||||
|
|
Loading…
Reference in New Issue