From 7fe7ecb00be5081d46cbf78324641e74af3ac692 Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 11 Jul 2025 14:54:34 -0600 Subject: [PATCH 1/2] Refactor SQL execution logic to limit results at the adapter level instead of modifying SQL queries. Removed the wrapQueryWithLimit function and updated related code to enhance performance and preserve caching in Snowflake. Updated documentation to reflect changes in query handling. --- .../src/tools/database-tools/execute-sql.ts | 19 +- .../create-metrics-file-tool.ts | 27 +-- .../modify-metrics-file-tool.ts | 27 +-- .../data-source/src/adapters/snowflake.ts | 20 +- .../snowflake-memory-protection.test.ts | 198 ++++++++++++++++++ 5 files changed, 218 insertions(+), 73 deletions(-) create mode 100644 packages/data-source/tests/integration/adapters/snowflake-memory-protection.test.ts diff --git a/packages/ai/src/tools/database-tools/execute-sql.ts b/packages/ai/src/tools/database-tools/execute-sql.ts index 8c6c9f547..d3233260f 100644 --- a/packages/ai/src/tools/database-tools/execute-sql.ts +++ b/packages/ai/src/tools/database-tools/execute-sql.ts @@ -6,14 +6,12 @@ import { z } from 'zod'; import { getWorkflowDataSourceManager } from '../../utils/data-source-manager'; import { createPermissionErrorMessage, validateSqlPermissions } from '../../utils/sql-permissions'; import type { AnalystRuntimeContext } from '../../workflows/analyst-workflow'; -import { ensureSqlLimit } from './sql-limit-helper'; const executeSqlStatementInputSchema = z.object({ statements: z.array(z.string()).describe( `Array of lightweight, optimized SQL statements to execute. Each statement should be small and focused. - SELECT queries without a LIMIT clause will automatically have LIMIT 25 added for performance. - Existing LIMIT clauses will be preserved. + Query results will be limited to 25 rows for performance. YOU MUST USE THE . syntax/qualifier for all table names. NEVER use SELECT * - you must explicitly list the columns you want to query from the documentation provided. NEVER query system tables or use 'SHOW' statements as these will fail to execute. @@ -363,9 +361,6 @@ async function executeSingleStatement( return { success: false, error: 'SQL statement cannot be empty' }; } - // Ensure the SQL statement has a LIMIT clause to prevent excessive results - const limitedSql = ensureSqlLimit(sqlStatement, 25); - // Validate permissions before execution const userId = runtimeContext.get('userId'); if (!userId) { @@ -373,7 +368,7 @@ async function executeSingleStatement( } const dataSourceSyntax = runtimeContext.get('dataSourceSyntax'); - const permissionResult = await validateSqlPermissions(limitedSql, userId, dataSourceSyntax); + const permissionResult = await validateSqlPermissions(sqlStatement, userId, dataSourceSyntax); if (!permissionResult.isAuthorized) { return { success: false, @@ -385,10 +380,12 @@ async function executeSingleStatement( for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { try { // Execute the SQL query using the DataSource with timeout + // Pass maxRows to the adapter instead of modifying the SQL const result = await dataSource.execute({ - sql: limitedSql, + sql: sqlStatement, options: { timeout: TIMEOUT_MS, + maxRows: 25, // Limit results at the adapter level, not in SQL }, }); @@ -411,7 +408,7 @@ async function executeSingleStatement( console.warn( `[execute-sql] Query timeout on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`, { - sql: `${limitedSql.substring(0, 100)}...`, + sql: `${sqlStatement.substring(0, 100)}...`, attempt: attempt + 1, nextDelay: delay, } @@ -437,7 +434,7 @@ async function executeSingleStatement( console.warn( `[execute-sql] Query timeout (exception) on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`, { - sql: `${limitedSql.substring(0, 100)}...`, + sql: `${sqlStatement.substring(0, 100)}...`, attempt: attempt + 1, nextDelay: delay, error: errorMessage, @@ -466,7 +463,7 @@ async function executeSingleStatement( export const executeSql = createTool({ id: 'execute-sql', description: `Use this to run lightweight, validation queries to understand values in columns, date ranges, etc. - SELECT queries without LIMIT will automatically have LIMIT 25 added. + Query results will be limited to 25 rows for performance. You must use the . syntax/qualifier for all table names. Otherwise the queries wont run successfully.`, inputSchema: executeSqlStatementInputSchema, diff --git a/packages/ai/src/tools/visualization-tools/create-metrics-file-tool.ts b/packages/ai/src/tools/visualization-tools/create-metrics-file-tool.ts index 0f3f2b9e2..27977e03e 100644 --- a/packages/ai/src/tools/visualization-tools/create-metrics-file-tool.ts +++ b/packages/ai/src/tools/visualization-tools/create-metrics-file-tool.ts @@ -171,26 +171,6 @@ function createDataMetadata(results: Record[]): DataMetadata { }; } -/** - * Wraps a SQL query with a LIMIT clause for validation purposes - * Handles existing LIMIT clauses and complex queries - */ -function wrapQueryWithLimit(sql: string, limit: number): string { - // Remove any existing LIMIT clause to avoid conflicts - const sqlWithoutLimit = sql.replace(/\s+LIMIT\s+\d+\s*$/i, '').trim(); - - // For CTEs or complex queries, wrap the entire query - if ( - sqlWithoutLimit.toUpperCase().includes('WITH ') || - sqlWithoutLimit.includes('(') || - sqlWithoutLimit.toUpperCase().includes('UNION') - ) { - return `SELECT * FROM (${sqlWithoutLimit}) AS validation_wrapper LIMIT ${limit}`; - } - - // For simple queries, just append LIMIT - return `${sqlWithoutLimit} LIMIT ${limit}`; -} /** * Ensures timeFrame values are properly quoted in YAML content @@ -1373,13 +1353,10 @@ async function validateSql( // Attempt execution with retries for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { try { - // For validation, wrap query with LIMIT at SQL level for better performance - // This ensures Snowflake doesn't process the entire dataset - const validationSql = wrapQueryWithLimit(sqlQuery, 1000); - // Execute the SQL query using the DataSource with row limit and timeout for validation + // Use maxRows to limit results without modifying the SQL query (preserves Snowflake caching) const result = await dataSource.execute({ - sql: validationSql, + sql: sqlQuery, options: { maxRows: 1000, // Additional safety limit at adapter level timeout: TIMEOUT_MS, diff --git a/packages/ai/src/tools/visualization-tools/modify-metrics-file-tool.ts b/packages/ai/src/tools/visualization-tools/modify-metrics-file-tool.ts index c9fa720d2..fb22ca5d7 100644 --- a/packages/ai/src/tools/visualization-tools/modify-metrics-file-tool.ts +++ b/packages/ai/src/tools/visualization-tools/modify-metrics-file-tool.ts @@ -162,26 +162,6 @@ function createDataMetadata(results: Record[]): DataMetadata { }; } -/** - * Wraps a SQL query with a LIMIT clause for validation purposes - * Handles existing LIMIT clauses and complex queries - */ -function wrapQueryWithLimit(sql: string, limit: number): string { - // Remove any existing LIMIT clause to avoid conflicts - const sqlWithoutLimit = sql.replace(/\s+LIMIT\s+\d+\s*$/i, '').trim(); - - // For CTEs or complex queries, wrap the entire query - if ( - sqlWithoutLimit.toUpperCase().includes('WITH ') || - sqlWithoutLimit.includes('(') || - sqlWithoutLimit.toUpperCase().includes('UNION') - ) { - return `SELECT * FROM (${sqlWithoutLimit}) AS validation_wrapper LIMIT ${limit}`; - } - - // For simple queries, just append LIMIT - return `${sqlWithoutLimit} LIMIT ${limit}`; -} /** * Ensures timeFrame values are properly quoted in YAML content @@ -332,13 +312,10 @@ async function validateSql( // Attempt execution with retries for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { try { - // For validation, wrap query with LIMIT at SQL level for better performance - // This ensures Snowflake doesn't process the entire dataset - const validationSql = wrapQueryWithLimit(sqlQuery, 1000); - // Execute the SQL query using the DataSource with row limit and timeout for validation + // Use maxRows to limit results without modifying the SQL query (preserves Snowflake caching) const result = await dataSource.execute({ - sql: validationSql, + sql: sqlQuery, options: { maxRows: 1000, // Additional safety limit at adapter level timeout: TIMEOUT_MS, diff --git a/packages/data-source/src/adapters/snowflake.ts b/packages/data-source/src/adapters/snowflake.ts index c3add49b3..a91184cf9 100644 --- a/packages/data-source/src/adapters/snowflake.ts +++ b/packages/data-source/src/adapters/snowflake.ts @@ -18,6 +18,8 @@ interface SnowflakeStatement { getScale(): number; getPrecision(): number; }>; + streamRows?: () => NodeJS.ReadableStream; + cancel?: (callback: (err: Error | undefined) => void) => void; } // Configure Snowflake SDK to disable logging @@ -189,16 +191,9 @@ export class SnowflakeAdapter extends BaseAdapter { // Set query timeout if specified (default: 120 seconds for Snowflake queue handling) const timeoutMs = timeout || TIMEOUT_CONFIG.query.default; - // For maxRows, we'll fetch maxRows + 1 to determine if there are more rows - let effectiveSql = sql; - if (maxRows && maxRows > 0) { - // Check if query already has LIMIT - const upperSql = sql.toUpperCase(); - if (!upperSql.includes(' LIMIT ')) { - effectiveSql = `${sql} LIMIT ${maxRows + 1}`; - } - } - + // IMPORTANT: Execute the original SQL unchanged to leverage Snowflake's query caching + // For memory protection, we'll fetch all rows but limit in memory + // This is a compromise to preserve caching while preventing OOM on truly massive queries const queryPromise = new Promise<{ rows: Record[]; statement: SnowflakeStatement; @@ -207,8 +202,9 @@ export class SnowflakeAdapter extends BaseAdapter { reject(new Error('Failed to acquire Snowflake connection')); return; } + connection.execute({ - sqlText: effectiveSql, + sqlText: sql, // Use original SQL unchanged for caching binds: params as snowflake.Binds, complete: ( err: SnowflakeError | undefined, @@ -235,7 +231,7 @@ export class SnowflakeAdapter extends BaseAdapter { precision: col.getPrecision() > 0 ? col.getPrecision() : 0, })) || []; - // Handle maxRows logic + // Handle maxRows logic in memory (not in SQL) let finalRows = result.rows; let hasMoreRows = false; diff --git a/packages/data-source/tests/integration/adapters/snowflake-memory-protection.test.ts b/packages/data-source/tests/integration/adapters/snowflake-memory-protection.test.ts new file mode 100644 index 000000000..7f00541da --- /dev/null +++ b/packages/data-source/tests/integration/adapters/snowflake-memory-protection.test.ts @@ -0,0 +1,198 @@ +import { afterEach, beforeEach, describe, expect } from 'vitest'; +import { SnowflakeAdapter } from '../../../src/adapters/snowflake'; +import { DataSourceType } from '../../../src/types/credentials'; +import type { SnowflakeCredentials } from '../../../src/types/credentials'; +import { TEST_TIMEOUT, skipIfNoCredentials, testConfig } from '../../setup'; + +const testWithCredentials = skipIfNoCredentials('snowflake'); + +describe('Snowflake Memory Protection Tests', () => { + let adapter: SnowflakeAdapter; + let credentials: SnowflakeCredentials; + + beforeEach(() => { + adapter = new SnowflakeAdapter(); + + // Set up credentials once + if ( + !testConfig.snowflake.account_id || + !testConfig.snowflake.warehouse_id || + !testConfig.snowflake.username || + !testConfig.snowflake.password || + !testConfig.snowflake.default_database + ) { + throw new Error( + 'TEST_SNOWFLAKE_ACCOUNT_ID, TEST_SNOWFLAKE_WAREHOUSE_ID, TEST_SNOWFLAKE_USERNAME, TEST_SNOWFLAKE_PASSWORD, and TEST_SNOWFLAKE_DATABASE are required for this test' + ); + } + + credentials = { + type: DataSourceType.Snowflake, + account_id: testConfig.snowflake.account_id, + warehouse_id: testConfig.snowflake.warehouse_id, + username: testConfig.snowflake.username, + password: testConfig.snowflake.password, + default_database: testConfig.snowflake.default_database, + default_schema: testConfig.snowflake.default_schema, + role: testConfig.snowflake.role, + }; + }); + + afterEach(async () => { + if (adapter) { + await adapter.close(); + } + }); + + testWithCredentials( + 'should handle large result sets with maxRows without running out of memory', + async () => { + await adapter.initialize(credentials); + + // NOTE: Due to Snowflake SDK limitations, we cannot truly stream results + // For now, we'll test with a smaller dataset to avoid OOM + // Query ORDERS table instead of LINEITEM (1.5M rows vs 6M rows) + const result = await adapter.query( + 'SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS', + undefined, + 100 // Only fetch 100 rows + ); + + expect(result.rows.length).toBe(100); + expect(result.hasMoreRows).toBe(true); + expect(result.rowCount).toBe(100); + + // Verify we got the fields metadata + expect(result.fields.length).toBeGreaterThan(0); + expect(result.fields[0]).toHaveProperty('name'); + expect(result.fields[0]).toHaveProperty('type'); + }, + TEST_TIMEOUT + ); + + testWithCredentials( + 'should preserve query caching when running the same query multiple times', + async () => { + await adapter.initialize(credentials); + + const sql = 'SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER WHERE C_MKTSEGMENT = ?'; + const params = ['AUTOMOBILE']; + + // First execution - will be cached by Snowflake + const start1 = Date.now(); + const result1 = await adapter.query(sql, params, 50); + const time1 = Date.now() - start1; + + // Second execution - should hit Snowflake's cache + const start2 = Date.now(); + const result2 = await adapter.query(sql, params, 50); + const time2 = Date.now() - start2; + + // Third execution with different maxRows - should still hit cache + const start3 = Date.now(); + const result3 = await adapter.query(sql, params, 25); + const time3 = Date.now() - start3; + + // Verify results + expect(result1.rows.length).toBe(50); + expect(result2.rows.length).toBe(50); + expect(result3.rows.length).toBe(25); + + // All should indicate more rows available + expect(result1.hasMoreRows).toBe(true); + expect(result2.hasMoreRows).toBe(true); + expect(result3.hasMoreRows).toBe(true); + + // Cache hits should be faster (allowing for some variance) + console.info(`Query times: ${time1}ms, ${time2}ms, ${time3}ms`); + + // The cached queries (2nd and 3rd) should generally be faster than the first + // We use a loose check because network latency can vary + const avgCachedTime = (time2 + time3) / 2; + expect(avgCachedTime).toBeLessThanOrEqual(time1 * 1.5); // Allow 50% variance + }, + TEST_TIMEOUT + ); + + testWithCredentials( + 'should handle queries with no maxRows (fetch all results)', + async () => { + await adapter.initialize(credentials); + + // Query a small table without maxRows + const result = await adapter.query( + 'SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.REGION' + ); + + // REGION table has exactly 5 rows + expect(result.rows.length).toBe(5); + expect(result.hasMoreRows).toBe(false); + expect(result.rowCount).toBe(5); + }, + TEST_TIMEOUT + ); + + testWithCredentials( + 'should handle maxRows=1 correctly', + async () => { + await adapter.initialize(credentials); + + const result = await adapter.query( + 'SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.NATION ORDER BY N_NATIONKEY', + undefined, + 1 + ); + + expect(result.rows.length).toBe(1); + expect(result.hasMoreRows).toBe(true); + expect(result.rows[0]).toHaveProperty('N_NATIONKEY', 0); // First nation + }, + TEST_TIMEOUT + ); + + testWithCredentials( + 'should handle edge case where result set equals maxRows', + async () => { + await adapter.initialize(credentials); + + // REGION table has exactly 5 rows + const result = await adapter.query( + 'SELECT * FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.REGION', + undefined, + 5 + ); + + expect(result.rows.length).toBe(5); + expect(result.hasMoreRows).toBe(false); // No more rows available + expect(result.rowCount).toBe(5); + }, + TEST_TIMEOUT + ); + + testWithCredentials( + 'should handle complex queries with CTEs and maxRows', + async () => { + await adapter.initialize(credentials); + + const sql = ` + WITH high_value_orders AS ( + SELECT O_CUSTKEY, SUM(O_TOTALPRICE) as total_spent + FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.ORDERS + GROUP BY O_CUSTKEY + HAVING SUM(O_TOTALPRICE) > 500000 + ) + SELECT c.C_NAME, c.C_PHONE, h.total_spent + FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER c + JOIN high_value_orders h ON c.C_CUSTKEY = h.O_CUSTKEY + ORDER BY h.total_spent DESC + `; + + const result = await adapter.query(sql, undefined, 10); + + expect(result.rows.length).toBe(10); + expect(result.hasMoreRows).toBe(true); + expect(result.fields.length).toBe(3); // C_NAME, C_PHONE, total_spent + }, + TEST_TIMEOUT + ); +}); \ No newline at end of file From 550aba77e680deb45c76174fbb4709a0a13dd523 Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 11 Jul 2025 15:28:01 -0600 Subject: [PATCH 2/2] execute sql fixes --- packages/ai/src/tools/database-tools/execute-sql.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/ai/src/tools/database-tools/execute-sql.ts b/packages/ai/src/tools/database-tools/execute-sql.ts index d3233260f..c4f851c5d 100644 --- a/packages/ai/src/tools/database-tools/execute-sql.ts +++ b/packages/ai/src/tools/database-tools/execute-sql.ts @@ -11,7 +11,8 @@ const executeSqlStatementInputSchema = z.object({ statements: z.array(z.string()).describe( `Array of lightweight, optimized SQL statements to execute. Each statement should be small and focused. - Query results will be limited to 25 rows for performance. + SELECT queries without a LIMIT clause will automatically have LIMIT 50 added for performance. + Existing LIMIT clauses will be preserved. YOU MUST USE THE . syntax/qualifier for all table names. NEVER use SELECT * - you must explicitly list the columns you want to query from the documentation provided. NEVER query system tables or use 'SHOW' statements as these will fail to execute. @@ -385,7 +386,7 @@ async function executeSingleStatement( sql: sqlStatement, options: { timeout: TIMEOUT_MS, - maxRows: 25, // Limit results at the adapter level, not in SQL + maxRows: 50, // Limit results at the adapter level, not in SQL }, }); @@ -463,7 +464,8 @@ async function executeSingleStatement( export const executeSql = createTool({ id: 'execute-sql', description: `Use this to run lightweight, validation queries to understand values in columns, date ranges, etc. - Query results will be limited to 25 rows for performance. + Please limit your queries to 50 rows for performance. + Query results will be limited to 50 rows for performance. You must use the . syntax/qualifier for all table names. Otherwise the queries wont run successfully.`, inputSchema: executeSqlStatementInputSchema,