feature: snowflake and postgres writeback

This commit is contained in:
dal 2025-09-30 08:28:33 -06:00
parent 4f09efd0c8
commit 48a2539271
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
20 changed files with 298 additions and 317 deletions

View File

@ -53,10 +53,18 @@ export async function deployHandler(options: DeployOptions): Promise<CLIDeployme
? createDryRunDeployer(options.verbose) ? createDryRunDeployer(options.verbose)
: await createAuthenticatedDeployer(); : await createAuthenticatedDeployer();
// Debug: Check if logs config is present
if (busterConfig.logs) {
console.info('\nLogs writeback configured in buster.yml:');
console.info(' Database:', busterConfig.logs.database);
console.info(' Schema:', busterConfig.logs.schema);
console.info(' Table:', busterConfig.logs.table_name || 'buster_query_logs');
}
// 4. Process all projects in parallel // 4. Process all projects in parallel
const projectResults = await Promise.all( const projectResults = await Promise.all(
busterConfig.projects.map((project) => busterConfig.projects.map((project) =>
processProject(project, configBaseDir, deploy, options) processProject(project, configBaseDir, deploy, options, busterConfig.logs)
) )
); );
@ -85,7 +93,8 @@ async function processProject(
project: ProjectContext, project: ProjectContext,
configBaseDir: string, configBaseDir: string,
deploy: DeployFunction, deploy: DeployFunction,
options: DeployOptions options: DeployOptions,
logsConfig?: any
): Promise<CLIDeploymentResult> { ): Promise<CLIDeploymentResult> {
console.info(`\nProcessing ${project.name} project...`); console.info(`\nProcessing ${project.name} project...`);
@ -203,7 +212,7 @@ async function processProject(
docs, docs,
true, // deleteAbsentModels true, // deleteAbsentModels
true, // deleteAbsentDocs true, // deleteAbsentDocs
config.logs // Pass logs config from buster.yml logsConfig // Pass logs config from buster.yml
); );
// 8. Create model-to-file mapping for result processing (pure) // 8. Create model-to-file mapping for result processing (pure)

View File

@ -23,12 +23,23 @@ export function prepareDeploymentRequest(
const logsWriteback: LogsWritebackConfig | undefined = logsConfig const logsWriteback: LogsWritebackConfig | undefined = logsConfig
? { ? {
enabled: true, enabled: true,
dataSource: logsConfig.data_source,
database: logsConfig.database, database: logsConfig.database,
schema: logsConfig.schema, schema: logsConfig.schema,
tableName: logsConfig.table_name || 'BUSTER_QUERY_LOGS', tableName: logsConfig.table_name || 'buster_query_logs',
} }
: undefined; : undefined;
if (logsConfig) {
console.info(' ✓ Logs writeback configuration found:', {
database: logsConfig.database,
schema: logsConfig.schema,
table_name: logsConfig.table_name || 'buster_query_logs',
});
} else {
console.info(' No logs writeback configuration found - will remove any existing config');
}
return { return {
models: models.map(modelToDeployModel), models: models.map(modelToDeployModel),
docs, docs,

View File

@ -1,5 +1,4 @@
import { createAdapter } from '@buster/data-source'; import { createAdapter } from '@buster/data-source';
import type { SnowflakeAdapter } from '@buster/data-source';
import { db } from '@buster/database/connection'; import { db } from '@buster/database/connection';
import { import {
deleteLogsWriteBackConfig, deleteLogsWriteBackConfig,
@ -167,15 +166,12 @@ export async function deployHandler(
// if (request.deleteAbsentDocs) { ... } // if (request.deleteAbsentDocs) { ... }
// Handle logs writeback configuration // Handle logs writeback configuration
let logsWritebackResult: LogsWritebackResult | undefined; // Always call this to handle both presence and absence of logs config
const logsWritebackResult = await handleLogsWritebackConfig(
if (request.logsWriteback !== undefined) {
logsWritebackResult = await handleLogsWritebackConfig(
request.logsWriteback, request.logsWriteback,
userOrg.organizationId, userOrg.organizationId,
tx tx
); );
}
return { return {
models: modelResult, models: modelResult,
@ -204,38 +200,65 @@ export async function deployHandler(
* Handle logs writeback configuration * Handle logs writeback configuration
*/ */
async function handleLogsWritebackConfig( async function handleLogsWritebackConfig(
config: deploy.LogsWritebackConfig, config: deploy.LogsWritebackConfig | undefined,
organizationId: string, organizationId: string,
tx: any tx: Parameters<Parameters<typeof db.transaction>[0]>[0]
): Promise<LogsWritebackResult> { ): Promise<LogsWritebackResult> {
try { try {
// If config is null or disabled, delete existing configuration // If config is undefined, null, or disabled, delete existing configuration
// This handles the case where logs section is removed from buster.yml
if (!config || !config.enabled) { if (!config || !config.enabled) {
const deleted = await deleteLogsWriteBackConfig(organizationId); const deleted = await deleteLogsWriteBackConfig(organizationId);
if (deleted) {
console.info('Logs writeback configuration removed (soft deleted)');
}
return { return {
configured: false, configured: false,
error: deleted ? undefined : 'No existing configuration to delete', error: deleted ? undefined : 'No existing configuration to remove',
}; };
} }
// Get the first Snowflake data source for the organization // Get the appropriate data source for logs writeback
// TODO: In future, we might want to allow specifying which data source let dataSource;
const [dataSource] = await tx
if (config.dataSource) {
// Use the specified data source
dataSource = await getDataSourceByName(
tx,
config.dataSource,
organizationId
);
if (!dataSource) {
return {
configured: false,
error: `Data source '${config.dataSource}' not found`,
};
}
} else {
// Get the first available data source for the organization
// Prefer Snowflake, but accept any data source type
const [firstDataSource] = await tx
.select() .select()
.from(dataSources) .from(dataSources)
.where( .where(
and( and(
eq(dataSources.organizationId, organizationId), eq(dataSources.organizationId, organizationId),
eq(dataSources.type, 'Snowflake'),
isNull(dataSources.deletedAt) isNull(dataSources.deletedAt)
) )
) )
.orderBy(dataSources.type) // This will prioritize alphabetically, so BigQuery, MySQL, PostgreSQL, Redshift, Snowflake, SQLServer
.limit(1); .limit(1);
dataSource = firstDataSource;
}
if (!dataSource) { if (!dataSource) {
return { return {
configured: false, configured: false,
error: 'No Snowflake data source found for organization', error: 'No data source found for organization',
}; };
} }
@ -245,7 +268,7 @@ async function handleLogsWritebackConfig(
dataSourceId: dataSource.id, dataSourceId: dataSource.id,
database: config.database, database: config.database,
schema: config.schema, schema: config.schema,
tableName: config.tableName || 'BUSTER_QUERY_LOGS', tableName: config.tableName || 'buster_query_logs',
}); });
// Get credentials and create adapter to check/create table // Get credentials and create adapter to check/create table
@ -258,41 +281,30 @@ async function handleLogsWritebackConfig(
configured: true, configured: true,
database: config.database, database: config.database,
schema: config.schema, schema: config.schema,
tableName: config.tableName || 'BUSTER_QUERY_LOGS', tableName: config.tableName || 'buster_query_logs',
error: 'Could not retrieve data source credentials', error: 'Could not retrieve data source credentials',
}; };
} }
// Create adapter and check/create table // Verify adapter supports logs writeback
const adapter = (await createAdapter(credentials as any)) as SnowflakeAdapter; const adapter = await createAdapter(credentials as any);
try { try {
await adapter.initialize(credentials as any); await adapter.initialize(credentials as any);
// Check if table exists // Just verify the adapter supports insertLogRecord
const tableExists = await adapter.tableExists( if (!adapter.insertLogRecord) {
config.database, return {
config.schema, configured: false,
config.tableName || 'BUSTER_QUERY_LOGS' error: 'Data source type does not support logs writeback',
); };
let tableCreated = false;
if (!tableExists) {
// Create the table
await adapter.createLogsTable(
config.database,
config.schema,
config.tableName || 'BUSTER_QUERY_LOGS'
);
tableCreated = true;
} }
return { return {
configured: true, configured: true,
tableCreated,
database: config.database, database: config.database,
schema: config.schema, schema: config.schema,
tableName: config.tableName || 'BUSTER_QUERY_LOGS', tableName: config.tableName || 'buster_query_logs',
}; };
} finally { } finally {
// Clean up adapter connection // Clean up adapter connection

View File

@ -1,5 +1,4 @@
import { createAdapter } from '@buster/data-source'; import { createAdapter } from '@buster/data-source';
import type { SnowflakeAdapter } from '@buster/data-source';
import { getDb } from '@buster/database/connection'; import { getDb } from '@buster/database/connection';
import { getDataSourceCredentials, getLogsWriteBackConfig } from '@buster/database/queries'; import { getDataSourceCredentials, getLogsWriteBackConfig } from '@buster/database/queries';
import { chats, dataSources, messages, users } from '@buster/database/schema'; import { chats, dataSources, messages, users } from '@buster/database/schema';
@ -113,8 +112,8 @@ export const logsWriteBackTask: ReturnType<
.where(eq(dataSources.id, config.dataSourceId)) .where(eq(dataSources.id, config.dataSourceId))
.limit(1); .limit(1);
if (!dataSource || dataSource.type !== 'Snowflake') { if (!dataSource) {
logger.error('Invalid data source', { logger.error('Data source not found', {
messageId: payload.messageId, messageId: payload.messageId,
dataSourceId: config.dataSourceId, dataSourceId: config.dataSourceId,
}); });
@ -122,12 +121,19 @@ export const logsWriteBackTask: ReturnType<
success: false, success: false,
messageId: payload.messageId, messageId: payload.messageId,
error: { error: {
code: 'INVALID_DATA_SOURCE', code: 'DATA_SOURCE_NOT_FOUND',
message: 'Data source not found or not Snowflake type', message: `Data source with ID ${config.dataSourceId} not found`,
}, },
}; };
} }
// Log the data source type for debugging
logger.info('Using data source for logs writeback', {
messageId: payload.messageId,
dataSourceId: config.dataSourceId,
dataSourceType: dataSource.type,
});
// Get credentials from vault // Get credentials from vault
const credentials = await getDataSourceCredentials({ const credentials = await getDataSourceCredentials({
dataSourceId: config.dataSourceId, dataSourceId: config.dataSourceId,
@ -148,12 +154,28 @@ export const logsWriteBackTask: ReturnType<
}; };
} }
// Create Snowflake adapter and write the log // Create adapter and write the log
const adapter = (await createAdapter(credentials as any)) as SnowflakeAdapter; const adapter = await createAdapter(credentials as any);
try { try {
await adapter.initialize(credentials as any); await adapter.initialize(credentials as any);
// Check if adapter supports log insertion
if (!adapter.insertLogRecord) {
logger.error('Adapter does not support log insertion', {
messageId: payload.messageId,
dataSourceType: dataSource.type,
});
return {
success: false,
messageId: payload.messageId,
error: {
code: 'UNSUPPORTED_OPERATION',
message: `Data source type ${dataSource.type} does not support log insertion`,
},
};
}
// Insert the log record // Insert the log record
await adapter.insertLogRecord(config.database, config.schema, config.tableName, { await adapter.insertLogRecord(config.database, config.schema, config.tableName, {
messageId: messageData.messageId, messageId: messageData.messageId,
@ -168,8 +190,9 @@ export const logsWriteBackTask: ReturnType<
assumptions: assumptions, assumptions: assumptions,
}); });
logger.log('Log record successfully written to Snowflake', { logger.log('Log record successfully written to data warehouse', {
messageId: payload.messageId, messageId: payload.messageId,
dataSourceType: dataSource.type,
database: config.database, database: config.database,
schema: config.schema, schema: config.schema,
table: config.tableName, table: config.tableName,
@ -180,9 +203,24 @@ export const logsWriteBackTask: ReturnType<
success: true, success: true,
messageId: payload.messageId, messageId: payload.messageId,
}; };
} catch (adapterError) {
logger.error('Adapter operation failed', {
messageId: payload.messageId,
dataSourceType: dataSource.type,
error: adapterError instanceof Error ? adapterError.message : 'Unknown error',
stack: adapterError instanceof Error ? adapterError.stack : undefined,
});
throw adapterError;
} finally { } finally {
// Always close the adapter connection // Always close the adapter connection
try {
await adapter.close(); await adapter.close();
} catch (closeError) {
logger.warn('Failed to close adapter connection', {
messageId: payload.messageId,
error: closeError instanceof Error ? closeError.message : 'Unknown error',
});
}
} }
} catch (error) { } catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'; const errorMessage = error instanceof Error ? error.message : 'Unknown error';

View File

@ -23,6 +23,7 @@ import type {
import { logger, schemaTask, tasks } from '@trigger.dev/sdk/v3'; import { logger, schemaTask, tasks } from '@trigger.dev/sdk/v3';
import { currentSpan, initLogger, wrapTraced } from 'braintrust'; import { currentSpan, initLogger, wrapTraced } from 'braintrust';
import { z } from 'zod/v4'; import { z } from 'zod/v4';
import type { logsWriteBackTask } from '../logs-write-back';
import { import {
buildWorkflowInput, buildWorkflowInput,
fetchPreviousPostProcessingMessages, fetchPreviousPostProcessingMessages,
@ -33,7 +34,6 @@ import {
} from './helpers'; } from './helpers';
import { DataFetchError, MessageNotFoundError, TaskInputSchema } from './types'; import { DataFetchError, MessageNotFoundError, TaskInputSchema } from './types';
import type { TaskInput, TaskOutput } from './types'; import type { TaskInput, TaskOutput } from './types';
import type { logsWriteBackTask } from '../logs-write-back';
/** /**
* Extract only the specific fields we want to save to the database * Extract only the specific fields we want to save to the database

View File

@ -78,6 +78,36 @@ export interface DatabaseAdapter {
* Get an introspector instance for this adapter * Get an introspector instance for this adapter
*/ */
introspect(): DataSourceIntrospector; introspect(): DataSourceIntrospector;
/**
* Optional: Insert a log record into the table (for writeback functionality)
*/
insertLogRecord?(
database: string,
schema: string,
tableName: string,
record: {
messageId: string;
userEmail: string;
userName: string;
chatId: string;
chatLink: string;
requestMessage: string;
createdAt: Date;
durationSeconds: number;
confidenceScore: string;
assumptions: unknown[];
}
): Promise<void>;
/**
* Optional: Execute a write operation (INSERT, UPDATE, DELETE)
*/
executeWrite?(
sql: string,
params?: QueryParameter[],
timeout?: number
): Promise<{ rowCount: number }>;
} }
/** /**
@ -99,32 +129,14 @@ export abstract class BaseAdapter implements DatabaseAdapter {
abstract getDataSourceType(): string; abstract getDataSourceType(): string;
abstract introspect(): DataSourceIntrospector; abstract introspect(): DataSourceIntrospector;
/**
* Optional: Check if a logs table exists (for writeback functionality)
*/
async tableExists?(database: string, schema: string, tableName: string): Promise<boolean> {
throw new Error('Logs writeback not implemented for this adapter');
}
/**
* Optional: Create the Buster logs table (for writeback functionality)
*/
async createLogsTable?(
database: string,
schema: string,
tableName?: string
): Promise<void> {
throw new Error('Logs writeback not implemented for this adapter');
}
/** /**
* Optional: Insert a log record into the table (for writeback functionality) * Optional: Insert a log record into the table (for writeback functionality)
*/ */
async insertLogRecord?( async insertLogRecord?(
database: string, _database: string,
schema: string, _schema: string,
tableName: string, _tableName: string,
record: { _record: {
messageId: string; messageId: string;
userEmail: string; userEmail: string;
userName: string; userName: string;
@ -144,9 +156,9 @@ export abstract class BaseAdapter implements DatabaseAdapter {
* Optional: Execute a write operation (INSERT, UPDATE, DELETE) * Optional: Execute a write operation (INSERT, UPDATE, DELETE)
*/ */
async executeWrite?( async executeWrite?(
sql: string, _sql: string,
params?: QueryParameter[], _params?: QueryParameter[],
timeout?: number _timeout?: number
): Promise<{ rowCount: number }> { ): Promise<{ rowCount: number }> {
throw new Error('Write operations not implemented for this adapter'); throw new Error('Write operations not implemented for this adapter');
} }

View File

@ -243,7 +243,11 @@ export class BigQueryAdapter extends BaseAdapter {
/** /**
* Check if a table exists in BigQuery * Check if a table exists in BigQuery
*/ */
override async tableExists(database: string, schema: string, tableName: string): Promise<boolean> { async tableExists(
_database: string,
schema: string,
tableName: string
): Promise<boolean> {
this.ensureConnected(); this.ensureConnected();
if (!this.client) { if (!this.client) {
@ -264,10 +268,10 @@ export class BigQueryAdapter extends BaseAdapter {
/** /**
* Create the Buster logs table in BigQuery * Create the Buster logs table in BigQuery
*/ */
override async createLogsTable( async createLogsTable(
database: string, _database: string,
schema: string, schema: string,
tableName: string = 'buster_query_logs' tableName = 'buster_query_logs'
): Promise<void> { ): Promise<void> {
this.ensureConnected(); this.ensureConnected();
@ -306,7 +310,7 @@ export class BigQueryAdapter extends BaseAdapter {
* Insert a log record into the BigQuery table * Insert a log record into the BigQuery table
*/ */
override async insertLogRecord( override async insertLogRecord(
database: string, _database: string,
schema: string, schema: string,
tableName: string, tableName: string,
record: { record: {
@ -396,7 +400,11 @@ export class BigQueryAdapter extends BaseAdapter {
} }
try { try {
const options: any = { const options: {
query: string;
timeoutMs: number;
params?: unknown[];
} = {
query: sql, query: sql,
timeoutMs: timeout || 60000, timeoutMs: timeout || 60000,
}; };

View File

@ -180,7 +180,11 @@ export class MySQLAdapter extends BaseAdapter {
/** /**
* Check if a table exists in MySQL * Check if a table exists in MySQL
*/ */
override async tableExists(database: string, schema: string, tableName: string): Promise<boolean> { async tableExists(
database: string,
_schema: string,
tableName: string
): Promise<boolean> {
this.ensureConnected(); this.ensureConnected();
if (!this.connection) { if (!this.connection) {
@ -196,7 +200,7 @@ export class MySQLAdapter extends BaseAdapter {
`; `;
const [rows] = await this.connection.execute(sql, [database, tableName]); const [rows] = await this.connection.execute(sql, [database, tableName]);
const firstRow = (rows as any[])[0] as { count?: number } | undefined; const firstRow = (rows as Array<{ count?: number }>)[0] as { count?: number } | undefined;
return !!firstRow && (firstRow.count ?? 0) > 0; return !!firstRow && (firstRow.count ?? 0) > 0;
} catch (error) { } catch (error) {
console.error('Error checking table existence:', error); console.error('Error checking table existence:', error);
@ -207,10 +211,10 @@ export class MySQLAdapter extends BaseAdapter {
/** /**
* Create the Buster logs table in MySQL * Create the Buster logs table in MySQL
*/ */
override async createLogsTable( async createLogsTable(
database: string, database: string,
schema: string, _schema: string,
tableName: string = 'buster_query_logs' tableName = 'buster_query_logs'
): Promise<void> { ): Promise<void> {
this.ensureConnected(); this.ensureConnected();
@ -249,7 +253,7 @@ export class MySQLAdapter extends BaseAdapter {
*/ */
override async insertLogRecord( override async insertLogRecord(
database: string, database: string,
schema: string, _schema: string,
tableName: string, tableName: string,
record: { record: {
messageId: string; messageId: string;

View File

@ -235,79 +235,13 @@ export class PostgreSQLAdapter extends BaseAdapter {
return this.introspector; return this.introspector;
} }
/**
* Check if a table exists in PostgreSQL
*/
override async tableExists(database: string, schema: string, tableName: string): Promise<boolean> {
this.ensureConnected();
if (!this.client) {
throw new Error('PostgreSQL client not initialized');
}
try {
const sql = `
SELECT COUNT(*) as count
FROM information_schema.tables
WHERE table_catalog = $1
AND table_schema = $2
AND table_name = $3
`;
const result = await this.client.query(sql, [database, schema.toLowerCase(), tableName.toLowerCase()]);
const firstRow = result.rows[0] as { count?: string } | undefined;
return !!firstRow && parseInt(firstRow.count ?? '0') > 0;
} catch (error) {
console.error('Error checking table existence:', error);
return false;
}
}
/**
* Create the Buster logs table in PostgreSQL
*/
override async createLogsTable(
database: string,
schema: string,
tableName: string = 'buster_query_logs'
): Promise<void> {
this.ensureConnected();
if (!this.client) {
throw new Error('PostgreSQL client not initialized');
}
const createTableSQL = `
CREATE TABLE IF NOT EXISTS "${schema}"."${tableName}" (
message_id VARCHAR(255),
user_email VARCHAR(500),
user_name VARCHAR(500),
chat_id VARCHAR(255),
chat_link VARCHAR(500),
request_message TEXT,
created_at TIMESTAMPTZ,
duration_seconds INTEGER,
confidence_score VARCHAR(50),
assumptions JSONB,
inserted_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
)
`;
try {
await this.client.query(createTableSQL);
console.info(`Table ${schema}.${tableName} created successfully`);
} catch (error) {
throw new Error(
`Failed to create logs table: ${error instanceof Error ? error.message : 'Unknown error'}`
);
}
}
/** /**
* Insert a log record into the PostgreSQL table * Insert a log record into the PostgreSQL table
*/ */
override async insertLogRecord( override async insertLogRecord(
database: string, _database: string,
schema: string, schema: string,
tableName: string, tableName: string,
record: { record: {

View File

@ -229,7 +229,11 @@ export class RedshiftAdapter extends BaseAdapter {
/** /**
* Check if a table exists in Redshift * Check if a table exists in Redshift
*/ */
override async tableExists(database: string, schema: string, tableName: string): Promise<boolean> { async tableExists(
database: string,
schema: string,
tableName: string
): Promise<boolean> {
this.ensureConnected(); this.ensureConnected();
if (!this.client) { if (!this.client) {
@ -245,9 +249,13 @@ export class RedshiftAdapter extends BaseAdapter {
AND table_name = $3 AND table_name = $3
`; `;
const result = await this.client.query(sql, [database, schema.toLowerCase(), tableName.toLowerCase()]); const result = await this.client.query(sql, [
database,
schema.toLowerCase(),
tableName.toLowerCase(),
]);
const firstRow = result.rows[0] as { count?: string } | undefined; const firstRow = result.rows[0] as { count?: string } | undefined;
return !!firstRow && parseInt(firstRow.count ?? '0') > 0; return !!firstRow && Number.parseInt(firstRow.count ?? '0') > 0;
} catch (error) { } catch (error) {
console.error('Error checking table existence:', error); console.error('Error checking table existence:', error);
return false; return false;
@ -257,10 +265,10 @@ export class RedshiftAdapter extends BaseAdapter {
/** /**
* Create the Buster logs table in Redshift * Create the Buster logs table in Redshift
*/ */
override async createLogsTable( async createLogsTable(
database: string, _database: string,
schema: string, schema: string,
tableName: string = 'buster_query_logs' tableName = 'buster_query_logs'
): Promise<void> { ): Promise<void> {
this.ensureConnected(); this.ensureConnected();
@ -298,7 +306,7 @@ export class RedshiftAdapter extends BaseAdapter {
* Insert a log record into the Redshift table * Insert a log record into the Redshift table
*/ */
override async insertLogRecord( override async insertLogRecord(
database: string, _database: string,
schema: string, schema: string,
tableName: string, tableName: string,
record: { record: {

View File

@ -363,70 +363,7 @@ export class SnowflakeAdapter extends BaseAdapter {
}; };
} }
/**
* Check if a table exists in Snowflake
*/
override async tableExists(database: string, schema: string, tableName: string): Promise<boolean> {
this.ensureConnected();
if (!this.connection) {
throw new Error('Snowflake connection not initialized');
}
try {
const sql = `
SELECT COUNT(*) as count
FROM "${database}".INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = '${schema.toUpperCase()}'
AND TABLE_NAME = '${tableName.toUpperCase()}'
`;
const result = await this.query(sql);
const firstRow = result.rows[0] as { count?: number } | undefined;
return !!firstRow && (firstRow.count ?? 0) > 0;
} catch (error) {
console.error('Error checking table existence:', error);
return false;
}
}
/**
* Create the Buster logs table in Snowflake
*/
override async createLogsTable(
database: string,
schema: string,
tableName: string = 'BUSTER_QUERY_LOGS'
): Promise<void> {
this.ensureConnected();
if (!this.connection) {
throw new Error('Snowflake connection not initialized');
}
const createTableSQL = `
CREATE TABLE IF NOT EXISTS "${database}"."${schema}"."${tableName}" (
message_id VARCHAR(255),
user_email VARCHAR(500),
user_name VARCHAR(500),
chat_id VARCHAR(255),
chat_link VARCHAR(500),
request_message TEXT,
created_at TIMESTAMP_TZ,
duration_seconds NUMBER,
confidence_score VARCHAR(50),
assumptions VARIANT,
inserted_at TIMESTAMP_TZ DEFAULT CURRENT_TIMESTAMP()
)
`;
try {
await this.query(createTableSQL);
console.info(`Table ${database}.${schema}.${tableName} created successfully`);
} catch (error) {
throw classifyError(error);
}
}
/** /**
* Insert a log record into the Snowflake table * Insert a log record into the Snowflake table
@ -458,7 +395,7 @@ export class SnowflakeAdapter extends BaseAdapter {
const assumptionsJson = JSON.stringify(record.assumptions); const assumptionsJson = JSON.stringify(record.assumptions);
const insertSQL = ` const insertSQL = `
INSERT INTO "${database}"."${schema}"."${tableName}" ( INSERT INTO ${database}.${schema}.${tableName} (
message_id, message_id,
user_email, user_email,
user_name, user_name,
@ -523,7 +460,11 @@ export class SnowflakeAdapter extends BaseAdapter {
sqlText: sql, sqlText: sql,
binds: params as snowflake.Binds, binds: params as snowflake.Binds,
streamResult: false, // Don't stream for write operations streamResult: false, // Don't stream for write operations
complete: (err: SnowflakeError | undefined, stmt: SnowflakeStatement, rows?: any[]) => { complete: (
err: SnowflakeError | undefined,
_stmt: SnowflakeStatement,
rows?: unknown[]
) => {
if (err) { if (err) {
reject(new Error(`Snowflake write operation failed: ${err.message}`)); reject(new Error(`Snowflake write operation failed: ${err.message}`));
return; return;

View File

@ -254,7 +254,11 @@ export class SQLServerAdapter extends BaseAdapter {
/** /**
* Check if a table exists in SQL Server * Check if a table exists in SQL Server
*/ */
override async tableExists(database: string, schema: string, tableName: string): Promise<boolean> { async tableExists(
database: string,
schema: string,
tableName: string
): Promise<boolean> {
this.ensureConnected(); this.ensureConnected();
if (!this.pool) { if (!this.pool) {
@ -286,10 +290,10 @@ export class SQLServerAdapter extends BaseAdapter {
/** /**
* Create the Buster logs table in SQL Server * Create the Buster logs table in SQL Server
*/ */
override async createLogsTable( async createLogsTable(
database: string, _database: string,
schema: string, schema: string,
tableName: string = 'buster_query_logs' tableName = 'buster_query_logs'
): Promise<void> { ): Promise<void> {
this.ensureConnected(); this.ensureConnected();
@ -333,7 +337,7 @@ export class SQLServerAdapter extends BaseAdapter {
* Insert a log record into the SQL Server table * Insert a log record into the SQL Server table
*/ */
override async insertLogRecord( override async insertLogRecord(
database: string, _database: string,
schema: string, schema: string,
tableName: string, tableName: string,
record: { record: {
@ -424,7 +428,7 @@ export class SQLServerAdapter extends BaseAdapter {
// Set query timeout if specified (default: 60 seconds) // Set query timeout if specified (default: 60 seconds)
const timeoutMs = timeout || 60000; const timeoutMs = timeout || 60000;
// SQL Server uses requestTimeout property on the request config // SQL Server uses requestTimeout property on the request config
(request as any).timeout = timeoutMs; (request as unknown as Record<string, unknown>).timeout = timeoutMs;
// Add parameters if provided // Add parameters if provided
if (params && params.length > 0) { if (params && params.length > 0) {

View File

@ -778,6 +778,13 @@
"when": 1759167570252, "when": 1759167570252,
"tag": "0111_happy_peter_quill", "tag": "0111_happy_peter_quill",
"breakpoints": true "breakpoints": true
},
{
"idx": 112,
"version": "7",
"when": 1759181434094,
"tag": "0112_write-back-logs-config",
"breakpoints": true
} }
] ]
} }

View File

@ -65,7 +65,7 @@ export async function upsertLogsWriteBackConfig(
updatedAt: updated.updatedAt, updatedAt: updated.updatedAt,
deletedAt: updated.deletedAt, deletedAt: updated.deletedAt,
}; };
} else { }
// Create new config // Create new config
const createdRows = await db const createdRows = await db
.insert(logsWriteBackConfigs) .insert(logsWriteBackConfigs)
@ -97,4 +97,3 @@ export async function upsertLogsWriteBackConfig(
deletedAt: created.deletedAt, deletedAt: created.deletedAt,
}; };
} }
}

View File

@ -1,24 +1,15 @@
{ {
"$schema": "https://turborepo.com/schema.json", "$schema": "https://turborepo.com/schema.json",
"extends": [ "extends": ["//"],
"//"
],
"tasks": { "tasks": {
"build": { "build": {
"dependsOn": [ "dependsOn": ["^build"],
"^build" "outputs": ["dist/**"]
],
"outputs": [
"dist/**"
]
}, },
"db:init": { "db:init": {
"cache": false, "cache": false,
"persistent": false, "persistent": false,
"dependsOn": [ "dependsOn": ["db:seed", "@buster-app/supabase#start"]
"db:seed",
"@buster-app/supabase#start"
]
}, },
"db:migrate": { "db:migrate": {
"cache": false, "cache": false,
@ -27,10 +18,7 @@
"db:seed": { "db:seed": {
"cache": false, "cache": false,
"persistent": false, "persistent": false,
"dependsOn": [ "dependsOn": ["db:migrate", "@buster-app/supabase#start"]
"db:migrate",
"@buster-app/supabase#start"
]
}, },
"db:dump": { "db:dump": {
"cache": false, "cache": false,

View File

@ -1,5 +1,4 @@
import { z } from 'zod'; import { z } from 'zod';
import { LogsConfigSchema } from '../deploy/schemas';
// ============================================================================ // ============================================================================
// Model Schemas - Define the structure of semantic layer models // Model Schemas - Define the structure of semantic layer models
@ -152,6 +151,17 @@ export const MultiModelSchema = z.object({
// Configuration Schemas - Define buster.yml structure // Configuration Schemas - Define buster.yml structure
// ============================================================================ // ============================================================================
// Schema for logs configuration in buster.yml
export const LogsConfigSchema = z.object({
data_source: z
.string()
.optional()
.describe('Data source to use for logs writeback (defaults to first available)'),
database: z.string().describe('Database name for logs'),
schema: z.string().describe('Schema name for logs'),
table_name: z.string().optional().describe('Table name for logs (defaults to buster_query_logs)'),
});
export const ProjectContextSchema = z.object({ export const ProjectContextSchema = z.object({
name: z.string(), name: z.string(),
data_source: z.string(), data_source: z.string(),

View File

@ -1,13 +1,13 @@
import { z } from 'zod'; import { z } from 'zod';
// Import DeployModelSchema from datasets to avoid duplication // Import DeployModelSchema and LogsConfigSchema from datasets to avoid duplication
import { DeployModelSchema } from '../datasets/schemas'; import { DeployModelSchema, LogsConfigSchema } from '../datasets/schemas';
// ============================================================================ // ============================================================================
// Unified Deploy Request/Response Schemas // Unified Deploy Request/Response Schemas
// ============================================================================ // ============================================================================
// Re-export the model schema from datasets // Re-export the schemas from datasets
export { DeployModelSchema }; export { DeployModelSchema, LogsConfigSchema };
// Schema for deploying docs (markdown files) // Schema for deploying docs (markdown files)
export const DeployDocSchema = z.object({ export const DeployDocSchema = z.object({
@ -16,20 +16,17 @@ export const DeployDocSchema = z.object({
type: z.enum(['analyst', 'normal']).default('normal'), type: z.enum(['analyst', 'normal']).default('normal'),
}); });
// Schema for logs configuration in buster.yml
export const LogsConfigSchema = z.object({
database: z.string().describe('Database name for logs'),
schema: z.string().describe('Schema name for logs'),
table_name: z.string().optional().describe('Table name for logs (defaults to BUSTER_QUERY_LOGS)'),
});
// Schema for logs writeback configuration (API format) // Schema for logs writeback configuration (API format)
export const LogsWritebackConfigSchema = z.object({ export const LogsWritebackConfigSchema = z
.object({
enabled: z.boolean().describe('Whether logs writeback is enabled'), enabled: z.boolean().describe('Whether logs writeback is enabled'),
database: z.string().describe('Snowflake database name'), dataSource: z.string().optional().describe('Data source name to use for logs writeback'),
schema: z.string().describe('Snowflake schema name'), database: z.string().describe('Database name'),
tableName: z.string().default('BUSTER_QUERY_LOGS').describe('Table name for logs'), schema: z.string().describe('Schema name'),
}).nullable().describe('Configuration for writing logs back to Snowflake'); tableName: z.string().default('buster_query_logs').describe('Table name for logs'),
})
.nullable()
.describe('Configuration for writing logs back to data warehouse');
// Unified deploy request that handles both models and docs // Unified deploy request that handles both models and docs
export const UnifiedDeployRequestSchema = z.object({ export const UnifiedDeployRequestSchema = z.object({
@ -91,7 +88,6 @@ export const DocDeployResultSchema = z.object({
// Schema for logs writeback deployment result // Schema for logs writeback deployment result
export const LogsWritebackResultSchema = z.object({ export const LogsWritebackResultSchema = z.object({
configured: z.boolean().describe('Whether logs writeback was configured'), configured: z.boolean().describe('Whether logs writeback was configured'),
tableCreated: z.boolean().optional().describe('Whether the table was created'),
database: z.string().optional().describe('Database name'), database: z.string().optional().describe('Database name'),
schema: z.string().optional().describe('Schema name'), schema: z.string().optional().describe('Schema name'),
tableName: z.string().optional().describe('Table name'), tableName: z.string().optional().describe('Table name'),