diff --git a/apps/server/src/api/v2/metric_files/get-metric-data.ts b/apps/server/src/api/v2/metric_files/get-metric-data.ts index 9528edebb..8d4e6a797 100644 --- a/apps/server/src/api/v2/metric_files/get-metric-data.ts +++ b/apps/server/src/api/v2/metric_files/get-metric-data.ts @@ -1,5 +1,5 @@ import { type AssetPermissionCheck, checkPermission } from '@buster/access-controls'; -import { createAdapter } from '@buster/data-source'; +import { executeMetricQuery } from '@buster/data-source'; import type { Credentials } from '@buster/data-source'; import type { User } from '@buster/database'; import { @@ -19,7 +19,7 @@ import { HTTPException } from 'hono/http-exception'; * 2. Checks user has permission to view the metric file * 3. Retrieves the metric definition * 4. Parses the metric content to extract SQL - * 5. Executes the query against the data source + * 5. Executes the query against the data source using the shared utility * 6. Returns the data with metadata and pagination info * * @param metricId - The ID of the metric to retrieve data for @@ -103,132 +103,19 @@ export async function getMetricDataHandler( }); } - // Create adapter and execute query - const adapter = await createAdapter(credentials); - + // Execute query using the shared utility try { - // Add 1 to limit to check if there are more records - const queryLimitWithCheck = queryLimit + 1; - - // Execute query with timeout (60 seconds) - const queryResult = await adapter.query( - sql, - [], // No parameters for metric queries - queryLimitWithCheck, - 60000 // 60 second timeout - ); - - // Check if we have more records than the requested limit - const hasMoreRecords = queryResult.rows.length > queryLimit; - - // Trim results to requested limit if we have more - const rawData = hasMoreRecords ? queryResult.rows.slice(0, queryLimit) : queryResult.rows; - - // Convert data to match expected type (string | number | null) - const data = rawData.map((row) => { - const typedRow: Record = {}; - for (const [key, value] of Object.entries(row)) { - if (value === null || typeof value === 'string' || typeof value === 'number') { - typedRow[key] = value; - } else if (typeof value === 'boolean') { - typedRow[key] = value.toString(); - } else if (value instanceof Date) { - typedRow[key] = value.toISOString(); - } else { - // Convert other types to string (JSON objects, arrays, etc) - typedRow[key] = JSON.stringify(value); - } - } - return typedRow; + const result = await executeMetricQuery(metric.dataSourceId, sql, credentials, { + maxRows: queryLimit, + timeout: 60000, // 60 seconds + retryDelays: [1000, 3000, 6000], // 1s, 3s, 6s }); - // Build metadata from query result with required fields - const columnMetadata = queryResult.fields.map((field) => { - // Determine simple type based on field type - const simpleType = - field.type.includes('int') || - field.type.includes('float') || - field.type.includes('decimal') || - field.type.includes('numeric') || - field.type === 'number' - ? 'number' - : field.type.includes('date') || field.type.includes('time') - ? 'date' - : 'text'; - - return { - name: field.name, - // Map common database types to supported types - type: (field.type.toLowerCase().includes('varchar') - ? 'varchar' - : field.type.toLowerCase().includes('char') - ? 'char' - : field.type.toLowerCase().includes('text') - ? 'text' - : field.type.toLowerCase().includes('int') - ? 'integer' - : field.type.toLowerCase().includes('float') - ? 'float' - : field.type.toLowerCase().includes('decimal') - ? 'decimal' - : field.type.toLowerCase().includes('numeric') - ? 'numeric' - : field.type.toLowerCase().includes('bool') - ? 'bool' - : field.type.toLowerCase().includes('date') - ? 'date' - : field.type.toLowerCase().includes('time') - ? 'timestamp' - : field.type.toLowerCase().includes('json') - ? 'json' - : 'text') as - | 'text' - | 'float' - | 'integer' - | 'date' - | 'float8' - | 'timestamp' - | 'timestamptz' - | 'bool' - | 'time' - | 'boolean' - | 'json' - | 'jsonb' - | 'int8' - | 'int4' - | 'int2' - | 'decimal' - | 'char' - | 'character varying' - | 'character' - | 'varchar' - | 'number' - | 'numeric' - | 'tinytext' - | 'mediumtext' - | 'longtext' - | 'nchar' - | 'nvarchat' - | 'ntext' - | 'float4', - min_value: '', // These would need to be calculated from actual data - max_value: '', - unique_values: 0, - simple_type: simpleType as 'text' | 'number' | 'date', - }; - }); - - const dataMetadata = { - column_count: queryResult.fields.length, - column_metadata: columnMetadata, - row_count: data.length, - }; - return { - data, - data_metadata: dataMetadata, + data: result.data, + data_metadata: result.dataMetadata, metricId, - has_more_records: hasMoreRecords, + has_more_records: result.hasMoreRecords, }; } catch (error) { console.error('Query execution failed:', error); @@ -242,10 +129,5 @@ export async function getMetricDataHandler( throw new HTTPException(500, { message: 'Query execution failed', }); - } finally { - // Always close the adapter connection - await adapter.close().catch((err) => { - console.error('Failed to close adapter connection:', err); - }); } } diff --git a/packages/ai/src/tools/visualization-tools/metrics/create-metrics-tool/create-metrics-execute.ts b/packages/ai/src/tools/visualization-tools/metrics/create-metrics-tool/create-metrics-execute.ts index 7be595e27..9f4221a63 100644 --- a/packages/ai/src/tools/visualization-tools/metrics/create-metrics-tool/create-metrics-execute.ts +++ b/packages/ai/src/tools/visualization-tools/metrics/create-metrics-tool/create-metrics-execute.ts @@ -1,5 +1,6 @@ import { randomUUID } from 'node:crypto'; -import type { DataSource } from '@buster/data-source'; +import type { Credentials } from '@buster/data-source'; +import { createMetadataFromResults, executeMetricQuery } from '@buster/data-source'; import { assetPermissions, db, metricFiles, updateMessageEntries } from '@buster/database'; import { type ChartConfigProps, @@ -10,7 +11,7 @@ import { import { wrapTraced } from 'braintrust'; import * as yaml from 'yaml'; import { z } from 'zod'; -import { getDataSource } from '../../../../utils/get-data-source'; +import { getDataSourceCredentials } from '../../../../utils/get-data-source'; import { createPermissionErrorMessage, validateSqlPermissions, @@ -18,7 +19,6 @@ import { import { createRawToolResultEntry } from '../../../shared/create-raw-llm-tool-result-entry'; import { trackFileAssociations } from '../../file-tracking-helper'; import { validateAndAdjustBarLineAxes } from '../helpers/bar-line-axis-validator'; -import { createMetadataFromResults } from '../helpers/metadata-from-results'; import { ensureTimeFrameQuoted } from '../helpers/time-frame-helper'; import type { CreateMetricsContext, @@ -75,29 +75,9 @@ interface ValidationResult { error?: string; message?: string; results?: Record[]; - metadata?: QueryMetadata; + metadata?: DataMetadata; } -interface QueryMetadata { - rowCount: number; - totalRowCount: number; - executionTime: number; - limited: boolean; - maxRows: number; -} - -interface ResultMetadata { - totalRowCount?: number | undefined; - limited?: boolean | undefined; - maxRows?: number | undefined; -} - -const resultMetadataSchema = z.object({ - totalRowCount: z.number().optional(), - limited: z.boolean().optional(), - maxRows: z.number().optional(), -}); - async function processMetricFile( file: { name: string; yml_content: string }, dataSourceId: string, @@ -225,11 +205,10 @@ async function validateSql( }; } - // Get a new DataSource instance - let dataSource: DataSource | null = null; - + // Get data source credentials + let credentials: Credentials; try { - dataSource = await getDataSource(dataSourceId); + credentials = await getDataSourceCredentials(dataSourceId); } catch (_error) { return { success: false, @@ -237,121 +216,39 @@ async function validateSql( }; } - // Retry configuration for SQL validation - const MAX_RETRIES = 3; - const TIMEOUT_MS = 120000; // 120 seconds (2 minutes) per attempt for Snowflake queue handling - const RETRY_DELAYS = [1000, 3000, 6000]; // 1s, 3s, 6s + // Execute query using the new utility + try { + const result = await executeMetricQuery(dataSourceId, sqlQuery, credentials, { + maxRows: 1000, // Validation limit + timeout: 120000, // 2 minutes + retryDelays: [1000, 3000, 6000], // 1s, 3s, 6s + }); - // Attempt execution with retries - for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { - try { - // Execute the SQL query using the DataSource with row limit and timeout for validation - // Use maxRows to limit results without modifying the SQL query (preserves Snowflake caching) - const result = await dataSource.execute({ - sql: sqlQuery, - options: { - maxRows: 1000, // Additional safety limit at adapter level - timeout: TIMEOUT_MS, - }, - }); + // Truncate results to 25 records for display in validation + const displayResults = result.data.slice(0, 25); - if (result.success) { - const allResults = result.rows || []; - // Truncate results to 25 records for display in validation - const results = allResults.slice(0, 25); - - // Validate metadata with Zod schema for runtime safety - const validatedMetadata = resultMetadataSchema.safeParse(result.metadata); - const parsedMetadata: ResultMetadata | undefined = validatedMetadata.success - ? validatedMetadata.data - : undefined; - - const metadata: QueryMetadata = { - rowCount: results.length, - totalRowCount: parsedMetadata?.totalRowCount ?? allResults.length, - executionTime: result.executionTime || 100, - limited: parsedMetadata?.limited ?? false, - maxRows: parsedMetadata?.maxRows ?? 5000, - }; - - let message: string; - if (allResults.length === 0) { - message = 'Query executed successfully but returned no records'; - } else if (result.metadata?.limited) { - message = `Query validated successfully. Results were limited to ${result.metadata.maxRows} rows for memory protection (query may return more rows when executed)${results.length < allResults.length ? ` - showing first 25 of ${allResults.length} fetched` : ''}`; - } else { - message = `Query validated successfully and returned ${allResults.length} records${allResults.length > 25 ? ' (showing sample of first 25)' : ''}`; - } - - return { - success: true, - message, - results, - metadata, - }; - } - - // Check if error is timeout-related - const errorMessage = result.error?.message || 'Query execution failed'; - const isTimeout = - errorMessage.toLowerCase().includes('timeout') || - errorMessage.toLowerCase().includes('timed out'); - - if (isTimeout && attempt < MAX_RETRIES) { - // Wait before retry - const delay = RETRY_DELAYS[attempt] || 6000; - console.warn( - `[create-metrics] SQL validation timeout on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`, - { - sqlPreview: `${sqlQuery.substring(0, 100)}...`, - attempt: attempt + 1, - nextDelay: delay, - } - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - continue; // Retry - } - - // Not a timeout or no more retries - return { - success: false, - error: errorMessage, - }; - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'SQL validation failed'; - const isTimeout = - errorMessage.toLowerCase().includes('timeout') || - errorMessage.toLowerCase().includes('timed out'); - - if (isTimeout && attempt < MAX_RETRIES) { - // Wait before retry - const delay = RETRY_DELAYS[attempt] || 6000; - console.warn( - `[create-metrics] SQL validation timeout (exception) on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`, - { - sqlPreview: `${sqlQuery.substring(0, 100)}...`, - attempt: attempt + 1, - nextDelay: delay, - error: errorMessage, - } - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - continue; // Retry - } - - // Not a timeout or no more retries - return { - success: false, - error: errorMessage, - }; + let message: string; + if (result.data.length === 0) { + message = 'Query executed successfully but returned no records'; + } else if (result.hasMoreRecords) { + message = `Query validated successfully. Results were limited to 1000 rows for memory protection (query may return more rows when executed)${displayResults.length < result.data.length ? ` - showing first 25 of ${result.data.length} fetched` : ''}`; + } else { + message = `Query validated successfully and returned ${result.data.length} records${result.data.length > 25 ? ' (showing sample of first 25)' : ''}`; } - } - // Should not reach here, but just in case - return { - success: false, - error: 'Max retries exceeded for SQL validation', - }; + return { + success: true, + message, + results: displayResults, + metadata: result.dataMetadata, + }; + } catch (error) { + console.error('[create-metrics] SQL validation failed:', error); + return { + success: false, + error: error instanceof Error ? error.message : 'SQL validation failed', + }; + } } catch (error) { return { success: false, diff --git a/packages/ai/src/tools/visualization-tools/metrics/modify-metrics-tool/modify-metrics-execute.ts b/packages/ai/src/tools/visualization-tools/metrics/modify-metrics-tool/modify-metrics-execute.ts index 33a6af426..70d8fcd05 100644 --- a/packages/ai/src/tools/visualization-tools/metrics/modify-metrics-tool/modify-metrics-execute.ts +++ b/packages/ai/src/tools/visualization-tools/metrics/modify-metrics-tool/modify-metrics-execute.ts @@ -1,4 +1,5 @@ -import type { DataSource } from '@buster/data-source'; +import type { Credentials } from '@buster/data-source'; +import { createMetadataFromResults, executeMetricQuery } from '@buster/data-source'; import { db, metricFiles, updateMessageEntries } from '@buster/database'; import { type ChartConfigProps, @@ -10,7 +11,7 @@ import { wrapTraced } from 'braintrust'; import { eq, inArray } from 'drizzle-orm'; import * as yaml from 'yaml'; import { z } from 'zod'; -import { getDataSource } from '../../../../utils/get-data-source'; +import { getDataSourceCredentials } from '../../../../utils/get-data-source'; import { createPermissionErrorMessage, validateSqlPermissions, @@ -18,7 +19,6 @@ import { import { createRawToolResultEntry } from '../../../shared/create-raw-llm-tool-result-entry'; import { trackFileAssociations } from '../../file-tracking-helper'; import { validateAndAdjustBarLineAxes } from '../helpers/bar-line-axis-validator'; -import { createMetadataFromResults } from '../helpers/metadata-from-results'; import { ensureTimeFrameQuoted } from '../helpers/time-frame-helper'; import { createModifyMetricsRawLlmMessageEntry, @@ -95,29 +95,9 @@ interface ValidationResult { error?: string; message?: string; results?: Record[]; - metadata?: QueryMetadata; + metadata?: DataMetadata; } -interface QueryMetadata { - rowCount: number; - totalRowCount: number; - executionTime: number; - limited: boolean; - maxRows: number; -} - -interface ResultMetadata { - totalRowCount?: number | undefined; - limited?: boolean | undefined; - maxRows?: number | undefined; -} - -const resultMetadataSchema = z.object({ - totalRowCount: z.number().optional(), - limited: z.boolean().optional(), - maxRows: z.number().optional(), -}); - async function validateSql( sqlQuery: string, dataSourceId: string, @@ -147,11 +127,10 @@ async function validateSql( }; } - // Get a new DataSource instance - let dataSource: DataSource | null = null; - + // Get data source credentials + let credentials: Credentials; try { - dataSource = await getDataSource(dataSourceId); + credentials = await getDataSourceCredentials(dataSourceId); } catch (_error) { return { success: false, @@ -159,128 +138,44 @@ async function validateSql( }; } - // Retry configuration for SQL validation - const MAX_RETRIES = 3; - const TIMEOUT_MS = 120000; // 120 seconds (2 minutes) per attempt for Snowflake queue handling - const RETRY_DELAYS = [1000, 3000, 6000]; // 1s, 3s, 6s + // Execute query using the new utility + try { + const result = await executeMetricQuery(dataSourceId, sqlQuery, credentials, { + maxRows: 1000, // Validation limit + timeout: 120000, // 2 minutes + retryDelays: [1000, 3000, 6000], // 1s, 3s, 6s + }); - // Attempt execution with retries - for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { - try { - // Execute the SQL query using the DataSource with row limit and timeout for validation - // Use maxRows to limit results without modifying the SQL query (preserves Snowflake caching) - const result = await dataSource.execute({ - sql: sqlQuery, - options: { - maxRows: 1000, // Additional safety limit at adapter level - timeout: TIMEOUT_MS, - }, - }); + // Truncate results to 25 records for display in validation + const displayResults = result.data.slice(0, 25); - if (result.success) { - const allResults = result.rows || []; - // Truncate results to 25 records for display in validation - const results = allResults.slice(0, 25); - - // Validate metadata with Zod schema for runtime safety - const validatedMetadata = resultMetadataSchema.safeParse(result.metadata); - const parsedMetadata: ResultMetadata | undefined = validatedMetadata.success - ? validatedMetadata.data - : undefined; - - const metadata: QueryMetadata = { - rowCount: results.length, - totalRowCount: parsedMetadata?.totalRowCount ?? allResults.length, - executionTime: result.executionTime || 100, - limited: parsedMetadata?.limited ?? false, - maxRows: parsedMetadata?.maxRows ?? 5000, - }; - - let message: string; - if (allResults.length === 0) { - message = 'Query executed successfully but returned no records'; - } else if (result.metadata?.limited) { - message = `Query validated successfully. Results were limited to ${result.metadata.maxRows} rows for memory protection (query may return more rows when executed)${results.length < allResults.length ? ` - showing first 25 of ${allResults.length} fetched` : ''}`; - } else { - message = `Query validated successfully and returned ${allResults.length} records${allResults.length > 25 ? ' (showing sample of first 25)' : ''}`; - } - - return { - success: true, - message, - results, - metadata, - }; - } - - // Check if error is timeout-related - const errorMessage = result.error?.message || 'Query execution failed'; - const isTimeout = - errorMessage.toLowerCase().includes('timeout') || - errorMessage.toLowerCase().includes('timed out'); - - if (isTimeout && attempt < MAX_RETRIES) { - // Wait before retry - const delay = RETRY_DELAYS[attempt] || 6000; - console.warn( - `[modify-metrics] SQL validation timeout on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`, - { - sqlPreview: `${sqlQuery.substring(0, 100)}...`, - attempt: attempt + 1, - nextDelay: delay, - } - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - continue; // Retry - } - - // Not a timeout or no more retries - return { - success: false, - error: errorMessage, - }; - } catch (error) { - const errorMessage = error instanceof Error ? error.message : 'SQL validation failed'; - const isTimeout = - errorMessage.toLowerCase().includes('timeout') || - errorMessage.toLowerCase().includes('timed out'); - - if (isTimeout && attempt < MAX_RETRIES) { - // Wait before retry - const delay = RETRY_DELAYS[attempt] || 6000; - console.warn( - `[modify-metrics] SQL validation timeout (exception) on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`, - { - sqlPreview: `${sqlQuery.substring(0, 100)}...`, - attempt: attempt + 1, - nextDelay: delay, - error: errorMessage, - } - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - continue; // Retry - } - - // Not a timeout or no more retries - return { - success: false, - error: errorMessage, - }; + let message: string; + if (result.data.length === 0) { + message = 'Query executed successfully but returned no records'; + } else if (result.hasMoreRecords) { + message = `Query validated successfully. Results were limited to 1000 rows for memory protection (query may return more rows when executed)${displayResults.length < result.data.length ? ` - showing first 25 of ${result.data.length} fetched` : ''}`; + } else { + message = `Query validated successfully and returned ${result.data.length} records${result.data.length > 25 ? ' (showing sample of first 25)' : ''}`; } - } - // Should not reach here, but just in case - return { - success: false, - error: 'Max retries exceeded for SQL validation', - }; + return { + success: true, + message, + results: displayResults, + metadata: result.dataMetadata, + }; + } catch (error) { + console.error('[modify-metrics] SQL validation failed:', error); + return { + success: false, + error: error instanceof Error ? error.message : 'SQL validation failed', + }; + } } catch (error) { return { success: false, error: error instanceof Error ? error.message : 'SQL validation failed', }; - } finally { - // Data source cleanup handled by getDataSource } } diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/helpers/modify-reports-transform-helper.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/helpers/modify-reports-transform-helper.ts index 2bcea1b22..5e1841da4 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/helpers/modify-reports-transform-helper.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/helpers/modify-reports-transform-helper.ts @@ -68,10 +68,10 @@ export function createModifyReportsReasoningEntry( // Only show elapsed time when all edits are complete (not during streaming) // Check if all edits have a final status (completed or failed), not just 'loading' - const allEditsComplete = state.edits?.every( - (edit) => edit.status === 'completed' || edit.status === 'failed' - ) ?? false; - + const allEditsComplete = + state.edits?.every((edit) => edit.status === 'completed' || edit.status === 'failed') ?? + false; + if (allEditsComplete) { secondaryTitle = formatElapsedTime(state.startTime); } diff --git a/packages/ai/src/utils/get-data-source.ts b/packages/ai/src/utils/get-data-source.ts index e36f9d63a..ad870943a 100644 --- a/packages/ai/src/utils/get-data-source.ts +++ b/packages/ai/src/utils/get-data-source.ts @@ -6,7 +6,7 @@ import { sql } from 'drizzle-orm'; /** * Get data source credentials from vault */ -async function getDataSourceCredentials(dataSourceId: string): Promise { +export async function getDataSourceCredentials(dataSourceId: string): Promise { try { // Query the vault to get the credentials const secretResult = await db.execute( diff --git a/packages/ai/src/utils/sql-permissions/sql-parser-helpers.ts b/packages/ai/src/utils/sql-permissions/sql-parser-helpers.ts index d2c793cfc..6168fd01a 100644 --- a/packages/ai/src/utils/sql-permissions/sql-parser-helpers.ts +++ b/packages/ai/src/utils/sql-permissions/sql-parser-helpers.ts @@ -2,6 +2,9 @@ import pkg from 'node-sql-parser'; const { Parser } = pkg; import type { BaseFrom, ColumnRefItem, Join, Select } from 'node-sql-parser'; import * as yaml from 'yaml'; +// Import checkQueryIsReadOnly from data-source package +export { checkQueryIsReadOnly } from '@buster/data-source'; +export type { QueryTypeCheckResult } from '@buster/data-source'; export interface ParsedTable { database?: string; @@ -25,12 +28,6 @@ interface StatementWithNext extends Record { type?: string; } -export interface QueryTypeCheckResult { - isReadOnly: boolean; - queryType?: string; - error?: string; -} - export interface WildcardValidationResult { isValid: boolean; error?: string; @@ -1638,76 +1635,3 @@ function extractColumnFromExpression( } } } - -/** - * Checks if a SQL query is read-only (SELECT statements only) - * Returns error if query contains write operations - */ -export function checkQueryIsReadOnly(sql: string, dataSourceSyntax?: string): QueryTypeCheckResult { - const dialect = getParserDialect(dataSourceSyntax); - const parser = new Parser(); - - try { - // Parse SQL into AST with the appropriate dialect - const ast = parser.astify(sql, { database: dialect }); - - // Handle single statement or array of statements - const statements = Array.isArray(ast) ? ast : [ast]; - - // Check each statement - for (const statement of statements) { - // Check if statement has a type property - if ('type' in statement && statement.type) { - const queryType = statement.type.toLowerCase(); - - // Only allow SELECT statements - if (queryType !== 'select') { - // Provide specific guidance based on the query type - let guidance = ''; - switch (queryType) { - case 'insert': - guidance = ' To read data, use SELECT statements instead of INSERT.'; - break; - case 'update': - guidance = ' To read data, use SELECT statements instead of UPDATE.'; - break; - case 'delete': - guidance = ' To read data, use SELECT statements instead of DELETE.'; - break; - case 'create': - guidance = - ' DDL operations like CREATE are not permitted. Use SELECT to query existing data.'; - break; - case 'drop': - guidance = - ' DDL operations like DROP are not permitted. Use SELECT to query existing data.'; - break; - case 'alter': - guidance = - ' DDL operations like ALTER are not permitted. Use SELECT to query existing data.'; - break; - default: - guidance = ' Please use SELECT statements to query data.'; - } - - return { - isReadOnly: false, - queryType: statement.type, - error: `Query type '${statement.type.toUpperCase()}' is not allowed. Only SELECT statements are permitted for read-only access.${guidance}`, - }; - } - } - } - - return { - isReadOnly: true, - queryType: 'select', - }; - } catch (error) { - const errorMessage = error instanceof Error ? error.message : String(error); - return { - isReadOnly: false, - error: `Failed to parse SQL query for validation: ${errorMessage}. Please ensure your SQL syntax is valid. Only SELECT statements are allowed for read-only access.`, - }; - } -} diff --git a/packages/data-source/package.json b/packages/data-source/package.json index 045063803..4fcbb3857 100644 --- a/packages/data-source/package.json +++ b/packages/data-source/package.json @@ -30,15 +30,18 @@ }, "dependencies": { "@buster/env-utils": "workspace:*", + "@buster/server-shared": "workspace:*", "@buster/typescript-config": "workspace:*", "@buster/vitest-config": "workspace:*", "@google-cloud/bigquery": "^8.1.0", "@types/pg": "^8.15.4", "mssql": "^11.0.1", "mysql2": "^3.14.1", + "node-sql-parser": "^5.3.11", "pg": "catalog:", "pg-cursor": "^2.15.3", - "snowflake-sdk": "^2.1.1" + "snowflake-sdk": "^2.1.1", + "zod": "^3.23.8" }, "devDependencies": { "@types/mssql": "^9.1.7", diff --git a/packages/data-source/src/index.ts b/packages/data-source/src/index.ts index 330c0e224..db7d247cc 100644 --- a/packages/data-source/src/index.ts +++ b/packages/data-source/src/index.ts @@ -78,3 +78,15 @@ export { batchWithRateLimit, getAllRateLimiterStats, } from './utils/rate-limiter'; + +// Metric query utilities +export { executeMetricQuery } from './utils/execute-metric-query'; +export type { + ExecuteMetricQueryOptions, + ExecuteMetricQueryResult, +} from './utils/execute-metric-query'; +export { createMetadataFromResults } from './utils/create-metadata-from-results'; + +// SQL validation utilities +export { checkQueryIsReadOnly } from './utils/sql-validation'; +export type { QueryTypeCheckResult } from './utils/sql-validation'; diff --git a/packages/ai/src/tools/visualization-tools/metrics/helpers/metadata-from-results.ts b/packages/data-source/src/utils/create-metadata-from-results.ts similarity index 98% rename from packages/ai/src/tools/visualization-tools/metrics/helpers/metadata-from-results.ts rename to packages/data-source/src/utils/create-metadata-from-results.ts index f491fc526..8f5df81c7 100644 --- a/packages/ai/src/tools/visualization-tools/metrics/helpers/metadata-from-results.ts +++ b/packages/data-source/src/utils/create-metadata-from-results.ts @@ -1,5 +1,5 @@ -import type { FieldMetadata } from '@buster/data-source'; import type { ColumnMetaData, DataMetadata } from '@buster/server-shared/metrics'; +import type { FieldMetadata } from '../adapters/base'; /** * Creates DataMetadata from query results and optional column metadata from adapters diff --git a/packages/data-source/src/utils/execute-metric-query.ts b/packages/data-source/src/utils/execute-metric-query.ts new file mode 100644 index 000000000..552efb92c --- /dev/null +++ b/packages/data-source/src/utils/execute-metric-query.ts @@ -0,0 +1,194 @@ +import type { DataMetadata } from '@buster/server-shared/metrics'; +import { z } from 'zod'; +import { DataSource } from '../data-source'; +import type { Credentials } from '../types/credentials'; +import { createMetadataFromResults } from './create-metadata-from-results'; +import { checkQueryIsReadOnly } from './sql-validation'; + +export interface ExecuteMetricQueryOptions { + maxRows?: number; + timeout?: number; + retryDelays?: number[]; + skipReadOnlyCheck?: boolean; // Allow skipping for special cases +} + +export interface ExecuteMetricQueryResult { + data: Record[]; + dataMetadata: DataMetadata; + hasMoreRecords: boolean; + executionTime?: number; +} + +const resultMetadataSchema = z.object({ + totalRowCount: z.number().optional(), + limited: z.boolean().optional(), + maxRows: z.number().optional(), +}); + +/** + * Executes a metric SQL query with retry logic and returns standardized results + * This utility is used by metric creation, modification, and data retrieval handlers + * + * @param dataSourceId - The ID of the data source to query + * @param sql - The SQL query to execute + * @param credentials - The credentials for the data source + * @param options - Query options including maxRows, timeout, and retry delays + * @returns Query results with data, metadata, and pagination info + */ +export async function executeMetricQuery( + dataSourceId: string, + sql: string, + credentials: Credentials, + options: ExecuteMetricQueryOptions = {} +): Promise { + const { + maxRows = 5000, + timeout = 120000, // 2 minutes default + retryDelays = [1000, 3000, 6000], // 1s, 3s, 6s + skipReadOnlyCheck = false, + } = options; + + // Validate query is read-only unless explicitly skipped + if (!skipReadOnlyCheck) { + const readOnlyCheck = checkQueryIsReadOnly(sql, credentials.type); + if (!readOnlyCheck.isReadOnly) { + throw new Error( + readOnlyCheck.error || + 'Only SELECT statements are allowed for metric queries. Write operations are not permitted.' + ); + } + } + + // Create DataSource instance with single data source config + const dataSource = new DataSource({ + dataSources: [ + { + name: dataSourceId, + type: credentials.type, + credentials, + }, + ], + defaultDataSource: dataSourceId, + }); + + try { + // Add 1 to limit to check if there are more records + const queryLimitWithCheck = maxRows + 1; + + // Attempt execution with retries + for (let attempt = 0; attempt <= retryDelays.length; attempt++) { + try { + const startTime = Date.now(); + + // Execute the SQL query using the DataSource + const result = await dataSource.execute({ + sql, + options: { + maxRows: queryLimitWithCheck, + timeout, + }, + }); + + const executionTime = Date.now() - startTime; + + if (result.success) { + const allResults = result.rows || []; + + // Check if we have more records than the requested limit + const hasMoreRecords = allResults.length > maxRows; + + // Trim results to requested limit if we have more + const data = hasMoreRecords ? allResults.slice(0, maxRows) : allResults; + + // Convert data to match expected type (string | number | null) + const typedData = data.map((row) => { + const typedRow: Record = {}; + for (const [key, value] of Object.entries(row)) { + if (value === null || typeof value === 'string' || typeof value === 'number') { + typedRow[key] = value; + } else if (typeof value === 'boolean') { + typedRow[key] = value.toString(); + } else if (value instanceof Date) { + typedRow[key] = value.toISOString(); + } else { + // Convert other types to string (JSON objects, arrays, etc) + typedRow[key] = JSON.stringify(value); + } + } + return typedRow; + }); + + // Create metadata from results and column information + const dataMetadata = createMetadataFromResults(typedData, result.columns); + + // Validate and parse result metadata if available + const validatedMetadata = resultMetadataSchema.safeParse(result.metadata); + const parsedMetadata = validatedMetadata.success ? validatedMetadata.data : undefined; + + return { + data: typedData, + dataMetadata, + hasMoreRecords: parsedMetadata?.limited ?? hasMoreRecords, + executionTime, + }; + } + + // Check if error is timeout-related + const errorMessage = result.error?.message || 'Query execution failed'; + const isTimeout = + errorMessage.toLowerCase().includes('timeout') || + errorMessage.toLowerCase().includes('timed out'); + + if (isTimeout && attempt < retryDelays.length) { + // Wait before retry + const delay = retryDelays[attempt] || 6000; + console.warn( + `[execute-metric-query] SQL execution timeout on attempt ${attempt + 1}/${retryDelays.length + 1}. Retrying in ${delay}ms...`, + { + sqlPreview: `${sql.substring(0, 100)}...`, + attempt: attempt + 1, + nextDelay: delay, + } + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; // Retry + } + + // Not a timeout or no more retries + throw new Error(errorMessage); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'SQL execution failed'; + const isTimeout = + errorMessage.toLowerCase().includes('timeout') || + errorMessage.toLowerCase().includes('timed out'); + + if (isTimeout && attempt < retryDelays.length) { + // Wait before retry + const delay = retryDelays[attempt] || 6000; + console.warn( + `[execute-metric-query] SQL execution timeout (exception) on attempt ${attempt + 1}/${retryDelays.length + 1}. Retrying in ${delay}ms...`, + { + sqlPreview: `${sql.substring(0, 100)}...`, + attempt: attempt + 1, + nextDelay: delay, + error: errorMessage, + } + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; // Retry + } + + // Not a timeout or no more retries + throw error; + } + } + + // Should not reach here, but just in case + throw new Error('Max retries exceeded for SQL execution'); + } finally { + // Always close the data source connection + await dataSource.close().catch((err) => { + console.error('Failed to close data source connection:', err); + }); + } +} diff --git a/packages/data-source/src/utils/sql-validation.ts b/packages/data-source/src/utils/sql-validation.ts new file mode 100644 index 000000000..3dda76d6f --- /dev/null +++ b/packages/data-source/src/utils/sql-validation.ts @@ -0,0 +1,156 @@ +import pkg from 'node-sql-parser'; +const { Parser } = pkg; + +export interface QueryTypeCheckResult { + isReadOnly: boolean; + queryType?: string; + error?: string; +} + +// Map data source syntax to node-sql-parser dialect +const DIALECT_MAPPING: Record = { + // Direct mappings + mysql: 'mysql', + postgresql: 'postgresql', + sqlite: 'sqlite', + mariadb: 'mariadb', + bigquery: 'bigquery', + snowflake: 'snowflake', + redshift: 'postgresql', // Redshift uses PostgreSQL dialect + transactsql: 'transactsql', + flinksql: 'flinksql', + hive: 'hive', + + // Alternative names + postgres: 'postgresql', + mssql: 'transactsql', + sqlserver: 'transactsql', + athena: 'postgresql', // Athena uses Presto/PostgreSQL syntax + db2: 'db2', + noql: 'mysql', // Default fallback for NoQL +}; + +function getParserDialect(dataSourceSyntax?: string): string { + if (!dataSourceSyntax) { + return 'postgresql'; + } + + const dialect = DIALECT_MAPPING[dataSourceSyntax.toLowerCase()]; + if (!dialect) { + return 'postgresql'; + } + + return dialect; +} + +/** + * Checks if a SQL query is read-only (SELECT only, no INSERT/UPDATE/DELETE/DDL) + * @param sql - The SQL query to validate + * @param dataSourceSyntax - Optional data source syntax for dialect-specific parsing + * @returns Result indicating if query is read-only with optional error message + */ +export function checkQueryIsReadOnly(sql: string, dataSourceSyntax?: string): QueryTypeCheckResult { + const dialect = getParserDialect(dataSourceSyntax); + const parser = new Parser(); + + try { + // Parse SQL into AST with the appropriate dialect + const ast = parser.astify(sql, { database: dialect }); + + // Handle single statement or array of statements + const statements = Array.isArray(ast) ? ast : [ast]; + + // Check each statement + for (const statement of statements) { + // Check if statement has a type property + if ('type' in statement && statement.type) { + const queryType = statement.type.toLowerCase(); + + // Only allow SELECT statements + if (queryType !== 'select') { + // Provide specific guidance based on the query type + let guidance = ''; + switch (queryType) { + case 'insert': + guidance = ' To read data, use SELECT statements instead of INSERT.'; + break; + case 'update': + guidance = ' To read data, use SELECT statements instead of UPDATE.'; + break; + case 'delete': + guidance = ' To read data, use SELECT statements instead of DELETE.'; + break; + case 'create': + guidance = + ' DDL operations like CREATE are not permitted. Use SELECT to query existing data.'; + break; + case 'drop': + guidance = + ' DDL operations like DROP are not permitted. Use SELECT to query existing data.'; + break; + case 'alter': + guidance = + ' DDL operations like ALTER are not permitted. Use SELECT to query existing data.'; + break; + case 'truncate': + guidance = + ' DDL operations like TRUNCATE are not permitted. Use SELECT to query existing data.'; + break; + case 'grant': + case 'revoke': + guidance = + ' Permission management statements are not allowed. Use SELECT statements to read data.'; + break; + default: + guidance = ' Please use SELECT statements to query data.'; + } + + return { + isReadOnly: false, + queryType, + error: `Query type '${queryType.toUpperCase()}' is not allowed. Only SELECT statements are permitted for read-only access.${guidance}`, + }; + } + } + } + + return { + isReadOnly: true, + queryType: 'select', + }; + } catch (error) { + // If we can't parse the SQL, err on the side of caution + const errorMessage = error instanceof Error ? error.message : 'Unknown parsing error'; + + // Check for common write operations in the raw SQL as a fallback + const sqlLower = sql.toLowerCase(); + const writeKeywords = [ + 'insert', + 'update', + 'delete', + 'create', + 'alter', + 'drop', + 'truncate', + 'grant', + 'revoke', + ]; + + for (const keyword of writeKeywords) { + // Simple regex to check for keywords at word boundaries + const regex = new RegExp(`\\b${keyword}\\b`); + if (regex.test(sqlLower)) { + return { + isReadOnly: false, + error: `Query appears to contain write operation (${keyword.toUpperCase()}). Only SELECT statements are allowed.`, + }; + } + } + + // If parsing failed, return error to be safe + return { + isReadOnly: false, + error: `Failed to parse SQL query for validation: ${errorMessage}. Please ensure your SQL syntax is valid. Only SELECT statements are allowed for read-only access.`, + }; + } +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 583edc59c..8a1075852 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -949,6 +949,9 @@ importers: '@buster/env-utils': specifier: workspace:* version: link:../env-utils + '@buster/server-shared': + specifier: workspace:* + version: link:../server-shared '@buster/typescript-config': specifier: workspace:* version: link:../typescript-config @@ -967,6 +970,9 @@ importers: mysql2: specifier: ^3.14.1 version: 3.14.1 + node-sql-parser: + specifier: ^5.3.11 + version: 5.3.11 pg: specifier: 'catalog:' version: 8.16.3 @@ -976,6 +982,9 @@ importers: snowflake-sdk: specifier: ^2.1.1 version: 2.1.1(asn1.js@5.4.1) + zod: + specifier: ^3.23.8 + version: 3.25.76 devDependencies: '@types/mssql': specifier: ^9.1.7 @@ -9600,6 +9609,10 @@ packages: resolution: {integrity: sha512-cf+iXXJ9Foz4hBIu+eNNeg207ac6XruA9I9DXEs+jCxeS9t/k9T0GZK8NZngPwkv+P26i3zNFj9jxJU2v3pJnw==} engines: {node: '>=8'} + node-sql-parser@5.3.11: + resolution: {integrity: sha512-1ZcXu2qEFZlMkkaYPpMoTkypTunYTvtnInzVjWDhjc1+FS5h/WqT1rbgP2pP42/nXBWfPVN66mEfidWSpYVs1Q==} + engines: {node: '>=8'} + normalize-path@3.0.0: resolution: {integrity: sha512-6eZs5Ls3WtCisHWp9S2GUy8dqkpGi4BVSz3GaqiE6ezub0512ESztXUwUB6C6IKbQkY2Pnb/mD4WYojCRwcwLA==} engines: {node: '>=0.10.0'} @@ -22886,6 +22899,11 @@ snapshots: '@types/pegjs': 0.10.6 big-integer: 1.6.52 + node-sql-parser@5.3.11: + dependencies: + '@types/pegjs': 0.10.6 + big-integer: 1.6.52 + normalize-path@3.0.0: {} npm-run-path@5.3.0: