logs-write-back

This commit is contained in:
dal 2025-09-30 10:40:58 -06:00
parent c30a04b1d1
commit 2bf285ab2f
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
11 changed files with 220 additions and 48 deletions

View File

@ -1,4 +1,5 @@
import { relative, resolve } from 'node:path';
import type { deploy } from '@buster/server-shared';
import { getConfigBaseDir, loadBusterConfig, resolveConfiguration } from './config/config-loader';
import {
formatDeploymentSummary,
@ -27,6 +28,8 @@ import {
import type { CLIDeploymentResult, DeployOptions, Model, ProjectContext } from './schemas';
import { createDeploymentValidationError, isDeploymentValidationError } from './utils/errors';
type LogsConfig = deploy.LogsConfig;
/**
* Main deploy handler that orchestrates the entire deployment pipeline
* using functional composition
@ -94,7 +97,7 @@ async function processProject(
configBaseDir: string,
deploy: DeployFunction,
options: DeployOptions,
logsConfig?: any
logsConfig?: LogsConfig
): Promise<CLIDeploymentResult> {
console.info(`\nProcessing ${project.name} project...`);

View File

@ -1,4 +1,4 @@
import { createAdapter } from '@buster/data-source';
import { type Credentials, createAdapter, toCredentials } from '@buster/data-source';
import { db } from '@buster/database/connection';
import {
deleteLogsWriteBackConfig,
@ -209,11 +209,11 @@ async function handleLogsWritebackConfig(
// This handles the case where logs section is removed from buster.yml
if (!config || !config.enabled) {
const deleted = await deleteLogsWriteBackConfig(organizationId);
if (deleted) {
console.info('Logs writeback configuration removed (soft deleted)');
}
return {
configured: false,
error: deleted ? undefined : 'No existing configuration to remove',
@ -221,16 +221,12 @@ async function handleLogsWritebackConfig(
}
// Get the appropriate data source for logs writeback
let dataSource;
let dataSource: Awaited<ReturnType<typeof getDataSourceByName>> | undefined;
if (config.dataSource) {
// Use the specified data source
dataSource = await getDataSourceByName(
tx,
config.dataSource,
organizationId
);
dataSource = await getDataSourceByName(tx, config.dataSource, organizationId);
if (!dataSource) {
return {
configured: false,
@ -243,12 +239,7 @@ async function handleLogsWritebackConfig(
const [firstDataSource] = await tx
.select()
.from(dataSources)
.where(
and(
eq(dataSources.organizationId, organizationId),
isNull(dataSources.deletedAt)
)
)
.where(and(eq(dataSources.organizationId, organizationId), isNull(dataSources.deletedAt)))
.orderBy(dataSources.type) // This will prioritize alphabetically, so BigQuery, MySQL, PostgreSQL, Redshift, Snowflake, SQLServer
.limit(1);
@ -287,10 +278,24 @@ async function handleLogsWritebackConfig(
}
// Verify adapter supports logs writeback
const adapter = await createAdapter(credentials as any);
// Safely validate and convert credentials
let validatedCredentials: Credentials;
try {
validatedCredentials = toCredentials(credentials);
} catch (error) {
return {
configured: true,
database: config.database,
schema: config.schema,
tableName: config.tableName || 'buster_query_logs',
error: `Invalid credentials: ${error instanceof Error ? error.message : 'Unknown error'}`,
};
}
const adapter = await createAdapter(validatedCredentials);
try {
await adapter.initialize(credentials as any);
await adapter.initialize(validatedCredentials);
// Just verify the adapter supports insertLogRecord
if (!adapter.insertLogRecord) {

View File

@ -1,4 +1,4 @@
import { createAdapter } from '@buster/data-source';
import { type Credentials, createAdapter, toCredentials } from '@buster/data-source';
import { getDb } from '@buster/database/connection';
import { getDataSourceCredentials, getLogsWriteBackConfig } from '@buster/database/queries';
import { chats, dataSources, messages, users } from '@buster/database/schema';
@ -95,7 +95,11 @@ export const logsWriteBackTask: ReturnType<
const durationSeconds = Math.floor((updatedAt.getTime() - createdAt.getTime()) / 1000);
// Extract post-processing data
const postProcessing = messageData.postProcessingMessage as any;
// Type assertion for JSONB field which can be null or contain structured data
const postProcessing = messageData.postProcessingMessage as {
confidence_score?: string;
assumptions?: string[];
} | null;
const confidenceScore = postProcessing?.confidence_score || 'unknown';
const assumptions = postProcessing?.assumptions || [];
@ -155,10 +159,29 @@ export const logsWriteBackTask: ReturnType<
}
// Create adapter and write the log
const adapter = await createAdapter(credentials as any);
// Safely validate and convert credentials
let validatedCredentials: Credentials;
try {
validatedCredentials = toCredentials(credentials);
} catch (error) {
logger.error('Invalid credentials format', {
messageId: payload.messageId,
error: error instanceof Error ? error.message : 'Unknown error',
});
return {
success: false,
messageId: payload.messageId,
error: {
code: 'INVALID_CREDENTIALS',
message: error instanceof Error ? error.message : 'Invalid credentials format',
},
};
}
const adapter = await createAdapter(validatedCredentials);
try {
await adapter.initialize(credentials as any);
await adapter.initialize(validatedCredentials);
// Check if adapter supports log insertion
if (!adapter.insertLogRecord) {

View File

@ -243,11 +243,7 @@ export class BigQueryAdapter extends BaseAdapter {
/**
* Check if a table exists in BigQuery
*/
async tableExists(
_database: string,
schema: string,
tableName: string
): Promise<boolean> {
async tableExists(_database: string, schema: string, tableName: string): Promise<boolean> {
this.ensureConnected();
if (!this.client) {

View File

@ -180,11 +180,7 @@ export class MySQLAdapter extends BaseAdapter {
/**
* Check if a table exists in MySQL
*/
async tableExists(
database: string,
_schema: string,
tableName: string
): Promise<boolean> {
async tableExists(database: string, _schema: string, tableName: string): Promise<boolean> {
this.ensureConnected();
if (!this.connection) {

View File

@ -235,8 +235,6 @@ export class PostgreSQLAdapter extends BaseAdapter {
return this.introspector;
}
/**
* Insert a log record into the PostgreSQL table
*/

View File

@ -229,11 +229,7 @@ export class RedshiftAdapter extends BaseAdapter {
/**
* Check if a table exists in Redshift
*/
async tableExists(
database: string,
schema: string,
tableName: string
): Promise<boolean> {
async tableExists(database: string, schema: string, tableName: string): Promise<boolean> {
this.ensureConnected();
if (!this.client) {

View File

@ -363,8 +363,6 @@ export class SnowflakeAdapter extends BaseAdapter {
};
}
/**
* Insert a log record into the Snowflake table
*/

View File

@ -254,11 +254,7 @@ export class SQLServerAdapter extends BaseAdapter {
/**
* Check if a table exists in SQL Server
*/
async tableExists(
database: string,
schema: string,
tableName: string
): Promise<boolean> {
async tableExists(database: string, schema: string, tableName: string): Promise<boolean> {
this.ensureConnected();
if (!this.pool) {

View File

@ -98,6 +98,9 @@ export type {
export { checkQueryIsReadOnly } from './utils/sql-validation';
export type { QueryTypeCheckResult } from './utils/sql-validation';
// Credentials validation utilities
export { isValidCredentials, toCredentials } from './utils/validate-credentials';
// R2 cache utilities for metric data
export {
checkCacheExists,

View File

@ -0,0 +1,158 @@
import { type Credentials, DataSourceType } from '../types/credentials';
/**
* Type guard to validate if an unknown object is valid Credentials
* This provides runtime type safety when converting from Record<string, unknown>
* to the Credentials union type
*/
export function isValidCredentials(obj: unknown): obj is Credentials {
if (!obj || typeof obj !== 'object') {
return false;
}
const record = obj as Record<string, unknown>;
// Check if type field exists and is valid
if (!record.type || typeof record.type !== 'string') {
return false;
}
// Validate based on the type
switch (record.type) {
case DataSourceType.Snowflake:
return validateSnowflakeCredentials(record);
case DataSourceType.BigQuery:
return validateBigQueryCredentials(record);
case DataSourceType.PostgreSQL:
return validatePostgreSQLCredentials(record);
case DataSourceType.MySQL:
return validateMySQLCredentials(record);
case DataSourceType.SQLServer:
return validateSQLServerCredentials(record);
case DataSourceType.Redshift:
return validateRedshiftCredentials(record);
default:
return false;
}
}
function validateSnowflakeCredentials(obj: Record<string, unknown>): boolean {
return !!(
obj.account_id &&
typeof obj.account_id === 'string' &&
obj.warehouse_id &&
typeof obj.warehouse_id === 'string' &&
obj.username &&
typeof obj.username === 'string' &&
obj.password &&
typeof obj.password === 'string' &&
obj.default_database &&
typeof obj.default_database === 'string'
);
}
function validateBigQueryCredentials(obj: Record<string, unknown>): boolean {
return !!(
obj.project_id &&
typeof obj.project_id === 'string' &&
(obj.service_account_key || obj.key_file_path)
);
}
function validatePostgreSQLCredentials(obj: Record<string, unknown>): boolean {
return !!(
obj.host &&
typeof obj.host === 'string' &&
obj.default_database &&
typeof obj.default_database === 'string' &&
obj.username &&
typeof obj.username === 'string' &&
obj.password &&
typeof obj.password === 'string'
);
}
function validateMySQLCredentials(obj: Record<string, unknown>): boolean {
return !!(
obj.host &&
typeof obj.host === 'string' &&
obj.default_database &&
typeof obj.default_database === 'string' &&
obj.username &&
typeof obj.username === 'string' &&
obj.password &&
typeof obj.password === 'string'
);
}
function validateSQLServerCredentials(obj: Record<string, unknown>): boolean {
return !!(
obj.server &&
typeof obj.server === 'string' &&
obj.default_database &&
typeof obj.default_database === 'string' &&
obj.username &&
typeof obj.username === 'string' &&
obj.password &&
typeof obj.password === 'string'
);
}
function validateRedshiftCredentials(obj: Record<string, unknown>): boolean {
return !!(
obj.host &&
typeof obj.host === 'string' &&
obj.default_database &&
typeof obj.default_database === 'string' &&
obj.username &&
typeof obj.username === 'string' &&
obj.password &&
typeof obj.password === 'string'
);
}
/**
* Safely converts a Record<string, unknown> to Credentials with validation
* Throws a descriptive error if validation fails
*/
export function toCredentials(obj: Record<string, unknown>): Credentials {
if (isValidCredentials(obj)) {
return obj;
}
// Provide helpful error message about what's missing
const type = obj.type as string | undefined;
if (!type) {
throw new Error('Credentials missing required "type" field');
}
// Type-specific error messages
switch (type) {
case DataSourceType.Snowflake:
throw new Error(
'Invalid Snowflake credentials: missing required fields (account_id, warehouse_id, username, password, default_database)'
);
case DataSourceType.BigQuery:
throw new Error(
'Invalid BigQuery credentials: missing required fields (project_id and either service_account_key or key_file_path)'
);
case DataSourceType.PostgreSQL:
throw new Error(
'Invalid PostgreSQL credentials: missing required fields (host, default_database, username, password)'
);
case DataSourceType.MySQL:
throw new Error(
'Invalid MySQL credentials: missing required fields (host, default_database, username, password)'
);
case DataSourceType.SQLServer:
throw new Error(
'Invalid SQL Server credentials: missing required fields (server, default_database, username, password)'
);
case DataSourceType.Redshift:
throw new Error(
'Invalid Redshift credentials: missing required fields (host, default_database, username, password)'
);
default:
throw new Error(`Unsupported data source type: ${type}`);
}
}