mirror of https://github.com/buster-so/buster.git
BUS-1495: Implement Snowflake adapter streaming with network-level row limiting
- Update query() method to use streamResult: true and stmt.streamRows() - Add network-level row limiting with default 5000 row cap - Process stream events (data, error, end) to build result set - Maintain backward compatibility with existing adapter interface - Update unit tests to mock streaming behavior - Fix integration test imports and property names - Preserve query caching by using original SQL unchanged Co-Authored-By: Dallin Bentley <dallinbentley98@gmail.com>
This commit is contained in:
parent
446bfc6a8e
commit
315a151a3f
|
@ -1,9 +1,20 @@
|
|||
import { afterEach, beforeEach, describe, expect } from 'vitest';
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
|
||||
import { DataSourceType } from '../types/credentials';
|
||||
import type { SnowflakeCredentials } from '../types/credentials';
|
||||
import { SnowflakeAdapter } from './snowflake';
|
||||
|
||||
const testWithCredentials = skipIfNoCredentials('snowflake');
|
||||
// Check if Snowflake test credentials are available
|
||||
const hasSnowflakeCredentials = !!(
|
||||
process.env.TEST_SNOWFLAKE_DATABASE &&
|
||||
process.env.TEST_SNOWFLAKE_USERNAME &&
|
||||
process.env.TEST_SNOWFLAKE_PASSWORD &&
|
||||
process.env.TEST_SNOWFLAKE_ACCOUNT_ID
|
||||
);
|
||||
|
||||
// Skip tests if credentials are not available
|
||||
const testWithCredentials = hasSnowflakeCredentials ? it : it.skip;
|
||||
|
||||
const TEST_TIMEOUT = 30000;
|
||||
|
||||
describe('Snowflake Memory Protection Tests', () => {
|
||||
let adapter: SnowflakeAdapter;
|
||||
|
@ -12,28 +23,16 @@ describe('Snowflake Memory Protection Tests', () => {
|
|||
beforeEach(() => {
|
||||
adapter = new SnowflakeAdapter();
|
||||
|
||||
// Set up credentials once
|
||||
if (
|
||||
!testConfig.snowflake.account_id ||
|
||||
!testConfig.snowflake.warehouse_id ||
|
||||
!testConfig.snowflake.username ||
|
||||
!testConfig.snowflake.password ||
|
||||
!testConfig.snowflake.default_database
|
||||
) {
|
||||
throw new Error(
|
||||
'TEST_SNOWFLAKE_ACCOUNT_ID, TEST_SNOWFLAKE_WAREHOUSE_ID, TEST_SNOWFLAKE_USERNAME, TEST_SNOWFLAKE_PASSWORD, and TEST_SNOWFLAKE_DATABASE are required for this test'
|
||||
);
|
||||
}
|
||||
|
||||
// Set up credentials from environment variables
|
||||
credentials = {
|
||||
type: DataSourceType.Snowflake,
|
||||
account_id: testConfig.snowflake.account_id,
|
||||
warehouse_id: testConfig.snowflake.warehouse_id,
|
||||
username: testConfig.snowflake.username,
|
||||
password: testConfig.snowflake.password,
|
||||
default_database: testConfig.snowflake.default_database,
|
||||
default_schema: testConfig.snowflake.default_schema,
|
||||
role: testConfig.snowflake.role,
|
||||
account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!,
|
||||
warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH',
|
||||
default_database: process.env.TEST_SNOWFLAKE_DATABASE!,
|
||||
default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC',
|
||||
username: process.env.TEST_SNOWFLAKE_USERNAME!,
|
||||
password: process.env.TEST_SNOWFLAKE_PASSWORD!,
|
||||
role: process.env.TEST_SNOWFLAKE_ROLE,
|
||||
};
|
||||
});
|
||||
|
||||
|
|
|
@ -35,10 +35,10 @@ describe('SnowflakeAdapter Integration', () => {
|
|||
async () => {
|
||||
const credentials: SnowflakeCredentials = {
|
||||
type: DataSourceType.Snowflake,
|
||||
account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!,
|
||||
warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH',
|
||||
database: process.env.TEST_SNOWFLAKE_DATABASE!,
|
||||
schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC',
|
||||
account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!,
|
||||
warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH',
|
||||
default_database: process.env.TEST_SNOWFLAKE_DATABASE!,
|
||||
default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC',
|
||||
username: process.env.TEST_SNOWFLAKE_USERNAME!,
|
||||
password: process.env.TEST_SNOWFLAKE_PASSWORD!,
|
||||
role: process.env.TEST_SNOWFLAKE_ROLE,
|
||||
|
@ -56,10 +56,10 @@ describe('SnowflakeAdapter Integration', () => {
|
|||
async () => {
|
||||
const credentials: SnowflakeCredentials = {
|
||||
type: DataSourceType.Snowflake,
|
||||
account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!,
|
||||
warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH',
|
||||
database: process.env.TEST_SNOWFLAKE_DATABASE!,
|
||||
schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC',
|
||||
account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!,
|
||||
warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH',
|
||||
default_database: process.env.TEST_SNOWFLAKE_DATABASE!,
|
||||
default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC',
|
||||
username: process.env.TEST_SNOWFLAKE_USERNAME!,
|
||||
password: process.env.TEST_SNOWFLAKE_PASSWORD!,
|
||||
role: process.env.TEST_SNOWFLAKE_ROLE,
|
||||
|
@ -81,10 +81,10 @@ describe('SnowflakeAdapter Integration', () => {
|
|||
async () => {
|
||||
const credentials: SnowflakeCredentials = {
|
||||
type: DataSourceType.Snowflake,
|
||||
account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!,
|
||||
warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH',
|
||||
database: process.env.TEST_SNOWFLAKE_DATABASE!,
|
||||
schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC',
|
||||
account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!,
|
||||
warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH',
|
||||
default_database: process.env.TEST_SNOWFLAKE_DATABASE!,
|
||||
default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC',
|
||||
username: process.env.TEST_SNOWFLAKE_USERNAME!,
|
||||
password: process.env.TEST_SNOWFLAKE_PASSWORD!,
|
||||
role: process.env.TEST_SNOWFLAKE_ROLE,
|
||||
|
@ -108,10 +108,10 @@ describe('SnowflakeAdapter Integration', () => {
|
|||
async () => {
|
||||
const credentials: SnowflakeCredentials = {
|
||||
type: DataSourceType.Snowflake,
|
||||
account: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!,
|
||||
warehouse: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH',
|
||||
database: process.env.TEST_SNOWFLAKE_DATABASE!,
|
||||
schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC',
|
||||
account_id: process.env.TEST_SNOWFLAKE_ACCOUNT_ID!,
|
||||
warehouse_id: process.env.TEST_SNOWFLAKE_WAREHOUSE_ID || 'COMPUTE_WH',
|
||||
default_database: process.env.TEST_SNOWFLAKE_DATABASE!,
|
||||
default_schema: process.env.TEST_SNOWFLAKE_SCHEMA || 'PUBLIC',
|
||||
username: process.env.TEST_SNOWFLAKE_USERNAME!,
|
||||
password: process.env.TEST_SNOWFLAKE_PASSWORD!,
|
||||
role: process.env.TEST_SNOWFLAKE_ROLE,
|
||||
|
@ -133,9 +133,9 @@ describe('SnowflakeAdapter Integration', () => {
|
|||
async () => {
|
||||
const invalidCredentials: SnowflakeCredentials = {
|
||||
type: DataSourceType.Snowflake,
|
||||
account: 'invalid-account',
|
||||
warehouse: 'INVALID_WH',
|
||||
database: 'invalid-db',
|
||||
account_id: 'invalid-account',
|
||||
warehouse_id: 'INVALID_WH',
|
||||
default_database: 'invalid-db',
|
||||
username: 'invalid-user',
|
||||
password: 'invalid-pass',
|
||||
};
|
||||
|
|
|
@ -159,29 +159,42 @@ describe('SnowflakeAdapter', () => {
|
|||
|
||||
it('should execute simple query without parameters', async () => {
|
||||
const mockRows = [{ ID: 1, NAME: 'Test' }];
|
||||
mockConnection.execute.mockImplementation(({ complete }) => {
|
||||
complete(
|
||||
null,
|
||||
const mockStream = {
|
||||
on: vi.fn(),
|
||||
};
|
||||
|
||||
const mockStatement = {
|
||||
getColumns: () => [
|
||||
{
|
||||
getColumns: () => [
|
||||
{
|
||||
getName: () => 'ID',
|
||||
getType: () => 'NUMBER',
|
||||
isNullable: () => false,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 38,
|
||||
},
|
||||
{
|
||||
getName: () => 'NAME',
|
||||
getType: () => 'TEXT',
|
||||
isNullable: () => true,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 0,
|
||||
},
|
||||
],
|
||||
getName: () => 'ID',
|
||||
getType: () => 'NUMBER',
|
||||
isNullable: () => false,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 38,
|
||||
},
|
||||
mockRows
|
||||
);
|
||||
{
|
||||
getName: () => 'NAME',
|
||||
getType: () => 'TEXT',
|
||||
isNullable: () => true,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 0,
|
||||
},
|
||||
],
|
||||
streamRows: vi.fn().mockReturnValue(mockStream),
|
||||
};
|
||||
|
||||
mockConnection.execute.mockImplementation(({ complete, streamResult }) => {
|
||||
expect(streamResult).toBe(true);
|
||||
complete(null, mockStatement);
|
||||
});
|
||||
|
||||
mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => {
|
||||
if (event === 'data') {
|
||||
setTimeout(() => handler(mockRows[0]), 0);
|
||||
} else if (event === 'end') {
|
||||
setTimeout(() => handler(), 0);
|
||||
}
|
||||
return mockStream;
|
||||
});
|
||||
|
||||
const result = await adapter.query('SELECT * FROM users');
|
||||
|
@ -189,9 +202,12 @@ describe('SnowflakeAdapter', () => {
|
|||
expect(mockConnection.execute).toHaveBeenCalledWith({
|
||||
sqlText: 'SELECT * FROM users',
|
||||
binds: undefined,
|
||||
streamResult: true,
|
||||
complete: expect.any(Function),
|
||||
});
|
||||
|
||||
expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 5000 });
|
||||
|
||||
expect(result).toEqual({
|
||||
rows: mockRows,
|
||||
rowCount: 1,
|
||||
|
@ -205,58 +221,88 @@ describe('SnowflakeAdapter', () => {
|
|||
|
||||
it('should execute parameterized query', async () => {
|
||||
const mockRows = [{ ID: 1 }];
|
||||
mockConnection.execute.mockImplementation(({ complete }) => {
|
||||
complete(
|
||||
null,
|
||||
const mockStream = {
|
||||
on: vi.fn(),
|
||||
};
|
||||
|
||||
const mockStatement = {
|
||||
getColumns: () => [
|
||||
{
|
||||
getColumns: () => [
|
||||
{
|
||||
getName: () => 'ID',
|
||||
getType: () => 'NUMBER',
|
||||
isNullable: () => false,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 38,
|
||||
},
|
||||
],
|
||||
getName: () => 'ID',
|
||||
getType: () => 'NUMBER',
|
||||
isNullable: () => false,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 38,
|
||||
},
|
||||
mockRows
|
||||
);
|
||||
],
|
||||
streamRows: vi.fn().mockReturnValue(mockStream),
|
||||
};
|
||||
|
||||
mockConnection.execute.mockImplementation(({ complete, streamResult }) => {
|
||||
expect(streamResult).toBe(true);
|
||||
complete(null, mockStatement);
|
||||
});
|
||||
|
||||
mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => {
|
||||
if (event === 'data') {
|
||||
setTimeout(() => handler(mockRows[0]), 0);
|
||||
} else if (event === 'end') {
|
||||
setTimeout(() => handler(), 0);
|
||||
}
|
||||
return mockStream;
|
||||
});
|
||||
|
||||
const result = await adapter.query('SELECT * FROM users WHERE id = ?', [1]);
|
||||
|
||||
expect(result.rows).toEqual(mockRows);
|
||||
expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 5000 });
|
||||
});
|
||||
|
||||
it('should handle maxRows limit', async () => {
|
||||
const mockRows = Array.from({ length: 15 }, (_, i) => ({ ID: i + 1 }));
|
||||
const mockRows = Array.from({ length: 10 }, (_, i) => ({ ID: i + 1 }));
|
||||
const mockStream = {
|
||||
on: vi.fn(),
|
||||
};
|
||||
|
||||
mockConnection.execute.mockImplementation(({ complete }) => {
|
||||
complete(
|
||||
null,
|
||||
const mockStatement = {
|
||||
getColumns: () => [
|
||||
{
|
||||
getColumns: () => [
|
||||
{
|
||||
getName: () => 'ID',
|
||||
getType: () => 'NUMBER',
|
||||
isNullable: () => false,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 38,
|
||||
},
|
||||
],
|
||||
getName: () => 'ID',
|
||||
getType: () => 'NUMBER',
|
||||
isNullable: () => false,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 38,
|
||||
},
|
||||
mockRows
|
||||
);
|
||||
],
|
||||
streamRows: vi.fn().mockReturnValue(mockStream),
|
||||
};
|
||||
|
||||
mockConnection.execute.mockImplementation(({ complete, streamResult }) => {
|
||||
expect(streamResult).toBe(true);
|
||||
complete(null, mockStatement);
|
||||
});
|
||||
|
||||
mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => {
|
||||
if (event === 'data') {
|
||||
setTimeout(() => {
|
||||
mockRows.forEach((row) => handler(row));
|
||||
}, 0);
|
||||
} else if (event === 'end') {
|
||||
setTimeout(() => handler(), 0);
|
||||
}
|
||||
return mockStream;
|
||||
});
|
||||
|
||||
const result = await adapter.query('SELECT * FROM users', [], 10);
|
||||
|
||||
expect(mockStatement.streamRows).toHaveBeenCalledWith({ start: 0, end: 10 });
|
||||
expect(result.rows).toHaveLength(10);
|
||||
expect(result.hasMoreRows).toBe(true);
|
||||
expect(result.hasMoreRows).toBe(true); // Since we got exactly the limit
|
||||
});
|
||||
|
||||
it('should handle query errors', async () => {
|
||||
mockConnection.execute.mockImplementation(({ complete }) => {
|
||||
mockConnection.execute.mockImplementation(({ complete, streamResult }) => {
|
||||
expect(streamResult).toBe(true);
|
||||
complete(new Error('Query failed'));
|
||||
});
|
||||
|
||||
|
@ -274,29 +320,40 @@ describe('SnowflakeAdapter', () => {
|
|||
});
|
||||
|
||||
it('should handle empty result sets', async () => {
|
||||
mockConnection.execute.mockImplementation(({ complete }) => {
|
||||
complete(
|
||||
null,
|
||||
const mockStream = {
|
||||
on: vi.fn(),
|
||||
};
|
||||
|
||||
const mockStatement = {
|
||||
getColumns: () => [
|
||||
{
|
||||
getColumns: () => [
|
||||
{
|
||||
getName: () => 'ID',
|
||||
getType: () => 'NUMBER',
|
||||
isNullable: () => false,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 38,
|
||||
},
|
||||
{
|
||||
getName: () => 'NAME',
|
||||
getType: () => 'TEXT',
|
||||
isNullable: () => true,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 0,
|
||||
},
|
||||
],
|
||||
getName: () => 'ID',
|
||||
getType: () => 'NUMBER',
|
||||
isNullable: () => false,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 38,
|
||||
},
|
||||
[]
|
||||
);
|
||||
{
|
||||
getName: () => 'NAME',
|
||||
getType: () => 'TEXT',
|
||||
isNullable: () => true,
|
||||
getScale: () => 0,
|
||||
getPrecision: () => 0,
|
||||
},
|
||||
],
|
||||
streamRows: vi.fn().mockReturnValue(mockStream),
|
||||
};
|
||||
|
||||
mockConnection.execute.mockImplementation(({ complete, streamResult }) => {
|
||||
expect(streamResult).toBe(true);
|
||||
complete(null, mockStatement);
|
||||
});
|
||||
|
||||
mockStream.on.mockImplementation((event: string, handler: (data?: unknown) => void) => {
|
||||
if (event === 'end') {
|
||||
setTimeout(() => handler(), 0);
|
||||
}
|
||||
return mockStream;
|
||||
});
|
||||
|
||||
const result = await adapter.query('SELECT * FROM users WHERE 1=0');
|
||||
|
@ -304,6 +361,7 @@ describe('SnowflakeAdapter', () => {
|
|||
expect(result.rows).toEqual([]);
|
||||
expect(result.rowCount).toBe(0);
|
||||
expect(result.fields).toHaveLength(2);
|
||||
expect(result.hasMoreRows).toBe(false);
|
||||
});
|
||||
|
||||
it('should handle query timeout', async () => {
|
||||
|
|
|
@ -18,7 +18,7 @@ interface SnowflakeStatement {
|
|||
getScale(): number;
|
||||
getPrecision(): number;
|
||||
}>;
|
||||
streamRows?: () => NodeJS.ReadableStream;
|
||||
streamRows?: (options?: { start?: number; end?: number }) => NodeJS.ReadableStream;
|
||||
cancel?: (callback: (err: Error | undefined) => void) => void;
|
||||
}
|
||||
|
||||
|
@ -191,12 +191,12 @@ export class SnowflakeAdapter extends BaseAdapter {
|
|||
// Set query timeout if specified (default: 120 seconds for Snowflake queue handling)
|
||||
const timeoutMs = timeout || TIMEOUT_CONFIG.query.default;
|
||||
|
||||
// IMPORTANT: Execute the original SQL unchanged to leverage Snowflake's query caching
|
||||
// For memory protection, we'll fetch all rows but limit in memory
|
||||
// This is a compromise to preserve caching while preventing OOM on truly massive queries
|
||||
const limit = maxRows && maxRows > 0 ? maxRows : 5000;
|
||||
|
||||
const queryPromise = new Promise<{
|
||||
rows: Record<string, unknown>[];
|
||||
statement: SnowflakeStatement;
|
||||
hasMoreRows: boolean;
|
||||
}>((resolve, reject) => {
|
||||
if (!connection) {
|
||||
reject(new Error('Failed to acquire Snowflake connection'));
|
||||
|
@ -206,16 +206,40 @@ export class SnowflakeAdapter extends BaseAdapter {
|
|||
connection.execute({
|
||||
sqlText: sql, // Use original SQL unchanged for caching
|
||||
binds: params as snowflake.Binds,
|
||||
complete: (
|
||||
err: SnowflakeError | undefined,
|
||||
stmt: SnowflakeStatement,
|
||||
rows: Record<string, unknown>[] | undefined
|
||||
) => {
|
||||
streamResult: true, // Enable streaming
|
||||
complete: (err: SnowflakeError | undefined, stmt: SnowflakeStatement) => {
|
||||
if (err) {
|
||||
reject(new Error(`Snowflake query failed: ${err.message}`));
|
||||
} else {
|
||||
resolve({ rows: rows || [], statement: stmt });
|
||||
return;
|
||||
}
|
||||
|
||||
const rows: Record<string, unknown>[] = [];
|
||||
let hasMoreRows = false;
|
||||
|
||||
const stream = stmt.streamRows?.({ start: 0, end: limit });
|
||||
if (!stream) {
|
||||
reject(new Error('Snowflake streaming not supported'));
|
||||
return;
|
||||
}
|
||||
|
||||
let rowCount = 0;
|
||||
|
||||
stream
|
||||
.on('data', (row: Record<string, unknown>) => {
|
||||
rows.push(row);
|
||||
rowCount++;
|
||||
})
|
||||
.on('error', (streamErr: Error) => {
|
||||
reject(new Error(`Snowflake stream error: ${streamErr.message}`));
|
||||
})
|
||||
.on('end', () => {
|
||||
hasMoreRows = rowCount >= limit;
|
||||
resolve({
|
||||
rows,
|
||||
statement: stmt,
|
||||
hasMoreRows,
|
||||
});
|
||||
});
|
||||
},
|
||||
});
|
||||
});
|
||||
|
@ -231,20 +255,11 @@ export class SnowflakeAdapter extends BaseAdapter {
|
|||
precision: col.getPrecision() > 0 ? col.getPrecision() : 0,
|
||||
})) || [];
|
||||
|
||||
// Handle maxRows logic in memory (not in SQL)
|
||||
let finalRows = result.rows;
|
||||
let hasMoreRows = false;
|
||||
|
||||
if (maxRows && maxRows > 0 && result.rows.length > maxRows) {
|
||||
finalRows = result.rows.slice(0, maxRows);
|
||||
hasMoreRows = true;
|
||||
}
|
||||
|
||||
const queryResult = {
|
||||
rows: finalRows,
|
||||
rowCount: finalRows.length,
|
||||
rows: result.rows,
|
||||
rowCount: result.rows.length,
|
||||
fields,
|
||||
hasMoreRows,
|
||||
hasMoreRows: result.hasMoreRows,
|
||||
};
|
||||
|
||||
return queryResult;
|
||||
|
|
Loading…
Reference in New Issue