diff --git a/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts b/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts index e3f30b596..f00436e73 100644 --- a/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts +++ b/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts @@ -1,9 +1,20 @@ -import { afterEach, beforeEach, describe, expect } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { DataSourceType } from '../types/credentials'; import type { SnowflakeCredentials } from '../types/credentials'; import { SnowflakeAdapter } from './snowflake'; -const testWithCredentials = skipIfNoCredentials('snowflake'); +// Check if Snowflake test credentials are available +const hasSnowflakeCredentials = !!( + process.env.TEST_SNOWFLAKE_DATABASE && + process.env.TEST_SNOWFLAKE_USERNAME && + process.env.TEST_SNOWFLAKE_PASSWORD && + process.env.TEST_SNOWFLAKE_ACCOUNT_ID +); + +// Skip tests if credentials are not available +const testWithCredentials = hasSnowflakeCredentials ? it : it.skip; + +const TEST_TIMEOUT = 30000; describe('Snowflake Memory Protection Tests', () => { let adapter: SnowflakeAdapter; @@ -12,28 +23,16 @@ describe('Snowflake Memory Protection Tests', () => { beforeEach(() => { adapter = new SnowflakeAdapter(); - // Set up credentials once - if ( - !testConfig.snowflake.account_id || - !testConfig.snowflake.warehouse_id || - !testConfig.snowflake.username || - !testConfig.snowflake.password || - !testConfig.snowflake.default_database - ) { - throw new Error( - 'TEST_SNOWFLAKE_ACCOUNT_ID, TEST_SNOWFLAKE_WAREHOUSE_ID, TEST_SNOWFLAKE_USERNAME, TEST_SNOWFLAKE_PASSWORD, and TEST_SNOWFLAKE_DATABASE are required for this test' - ); - } - + // Set up credentials from environment variables credentials = { type: DataSourceType.Snowflake, - account_id: testConfig.snowflake.account_id, - warehouse_id: testConfig.snowflake.warehouse_id, - username: testConfig.snowflake.username, - password: testConfig.snowflake.password, - default_database: testConfig.snowflake.default_database, - default_schema: testConfig.snowflake.default_schema, - role: testConfig.snowflake.role, + account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, + warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', + default_database: process.env.TEST_SNOWFLAKE_DATABASE!, + default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', + username: process.env.TEST_SNOWFLAKE_USERNAME!, + password: process.env.TEST_SNOWFLAKE_PASSWORD!, + role: process.env.TEST_SNOWFLAKE_ROLE, }; }); diff --git a/packages/data-source/src/adapters/snowflake.int.test.ts b/packages/data-source/src/adapters/snowflake.int.test.ts index 309f585f3..10783bd1a 100644 --- a/packages/data-source/src/adapters/snowflake.int.test.ts +++ b/packages/data-source/src/adapters/snowflake.int.test.ts @@ -35,10 +35,10 @@ describe('SnowflakeAdapter Integration', () => { async () => { const credentials: SnowflakeCredentials = { type: DataSourceType.Snowflake, - account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, - warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', - database: process.env.TEST_SNOWFLAKE_DATABASE!, - schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', + account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, + warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', + default_database: process.env.TEST_SNOWFLAKE_DATABASE!, + default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', username: process.env.TEST_SNOWFLAKE_USERNAME!, password: process.env.TEST_SNOWFLAKE_PASSWORD!, role: process.env.TEST_SNOWFLAKE_ROLE, @@ -56,10 +56,10 @@ describe('SnowflakeAdapter Integration', () => { async () => { const credentials: SnowflakeCredentials = { type: DataSourceType.Snowflake, - account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, - warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', - database: process.env.TEST_SNOWFLAKE_DATABASE!, - schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', + account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, + warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', + default_database: process.env.TEST_SNOWFLAKE_DATABASE!, + default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', username: process.env.TEST_SNOWFLAKE_USERNAME!, password: process.env.TEST_SNOWFLAKE_PASSWORD!, role: process.env.TEST_SNOWFLAKE_ROLE, @@ -81,10 +81,10 @@ describe('SnowflakeAdapter Integration', () => { async () => { const credentials: SnowflakeCredentials = { type: DataSourceType.Snowflake, - account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, - warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', - database: process.env.TEST_SNOWFLAKE_DATABASE!, - schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', + account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, + warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', + default_database: process.env.TEST_SNOWFLAKE_DATABASE!, + default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', username: process.env.TEST_SNOWFLAKE_USERNAME!, password: process.env.TEST_SNOWFLAKE_PASSWORD!, role: process.env.TEST_SNOWFLAKE_ROLE, @@ -108,10 +108,10 @@ describe('SnowflakeAdapter Integration', () => { async () => { const credentials: SnowflakeCredentials = { type: DataSourceType.Snowflake, - account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, - warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', - database: process.env.TEST_SNOWFLAKE_DATABASE!, - schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', + account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, + warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', + default_database: process.env.TEST_SNOWFLAKE_DATABASE!, + default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', username: process.env.TEST_SNOWFLAKE_USERNAME!, password: process.env.TEST_SNOWFLAKE_PASSWORD!, role: process.env.TEST_SNOWFLAKE_ROLE, @@ -133,9 +133,9 @@ describe('SnowflakeAdapter Integration', () => { async () => { const invalidCredentials: SnowflakeCredentials = { type: DataSourceType.Snowflake, - account: 'invalid-account', - warehouse: 'INVALID_WH', - database: 'invalid-db', + account_id: 'invalid-account', + warehouse_id: 'INVALID_WH', + default_database: 'invalid-db', username: 'invalid-user', password: 'invalid-pass', }; diff --git a/packages/data-source/src/adapters/snowflake.test.ts b/packages/data-source/src/adapters/snowflake.test.ts index 239089a9c..9f660d05d 100644 --- a/packages/data-source/src/adapters/snowflake.test.ts +++ b/packages/data-source/src/adapters/snowflake.test.ts @@ -159,29 +159,42 @@ describe('SnowflakeAdapter', () => { it('should execute simple query without parameters', async () => { const mockRows = [{ ID: 1, NAME: 'Test' }]; - mockConnection.execute.mockImplementation(({ complete }) => { - complete( - null, + const mockStream = { + on: vi.fn(), + }; + + const mockStatement = { + getColumns: () => [ { - getColumns: () => [ - { - getName: () => 'ID', - getType: () => 'NUMBER', - isNullable: () => false, - getScale: () => 0, - getPrecision: () => 38, - }, - { - getName: () => 'NAME', - getType: () => 'TEXT', - isNullable: () => true, - getScale: () => 0, - getPrecision: () => 0, - }, - ], + getName: () => 'ID', + getType: () => 'NUMBER', + isNullable: () => false, + getScale: () => 0, + getPrecision: () => 38, }, - mockRows - ); + { + getName: () => 'NAME', + getType: () => 'TEXT', + isNullable: () => true, + getScale: () => 0, + getPrecision: () => 0, + }, + ], + streamRows: vi.fn().mockReturnValue(mockStream), + }; + + mockConnection.execute.mockImplementation(({ complete, streamResult }) => { + expect(streamResult).toBe(true); + complete(null, mockStatement); + }); + + mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => { + if (event === 'data') { + setTimeout(() => handler(mockRows[0]), 0); + } else if (event === 'end') { + setTimeout(() => handler(), 0); + } + return mockStream; }); const result = await adapter.query('SELECT * FROM users'); @@ -189,9 +202,12 @@ describe('SnowflakeAdapter', () => { expect(mockConnection.execute).toHaveBeenCalledWith({ sqlText: 'SELECT * FROM users', binds: undefined, + streamResult: true, complete: expect.any(Function), }); + expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 5000 }); + expect(result).toEqual({ rows: mockRows, rowCount: 1, @@ -205,58 +221,88 @@ describe('SnowflakeAdapter', () => { it('should execute parameterized query', async () => { const mockRows = [{ ID: 1 }]; - mockConnection.execute.mockImplementation(({ complete }) => { - complete( - null, + const mockStream = { + on: vi.fn(), + }; + + const mockStatement = { + getColumns: () => [ { - getColumns: () => [ - { - getName: () => 'ID', - getType: () => 'NUMBER', - isNullable: () => false, - getScale: () => 0, - getPrecision: () => 38, - }, - ], + getName: () => 'ID', + getType: () => 'NUMBER', + isNullable: () => false, + getScale: () => 0, + getPrecision: () => 38, }, - mockRows - ); + ], + streamRows: vi.fn().mockReturnValue(mockStream), + }; + + mockConnection.execute.mockImplementation(({ complete, streamResult }) => { + expect(streamResult).toBe(true); + complete(null, mockStatement); + }); + + mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => { + if (event === 'data') { + setTimeout(() => handler(mockRows[0]), 0); + } else if (event === 'end') { + setTimeout(() => handler(), 0); + } + return mockStream; }); const result = await adapter.query('SELECT * FROM users WHERE id = ?', [1]); expect(result.rows).toEqual(mockRows); + expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 5000 }); }); it('should handle maxRows limit', async () => { - const mockRows = Array.from({ length: 15 }, (_, i) => ({ ID: i + 1 })); + const mockRows = Array.from({ length: 10 }, (_, i) => ({ ID: i + 1 })); + const mockStream = { + on: vi.fn(), + }; - mockConnection.execute.mockImplementation(({ complete }) => { - complete( - null, + const mockStatement = { + getColumns: () => [ { - getColumns: () => [ - { - getName: () => 'ID', - getType: () => 'NUMBER', - isNullable: () => false, - getScale: () => 0, - getPrecision: () => 38, - }, - ], + getName: () => 'ID', + getType: () => 'NUMBER', + isNullable: () => false, + getScale: () => 0, + getPrecision: () => 38, }, - mockRows - ); + ], + streamRows: vi.fn().mockReturnValue(mockStream), + }; + + mockConnection.execute.mockImplementation(({ complete, streamResult }) => { + expect(streamResult).toBe(true); + complete(null, mockStatement); + }); + + mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => { + if (event === 'data') { + setTimeout(() => { + mockRows.forEach((row) => handler(row)); + }, 0); + } else if (event === 'end') { + setTimeout(() => handler(), 0); + } + return mockStream; }); const result = await adapter.query('SELECT * FROM users', [], 10); + expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 10 }); expect(result.rows).toHaveLength(10); - expect(result.hasMoreRows).toBe(true); + expect(result.hasMoreRows).toBe(true); // Since we got exactly the limit }); it('should handle query errors', async () => { - mockConnection.execute.mockImplementation(({ complete }) => { + mockConnection.execute.mockImplementation(({ complete, streamResult }) => { + expect(streamResult).toBe(true); complete(new Error('Query failed')); }); @@ -274,29 +320,40 @@ describe('SnowflakeAdapter', () => { }); it('should handle empty result sets', async () => { - mockConnection.execute.mockImplementation(({ complete }) => { - complete( - null, + const mockStream = { + on: vi.fn(), + }; + + const mockStatement = { + getColumns: () => [ { - getColumns: () => [ - { - getName: () => 'ID', - getType: () => 'NUMBER', - isNullable: () => false, - getScale: () => 0, - getPrecision: () => 38, - }, - { - getName: () => 'NAME', - getType: () => 'TEXT', - isNullable: () => true, - getScale: () => 0, - getPrecision: () => 0, - }, - ], + getName: () => 'ID', + getType: () => 'NUMBER', + isNullable: () => false, + getScale: () => 0, + getPrecision: () => 38, }, - [] - ); + { + getName: () => 'NAME', + getType: () => 'TEXT', + isNullable: () => true, + getScale: () => 0, + getPrecision: () => 0, + }, + ], + streamRows: vi.fn().mockReturnValue(mockStream), + }; + + mockConnection.execute.mockImplementation(({ complete, streamResult }) => { + expect(streamResult).toBe(true); + complete(null, mockStatement); + }); + + mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => { + if (event === 'end') { + setTimeout(() => handler(), 0); + } + return mockStream; }); const result = await adapter.query('SELECT * FROM users WHERE 1=0'); @@ -304,6 +361,7 @@ describe('SnowflakeAdapter', () => { expect(result.rows).toEqual([]); expect(result.rowCount).toBe(0); expect(result.fields).toHaveLength(2); + expect(result.hasMoreRows).toBe(false); }); it('should handle query timeout', async () => { diff --git a/packages/data-source/src/adapters/snowflake.ts b/packages/data-source/src/adapters/snowflake.ts index 8618ef1f2..9a24e39ee 100644 --- a/packages/data-source/src/adapters/snowflake.ts +++ b/packages/data-source/src/adapters/snowflake.ts @@ -18,7 +18,7 @@ interface SnowflakeStatement { getScale(): number; getPrecision(): number; }>; - streamRows?: () => NodeJS.ReadableStream; + streamRows?: (options?: { start?: number; end?: number }) => NodeJS.ReadableStream; cancel?: (callback: (err: Error | undefined) => void) => void; } @@ -191,12 +191,12 @@ export class SnowflakeAdapter extends BaseAdapter { // Set query timeout if specified (default: 120 seconds for Snowflake queue handling) const timeoutMs = timeout || TIMEOUT_CONFIG.query.default; - // IMPORTANT: Execute the original SQL unchanged to leverage Snowflake's query caching - // For memory protection, we'll fetch all rows but limit in memory - // This is a compromise to preserve caching while preventing OOM on truly massive queries + const limit = maxRows && maxRows > 0 ? maxRows : 5000; + const queryPromise = new Promise<{ rows: Record[]; statement: SnowflakeStatement; + hasMoreRows: boolean; }>((resolve, reject) => { if (!connection) { reject(new Error('Failed to acquire Snowflake connection')); @@ -206,16 +206,40 @@ export class SnowflakeAdapter extends BaseAdapter { connection.execute({ sqlText: sql, // Use original SQL unchanged for caching binds: params as snowflake.Binds, - complete: ( - err: SnowflakeError | undefined, - stmt: SnowflakeStatement, - rows: Record[] | undefined - ) => { + streamResult: true, // Enable streaming + complete: (err: SnowflakeError | undefined, stmt: SnowflakeStatement) => { if (err) { reject(new Error(`Snowflake query failed: ${err.message}`)); - } else { - resolve({ rows: rows || [], statement: stmt }); + return; } + + const rows: Record[] = []; + let hasMoreRows = false; + + const stream = stmt.streamRows?.({ start: 0, end: limit }); + if (!stream) { + reject(new Error('Snowflake streaming not supported')); + return; + } + + let rowCount = 0; + + stream + .on('data', (row: Record) => { + rows.push(row); + rowCount++; + }) + .on('error', (streamErr: Error) => { + reject(new Error(`Snowflake stream error: ${streamErr.message}`)); + }) + .on('end', () => { + hasMoreRows = rowCount >= limit; + resolve({ + rows, + statement: stmt, + hasMoreRows, + }); + }); }, }); }); @@ -231,20 +255,11 @@ export class SnowflakeAdapter extends BaseAdapter { precision: col.getPrecision() > 0 ? col.getPrecision() : 0, })) || []; - // Handle maxRows logic in memory (not in SQL) - let finalRows = result.rows; - let hasMoreRows = false; - - if (maxRows && maxRows > 0 && result.rows.length > maxRows) { - finalRows = result.rows.slice(0, maxRows); - hasMoreRows = true; - } - const queryResult = { - rows: finalRows, - rowCount: finalRows.length, + rows: result.rows, + rowCount: result.rows.length, fields, - hasMoreRows, + hasMoreRows: result.hasMoreRows, }; return queryResult;