From 48a253927111dd5a51518c3cc59e67d519132cd8 Mon Sep 17 00:00:00 2001 From: dal Date: Tue, 30 Sep 2025 08:28:33 -0600 Subject: [PATCH] feature: snowflake and postgres writeback --- .../cli/src/commands/deploy/deploy-handler.ts | 15 ++- .../deploy/deployment/transformers.ts | 13 ++- apps/server/src/api/v2/deploy/POST.ts | 110 ++++++++++-------- .../src/tasks/logs-write-back/index.ts | 2 +- .../logs-write-back/logs-write-back-task.ts | 56 +++++++-- .../message-post-processing.ts | 4 +- packages/data-source/src/adapters/base.ts | 62 ++++++---- packages/data-source/src/adapters/bigquery.ts | 20 +++- packages/data-source/src/adapters/mysql.ts | 16 ++- .../data-source/src/adapters/postgresql.ts | 68 +---------- packages/data-source/src/adapters/redshift.ts | 22 ++-- .../data-source/src/adapters/snowflake.ts | 71 +---------- .../data-source/src/adapters/sqlserver.ts | 16 ++- packages/database/drizzle/meta/_journal.json | 7 ++ .../upsert-logs-writeback-config.ts | 61 +++++----- packages/database/turbo.json | 24 +--- .../server-shared/src/datasets/schemas.ts | 12 +- packages/server-shared/src/deploy/schemas.ts | 32 +++-- .../server-shared/src/logs-writeback/index.ts | 2 +- .../src/logs-writeback/schemas.ts | 2 +- 20 files changed, 298 insertions(+), 317 deletions(-) diff --git a/apps/cli/src/commands/deploy/deploy-handler.ts b/apps/cli/src/commands/deploy/deploy-handler.ts index 832546381..9f258409b 100644 --- a/apps/cli/src/commands/deploy/deploy-handler.ts +++ b/apps/cli/src/commands/deploy/deploy-handler.ts @@ -53,10 +53,18 @@ export async function deployHandler(options: DeployOptions): Promise - processProject(project, configBaseDir, deploy, options) + processProject(project, configBaseDir, deploy, options, busterConfig.logs) ) ); @@ -85,7 +93,8 @@ async function processProject( project: ProjectContext, configBaseDir: string, deploy: DeployFunction, - options: DeployOptions + options: DeployOptions, + logsConfig?: any ): Promise { console.info(`\nProcessing ${project.name} project...`); @@ -203,7 +212,7 @@ async function processProject( docs, true, // deleteAbsentModels true, // deleteAbsentDocs - config.logs // Pass logs config from buster.yml + logsConfig // Pass logs config from buster.yml ); // 8. Create model-to-file mapping for result processing (pure) diff --git a/apps/cli/src/commands/deploy/deployment/transformers.ts b/apps/cli/src/commands/deploy/deployment/transformers.ts index ce86f392f..dbb44115b 100644 --- a/apps/cli/src/commands/deploy/deployment/transformers.ts +++ b/apps/cli/src/commands/deploy/deployment/transformers.ts @@ -23,12 +23,23 @@ export function prepareDeploymentRequest( const logsWriteback: LogsWritebackConfig | undefined = logsConfig ? { enabled: true, + dataSource: logsConfig.data_source, database: logsConfig.database, schema: logsConfig.schema, - tableName: logsConfig.table_name || 'BUSTER_QUERY_LOGS', + tableName: logsConfig.table_name || 'buster_query_logs', } : undefined; + if (logsConfig) { + console.info(' ✓ Logs writeback configuration found:', { + database: logsConfig.database, + schema: logsConfig.schema, + table_name: logsConfig.table_name || 'buster_query_logs', + }); + } else { + console.info(' ℹ No logs writeback configuration found - will remove any existing config'); + } + return { models: models.map(modelToDeployModel), docs, diff --git a/apps/server/src/api/v2/deploy/POST.ts b/apps/server/src/api/v2/deploy/POST.ts index dcddce279..7bed307ff 100644 --- a/apps/server/src/api/v2/deploy/POST.ts +++ b/apps/server/src/api/v2/deploy/POST.ts @@ -1,5 +1,4 @@ import { createAdapter } from '@buster/data-source'; -import type { SnowflakeAdapter } from '@buster/data-source'; import { db } from '@buster/database/connection'; import { deleteLogsWriteBackConfig, @@ -167,15 +166,12 @@ export async function deployHandler( // if (request.deleteAbsentDocs) { ... } // Handle logs writeback configuration - let logsWritebackResult: LogsWritebackResult | undefined; - - if (request.logsWriteback !== undefined) { - logsWritebackResult = await handleLogsWritebackConfig( - request.logsWriteback, - userOrg.organizationId, - tx - ); - } + // Always call this to handle both presence and absence of logs config + const logsWritebackResult = await handleLogsWritebackConfig( + request.logsWriteback, + userOrg.organizationId, + tx + ); return { models: modelResult, @@ -204,38 +200,65 @@ export async function deployHandler( * Handle logs writeback configuration */ async function handleLogsWritebackConfig( - config: deploy.LogsWritebackConfig, + config: deploy.LogsWritebackConfig | undefined, organizationId: string, - tx: any + tx: Parameters[0]>[0] ): Promise { try { - // If config is null or disabled, delete existing configuration + // If config is undefined, null, or disabled, delete existing configuration + // This handles the case where logs section is removed from buster.yml if (!config || !config.enabled) { const deleted = await deleteLogsWriteBackConfig(organizationId); + + if (deleted) { + console.info('Logs writeback configuration removed (soft deleted)'); + } + return { configured: false, - error: deleted ? undefined : 'No existing configuration to delete', + error: deleted ? undefined : 'No existing configuration to remove', }; } - // Get the first Snowflake data source for the organization - // TODO: In future, we might want to allow specifying which data source - const [dataSource] = await tx - .select() - .from(dataSources) - .where( - and( - eq(dataSources.organizationId, organizationId), - eq(dataSources.type, 'Snowflake'), - isNull(dataSources.deletedAt) + // Get the appropriate data source for logs writeback + let dataSource; + + if (config.dataSource) { + // Use the specified data source + dataSource = await getDataSourceByName( + tx, + config.dataSource, + organizationId + ); + + if (!dataSource) { + return { + configured: false, + error: `Data source '${config.dataSource}' not found`, + }; + } + } else { + // Get the first available data source for the organization + // Prefer Snowflake, but accept any data source type + const [firstDataSource] = await tx + .select() + .from(dataSources) + .where( + and( + eq(dataSources.organizationId, organizationId), + isNull(dataSources.deletedAt) + ) ) - ) - .limit(1); + .orderBy(dataSources.type) // This will prioritize alphabetically, so BigQuery, MySQL, PostgreSQL, Redshift, Snowflake, SQLServer + .limit(1); + + dataSource = firstDataSource; + } if (!dataSource) { return { configured: false, - error: 'No Snowflake data source found for organization', + error: 'No data source found for organization', }; } @@ -245,7 +268,7 @@ async function handleLogsWritebackConfig( dataSourceId: dataSource.id, database: config.database, schema: config.schema, - tableName: config.tableName || 'BUSTER_QUERY_LOGS', + tableName: config.tableName || 'buster_query_logs', }); // Get credentials and create adapter to check/create table @@ -258,41 +281,30 @@ async function handleLogsWritebackConfig( configured: true, database: config.database, schema: config.schema, - tableName: config.tableName || 'BUSTER_QUERY_LOGS', + tableName: config.tableName || 'buster_query_logs', error: 'Could not retrieve data source credentials', }; } - // Create adapter and check/create table - const adapter = (await createAdapter(credentials as any)) as SnowflakeAdapter; + // Verify adapter supports logs writeback + const adapter = await createAdapter(credentials as any); try { await adapter.initialize(credentials as any); - // Check if table exists - const tableExists = await adapter.tableExists( - config.database, - config.schema, - config.tableName || 'BUSTER_QUERY_LOGS' - ); - - let tableCreated = false; - if (!tableExists) { - // Create the table - await adapter.createLogsTable( - config.database, - config.schema, - config.tableName || 'BUSTER_QUERY_LOGS' - ); - tableCreated = true; + // Just verify the adapter supports insertLogRecord + if (!adapter.insertLogRecord) { + return { + configured: false, + error: 'Data source type does not support logs writeback', + }; } return { configured: true, - tableCreated, database: config.database, schema: config.schema, - tableName: config.tableName || 'BUSTER_QUERY_LOGS', + tableName: config.tableName || 'buster_query_logs', }; } finally { // Clean up adapter connection diff --git a/apps/trigger/src/tasks/logs-write-back/index.ts b/apps/trigger/src/tasks/logs-write-back/index.ts index 6c5e91a64..35b7873b8 100644 --- a/apps/trigger/src/tasks/logs-write-back/index.ts +++ b/apps/trigger/src/tasks/logs-write-back/index.ts @@ -1 +1 @@ -export * from './logs-write-back-task'; \ No newline at end of file +export * from './logs-write-back-task'; diff --git a/apps/trigger/src/tasks/logs-write-back/logs-write-back-task.ts b/apps/trigger/src/tasks/logs-write-back/logs-write-back-task.ts index 5d32a6ba8..43c0517c6 100644 --- a/apps/trigger/src/tasks/logs-write-back/logs-write-back-task.ts +++ b/apps/trigger/src/tasks/logs-write-back/logs-write-back-task.ts @@ -1,5 +1,4 @@ import { createAdapter } from '@buster/data-source'; -import type { SnowflakeAdapter } from '@buster/data-source'; import { getDb } from '@buster/database/connection'; import { getDataSourceCredentials, getLogsWriteBackConfig } from '@buster/database/queries'; import { chats, dataSources, messages, users } from '@buster/database/schema'; @@ -113,8 +112,8 @@ export const logsWriteBackTask: ReturnType< .where(eq(dataSources.id, config.dataSourceId)) .limit(1); - if (!dataSource || dataSource.type !== 'Snowflake') { - logger.error('Invalid data source', { + if (!dataSource) { + logger.error('Data source not found', { messageId: payload.messageId, dataSourceId: config.dataSourceId, }); @@ -122,12 +121,19 @@ export const logsWriteBackTask: ReturnType< success: false, messageId: payload.messageId, error: { - code: 'INVALID_DATA_SOURCE', - message: 'Data source not found or not Snowflake type', + code: 'DATA_SOURCE_NOT_FOUND', + message: `Data source with ID ${config.dataSourceId} not found`, }, }; } + // Log the data source type for debugging + logger.info('Using data source for logs writeback', { + messageId: payload.messageId, + dataSourceId: config.dataSourceId, + dataSourceType: dataSource.type, + }); + // Get credentials from vault const credentials = await getDataSourceCredentials({ dataSourceId: config.dataSourceId, @@ -148,12 +154,28 @@ export const logsWriteBackTask: ReturnType< }; } - // Create Snowflake adapter and write the log - const adapter = (await createAdapter(credentials as any)) as SnowflakeAdapter; + // Create adapter and write the log + const adapter = await createAdapter(credentials as any); try { await adapter.initialize(credentials as any); + // Check if adapter supports log insertion + if (!adapter.insertLogRecord) { + logger.error('Adapter does not support log insertion', { + messageId: payload.messageId, + dataSourceType: dataSource.type, + }); + return { + success: false, + messageId: payload.messageId, + error: { + code: 'UNSUPPORTED_OPERATION', + message: `Data source type ${dataSource.type} does not support log insertion`, + }, + }; + } + // Insert the log record await adapter.insertLogRecord(config.database, config.schema, config.tableName, { messageId: messageData.messageId, @@ -168,8 +190,9 @@ export const logsWriteBackTask: ReturnType< assumptions: assumptions, }); - logger.log('Log record successfully written to Snowflake', { + logger.log('Log record successfully written to data warehouse', { messageId: payload.messageId, + dataSourceType: dataSource.type, database: config.database, schema: config.schema, table: config.tableName, @@ -180,9 +203,24 @@ export const logsWriteBackTask: ReturnType< success: true, messageId: payload.messageId, }; + } catch (adapterError) { + logger.error('Adapter operation failed', { + messageId: payload.messageId, + dataSourceType: dataSource.type, + error: adapterError instanceof Error ? adapterError.message : 'Unknown error', + stack: adapterError instanceof Error ? adapterError.stack : undefined, + }); + throw adapterError; } finally { // Always close the adapter connection - await adapter.close(); + try { + await adapter.close(); + } catch (closeError) { + logger.warn('Failed to close adapter connection', { + messageId: payload.messageId, + error: closeError instanceof Error ? closeError.message : 'Unknown error', + }); + } } } catch (error) { const errorMessage = error instanceof Error ? error.message : 'Unknown error'; diff --git a/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts b/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts index 624e2054a..af86537c8 100644 --- a/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts +++ b/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts @@ -23,6 +23,7 @@ import type { import { logger, schemaTask, tasks } from '@trigger.dev/sdk/v3'; import { currentSpan, initLogger, wrapTraced } from 'braintrust'; import { z } from 'zod/v4'; +import type { logsWriteBackTask } from '../logs-write-back'; import { buildWorkflowInput, fetchPreviousPostProcessingMessages, @@ -33,7 +34,6 @@ import { } from './helpers'; import { DataFetchError, MessageNotFoundError, TaskInputSchema } from './types'; import type { TaskInput, TaskOutput } from './types'; -import type { logsWriteBackTask } from '../logs-write-back'; /** * Extract only the specific fields we want to save to the database @@ -480,7 +480,7 @@ export const messagePostProcessingTask: ReturnType< try { // Check if organization has logs write-back configured const logsWriteBackConfig = await getLogsWriteBackConfig(messageContext.organizationId); - + if (logsWriteBackConfig) { logger.log('Triggering logs write-back task', { messageId: payload.messageId, diff --git a/packages/data-source/src/adapters/base.ts b/packages/data-source/src/adapters/base.ts index fceaa20ff..24f97e178 100644 --- a/packages/data-source/src/adapters/base.ts +++ b/packages/data-source/src/adapters/base.ts @@ -78,6 +78,36 @@ export interface DatabaseAdapter { * Get an introspector instance for this adapter */ introspect(): DataSourceIntrospector; + + /** + * Optional: Insert a log record into the table (for writeback functionality) + */ + insertLogRecord?( + database: string, + schema: string, + tableName: string, + record: { + messageId: string; + userEmail: string; + userName: string; + chatId: string; + chatLink: string; + requestMessage: string; + createdAt: Date; + durationSeconds: number; + confidenceScore: string; + assumptions: unknown[]; + } + ): Promise; + + /** + * Optional: Execute a write operation (INSERT, UPDATE, DELETE) + */ + executeWrite?( + sql: string, + params?: QueryParameter[], + timeout?: number + ): Promise<{ rowCount: number }>; } /** @@ -99,32 +129,14 @@ export abstract class BaseAdapter implements DatabaseAdapter { abstract getDataSourceType(): string; abstract introspect(): DataSourceIntrospector; - /** - * Optional: Check if a logs table exists (for writeback functionality) - */ - async tableExists?(database: string, schema: string, tableName: string): Promise { - throw new Error('Logs writeback not implemented for this adapter'); - } - - /** - * Optional: Create the Buster logs table (for writeback functionality) - */ - async createLogsTable?( - database: string, - schema: string, - tableName?: string - ): Promise { - throw new Error('Logs writeback not implemented for this adapter'); - } - /** * Optional: Insert a log record into the table (for writeback functionality) */ async insertLogRecord?( - database: string, - schema: string, - tableName: string, - record: { + _database: string, + _schema: string, + _tableName: string, + _record: { messageId: string; userEmail: string; userName: string; @@ -144,9 +156,9 @@ export abstract class BaseAdapter implements DatabaseAdapter { * Optional: Execute a write operation (INSERT, UPDATE, DELETE) */ async executeWrite?( - sql: string, - params?: QueryParameter[], - timeout?: number + _sql: string, + _params?: QueryParameter[], + _timeout?: number ): Promise<{ rowCount: number }> { throw new Error('Write operations not implemented for this adapter'); } diff --git a/packages/data-source/src/adapters/bigquery.ts b/packages/data-source/src/adapters/bigquery.ts index d974566d3..d41f06559 100644 --- a/packages/data-source/src/adapters/bigquery.ts +++ b/packages/data-source/src/adapters/bigquery.ts @@ -243,7 +243,11 @@ export class BigQueryAdapter extends BaseAdapter { /** * Check if a table exists in BigQuery */ - override async tableExists(database: string, schema: string, tableName: string): Promise { + async tableExists( + _database: string, + schema: string, + tableName: string + ): Promise { this.ensureConnected(); if (!this.client) { @@ -264,10 +268,10 @@ export class BigQueryAdapter extends BaseAdapter { /** * Create the Buster logs table in BigQuery */ - override async createLogsTable( - database: string, + async createLogsTable( + _database: string, schema: string, - tableName: string = 'buster_query_logs' + tableName = 'buster_query_logs' ): Promise { this.ensureConnected(); @@ -306,7 +310,7 @@ export class BigQueryAdapter extends BaseAdapter { * Insert a log record into the BigQuery table */ override async insertLogRecord( - database: string, + _database: string, schema: string, tableName: string, record: { @@ -396,7 +400,11 @@ export class BigQueryAdapter extends BaseAdapter { } try { - const options: any = { + const options: { + query: string; + timeoutMs: number; + params?: unknown[]; + } = { query: sql, timeoutMs: timeout || 60000, }; diff --git a/packages/data-source/src/adapters/mysql.ts b/packages/data-source/src/adapters/mysql.ts index 3dc7b525e..ca0109165 100644 --- a/packages/data-source/src/adapters/mysql.ts +++ b/packages/data-source/src/adapters/mysql.ts @@ -180,7 +180,11 @@ export class MySQLAdapter extends BaseAdapter { /** * Check if a table exists in MySQL */ - override async tableExists(database: string, schema: string, tableName: string): Promise { + async tableExists( + database: string, + _schema: string, + tableName: string + ): Promise { this.ensureConnected(); if (!this.connection) { @@ -196,7 +200,7 @@ export class MySQLAdapter extends BaseAdapter { `; const [rows] = await this.connection.execute(sql, [database, tableName]); - const firstRow = (rows as any[])[0] as { count?: number } | undefined; + const firstRow = (rows as Array<{ count?: number }>)[0] as { count?: number } | undefined; return !!firstRow && (firstRow.count ?? 0) > 0; } catch (error) { console.error('Error checking table existence:', error); @@ -207,10 +211,10 @@ export class MySQLAdapter extends BaseAdapter { /** * Create the Buster logs table in MySQL */ - override async createLogsTable( + async createLogsTable( database: string, - schema: string, - tableName: string = 'buster_query_logs' + _schema: string, + tableName = 'buster_query_logs' ): Promise { this.ensureConnected(); @@ -249,7 +253,7 @@ export class MySQLAdapter extends BaseAdapter { */ override async insertLogRecord( database: string, - schema: string, + _schema: string, tableName: string, record: { messageId: string; diff --git a/packages/data-source/src/adapters/postgresql.ts b/packages/data-source/src/adapters/postgresql.ts index e4fda75c9..2a3e6fcb9 100644 --- a/packages/data-source/src/adapters/postgresql.ts +++ b/packages/data-source/src/adapters/postgresql.ts @@ -235,79 +235,13 @@ export class PostgreSQLAdapter extends BaseAdapter { return this.introspector; } - /** - * Check if a table exists in PostgreSQL - */ - override async tableExists(database: string, schema: string, tableName: string): Promise { - this.ensureConnected(); - if (!this.client) { - throw new Error('PostgreSQL client not initialized'); - } - - try { - const sql = ` - SELECT COUNT(*) as count - FROM information_schema.tables - WHERE table_catalog = $1 - AND table_schema = $2 - AND table_name = $3 - `; - - const result = await this.client.query(sql, [database, schema.toLowerCase(), tableName.toLowerCase()]); - const firstRow = result.rows[0] as { count?: string } | undefined; - return !!firstRow && parseInt(firstRow.count ?? '0') > 0; - } catch (error) { - console.error('Error checking table existence:', error); - return false; - } - } - - /** - * Create the Buster logs table in PostgreSQL - */ - override async createLogsTable( - database: string, - schema: string, - tableName: string = 'buster_query_logs' - ): Promise { - this.ensureConnected(); - - if (!this.client) { - throw new Error('PostgreSQL client not initialized'); - } - - const createTableSQL = ` - CREATE TABLE IF NOT EXISTS "${schema}"."${tableName}" ( - message_id VARCHAR(255), - user_email VARCHAR(500), - user_name VARCHAR(500), - chat_id VARCHAR(255), - chat_link VARCHAR(500), - request_message TEXT, - created_at TIMESTAMPTZ, - duration_seconds INTEGER, - confidence_score VARCHAR(50), - assumptions JSONB, - inserted_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP - ) - `; - - try { - await this.client.query(createTableSQL); - console.info(`Table ${schema}.${tableName} created successfully`); - } catch (error) { - throw new Error( - `Failed to create logs table: ${error instanceof Error ? error.message : 'Unknown error'}` - ); - } - } /** * Insert a log record into the PostgreSQL table */ override async insertLogRecord( - database: string, + _database: string, schema: string, tableName: string, record: { diff --git a/packages/data-source/src/adapters/redshift.ts b/packages/data-source/src/adapters/redshift.ts index d82396c32..743e6dde3 100644 --- a/packages/data-source/src/adapters/redshift.ts +++ b/packages/data-source/src/adapters/redshift.ts @@ -229,7 +229,11 @@ export class RedshiftAdapter extends BaseAdapter { /** * Check if a table exists in Redshift */ - override async tableExists(database: string, schema: string, tableName: string): Promise { + async tableExists( + database: string, + schema: string, + tableName: string + ): Promise { this.ensureConnected(); if (!this.client) { @@ -245,9 +249,13 @@ export class RedshiftAdapter extends BaseAdapter { AND table_name = $3 `; - const result = await this.client.query(sql, [database, schema.toLowerCase(), tableName.toLowerCase()]); + const result = await this.client.query(sql, [ + database, + schema.toLowerCase(), + tableName.toLowerCase(), + ]); const firstRow = result.rows[0] as { count?: string } | undefined; - return !!firstRow && parseInt(firstRow.count ?? '0') > 0; + return !!firstRow && Number.parseInt(firstRow.count ?? '0') > 0; } catch (error) { console.error('Error checking table existence:', error); return false; @@ -257,10 +265,10 @@ export class RedshiftAdapter extends BaseAdapter { /** * Create the Buster logs table in Redshift */ - override async createLogsTable( - database: string, + async createLogsTable( + _database: string, schema: string, - tableName: string = 'buster_query_logs' + tableName = 'buster_query_logs' ): Promise { this.ensureConnected(); @@ -298,7 +306,7 @@ export class RedshiftAdapter extends BaseAdapter { * Insert a log record into the Redshift table */ override async insertLogRecord( - database: string, + _database: string, schema: string, tableName: string, record: { diff --git a/packages/data-source/src/adapters/snowflake.ts b/packages/data-source/src/adapters/snowflake.ts index 95ca3663f..c5e778b73 100644 --- a/packages/data-source/src/adapters/snowflake.ts +++ b/packages/data-source/src/adapters/snowflake.ts @@ -363,70 +363,7 @@ export class SnowflakeAdapter extends BaseAdapter { }; } - /** - * Check if a table exists in Snowflake - */ - override async tableExists(database: string, schema: string, tableName: string): Promise { - this.ensureConnected(); - if (!this.connection) { - throw new Error('Snowflake connection not initialized'); - } - - try { - const sql = ` - SELECT COUNT(*) as count - FROM "${database}".INFORMATION_SCHEMA.TABLES - WHERE TABLE_SCHEMA = '${schema.toUpperCase()}' - AND TABLE_NAME = '${tableName.toUpperCase()}' - `; - - const result = await this.query(sql); - const firstRow = result.rows[0] as { count?: number } | undefined; - return !!firstRow && (firstRow.count ?? 0) > 0; - } catch (error) { - console.error('Error checking table existence:', error); - return false; - } - } - - /** - * Create the Buster logs table in Snowflake - */ - override async createLogsTable( - database: string, - schema: string, - tableName: string = 'BUSTER_QUERY_LOGS' - ): Promise { - this.ensureConnected(); - - if (!this.connection) { - throw new Error('Snowflake connection not initialized'); - } - - const createTableSQL = ` - CREATE TABLE IF NOT EXISTS "${database}"."${schema}"."${tableName}" ( - message_id VARCHAR(255), - user_email VARCHAR(500), - user_name VARCHAR(500), - chat_id VARCHAR(255), - chat_link VARCHAR(500), - request_message TEXT, - created_at TIMESTAMP_TZ, - duration_seconds NUMBER, - confidence_score VARCHAR(50), - assumptions VARIANT, - inserted_at TIMESTAMP_TZ DEFAULT CURRENT_TIMESTAMP() - ) - `; - - try { - await this.query(createTableSQL); - console.info(`Table ${database}.${schema}.${tableName} created successfully`); - } catch (error) { - throw classifyError(error); - } - } /** * Insert a log record into the Snowflake table @@ -458,7 +395,7 @@ export class SnowflakeAdapter extends BaseAdapter { const assumptionsJson = JSON.stringify(record.assumptions); const insertSQL = ` - INSERT INTO "${database}"."${schema}"."${tableName}" ( + INSERT INTO ${database}.${schema}.${tableName} ( message_id, user_email, user_name, @@ -523,7 +460,11 @@ export class SnowflakeAdapter extends BaseAdapter { sqlText: sql, binds: params as snowflake.Binds, streamResult: false, // Don't stream for write operations - complete: (err: SnowflakeError | undefined, stmt: SnowflakeStatement, rows?: any[]) => { + complete: ( + err: SnowflakeError | undefined, + _stmt: SnowflakeStatement, + rows?: unknown[] + ) => { if (err) { reject(new Error(`Snowflake write operation failed: ${err.message}`)); return; diff --git a/packages/data-source/src/adapters/sqlserver.ts b/packages/data-source/src/adapters/sqlserver.ts index dec043e61..45e56fdd0 100644 --- a/packages/data-source/src/adapters/sqlserver.ts +++ b/packages/data-source/src/adapters/sqlserver.ts @@ -254,7 +254,11 @@ export class SQLServerAdapter extends BaseAdapter { /** * Check if a table exists in SQL Server */ - override async tableExists(database: string, schema: string, tableName: string): Promise { + async tableExists( + database: string, + schema: string, + tableName: string + ): Promise { this.ensureConnected(); if (!this.pool) { @@ -286,10 +290,10 @@ export class SQLServerAdapter extends BaseAdapter { /** * Create the Buster logs table in SQL Server */ - override async createLogsTable( - database: string, + async createLogsTable( + _database: string, schema: string, - tableName: string = 'buster_query_logs' + tableName = 'buster_query_logs' ): Promise { this.ensureConnected(); @@ -333,7 +337,7 @@ export class SQLServerAdapter extends BaseAdapter { * Insert a log record into the SQL Server table */ override async insertLogRecord( - database: string, + _database: string, schema: string, tableName: string, record: { @@ -424,7 +428,7 @@ export class SQLServerAdapter extends BaseAdapter { // Set query timeout if specified (default: 60 seconds) const timeoutMs = timeout || 60000; // SQL Server uses requestTimeout property on the request config - (request as any).timeout = timeoutMs; + (request as unknown as Record).timeout = timeoutMs; // Add parameters if provided if (params && params.length > 0) { diff --git a/packages/database/drizzle/meta/_journal.json b/packages/database/drizzle/meta/_journal.json index 011c311ae..b0e2d764b 100644 --- a/packages/database/drizzle/meta/_journal.json +++ b/packages/database/drizzle/meta/_journal.json @@ -778,6 +778,13 @@ "when": 1759167570252, "tag": "0111_happy_peter_quill", "breakpoints": true + }, + { + "idx": 112, + "version": "7", + "when": 1759181434094, + "tag": "0112_write-back-logs-config", + "breakpoints": true } ] } \ No newline at end of file diff --git a/packages/database/src/queries/logs-writeback/upsert-logs-writeback-config.ts b/packages/database/src/queries/logs-writeback/upsert-logs-writeback-config.ts index f4e1b63eb..56344ff27 100644 --- a/packages/database/src/queries/logs-writeback/upsert-logs-writeback-config.ts +++ b/packages/database/src/queries/logs-writeback/upsert-logs-writeback-config.ts @@ -65,36 +65,35 @@ export async function upsertLogsWriteBackConfig( updatedAt: updated.updatedAt, deletedAt: updated.deletedAt, }; - } else { - // Create new config - const createdRows = await db - .insert(logsWriteBackConfigs) - .values({ - organizationId: params.organizationId, - dataSourceId: params.dataSourceId, - database: params.database, - schema: params.schema, - tableName: params.tableName || 'buster_query_logs', - createdAt: now, - updatedAt: now, - }) - .returning(); - - const created = createdRows[0]; - if (!created) { - throw new Error('Failed to create logs writeback configuration'); - } - - return { - id: created.id, - organizationId: created.organizationId, - dataSourceId: created.dataSourceId, - database: created.database, - schema: created.schema, - tableName: created.tableName, - createdAt: created.createdAt, - updatedAt: created.updatedAt, - deletedAt: created.deletedAt, - }; } + // Create new config + const createdRows = await db + .insert(logsWriteBackConfigs) + .values({ + organizationId: params.organizationId, + dataSourceId: params.dataSourceId, + database: params.database, + schema: params.schema, + tableName: params.tableName || 'buster_query_logs', + createdAt: now, + updatedAt: now, + }) + .returning(); + + const created = createdRows[0]; + if (!created) { + throw new Error('Failed to create logs writeback configuration'); + } + + return { + id: created.id, + organizationId: created.organizationId, + dataSourceId: created.dataSourceId, + database: created.database, + schema: created.schema, + tableName: created.tableName, + createdAt: created.createdAt, + updatedAt: created.updatedAt, + deletedAt: created.deletedAt, + }; } diff --git a/packages/database/turbo.json b/packages/database/turbo.json index cbfb93010..be649b4aa 100644 --- a/packages/database/turbo.json +++ b/packages/database/turbo.json @@ -1,24 +1,15 @@ { "$schema": "https://turborepo.com/schema.json", - "extends": [ - "//" - ], + "extends": ["//"], "tasks": { "build": { - "dependsOn": [ - "^build" - ], - "outputs": [ - "dist/**" - ] + "dependsOn": ["^build"], + "outputs": ["dist/**"] }, "db:init": { "cache": false, "persistent": false, - "dependsOn": [ - "db:seed", - "@buster-app/supabase#start" - ] + "dependsOn": ["db:seed", "@buster-app/supabase#start"] }, "db:migrate": { "cache": false, @@ -27,10 +18,7 @@ "db:seed": { "cache": false, "persistent": false, - "dependsOn": [ - "db:migrate", - "@buster-app/supabase#start" - ] + "dependsOn": ["db:migrate", "@buster-app/supabase#start"] }, "db:dump": { "cache": false, @@ -53,4 +41,4 @@ "persistent": true } } -} \ No newline at end of file +} diff --git a/packages/server-shared/src/datasets/schemas.ts b/packages/server-shared/src/datasets/schemas.ts index 68b9e9746..b0ec7e14a 100644 --- a/packages/server-shared/src/datasets/schemas.ts +++ b/packages/server-shared/src/datasets/schemas.ts @@ -1,5 +1,4 @@ import { z } from 'zod'; -import { LogsConfigSchema } from '../deploy/schemas'; // ============================================================================ // Model Schemas - Define the structure of semantic layer models @@ -152,6 +151,17 @@ export const MultiModelSchema = z.object({ // Configuration Schemas - Define buster.yml structure // ============================================================================ +// Schema for logs configuration in buster.yml +export const LogsConfigSchema = z.object({ + data_source: z + .string() + .optional() + .describe('Data source to use for logs writeback (defaults to first available)'), + database: z.string().describe('Database name for logs'), + schema: z.string().describe('Schema name for logs'), + table_name: z.string().optional().describe('Table name for logs (defaults to buster_query_logs)'), +}); + export const ProjectContextSchema = z.object({ name: z.string(), data_source: z.string(), diff --git a/packages/server-shared/src/deploy/schemas.ts b/packages/server-shared/src/deploy/schemas.ts index a4256a4fd..8adb38983 100644 --- a/packages/server-shared/src/deploy/schemas.ts +++ b/packages/server-shared/src/deploy/schemas.ts @@ -1,13 +1,13 @@ import { z } from 'zod'; -// Import DeployModelSchema from datasets to avoid duplication -import { DeployModelSchema } from '../datasets/schemas'; +// Import DeployModelSchema and LogsConfigSchema from datasets to avoid duplication +import { DeployModelSchema, LogsConfigSchema } from '../datasets/schemas'; // ============================================================================ // Unified Deploy Request/Response Schemas // ============================================================================ -// Re-export the model schema from datasets -export { DeployModelSchema }; +// Re-export the schemas from datasets +export { DeployModelSchema, LogsConfigSchema }; // Schema for deploying docs (markdown files) export const DeployDocSchema = z.object({ @@ -16,20 +16,17 @@ export const DeployDocSchema = z.object({ type: z.enum(['analyst', 'normal']).default('normal'), }); -// Schema for logs configuration in buster.yml -export const LogsConfigSchema = z.object({ - database: z.string().describe('Database name for logs'), - schema: z.string().describe('Schema name for logs'), - table_name: z.string().optional().describe('Table name for logs (defaults to BUSTER_QUERY_LOGS)'), -}); - // Schema for logs writeback configuration (API format) -export const LogsWritebackConfigSchema = z.object({ - enabled: z.boolean().describe('Whether logs writeback is enabled'), - database: z.string().describe('Snowflake database name'), - schema: z.string().describe('Snowflake schema name'), - tableName: z.string().default('BUSTER_QUERY_LOGS').describe('Table name for logs'), -}).nullable().describe('Configuration for writing logs back to Snowflake'); +export const LogsWritebackConfigSchema = z + .object({ + enabled: z.boolean().describe('Whether logs writeback is enabled'), + dataSource: z.string().optional().describe('Data source name to use for logs writeback'), + database: z.string().describe('Database name'), + schema: z.string().describe('Schema name'), + tableName: z.string().default('buster_query_logs').describe('Table name for logs'), + }) + .nullable() + .describe('Configuration for writing logs back to data warehouse'); // Unified deploy request that handles both models and docs export const UnifiedDeployRequestSchema = z.object({ @@ -91,7 +88,6 @@ export const DocDeployResultSchema = z.object({ // Schema for logs writeback deployment result export const LogsWritebackResultSchema = z.object({ configured: z.boolean().describe('Whether logs writeback was configured'), - tableCreated: z.boolean().optional().describe('Whether the table was created'), database: z.string().optional().describe('Database name'), schema: z.string().optional().describe('Schema name'), tableName: z.string().optional().describe('Table name'), diff --git a/packages/server-shared/src/logs-writeback/index.ts b/packages/server-shared/src/logs-writeback/index.ts index 498ee8a2d..fd96451e7 100644 --- a/packages/server-shared/src/logs-writeback/index.ts +++ b/packages/server-shared/src/logs-writeback/index.ts @@ -1,2 +1,2 @@ // Export all logs writeback schemas and types -export * from './schemas'; \ No newline at end of file +export * from './schemas'; diff --git a/packages/server-shared/src/logs-writeback/schemas.ts b/packages/server-shared/src/logs-writeback/schemas.ts index 731ebcccf..2017ec432 100644 --- a/packages/server-shared/src/logs-writeback/schemas.ts +++ b/packages/server-shared/src/logs-writeback/schemas.ts @@ -57,4 +57,4 @@ export const LogsWritebackTaskOutputSchema = z.object({ export type LogAssumption = z.infer; export type LogsWritebackRecord = z.infer; export type LogsWritebackTaskInput = z.infer; -export type LogsWritebackTaskOutput = z.infer; \ No newline at end of file +export type LogsWritebackTaskOutput = z.infer;