From 315a151a3f5159311115016c63efdd6a1716702c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 23 Jul 2025 13:56:40 +0000 Subject: [PATCH 1/4] BUS-1495: Implement Snowflake adapter streaming with network-level row limiting - Update query() method to use streamResult: true and stmt.streamRows() - Add network-level row limiting with default 5000 row cap - Process stream events (data, error, end) to build result set - Maintain backward compatibility with existing adapter interface - Update unit tests to mock streaming behavior - Fix integration test imports and property names - Preserve query caching by using original SQL unchanged Co-Authored-By: Dallin Bentley --- .../snowflake-memory-protection.int.test.ts | 43 ++-- .../src/adapters/snowflake.int.test.ts | 38 ++-- .../src/adapters/snowflake.test.ts | 204 +++++++++++------- .../data-source/src/adapters/snowflake.ts | 61 ++++-- 4 files changed, 209 insertions(+), 137 deletions(-) diff --git a/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts b/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts index e3f30b596..f00436e73 100644 --- a/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts +++ b/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts @@ -1,9 +1,20 @@ -import { afterEach, beforeEach, describe, expect } from 'vitest'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { DataSourceType } from '../types/credentials'; import type { SnowflakeCredentials } from '../types/credentials'; import { SnowflakeAdapter } from './snowflake'; -const testWithCredentials = skipIfNoCredentials('snowflake'); +// Check if Snowflake test credentials are available +const hasSnowflakeCredentials = !!( + process.env.TEST_SNOWFLAKE_DATABASE && + process.env.TEST_SNOWFLAKE_USERNAME && + process.env.TEST_SNOWFLAKE_PASSWORD && + process.env.TEST_SNOWFLAKE_ACCOUNT_ID +); + +// Skip tests if credentials are not available +const testWithCredentials = hasSnowflakeCredentials ? it : it.skip; + +const TEST_TIMEOUT = 30000; describe('Snowflake Memory Protection Tests', () => { let adapter: SnowflakeAdapter; @@ -12,28 +23,16 @@ describe('Snowflake Memory Protection Tests', () => { beforeEach(() => { adapter = new SnowflakeAdapter(); - // Set up credentials once - if ( - !testConfig.snowflake.account_id || - !testConfig.snowflake.warehouse_id || - !testConfig.snowflake.username || - !testConfig.snowflake.password || - !testConfig.snowflake.default_database - ) { - throw new Error( - 'TEST_SNOWFLAKE_ACCOUNT_ID, TEST_SNOWFLAKE_WAREHOUSE_ID, TEST_SNOWFLAKE_USERNAME, TEST_SNOWFLAKE_PASSWORD, and TEST_SNOWFLAKE_DATABASE are required for this test' - ); - } - + // Set up credentials from environment variables credentials = { type: DataSourceType.Snowflake, - account_id: testConfig.snowflake.account_id, - warehouse_id: testConfig.snowflake.warehouse_id, - username: testConfig.snowflake.username, - password: testConfig.snowflake.password, - default_database: testConfig.snowflake.default_database, - default_schema: testConfig.snowflake.default_schema, - role: testConfig.snowflake.role, + account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, + warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', + default_database: process.env.TEST_SNOWFLAKE_DATABASE!, + default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', + username: process.env.TEST_SNOWFLAKE_USERNAME!, + password: process.env.TEST_SNOWFLAKE_PASSWORD!, + role: process.env.TEST_SNOWFLAKE_ROLE, }; }); diff --git a/packages/data-source/src/adapters/snowflake.int.test.ts b/packages/data-source/src/adapters/snowflake.int.test.ts index 309f585f3..10783bd1a 100644 --- a/packages/data-source/src/adapters/snowflake.int.test.ts +++ b/packages/data-source/src/adapters/snowflake.int.test.ts @@ -35,10 +35,10 @@ describe('SnowflakeAdapter Integration', () => { async () => { const credentials: SnowflakeCredentials = { type: DataSourceType.Snowflake, - account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, - warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', - database: process.env.TEST_SNOWFLAKE_DATABASE!, - schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', + account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, + warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', + default_database: process.env.TEST_SNOWFLAKE_DATABASE!, + default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', username: process.env.TEST_SNOWFLAKE_USERNAME!, password: process.env.TEST_SNOWFLAKE_PASSWORD!, role: process.env.TEST_SNOWFLAKE_ROLE, @@ -56,10 +56,10 @@ describe('SnowflakeAdapter Integration', () => { async () => { const credentials: SnowflakeCredentials = { type: DataSourceType.Snowflake, - account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, - warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', - database: process.env.TEST_SNOWFLAKE_DATABASE!, - schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', + account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, + warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', + default_database: process.env.TEST_SNOWFLAKE_DATABASE!, + default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', username: process.env.TEST_SNOWFLAKE_USERNAME!, password: process.env.TEST_SNOWFLAKE_PASSWORD!, role: process.env.TEST_SNOWFLAKE_ROLE, @@ -81,10 +81,10 @@ describe('SnowflakeAdapter Integration', () => { async () => { const credentials: SnowflakeCredentials = { type: DataSourceType.Snowflake, - account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, - warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', - database: process.env.TEST_SNOWFLAKE_DATABASE!, - schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', + account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, + warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', + default_database: process.env.TEST_SNOWFLAKE_DATABASE!, + default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', username: process.env.TEST_SNOWFLAKE_USERNAME!, password: process.env.TEST_SNOWFLAKE_PASSWORD!, role: process.env.TEST_SNOWFLAKE_ROLE, @@ -108,10 +108,10 @@ describe('SnowflakeAdapter Integration', () => { async () => { const credentials: SnowflakeCredentials = { type: DataSourceType.Snowflake, - account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, - warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', - database: process.env.TEST_SNOWFLAKE_DATABASE!, - schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', + account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!, + warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH', + default_database: process.env.TEST_SNOWFLAKE_DATABASE!, + default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC', username: process.env.TEST_SNOWFLAKE_USERNAME!, password: process.env.TEST_SNOWFLAKE_PASSWORD!, role: process.env.TEST_SNOWFLAKE_ROLE, @@ -133,9 +133,9 @@ describe('SnowflakeAdapter Integration', () => { async () => { const invalidCredentials: SnowflakeCredentials = { type: DataSourceType.Snowflake, - account: 'invalid-account', - warehouse: 'INVALID_WH', - database: 'invalid-db', + account_id: 'invalid-account', + warehouse_id: 'INVALID_WH', + default_database: 'invalid-db', username: 'invalid-user', password: 'invalid-pass', }; diff --git a/packages/data-source/src/adapters/snowflake.test.ts b/packages/data-source/src/adapters/snowflake.test.ts index 239089a9c..9f660d05d 100644 --- a/packages/data-source/src/adapters/snowflake.test.ts +++ b/packages/data-source/src/adapters/snowflake.test.ts @@ -159,29 +159,42 @@ describe('SnowflakeAdapter', () => { it('should execute simple query without parameters', async () => { const mockRows = [{ ID: 1, NAME: 'Test' }]; - mockConnection.execute.mockImplementation(({ complete }) => { - complete( - null, + const mockStream = { + on: vi.fn(), + }; + + const mockStatement = { + getColumns: () => [ { - getColumns: () => [ - { - getName: () => 'ID', - getType: () => 'NUMBER', - isNullable: () => false, - getScale: () => 0, - getPrecision: () => 38, - }, - { - getName: () => 'NAME', - getType: () => 'TEXT', - isNullable: () => true, - getScale: () => 0, - getPrecision: () => 0, - }, - ], + getName: () => 'ID', + getType: () => 'NUMBER', + isNullable: () => false, + getScale: () => 0, + getPrecision: () => 38, }, - mockRows - ); + { + getName: () => 'NAME', + getType: () => 'TEXT', + isNullable: () => true, + getScale: () => 0, + getPrecision: () => 0, + }, + ], + streamRows: vi.fn().mockReturnValue(mockStream), + }; + + mockConnection.execute.mockImplementation(({ complete, streamResult }) => { + expect(streamResult).toBe(true); + complete(null, mockStatement); + }); + + mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => { + if (event === 'data') { + setTimeout(() => handler(mockRows[0]), 0); + } else if (event === 'end') { + setTimeout(() => handler(), 0); + } + return mockStream; }); const result = await adapter.query('SELECT * FROM users'); @@ -189,9 +202,12 @@ describe('SnowflakeAdapter', () => { expect(mockConnection.execute).toHaveBeenCalledWith({ sqlText: 'SELECT * FROM users', binds: undefined, + streamResult: true, complete: expect.any(Function), }); + expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 5000 }); + expect(result).toEqual({ rows: mockRows, rowCount: 1, @@ -205,58 +221,88 @@ describe('SnowflakeAdapter', () => { it('should execute parameterized query', async () => { const mockRows = [{ ID: 1 }]; - mockConnection.execute.mockImplementation(({ complete }) => { - complete( - null, + const mockStream = { + on: vi.fn(), + }; + + const mockStatement = { + getColumns: () => [ { - getColumns: () => [ - { - getName: () => 'ID', - getType: () => 'NUMBER', - isNullable: () => false, - getScale: () => 0, - getPrecision: () => 38, - }, - ], + getName: () => 'ID', + getType: () => 'NUMBER', + isNullable: () => false, + getScale: () => 0, + getPrecision: () => 38, }, - mockRows - ); + ], + streamRows: vi.fn().mockReturnValue(mockStream), + }; + + mockConnection.execute.mockImplementation(({ complete, streamResult }) => { + expect(streamResult).toBe(true); + complete(null, mockStatement); + }); + + mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => { + if (event === 'data') { + setTimeout(() => handler(mockRows[0]), 0); + } else if (event === 'end') { + setTimeout(() => handler(), 0); + } + return mockStream; }); const result = await adapter.query('SELECT * FROM users WHERE id = ?', [1]); expect(result.rows).toEqual(mockRows); + expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 5000 }); }); it('should handle maxRows limit', async () => { - const mockRows = Array.from({ length: 15 }, (_, i) => ({ ID: i + 1 })); + const mockRows = Array.from({ length: 10 }, (_, i) => ({ ID: i + 1 })); + const mockStream = { + on: vi.fn(), + }; - mockConnection.execute.mockImplementation(({ complete }) => { - complete( - null, + const mockStatement = { + getColumns: () => [ { - getColumns: () => [ - { - getName: () => 'ID', - getType: () => 'NUMBER', - isNullable: () => false, - getScale: () => 0, - getPrecision: () => 38, - }, - ], + getName: () => 'ID', + getType: () => 'NUMBER', + isNullable: () => false, + getScale: () => 0, + getPrecision: () => 38, }, - mockRows - ); + ], + streamRows: vi.fn().mockReturnValue(mockStream), + }; + + mockConnection.execute.mockImplementation(({ complete, streamResult }) => { + expect(streamResult).toBe(true); + complete(null, mockStatement); + }); + + mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => { + if (event === 'data') { + setTimeout(() => { + mockRows.forEach((row) => handler(row)); + }, 0); + } else if (event === 'end') { + setTimeout(() => handler(), 0); + } + return mockStream; }); const result = await adapter.query('SELECT * FROM users', [], 10); + expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 10 }); expect(result.rows).toHaveLength(10); - expect(result.hasMoreRows).toBe(true); + expect(result.hasMoreRows).toBe(true); // Since we got exactly the limit }); it('should handle query errors', async () => { - mockConnection.execute.mockImplementation(({ complete }) => { + mockConnection.execute.mockImplementation(({ complete, streamResult }) => { + expect(streamResult).toBe(true); complete(new Error('Query failed')); }); @@ -274,29 +320,40 @@ describe('SnowflakeAdapter', () => { }); it('should handle empty result sets', async () => { - mockConnection.execute.mockImplementation(({ complete }) => { - complete( - null, + const mockStream = { + on: vi.fn(), + }; + + const mockStatement = { + getColumns: () => [ { - getColumns: () => [ - { - getName: () => 'ID', - getType: () => 'NUMBER', - isNullable: () => false, - getScale: () => 0, - getPrecision: () => 38, - }, - { - getName: () => 'NAME', - getType: () => 'TEXT', - isNullable: () => true, - getScale: () => 0, - getPrecision: () => 0, - }, - ], + getName: () => 'ID', + getType: () => 'NUMBER', + isNullable: () => false, + getScale: () => 0, + getPrecision: () => 38, }, - [] - ); + { + getName: () => 'NAME', + getType: () => 'TEXT', + isNullable: () => true, + getScale: () => 0, + getPrecision: () => 0, + }, + ], + streamRows: vi.fn().mockReturnValue(mockStream), + }; + + mockConnection.execute.mockImplementation(({ complete, streamResult }) => { + expect(streamResult).toBe(true); + complete(null, mockStatement); + }); + + mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => { + if (event === 'end') { + setTimeout(() => handler(), 0); + } + return mockStream; }); const result = await adapter.query('SELECT * FROM users WHERE 1=0'); @@ -304,6 +361,7 @@ describe('SnowflakeAdapter', () => { expect(result.rows).toEqual([]); expect(result.rowCount).toBe(0); expect(result.fields).toHaveLength(2); + expect(result.hasMoreRows).toBe(false); }); it('should handle query timeout', async () => { diff --git a/packages/data-source/src/adapters/snowflake.ts b/packages/data-source/src/adapters/snowflake.ts index 8618ef1f2..9a24e39ee 100644 --- a/packages/data-source/src/adapters/snowflake.ts +++ b/packages/data-source/src/adapters/snowflake.ts @@ -18,7 +18,7 @@ interface SnowflakeStatement { getScale(): number; getPrecision(): number; }>; - streamRows?: () => NodeJS.ReadableStream; + streamRows?: (options?: { start?: number; end?: number }) => NodeJS.ReadableStream; cancel?: (callback: (err: Error | undefined) => void) => void; } @@ -191,12 +191,12 @@ export class SnowflakeAdapter extends BaseAdapter { // Set query timeout if specified (default: 120 seconds for Snowflake queue handling) const timeoutMs = timeout || TIMEOUT_CONFIG.query.default; - // IMPORTANT: Execute the original SQL unchanged to leverage Snowflake's query caching - // For memory protection, we'll fetch all rows but limit in memory - // This is a compromise to preserve caching while preventing OOM on truly massive queries + const limit = maxRows && maxRows > 0 ? maxRows : 5000; + const queryPromise = new Promise<{ rows: Record[]; statement: SnowflakeStatement; + hasMoreRows: boolean; }>((resolve, reject) => { if (!connection) { reject(new Error('Failed to acquire Snowflake connection')); @@ -206,16 +206,40 @@ export class SnowflakeAdapter extends BaseAdapter { connection.execute({ sqlText: sql, // Use original SQL unchanged for caching binds: params as snowflake.Binds, - complete: ( - err: SnowflakeError | undefined, - stmt: SnowflakeStatement, - rows: Record[] | undefined - ) => { + streamResult: true, // Enable streaming + complete: (err: SnowflakeError | undefined, stmt: SnowflakeStatement) => { if (err) { reject(new Error(`Snowflake query failed: ${err.message}`)); - } else { - resolve({ rows: rows || [], statement: stmt }); + return; } + + const rows: Record[] = []; + let hasMoreRows = false; + + const stream = stmt.streamRows?.({ start: 0, end: limit }); + if (!stream) { + reject(new Error('Snowflake streaming not supported')); + return; + } + + let rowCount = 0; + + stream + .on('data', (row: Record) => { + rows.push(row); + rowCount++; + }) + .on('error', (streamErr: Error) => { + reject(new Error(`Snowflake stream error: ${streamErr.message}`)); + }) + .on('end', () => { + hasMoreRows = rowCount >= limit; + resolve({ + rows, + statement: stmt, + hasMoreRows, + }); + }); }, }); }); @@ -231,20 +255,11 @@ export class SnowflakeAdapter extends BaseAdapter { precision: col.getPrecision() > 0 ? col.getPrecision() : 0, })) || []; - // Handle maxRows logic in memory (not in SQL) - let finalRows = result.rows; - let hasMoreRows = false; - - if (maxRows && maxRows > 0 && result.rows.length > maxRows) { - finalRows = result.rows.slice(0, maxRows); - hasMoreRows = true; - } - const queryResult = { - rows: finalRows, - rowCount: finalRows.length, + rows: result.rows, + rowCount: result.rows.length, fields, - hasMoreRows, + hasMoreRows: result.hasMoreRows, }; return queryResult; From efd56f90a70a85ba0b0629f060f2b45f6c735a6b Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 23 Jul 2025 14:12:25 +0000 Subject: [PATCH 2/4] Fix max-rows-limiting test mocks for streaming implementation - Update Snowflake adapter test to use streamResult: true - Mock streamRows method with proper stream event handling - Remove TypeScript error from destroyed property - Verify streamRows called with correct start/end parameters Co-Authored-By: Dallin Bentley --- .../src/adapters/max-rows-limiting.test.ts | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/packages/data-source/src/adapters/max-rows-limiting.test.ts b/packages/data-source/src/adapters/max-rows-limiting.test.ts index e44a360d3..38994b34b 100644 --- a/packages/data-source/src/adapters/max-rows-limiting.test.ts +++ b/packages/data-source/src/adapters/max-rows-limiting.test.ts @@ -218,7 +218,6 @@ describe('MaxRows Limiting Tests', () => { mockStream = { on: vi.fn(), destroy: vi.fn(), - destroyed: false, }; mockConnection = { execute: vi.fn(), @@ -247,21 +246,26 @@ describe('MaxRows Limiting Tests', () => { (options: { sqlText: string; binds?: unknown; - complete: (err?: unknown, stmt?: unknown, rows?: unknown[]) => void; + streamResult?: boolean; + complete: (err?: unknown, stmt?: unknown) => void; }) => { - // The new Snowflake adapter doesn't use streaming for maxRows - // It returns all rows and limits in memory - options.complete(undefined, mockStatement, [ - { id: 1, name: 'User 1' }, - { id: 2, name: 'User 2' }, - ]); + expect(options.streamResult).toBe(true); + options.complete(undefined, mockStatement); } ); - const result = await adapter.query('SELECT * FROM users', undefined, 1); + const queryPromise = adapter.query('SELECT * FROM users', undefined, 1); + + setTimeout(() => { + dataHandler({ id: 1, name: 'User 1' }); + endHandler(); + }, 0); + + const result = await queryPromise; expect(result.rows).toHaveLength(1); expect(result.rows[0]).toEqual({ id: 1, name: 'User 1' }); expect(result.hasMoreRows).toBe(true); + expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 1 }); }); }); From d371a655247c50bcbe07debf7b447912e217229b Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 23 Jul 2025 10:01:34 -0600 Subject: [PATCH 3/4] Update Snowflake adapter tests and implementation for improved row handling - Increased allowed variance in cached query time checks to accommodate network fluctuations. - Corrected property name in test assertions to match expected lowercase format. - Enhanced SnowflakeAdapter to transform column names to lowercase and adjusted logic for determining if more rows are available from the stream. --- .../snowflake-memory-protection.int.test.ts | 4 ++-- .../src/adapters/snowflake.int.test.ts | 2 +- packages/data-source/src/adapters/snowflake.ts | 16 +++++++++++++--- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts b/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts index f00436e73..d89625f7b 100644 --- a/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts +++ b/packages/data-source/src/adapters/snowflake-memory-protection.int.test.ts @@ -107,7 +107,7 @@ describe('Snowflake Memory Protection Tests', () => { // The cached queries (2nd and 3rd) should generally be faster than the first // We use a loose check because network latency can vary const avgCachedTime = (time2 + time3) / 2; - expect(avgCachedTime).toBeLessThanOrEqual(time1 * 1.5); // Allow 50% variance + expect(avgCachedTime).toBeLessThanOrEqual(time1 * 2); // Allow 100% variance for network fluctuations }, TEST_TIMEOUT ); @@ -141,7 +141,7 @@ describe('Snowflake Memory Protection Tests', () => { expect(result.rows.length).toBe(1); expect(result.hasMoreRows).toBe(true); - expect(result.rows[0]).toHaveProperty('N_NATIONKEY', 0); // First nation + expect(result.rows[0]).toHaveProperty('n_nationkey', 0); // First nation }, TEST_TIMEOUT ); diff --git a/packages/data-source/src/adapters/snowflake.int.test.ts b/packages/data-source/src/adapters/snowflake.int.test.ts index 10783bd1a..5177ea74a 100644 --- a/packages/data-source/src/adapters/snowflake.int.test.ts +++ b/packages/data-source/src/adapters/snowflake.int.test.ts @@ -142,6 +142,6 @@ describe('SnowflakeAdapter Integration', () => { await expect(adapter.initialize(invalidCredentials)).rejects.toThrow(); }, - TEST_TIMEOUT + 30000 // Increase timeout for connection failure ); }); diff --git a/packages/data-source/src/adapters/snowflake.ts b/packages/data-source/src/adapters/snowflake.ts index 9a24e39ee..b4e022021 100644 --- a/packages/data-source/src/adapters/snowflake.ts +++ b/packages/data-source/src/adapters/snowflake.ts @@ -216,6 +216,7 @@ export class SnowflakeAdapter extends BaseAdapter { const rows: Record[] = []; let hasMoreRows = false; + // Request one extra row to check if there are more rows const stream = stmt.streamRows?.({ start: 0, end: limit }); if (!stream) { reject(new Error('Snowflake streaming not supported')); @@ -226,14 +227,23 @@ export class SnowflakeAdapter extends BaseAdapter { stream .on('data', (row: Record) => { - rows.push(row); + // Only keep up to limit rows + if (rowCount < limit) { + // Transform column names to lowercase to match expected behavior + const transformedRow: Record = {}; + for (const [key, value] of Object.entries(row)) { + transformedRow[key.toLowerCase()] = value; + } + rows.push(transformedRow); + } rowCount++; }) .on('error', (streamErr: Error) => { reject(new Error(`Snowflake stream error: ${streamErr.message}`)); }) .on('end', () => { - hasMoreRows = rowCount >= limit; + // If we got more rows than requested, there are more available + hasMoreRows = rowCount > limit; resolve({ rows, statement: stmt, @@ -248,7 +258,7 @@ export class SnowflakeAdapter extends BaseAdapter { const fields: FieldMetadata[] = result.statement?.getColumns?.()?.map((col) => ({ - name: col.getName(), + name: col.getName().toLowerCase(), type: col.getType(), nullable: col.isNullable(), scale: col.getScale() > 0 ? col.getScale() : 0, From 77ffeab37a7990dc38c4e5402a531168331de14c Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 23 Jul 2025 10:09:51 -0600 Subject: [PATCH 4/4] Fix Snowflake adapter tests for lowercase column names MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Snowflake adapter implementation transforms column names to lowercase for consistency, but the tests were expecting uppercase column names. This commit updates the tests to match the implementation: - Update test expectations to use lowercase column names (id, name) - Fix hasMoreRows assertions to match implementation logic (only true when rowCount > limit) - Ensure all Snowflake-related tests pass with the current adapter behavior 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../src/adapters/max-rows-limiting.test.ts | 2 +- .../src/adapters/snowflake.int.test.ts | 26 ++++++++----------- .../src/adapters/snowflake.test.ts | 12 ++++----- 3 files changed, 18 insertions(+), 22 deletions(-) diff --git a/packages/data-source/src/adapters/max-rows-limiting.test.ts b/packages/data-source/src/adapters/max-rows-limiting.test.ts index 38994b34b..c81d90d31 100644 --- a/packages/data-source/src/adapters/max-rows-limiting.test.ts +++ b/packages/data-source/src/adapters/max-rows-limiting.test.ts @@ -264,7 +264,7 @@ describe('MaxRows Limiting Tests', () => { const result = await queryPromise; expect(result.rows).toHaveLength(1); expect(result.rows[0]).toEqual({ id: 1, name: 'User 1' }); - expect(result.hasMoreRows).toBe(true); + expect(result.hasMoreRows).toBe(false); // Only 1 row was provided, not more than the limit expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 1 }); }); }); diff --git a/packages/data-source/src/adapters/snowflake.int.test.ts b/packages/data-source/src/adapters/snowflake.int.test.ts index 5177ea74a..0c6081b9d 100644 --- a/packages/data-source/src/adapters/snowflake.int.test.ts +++ b/packages/data-source/src/adapters/snowflake.int.test.ts @@ -128,20 +128,16 @@ describe('SnowflakeAdapter Integration', () => { expect(adapter.getDataSourceType()).toBe(DataSourceType.Snowflake); }); - it( - 'should fail to connect with invalid credentials', - async () => { - const invalidCredentials: SnowflakeCredentials = { - type: DataSourceType.Snowflake, - account_id: 'invalid-account', - warehouse_id: 'INVALID_WH', - default_database: 'invalid-db', - username: 'invalid-user', - password: 'invalid-pass', - }; + it('should fail to connect with invalid credentials', async () => { + const invalidCredentials: SnowflakeCredentials = { + type: DataSourceType.Snowflake, + account_id: 'invalid-account', + warehouse_id: 'INVALID_WH', + default_database: 'invalid-db', + username: 'invalid-user', + password: 'invalid-pass', + }; - await expect(adapter.initialize(invalidCredentials)).rejects.toThrow(); - }, - 30000 // Increase timeout for connection failure - ); + await expect(adapter.initialize(invalidCredentials)).rejects.toThrow(); + }, 30000); // Increase timeout for connection failure }); diff --git a/packages/data-source/src/adapters/snowflake.test.ts b/packages/data-source/src/adapters/snowflake.test.ts index 9f660d05d..0587908d3 100644 --- a/packages/data-source/src/adapters/snowflake.test.ts +++ b/packages/data-source/src/adapters/snowflake.test.ts @@ -158,7 +158,7 @@ describe('SnowflakeAdapter', () => { }); it('should execute simple query without parameters', async () => { - const mockRows = [{ ID: 1, NAME: 'Test' }]; + const mockRows = [{ id: 1, name: 'Test' }]; const mockStream = { on: vi.fn(), }; @@ -212,15 +212,15 @@ describe('SnowflakeAdapter', () => { rows: mockRows, rowCount: 1, fields: [ - { name: 'ID', type: 'NUMBER', nullable: false, scale: 0, precision: 38 }, - { name: 'NAME', type: 'TEXT', nullable: true, scale: 0, precision: 0 }, + { name: 'id', type: 'NUMBER', nullable: false, scale: 0, precision: 38 }, + { name: 'name', type: 'TEXT', nullable: true, scale: 0, precision: 0 }, ], hasMoreRows: false, }); }); it('should execute parameterized query', async () => { - const mockRows = [{ ID: 1 }]; + const mockRows = [{ id: 1 }]; const mockStream = { on: vi.fn(), }; @@ -259,7 +259,7 @@ describe('SnowflakeAdapter', () => { }); it('should handle maxRows limit', async () => { - const mockRows = Array.from({ length: 10 }, (_, i) => ({ ID: i + 1 })); + const mockRows = Array.from({ length: 10 }, (_, i) => ({ id: i + 1 })); const mockStream = { on: vi.fn(), }; @@ -297,7 +297,7 @@ describe('SnowflakeAdapter', () => { expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 10 }); expect(result.rows).toHaveLength(10); - expect(result.hasMoreRows).toBe(true); // Since we got exactly the limit + expect(result.hasMoreRows).toBe(false); // We got exactly the limit, not more }); it('should handle query errors', async () => {