Enhance metric query execution and metadata handling

- Introduced `executeMetricQuery` utility for standardized metric SQL query execution with retry logic.
- Updated `getMetricDataHandler` and metric tool execution functions to utilize the new query utility, improving error handling and result processing.
- Added metadata generation from query results to provide detailed insights into data structure.
- Refactored SQL validation to ensure only read-only queries are executed, enhancing data integrity.
This commit is contained in:
dal 2025-08-22 09:01:28 -06:00
parent 44c8191f94
commit d1155e5978
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
12 changed files with 475 additions and 494 deletions

View File

@ -1,5 +1,5 @@
import { type AssetPermissionCheck, checkPermission } from '@buster/access-controls';
import { createAdapter } from '@buster/data-source';
import { executeMetricQuery } from '@buster/data-source';
import type { Credentials } from '@buster/data-source';
import type { User } from '@buster/database';
import {
@ -19,7 +19,7 @@ import { HTTPException } from 'hono/http-exception';
* 2. Checks user has permission to view the metric file
* 3. Retrieves the metric definition
* 4. Parses the metric content to extract SQL
* 5. Executes the query against the data source
* 5. Executes the query against the data source using the shared utility
* 6. Returns the data with metadata and pagination info
*
* @param metricId - The ID of the metric to retrieve data for
@ -103,132 +103,19 @@ export async function getMetricDataHandler(
});
}
// Create adapter and execute query
const adapter = await createAdapter(credentials);
// Execute query using the shared utility
try {
// Add 1 to limit to check if there are more records
const queryLimitWithCheck = queryLimit + 1;
// Execute query with timeout (60 seconds)
const queryResult = await adapter.query(
sql,
[], // No parameters for metric queries
queryLimitWithCheck,
60000 // 60 second timeout
);
// Check if we have more records than the requested limit
const hasMoreRecords = queryResult.rows.length > queryLimit;
// Trim results to requested limit if we have more
const rawData = hasMoreRecords ? queryResult.rows.slice(0, queryLimit) : queryResult.rows;
// Convert data to match expected type (string | number | null)
const data = rawData.map((row) => {
const typedRow: Record<string, string | number | null> = {};
for (const [key, value] of Object.entries(row)) {
if (value === null || typeof value === 'string' || typeof value === 'number') {
typedRow[key] = value;
} else if (typeof value === 'boolean') {
typedRow[key] = value.toString();
} else if (value instanceof Date) {
typedRow[key] = value.toISOString();
} else {
// Convert other types to string (JSON objects, arrays, etc)
typedRow[key] = JSON.stringify(value);
}
}
return typedRow;
const result = await executeMetricQuery(metric.dataSourceId, sql, credentials, {
maxRows: queryLimit,
timeout: 60000, // 60 seconds
retryDelays: [1000, 3000, 6000], // 1s, 3s, 6s
});
// Build metadata from query result with required fields
const columnMetadata = queryResult.fields.map((field) => {
// Determine simple type based on field type
const simpleType =
field.type.includes('int') ||
field.type.includes('float') ||
field.type.includes('decimal') ||
field.type.includes('numeric') ||
field.type === 'number'
? 'number'
: field.type.includes('date') || field.type.includes('time')
? 'date'
: 'text';
return {
name: field.name,
// Map common database types to supported types
type: (field.type.toLowerCase().includes('varchar')
? 'varchar'
: field.type.toLowerCase().includes('char')
? 'char'
: field.type.toLowerCase().includes('text')
? 'text'
: field.type.toLowerCase().includes('int')
? 'integer'
: field.type.toLowerCase().includes('float')
? 'float'
: field.type.toLowerCase().includes('decimal')
? 'decimal'
: field.type.toLowerCase().includes('numeric')
? 'numeric'
: field.type.toLowerCase().includes('bool')
? 'bool'
: field.type.toLowerCase().includes('date')
? 'date'
: field.type.toLowerCase().includes('time')
? 'timestamp'
: field.type.toLowerCase().includes('json')
? 'json'
: 'text') as
| 'text'
| 'float'
| 'integer'
| 'date'
| 'float8'
| 'timestamp'
| 'timestamptz'
| 'bool'
| 'time'
| 'boolean'
| 'json'
| 'jsonb'
| 'int8'
| 'int4'
| 'int2'
| 'decimal'
| 'char'
| 'character varying'
| 'character'
| 'varchar'
| 'number'
| 'numeric'
| 'tinytext'
| 'mediumtext'
| 'longtext'
| 'nchar'
| 'nvarchat'
| 'ntext'
| 'float4',
min_value: '', // These would need to be calculated from actual data
max_value: '',
unique_values: 0,
simple_type: simpleType as 'text' | 'number' | 'date',
};
});
const dataMetadata = {
column_count: queryResult.fields.length,
column_metadata: columnMetadata,
row_count: data.length,
};
return {
data,
data_metadata: dataMetadata,
data: result.data,
data_metadata: result.dataMetadata,
metricId,
has_more_records: hasMoreRecords,
has_more_records: result.hasMoreRecords,
};
} catch (error) {
console.error('Query execution failed:', error);
@ -242,10 +129,5 @@ export async function getMetricDataHandler(
throw new HTTPException(500, {
message: 'Query execution failed',
});
} finally {
// Always close the adapter connection
await adapter.close().catch((err) => {
console.error('Failed to close adapter connection:', err);
});
}
}

View File

@ -1,5 +1,6 @@
import { randomUUID } from 'node:crypto';
import type { DataSource } from '@buster/data-source';
import type { Credentials } from '@buster/data-source';
import { createMetadataFromResults, executeMetricQuery } from '@buster/data-source';
import { assetPermissions, db, metricFiles, updateMessageEntries } from '@buster/database';
import {
type ChartConfigProps,
@ -10,7 +11,7 @@ import {
import { wrapTraced } from 'braintrust';
import * as yaml from 'yaml';
import { z } from 'zod';
import { getDataSource } from '../../../../utils/get-data-source';
import { getDataSourceCredentials } from '../../../../utils/get-data-source';
import {
createPermissionErrorMessage,
validateSqlPermissions,
@ -18,7 +19,6 @@ import {
import { createRawToolResultEntry } from '../../../shared/create-raw-llm-tool-result-entry';
import { trackFileAssociations } from '../../file-tracking-helper';
import { validateAndAdjustBarLineAxes } from '../helpers/bar-line-axis-validator';
import { createMetadataFromResults } from '../helpers/metadata-from-results';
import { ensureTimeFrameQuoted } from '../helpers/time-frame-helper';
import type {
CreateMetricsContext,
@ -75,29 +75,9 @@ interface ValidationResult {
error?: string;
message?: string;
results?: Record<string, unknown>[];
metadata?: QueryMetadata;
metadata?: DataMetadata;
}
interface QueryMetadata {
rowCount: number;
totalRowCount: number;
executionTime: number;
limited: boolean;
maxRows: number;
}
interface ResultMetadata {
totalRowCount?: number | undefined;
limited?: boolean | undefined;
maxRows?: number | undefined;
}
const resultMetadataSchema = z.object({
totalRowCount: z.number().optional(),
limited: z.boolean().optional(),
maxRows: z.number().optional(),
});
async function processMetricFile(
file: { name: string; yml_content: string },
dataSourceId: string,
@ -225,11 +205,10 @@ async function validateSql(
};
}
// Get a new DataSource instance
let dataSource: DataSource | null = null;
// Get data source credentials
let credentials: Credentials;
try {
dataSource = await getDataSource(dataSourceId);
credentials = await getDataSourceCredentials(dataSourceId);
} catch (_error) {
return {
success: false,
@ -237,121 +216,39 @@ async function validateSql(
};
}
// Retry configuration for SQL validation
const MAX_RETRIES = 3;
const TIMEOUT_MS = 120000; // 120 seconds (2 minutes) per attempt for Snowflake queue handling
const RETRY_DELAYS = [1000, 3000, 6000]; // 1s, 3s, 6s
// Execute query using the new utility
try {
const result = await executeMetricQuery(dataSourceId, sqlQuery, credentials, {
maxRows: 1000, // Validation limit
timeout: 120000, // 2 minutes
retryDelays: [1000, 3000, 6000], // 1s, 3s, 6s
});
// Attempt execution with retries
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
// Execute the SQL query using the DataSource with row limit and timeout for validation
// Use maxRows to limit results without modifying the SQL query (preserves Snowflake caching)
const result = await dataSource.execute({
sql: sqlQuery,
options: {
maxRows: 1000, // Additional safety limit at adapter level
timeout: TIMEOUT_MS,
},
});
// Truncate results to 25 records for display in validation
const displayResults = result.data.slice(0, 25);
if (result.success) {
const allResults = result.rows || [];
// Truncate results to 25 records for display in validation
const results = allResults.slice(0, 25);
// Validate metadata with Zod schema for runtime safety
const validatedMetadata = resultMetadataSchema.safeParse(result.metadata);
const parsedMetadata: ResultMetadata | undefined = validatedMetadata.success
? validatedMetadata.data
: undefined;
const metadata: QueryMetadata = {
rowCount: results.length,
totalRowCount: parsedMetadata?.totalRowCount ?? allResults.length,
executionTime: result.executionTime || 100,
limited: parsedMetadata?.limited ?? false,
maxRows: parsedMetadata?.maxRows ?? 5000,
};
let message: string;
if (allResults.length === 0) {
message = 'Query executed successfully but returned no records';
} else if (result.metadata?.limited) {
message = `Query validated successfully. Results were limited to ${result.metadata.maxRows} rows for memory protection (query may return more rows when executed)${results.length < allResults.length ? ` - showing first 25 of ${allResults.length} fetched` : ''}`;
} else {
message = `Query validated successfully and returned ${allResults.length} records${allResults.length > 25 ? ' (showing sample of first 25)' : ''}`;
}
return {
success: true,
message,
results,
metadata,
};
}
// Check if error is timeout-related
const errorMessage = result.error?.message || 'Query execution failed';
const isTimeout =
errorMessage.toLowerCase().includes('timeout') ||
errorMessage.toLowerCase().includes('timed out');
if (isTimeout && attempt < MAX_RETRIES) {
// Wait before retry
const delay = RETRY_DELAYS[attempt] || 6000;
console.warn(
`[create-metrics] SQL validation timeout on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`,
{
sqlPreview: `${sqlQuery.substring(0, 100)}...`,
attempt: attempt + 1,
nextDelay: delay,
}
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue; // Retry
}
// Not a timeout or no more retries
return {
success: false,
error: errorMessage,
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'SQL validation failed';
const isTimeout =
errorMessage.toLowerCase().includes('timeout') ||
errorMessage.toLowerCase().includes('timed out');
if (isTimeout && attempt < MAX_RETRIES) {
// Wait before retry
const delay = RETRY_DELAYS[attempt] || 6000;
console.warn(
`[create-metrics] SQL validation timeout (exception) on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`,
{
sqlPreview: `${sqlQuery.substring(0, 100)}...`,
attempt: attempt + 1,
nextDelay: delay,
error: errorMessage,
}
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue; // Retry
}
// Not a timeout or no more retries
return {
success: false,
error: errorMessage,
};
let message: string;
if (result.data.length === 0) {
message = 'Query executed successfully but returned no records';
} else if (result.hasMoreRecords) {
message = `Query validated successfully. Results were limited to 1000 rows for memory protection (query may return more rows when executed)${displayResults.length < result.data.length ? ` - showing first 25 of ${result.data.length} fetched` : ''}`;
} else {
message = `Query validated successfully and returned ${result.data.length} records${result.data.length > 25 ? ' (showing sample of first 25)' : ''}`;
}
}
// Should not reach here, but just in case
return {
success: false,
error: 'Max retries exceeded for SQL validation',
};
return {
success: true,
message,
results: displayResults,
metadata: result.dataMetadata,
};
} catch (error) {
console.error('[create-metrics] SQL validation failed:', error);
return {
success: false,
error: error instanceof Error ? error.message : 'SQL validation failed',
};
}
} catch (error) {
return {
success: false,

View File

@ -1,4 +1,5 @@
import type { DataSource } from '@buster/data-source';
import type { Credentials } from '@buster/data-source';
import { createMetadataFromResults, executeMetricQuery } from '@buster/data-source';
import { db, metricFiles, updateMessageEntries } from '@buster/database';
import {
type ChartConfigProps,
@ -10,7 +11,7 @@ import { wrapTraced } from 'braintrust';
import { eq, inArray } from 'drizzle-orm';
import * as yaml from 'yaml';
import { z } from 'zod';
import { getDataSource } from '../../../../utils/get-data-source';
import { getDataSourceCredentials } from '../../../../utils/get-data-source';
import {
createPermissionErrorMessage,
validateSqlPermissions,
@ -18,7 +19,6 @@ import {
import { createRawToolResultEntry } from '../../../shared/create-raw-llm-tool-result-entry';
import { trackFileAssociations } from '../../file-tracking-helper';
import { validateAndAdjustBarLineAxes } from '../helpers/bar-line-axis-validator';
import { createMetadataFromResults } from '../helpers/metadata-from-results';
import { ensureTimeFrameQuoted } from '../helpers/time-frame-helper';
import {
createModifyMetricsRawLlmMessageEntry,
@ -95,29 +95,9 @@ interface ValidationResult {
error?: string;
message?: string;
results?: Record<string, unknown>[];
metadata?: QueryMetadata;
metadata?: DataMetadata;
}
interface QueryMetadata {
rowCount: number;
totalRowCount: number;
executionTime: number;
limited: boolean;
maxRows: number;
}
interface ResultMetadata {
totalRowCount?: number | undefined;
limited?: boolean | undefined;
maxRows?: number | undefined;
}
const resultMetadataSchema = z.object({
totalRowCount: z.number().optional(),
limited: z.boolean().optional(),
maxRows: z.number().optional(),
});
async function validateSql(
sqlQuery: string,
dataSourceId: string,
@ -147,11 +127,10 @@ async function validateSql(
};
}
// Get a new DataSource instance
let dataSource: DataSource | null = null;
// Get data source credentials
let credentials: Credentials;
try {
dataSource = await getDataSource(dataSourceId);
credentials = await getDataSourceCredentials(dataSourceId);
} catch (_error) {
return {
success: false,
@ -159,128 +138,44 @@ async function validateSql(
};
}
// Retry configuration for SQL validation
const MAX_RETRIES = 3;
const TIMEOUT_MS = 120000; // 120 seconds (2 minutes) per attempt for Snowflake queue handling
const RETRY_DELAYS = [1000, 3000, 6000]; // 1s, 3s, 6s
// Execute query using the new utility
try {
const result = await executeMetricQuery(dataSourceId, sqlQuery, credentials, {
maxRows: 1000, // Validation limit
timeout: 120000, // 2 minutes
retryDelays: [1000, 3000, 6000], // 1s, 3s, 6s
});
// Attempt execution with retries
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
// Execute the SQL query using the DataSource with row limit and timeout for validation
// Use maxRows to limit results without modifying the SQL query (preserves Snowflake caching)
const result = await dataSource.execute({
sql: sqlQuery,
options: {
maxRows: 1000, // Additional safety limit at adapter level
timeout: TIMEOUT_MS,
},
});
// Truncate results to 25 records for display in validation
const displayResults = result.data.slice(0, 25);
if (result.success) {
const allResults = result.rows || [];
// Truncate results to 25 records for display in validation
const results = allResults.slice(0, 25);
// Validate metadata with Zod schema for runtime safety
const validatedMetadata = resultMetadataSchema.safeParse(result.metadata);
const parsedMetadata: ResultMetadata | undefined = validatedMetadata.success
? validatedMetadata.data
: undefined;
const metadata: QueryMetadata = {
rowCount: results.length,
totalRowCount: parsedMetadata?.totalRowCount ?? allResults.length,
executionTime: result.executionTime || 100,
limited: parsedMetadata?.limited ?? false,
maxRows: parsedMetadata?.maxRows ?? 5000,
};
let message: string;
if (allResults.length === 0) {
message = 'Query executed successfully but returned no records';
} else if (result.metadata?.limited) {
message = `Query validated successfully. Results were limited to ${result.metadata.maxRows} rows for memory protection (query may return more rows when executed)${results.length < allResults.length ? ` - showing first 25 of ${allResults.length} fetched` : ''}`;
} else {
message = `Query validated successfully and returned ${allResults.length} records${allResults.length > 25 ? ' (showing sample of first 25)' : ''}`;
}
return {
success: true,
message,
results,
metadata,
};
}
// Check if error is timeout-related
const errorMessage = result.error?.message || 'Query execution failed';
const isTimeout =
errorMessage.toLowerCase().includes('timeout') ||
errorMessage.toLowerCase().includes('timed out');
if (isTimeout && attempt < MAX_RETRIES) {
// Wait before retry
const delay = RETRY_DELAYS[attempt] || 6000;
console.warn(
`[modify-metrics] SQL validation timeout on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`,
{
sqlPreview: `${sqlQuery.substring(0, 100)}...`,
attempt: attempt + 1,
nextDelay: delay,
}
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue; // Retry
}
// Not a timeout or no more retries
return {
success: false,
error: errorMessage,
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'SQL validation failed';
const isTimeout =
errorMessage.toLowerCase().includes('timeout') ||
errorMessage.toLowerCase().includes('timed out');
if (isTimeout && attempt < MAX_RETRIES) {
// Wait before retry
const delay = RETRY_DELAYS[attempt] || 6000;
console.warn(
`[modify-metrics] SQL validation timeout (exception) on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`,
{
sqlPreview: `${sqlQuery.substring(0, 100)}...`,
attempt: attempt + 1,
nextDelay: delay,
error: errorMessage,
}
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue; // Retry
}
// Not a timeout or no more retries
return {
success: false,
error: errorMessage,
};
let message: string;
if (result.data.length === 0) {
message = 'Query executed successfully but returned no records';
} else if (result.hasMoreRecords) {
message = `Query validated successfully. Results were limited to 1000 rows for memory protection (query may return more rows when executed)${displayResults.length < result.data.length ? ` - showing first 25 of ${result.data.length} fetched` : ''}`;
} else {
message = `Query validated successfully and returned ${result.data.length} records${result.data.length > 25 ? ' (showing sample of first 25)' : ''}`;
}
}
// Should not reach here, but just in case
return {
success: false,
error: 'Max retries exceeded for SQL validation',
};
return {
success: true,
message,
results: displayResults,
metadata: result.dataMetadata,
};
} catch (error) {
console.error('[modify-metrics] SQL validation failed:', error);
return {
success: false,
error: error instanceof Error ? error.message : 'SQL validation failed',
};
}
} catch (error) {
return {
success: false,
error: error instanceof Error ? error.message : 'SQL validation failed',
};
} finally {
// Data source cleanup handled by getDataSource
}
}

View File

@ -68,10 +68,10 @@ export function createModifyReportsReasoningEntry(
// Only show elapsed time when all edits are complete (not during streaming)
// Check if all edits have a final status (completed or failed), not just 'loading'
const allEditsComplete = state.edits?.every(
(edit) => edit.status === 'completed' || edit.status === 'failed'
) ?? false;
const allEditsComplete =
state.edits?.every((edit) => edit.status === 'completed' || edit.status === 'failed') ??
false;
if (allEditsComplete) {
secondaryTitle = formatElapsedTime(state.startTime);
}

View File

@ -6,7 +6,7 @@ import { sql } from 'drizzle-orm';
/**
* Get data source credentials from vault
*/
async function getDataSourceCredentials(dataSourceId: string): Promise<Credentials> {
export async function getDataSourceCredentials(dataSourceId: string): Promise<Credentials> {
try {
// Query the vault to get the credentials
const secretResult = await db.execute(

View File

@ -2,6 +2,9 @@ import pkg from 'node-sql-parser';
const { Parser } = pkg;
import type { BaseFrom, ColumnRefItem, Join, Select } from 'node-sql-parser';
import * as yaml from 'yaml';
// Import checkQueryIsReadOnly from data-source package
export { checkQueryIsReadOnly } from '@buster/data-source';
export type { QueryTypeCheckResult } from '@buster/data-source';
export interface ParsedTable {
database?: string;
@ -25,12 +28,6 @@ interface StatementWithNext extends Record<string, unknown> {
type?: string;
}
export interface QueryTypeCheckResult {
isReadOnly: boolean;
queryType?: string;
error?: string;
}
export interface WildcardValidationResult {
isValid: boolean;
error?: string;
@ -1638,76 +1635,3 @@ function extractColumnFromExpression(
}
}
}
/**
* Checks if a SQL query is read-only (SELECT statements only)
* Returns error if query contains write operations
*/
export function checkQueryIsReadOnly(sql: string, dataSourceSyntax?: string): QueryTypeCheckResult {
const dialect = getParserDialect(dataSourceSyntax);
const parser = new Parser();
try {
// Parse SQL into AST with the appropriate dialect
const ast = parser.astify(sql, { database: dialect });
// Handle single statement or array of statements
const statements = Array.isArray(ast) ? ast : [ast];
// Check each statement
for (const statement of statements) {
// Check if statement has a type property
if ('type' in statement && statement.type) {
const queryType = statement.type.toLowerCase();
// Only allow SELECT statements
if (queryType !== 'select') {
// Provide specific guidance based on the query type
let guidance = '';
switch (queryType) {
case 'insert':
guidance = ' To read data, use SELECT statements instead of INSERT.';
break;
case 'update':
guidance = ' To read data, use SELECT statements instead of UPDATE.';
break;
case 'delete':
guidance = ' To read data, use SELECT statements instead of DELETE.';
break;
case 'create':
guidance =
' DDL operations like CREATE are not permitted. Use SELECT to query existing data.';
break;
case 'drop':
guidance =
' DDL operations like DROP are not permitted. Use SELECT to query existing data.';
break;
case 'alter':
guidance =
' DDL operations like ALTER are not permitted. Use SELECT to query existing data.';
break;
default:
guidance = ' Please use SELECT statements to query data.';
}
return {
isReadOnly: false,
queryType: statement.type,
error: `Query type '${statement.type.toUpperCase()}' is not allowed. Only SELECT statements are permitted for read-only access.${guidance}`,
};
}
}
}
return {
isReadOnly: true,
queryType: 'select',
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
return {
isReadOnly: false,
error: `Failed to parse SQL query for validation: ${errorMessage}. Please ensure your SQL syntax is valid. Only SELECT statements are allowed for read-only access.`,
};
}
}

View File

@ -30,15 +30,18 @@
},
"dependencies": {
"@buster/env-utils": "workspace:*",
"@buster/server-shared": "workspace:*",
"@buster/typescript-config": "workspace:*",
"@buster/vitest-config": "workspace:*",
"@google-cloud/bigquery": "^8.1.0",
"@types/pg": "^8.15.4",
"mssql": "^11.0.1",
"mysql2": "^3.14.1",
"node-sql-parser": "^5.3.11",
"pg": "catalog:",
"pg-cursor": "^2.15.3",
"snowflake-sdk": "^2.1.1"
"snowflake-sdk": "^2.1.1",
"zod": "^3.23.8"
},
"devDependencies": {
"@types/mssql": "^9.1.7",

View File

@ -78,3 +78,15 @@ export {
batchWithRateLimit,
getAllRateLimiterStats,
} from './utils/rate-limiter';
// Metric query utilities
export { executeMetricQuery } from './utils/execute-metric-query';
export type {
ExecuteMetricQueryOptions,
ExecuteMetricQueryResult,
} from './utils/execute-metric-query';
export { createMetadataFromResults } from './utils/create-metadata-from-results';
// SQL validation utilities
export { checkQueryIsReadOnly } from './utils/sql-validation';
export type { QueryTypeCheckResult } from './utils/sql-validation';

View File

@ -1,5 +1,5 @@
import type { FieldMetadata } from '@buster/data-source';
import type { ColumnMetaData, DataMetadata } from '@buster/server-shared/metrics';
import type { FieldMetadata } from '../adapters/base';
/**
* Creates DataMetadata from query results and optional column metadata from adapters

View File

@ -0,0 +1,194 @@
import type { DataMetadata } from '@buster/server-shared/metrics';
import { z } from 'zod';
import { DataSource } from '../data-source';
import type { Credentials } from '../types/credentials';
import { createMetadataFromResults } from './create-metadata-from-results';
import { checkQueryIsReadOnly } from './sql-validation';
export interface ExecuteMetricQueryOptions {
maxRows?: number;
timeout?: number;
retryDelays?: number[];
skipReadOnlyCheck?: boolean; // Allow skipping for special cases
}
export interface ExecuteMetricQueryResult {
data: Record<string, string | number | null>[];
dataMetadata: DataMetadata;
hasMoreRecords: boolean;
executionTime?: number;
}
const resultMetadataSchema = z.object({
totalRowCount: z.number().optional(),
limited: z.boolean().optional(),
maxRows: z.number().optional(),
});
/**
* Executes a metric SQL query with retry logic and returns standardized results
* This utility is used by metric creation, modification, and data retrieval handlers
*
* @param dataSourceId - The ID of the data source to query
* @param sql - The SQL query to execute
* @param credentials - The credentials for the data source
* @param options - Query options including maxRows, timeout, and retry delays
* @returns Query results with data, metadata, and pagination info
*/
export async function executeMetricQuery(
dataSourceId: string,
sql: string,
credentials: Credentials,
options: ExecuteMetricQueryOptions = {}
): Promise<ExecuteMetricQueryResult> {
const {
maxRows = 5000,
timeout = 120000, // 2 minutes default
retryDelays = [1000, 3000, 6000], // 1s, 3s, 6s
skipReadOnlyCheck = false,
} = options;
// Validate query is read-only unless explicitly skipped
if (!skipReadOnlyCheck) {
const readOnlyCheck = checkQueryIsReadOnly(sql, credentials.type);
if (!readOnlyCheck.isReadOnly) {
throw new Error(
readOnlyCheck.error ||
'Only SELECT statements are allowed for metric queries. Write operations are not permitted.'
);
}
}
// Create DataSource instance with single data source config
const dataSource = new DataSource({
dataSources: [
{
name: dataSourceId,
type: credentials.type,
credentials,
},
],
defaultDataSource: dataSourceId,
});
try {
// Add 1 to limit to check if there are more records
const queryLimitWithCheck = maxRows + 1;
// Attempt execution with retries
for (let attempt = 0; attempt <= retryDelays.length; attempt++) {
try {
const startTime = Date.now();
// Execute the SQL query using the DataSource
const result = await dataSource.execute({
sql,
options: {
maxRows: queryLimitWithCheck,
timeout,
},
});
const executionTime = Date.now() - startTime;
if (result.success) {
const allResults = result.rows || [];
// Check if we have more records than the requested limit
const hasMoreRecords = allResults.length > maxRows;
// Trim results to requested limit if we have more
const data = hasMoreRecords ? allResults.slice(0, maxRows) : allResults;
// Convert data to match expected type (string | number | null)
const typedData = data.map((row) => {
const typedRow: Record<string, string | number | null> = {};
for (const [key, value] of Object.entries(row)) {
if (value === null || typeof value === 'string' || typeof value === 'number') {
typedRow[key] = value;
} else if (typeof value === 'boolean') {
typedRow[key] = value.toString();
} else if (value instanceof Date) {
typedRow[key] = value.toISOString();
} else {
// Convert other types to string (JSON objects, arrays, etc)
typedRow[key] = JSON.stringify(value);
}
}
return typedRow;
});
// Create metadata from results and column information
const dataMetadata = createMetadataFromResults(typedData, result.columns);
// Validate and parse result metadata if available
const validatedMetadata = resultMetadataSchema.safeParse(result.metadata);
const parsedMetadata = validatedMetadata.success ? validatedMetadata.data : undefined;
return {
data: typedData,
dataMetadata,
hasMoreRecords: parsedMetadata?.limited ?? hasMoreRecords,
executionTime,
};
}
// Check if error is timeout-related
const errorMessage = result.error?.message || 'Query execution failed';
const isTimeout =
errorMessage.toLowerCase().includes('timeout') ||
errorMessage.toLowerCase().includes('timed out');
if (isTimeout && attempt < retryDelays.length) {
// Wait before retry
const delay = retryDelays[attempt] || 6000;
console.warn(
`[execute-metric-query] SQL execution timeout on attempt ${attempt + 1}/${retryDelays.length + 1}. Retrying in ${delay}ms...`,
{
sqlPreview: `${sql.substring(0, 100)}...`,
attempt: attempt + 1,
nextDelay: delay,
}
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue; // Retry
}
// Not a timeout or no more retries
throw new Error(errorMessage);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'SQL execution failed';
const isTimeout =
errorMessage.toLowerCase().includes('timeout') ||
errorMessage.toLowerCase().includes('timed out');
if (isTimeout && attempt < retryDelays.length) {
// Wait before retry
const delay = retryDelays[attempt] || 6000;
console.warn(
`[execute-metric-query] SQL execution timeout (exception) on attempt ${attempt + 1}/${retryDelays.length + 1}. Retrying in ${delay}ms...`,
{
sqlPreview: `${sql.substring(0, 100)}...`,
attempt: attempt + 1,
nextDelay: delay,
error: errorMessage,
}
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue; // Retry
}
// Not a timeout or no more retries
throw error;
}
}
// Should not reach here, but just in case
throw new Error('Max retries exceeded for SQL execution');
} finally {
// Always close the data source connection
await dataSource.close().catch((err) => {
console.error('Failed to close data source connection:', err);
});
}
}

View File

@ -0,0 +1,156 @@
import pkg from 'node-sql-parser';
const { Parser } = pkg;
export interface QueryTypeCheckResult {
isReadOnly: boolean;
queryType?: string;
error?: string;
}
// Map data source syntax to node-sql-parser dialect
const DIALECT_MAPPING: Record<string, string> = {
// Direct mappings
mysql: 'mysql',
postgresql: 'postgresql',
sqlite: 'sqlite',
mariadb: 'mariadb',
bigquery: 'bigquery',
snowflake: 'snowflake',
redshift: 'postgresql', // Redshift uses PostgreSQL dialect
transactsql: 'transactsql',
flinksql: 'flinksql',
hive: 'hive',
// Alternative names
postgres: 'postgresql',
mssql: 'transactsql',
sqlserver: 'transactsql',
athena: 'postgresql', // Athena uses Presto/PostgreSQL syntax
db2: 'db2',
noql: 'mysql', // Default fallback for NoQL
};
function getParserDialect(dataSourceSyntax?: string): string {
if (!dataSourceSyntax) {
return 'postgresql';
}
const dialect = DIALECT_MAPPING[dataSourceSyntax.toLowerCase()];
if (!dialect) {
return 'postgresql';
}
return dialect;
}
/**
* Checks if a SQL query is read-only (SELECT only, no INSERT/UPDATE/DELETE/DDL)
* @param sql - The SQL query to validate
* @param dataSourceSyntax - Optional data source syntax for dialect-specific parsing
* @returns Result indicating if query is read-only with optional error message
*/
export function checkQueryIsReadOnly(sql: string, dataSourceSyntax?: string): QueryTypeCheckResult {
const dialect = getParserDialect(dataSourceSyntax);
const parser = new Parser();
try {
// Parse SQL into AST with the appropriate dialect
const ast = parser.astify(sql, { database: dialect });
// Handle single statement or array of statements
const statements = Array.isArray(ast) ? ast : [ast];
// Check each statement
for (const statement of statements) {
// Check if statement has a type property
if ('type' in statement && statement.type) {
const queryType = statement.type.toLowerCase();
// Only allow SELECT statements
if (queryType !== 'select') {
// Provide specific guidance based on the query type
let guidance = '';
switch (queryType) {
case 'insert':
guidance = ' To read data, use SELECT statements instead of INSERT.';
break;
case 'update':
guidance = ' To read data, use SELECT statements instead of UPDATE.';
break;
case 'delete':
guidance = ' To read data, use SELECT statements instead of DELETE.';
break;
case 'create':
guidance =
' DDL operations like CREATE are not permitted. Use SELECT to query existing data.';
break;
case 'drop':
guidance =
' DDL operations like DROP are not permitted. Use SELECT to query existing data.';
break;
case 'alter':
guidance =
' DDL operations like ALTER are not permitted. Use SELECT to query existing data.';
break;
case 'truncate':
guidance =
' DDL operations like TRUNCATE are not permitted. Use SELECT to query existing data.';
break;
case 'grant':
case 'revoke':
guidance =
' Permission management statements are not allowed. Use SELECT statements to read data.';
break;
default:
guidance = ' Please use SELECT statements to query data.';
}
return {
isReadOnly: false,
queryType,
error: `Query type '${queryType.toUpperCase()}' is not allowed. Only SELECT statements are permitted for read-only access.${guidance}`,
};
}
}
}
return {
isReadOnly: true,
queryType: 'select',
};
} catch (error) {
// If we can't parse the SQL, err on the side of caution
const errorMessage = error instanceof Error ? error.message : 'Unknown parsing error';
// Check for common write operations in the raw SQL as a fallback
const sqlLower = sql.toLowerCase();
const writeKeywords = [
'insert',
'update',
'delete',
'create',
'alter',
'drop',
'truncate',
'grant',
'revoke',
];
for (const keyword of writeKeywords) {
// Simple regex to check for keywords at word boundaries
const regex = new RegExp(`\\b${keyword}\\b`);
if (regex.test(sqlLower)) {
return {
isReadOnly: false,
error: `Query appears to contain write operation (${keyword.toUpperCase()}). Only SELECT statements are allowed.`,
};
}
}
// If parsing failed, return error to be safe
return {
isReadOnly: false,
error: `Failed to parse SQL query for validation: ${errorMessage}. Please ensure your SQL syntax is valid. Only SELECT statements are allowed for read-only access.`,
};
}
}

View File

@ -949,6 +949,9 @@ importers:
'@buster/env-utils':
specifier: workspace:*
version: link:../env-utils
'@buster/server-shared':
specifier: workspace:*
version: link:../server-shared
'@buster/typescript-config':
specifier: workspace:*
version: link:../typescript-config
@ -967,6 +970,9 @@ importers:
mysql2:
specifier: ^3.14.1
version: 3.14.1
node-sql-parser:
specifier: ^5.3.11
version: 5.3.11
pg:
specifier: 'catalog:'
version: 8.16.3
@ -976,6 +982,9 @@ importers:
snowflake-sdk:
specifier: ^2.1.1
version: 2.1.1(asn1.js@5.4.1)
zod:
specifier: ^3.23.8
version: 3.25.76
devDependencies:
'@types/mssql':
specifier: ^9.1.7
@ -9600,6 +9609,10 @@ packages:
resolution: {integrity: sha512-cf+iXXJ9Foz4hBIu+eNNeg207ac6XruA9I9DXEs+jCxeS9t/k9T0GZK8NZngPwkv+P26i3zNFj9jxJU2v3pJnw==}
engines: {node: '>=8'}
node-sql-parser@5.3.11:
resolution: {integrity: sha512-1ZcXu2qEFZlMkkaYPpMoTkypTunYTvtnInzVjWDhjc1+FS5h/WqT1rbgP2pP42/nXBWfPVN66mEfidWSpYVs1Q==}
engines: {node: '>=8'}
normalize-path@3.0.0:
resolution: {integrity: sha512-6eZs5Ls3WtCisHWp9S2GUy8dqkpGi4BVSz3GaqiE6ezub0512ESztXUwUB6C6IKbQkY2Pnb/mD4WYojCRwcwLA==}
engines: {node: '>=0.10.0'}
@ -22886,6 +22899,11 @@ snapshots:
'@types/pegjs': 0.10.6
big-integer: 1.6.52
node-sql-parser@5.3.11:
dependencies:
'@types/pegjs': 0.10.6
big-integer: 1.6.52
normalize-path@3.0.0: {}
npm-run-path@5.3.0: