mirror of https://github.com/buster-so/buster.git
Merge pull request #1008 from buster-so/dallin-bus-1844-angel-stored-values-not-synced
BUS-1844: Optimize sync-searchable-values performance
This commit is contained in:
commit
e66e3d729c
|
@ -68,6 +68,7 @@ describe('processSyncJob', () => {
|
|||
executeBulk: vi.fn(),
|
||||
getMetadata: vi.fn(),
|
||||
isConnected: vi.fn(),
|
||||
getDataSourceType: vi.fn().mockReturnValue('postgres'),
|
||||
};
|
||||
|
||||
beforeEach(() => {
|
||||
|
|
|
@ -172,84 +172,201 @@ export const processSyncJob: ReturnType<
|
|||
newCount: cacheResult.newValues.length,
|
||||
});
|
||||
|
||||
// Process embeddings in batches to avoid memory issues
|
||||
// Process embeddings in batches with concurrency to improve performance
|
||||
const EMBEDDING_BATCH_SIZE = 1000; // Process 1000 values at a time
|
||||
const EMBEDDING_CONCURRENCY = 5; // Process up to 5 batches concurrently
|
||||
const allValuesWithEmbeddings: SearchableValue[] = [];
|
||||
|
||||
// Create batches
|
||||
const embeddingBatches: SearchableValue[][] = [];
|
||||
for (let i = 0; i < newSearchableValues.length; i += EMBEDDING_BATCH_SIZE) {
|
||||
const batch = newSearchableValues.slice(i, i + EMBEDDING_BATCH_SIZE);
|
||||
const batchTexts = batch.map((v) => v.value);
|
||||
|
||||
logger.info('Processing embedding batch', {
|
||||
[identifierType]: identifier,
|
||||
batchStart: i,
|
||||
batchSize: batch.length,
|
||||
totalValues: newSearchableValues.length,
|
||||
});
|
||||
|
||||
// Generate embeddings for this batch
|
||||
const batchEmbeddings = await generateSearchableValueEmbeddings(batchTexts);
|
||||
|
||||
// Log embedding dimensions for debugging
|
||||
if (batchEmbeddings.length > 0 && batchEmbeddings[0]) {
|
||||
logger.info('Embedding dimensions check', {
|
||||
[identifierType]: identifier,
|
||||
firstEmbeddingLength: batchEmbeddings[0].length,
|
||||
expectedDimensions: 512,
|
||||
allEmbeddingLengths: batchEmbeddings.slice(0, 5).map((e) => e?.length), // Log first 5 for sample
|
||||
});
|
||||
}
|
||||
|
||||
// Combine values with embeddings for this batch
|
||||
const batchWithEmbeddings = batch.map((value, index) => ({
|
||||
...value,
|
||||
embedding: batchEmbeddings[index],
|
||||
synced_at: new Date().toISOString(),
|
||||
}));
|
||||
|
||||
allValuesWithEmbeddings.push(...batchWithEmbeddings);
|
||||
embeddingBatches.push(newSearchableValues.slice(i, i + EMBEDDING_BATCH_SIZE));
|
||||
}
|
||||
|
||||
// Step 8: Upsert to Turbopuffer in batches to manage memory
|
||||
logger.info('Processing embeddings with concurrency', {
|
||||
[identifierType]: identifier,
|
||||
totalBatches: embeddingBatches.length,
|
||||
batchSize: EMBEDDING_BATCH_SIZE,
|
||||
concurrency: EMBEDDING_CONCURRENCY,
|
||||
totalValues: newSearchableValues.length,
|
||||
});
|
||||
|
||||
// Process batches with controlled concurrency
|
||||
for (let i = 0; i < embeddingBatches.length; i += EMBEDDING_CONCURRENCY) {
|
||||
const concurrentBatches = embeddingBatches.slice(i, i + EMBEDDING_CONCURRENCY);
|
||||
|
||||
logger.info('Processing concurrent embedding batch group', {
|
||||
[identifierType]: identifier,
|
||||
groupStart: i,
|
||||
groupSize: concurrentBatches.length,
|
||||
totalGroups: Math.ceil(embeddingBatches.length / EMBEDDING_CONCURRENCY),
|
||||
});
|
||||
|
||||
// Process multiple batches concurrently
|
||||
const embeddingPromises = concurrentBatches.map(async (batch, batchIndex) => {
|
||||
const absoluteBatchIndex = i + batchIndex;
|
||||
const batchTexts = batch.map((v) => v.value);
|
||||
|
||||
logger.info('Starting embedding batch', {
|
||||
[identifierType]: identifier,
|
||||
batchIndex: absoluteBatchIndex,
|
||||
batchSize: batch.length,
|
||||
totalBatches: embeddingBatches.length,
|
||||
});
|
||||
|
||||
try {
|
||||
// Generate embeddings for this batch
|
||||
const batchEmbeddings = await generateSearchableValueEmbeddings(batchTexts);
|
||||
|
||||
// Log embedding dimensions for first batch only
|
||||
if (absoluteBatchIndex === 0 && batchEmbeddings.length > 0 && batchEmbeddings[0]) {
|
||||
logger.info('Embedding dimensions check', {
|
||||
[identifierType]: identifier,
|
||||
firstEmbeddingLength: batchEmbeddings[0].length,
|
||||
expectedDimensions: 512,
|
||||
allEmbeddingLengths: batchEmbeddings.slice(0, 5).map((e) => e?.length),
|
||||
});
|
||||
}
|
||||
|
||||
// Combine values with embeddings for this batch
|
||||
const batchWithEmbeddings = batch.map((value, index) => ({
|
||||
...value,
|
||||
embedding: batchEmbeddings[index],
|
||||
synced_at: new Date().toISOString(),
|
||||
}));
|
||||
|
||||
logger.info('Completed embedding batch', {
|
||||
[identifierType]: identifier,
|
||||
batchIndex: absoluteBatchIndex,
|
||||
processedCount: batchWithEmbeddings.length,
|
||||
});
|
||||
|
||||
return batchWithEmbeddings;
|
||||
} catch (error) {
|
||||
logger.error('Failed to process embedding batch', {
|
||||
[identifierType]: identifier,
|
||||
batchIndex: absoluteBatchIndex,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for all concurrent batches to complete
|
||||
const results = await Promise.all(embeddingPromises);
|
||||
|
||||
// Flatten and add to results
|
||||
for (const batchResult of results) {
|
||||
allValuesWithEmbeddings.push(...batchResult);
|
||||
}
|
||||
|
||||
logger.info('Completed concurrent embedding batch group', {
|
||||
[identifierType]: identifier,
|
||||
groupStart: i,
|
||||
processedCount: results.reduce((sum, r) => sum + r.length, 0),
|
||||
totalProcessed: allValuesWithEmbeddings.length,
|
||||
});
|
||||
}
|
||||
|
||||
// Step 8: Upsert to Turbopuffer in batches with concurrency
|
||||
logger.info('Upserting values to Turbopuffer', {
|
||||
[identifierType]: identifier,
|
||||
count: allValuesWithEmbeddings.length,
|
||||
});
|
||||
|
||||
// Process upserts in batches to avoid memory and API limits
|
||||
// Process upserts in batches with concurrency to improve performance
|
||||
const UPSERT_BATCH_SIZE = 500; // Upsert 500 values at a time
|
||||
const UPSERT_CONCURRENCY = 3; // Process up to 3 upserts concurrently (lower than embeddings to avoid overwhelming Turbopuffer)
|
||||
let totalUpserted = 0;
|
||||
const upsertErrors: string[] = [];
|
||||
|
||||
// Create upsert batches
|
||||
const upsertBatches: SearchableValue[][] = [];
|
||||
for (let i = 0; i < allValuesWithEmbeddings.length; i += UPSERT_BATCH_SIZE) {
|
||||
const batch = allValuesWithEmbeddings.slice(i, i + UPSERT_BATCH_SIZE);
|
||||
upsertBatches.push(allValuesWithEmbeddings.slice(i, i + UPSERT_BATCH_SIZE));
|
||||
}
|
||||
|
||||
logger.info('Processing upsert batch', {
|
||||
logger.info('Processing upserts with concurrency', {
|
||||
[identifierType]: identifier,
|
||||
totalBatches: upsertBatches.length,
|
||||
batchSize: UPSERT_BATCH_SIZE,
|
||||
concurrency: UPSERT_CONCURRENCY,
|
||||
totalValues: allValuesWithEmbeddings.length,
|
||||
});
|
||||
|
||||
// Process batches with controlled concurrency
|
||||
for (let i = 0; i < upsertBatches.length; i += UPSERT_CONCURRENCY) {
|
||||
const concurrentBatches = upsertBatches.slice(i, i + UPSERT_CONCURRENCY);
|
||||
|
||||
logger.info('Processing concurrent upsert batch group', {
|
||||
[identifierType]: identifier,
|
||||
batchStart: i,
|
||||
batchSize: batch.length,
|
||||
totalValues: allValuesWithEmbeddings.length,
|
||||
groupStart: i,
|
||||
groupSize: concurrentBatches.length,
|
||||
totalGroups: Math.ceil(upsertBatches.length / UPSERT_CONCURRENCY),
|
||||
});
|
||||
|
||||
const batchResult = await upsertSearchableValues({
|
||||
dataSourceId: payload.dataSourceId,
|
||||
values: batch,
|
||||
});
|
||||
// Process multiple upsert batches concurrently
|
||||
const upsertPromises = concurrentBatches.map(async (batch, batchIndex) => {
|
||||
const absoluteBatchIndex = i + batchIndex;
|
||||
|
||||
totalUpserted += batchResult.upserted;
|
||||
if (batchResult.errors && batchResult.errors.length > 0) {
|
||||
// Log and throw error for any upsert failures
|
||||
logger.error('Upsert batch failed', {
|
||||
logger.info('Starting upsert batch', {
|
||||
[identifierType]: identifier,
|
||||
batchStart: i,
|
||||
batchIndex: absoluteBatchIndex,
|
||||
batchSize: batch.length,
|
||||
errorsInBatch: batchResult.errors.length,
|
||||
errors: batchResult.errors,
|
||||
totalBatches: upsertBatches.length,
|
||||
});
|
||||
|
||||
throw new Error(
|
||||
`Failed to upsert ${batchResult.errors.length} values to Turbopuffer: ${batchResult.errors.slice(0, 3).join(', ')}${batchResult.errors.length > 3 ? '...' : ''}`
|
||||
);
|
||||
}
|
||||
try {
|
||||
const batchResult = await upsertSearchableValues({
|
||||
dataSourceId: payload.dataSourceId,
|
||||
values: batch,
|
||||
});
|
||||
|
||||
if (batchResult.errors && batchResult.errors.length > 0) {
|
||||
// Collect errors but don't throw immediately
|
||||
logger.error('Upsert batch had errors', {
|
||||
[identifierType]: identifier,
|
||||
batchIndex: absoluteBatchIndex,
|
||||
errorsInBatch: batchResult.errors.length,
|
||||
errors: batchResult.errors,
|
||||
});
|
||||
|
||||
upsertErrors.push(...batchResult.errors);
|
||||
}
|
||||
|
||||
logger.info('Completed upsert batch', {
|
||||
[identifierType]: identifier,
|
||||
batchIndex: absoluteBatchIndex,
|
||||
upserted: batchResult.upserted,
|
||||
});
|
||||
|
||||
return batchResult.upserted;
|
||||
} catch (error) {
|
||||
logger.error('Failed to process upsert batch', {
|
||||
[identifierType]: identifier,
|
||||
batchIndex: absoluteBatchIndex,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
// Wait for all concurrent upserts to complete
|
||||
const results = await Promise.all(upsertPromises);
|
||||
totalUpserted += results.reduce((sum, count) => sum + count, 0);
|
||||
|
||||
logger.info('Completed concurrent upsert batch group', {
|
||||
[identifierType]: identifier,
|
||||
groupStart: i,
|
||||
totalUpserted: totalUpserted,
|
||||
});
|
||||
}
|
||||
|
||||
// Check if there were any errors
|
||||
if (upsertErrors.length > 0) {
|
||||
throw new Error(
|
||||
`Failed to upsert ${upsertErrors.length} values to Turbopuffer: ${upsertErrors.slice(0, 3).join(', ')}${upsertErrors.length > 3 ? '...' : ''}`
|
||||
);
|
||||
}
|
||||
|
||||
logger.info('All upserts completed successfully', {
|
||||
|
@ -364,17 +481,50 @@ async function queryDistinctColumnValues({
|
|||
columnName: string;
|
||||
limit?: number; // Optional - when not provided, query all distinct values
|
||||
}): Promise<string[]> {
|
||||
// Get the data source type to determine proper identifier quoting
|
||||
const dataSourceType = adapter.getDataSourceType();
|
||||
|
||||
// Determine the appropriate quote character based on data source type
|
||||
let quoteChar = '';
|
||||
switch (dataSourceType) {
|
||||
case 'postgres':
|
||||
case 'redshift':
|
||||
quoteChar = '"';
|
||||
break;
|
||||
case 'mysql':
|
||||
case 'bigquery':
|
||||
quoteChar = '`';
|
||||
break;
|
||||
case 'sqlserver':
|
||||
// SQL Server uses square brackets, but we'll handle it differently
|
||||
break;
|
||||
case 'snowflake':
|
||||
// Snowflake doesn't need quotes unless identifiers have special chars or are case-sensitive
|
||||
// For safety, we'll omit quotes for Snowflake
|
||||
break;
|
||||
default:
|
||||
// Default to no quotes
|
||||
break;
|
||||
}
|
||||
|
||||
// Build the fully qualified table name
|
||||
const fullyQualifiedTable = `${databaseName}.${schemaName}.${tableName}`;
|
||||
|
||||
// Build the column reference with appropriate quoting
|
||||
let columnRef = columnName;
|
||||
if (dataSourceType === 'sqlserver') {
|
||||
columnRef = `[${columnName}]`;
|
||||
} else if (quoteChar) {
|
||||
columnRef = `${quoteChar}${columnName}${quoteChar}`;
|
||||
}
|
||||
|
||||
// Build the query to get distinct non-null values
|
||||
// Using parameterized identifiers for safety
|
||||
// Removed ORDER BY since we're joining with cached datasets
|
||||
const query = `
|
||||
SELECT DISTINCT "${columnName}" AS value
|
||||
SELECT DISTINCT ${columnRef} AS value
|
||||
FROM ${fullyQualifiedTable}
|
||||
WHERE "${columnName}" IS NOT NULL
|
||||
AND TRIM("${columnName}") != ''
|
||||
ORDER BY "${columnName}"${
|
||||
WHERE ${columnRef} IS NOT NULL
|
||||
AND TRIM(${columnRef}) != ''${
|
||||
limit
|
||||
? `
|
||||
LIMIT ${limit}`
|
||||
|
@ -385,6 +535,7 @@ async function queryDistinctColumnValues({
|
|||
logger.info('Executing distinct values query', {
|
||||
table: fullyQualifiedTable,
|
||||
column: columnName,
|
||||
dataSourceType,
|
||||
limit: limit || 'no limit',
|
||||
});
|
||||
|
||||
|
@ -410,6 +561,7 @@ async function queryDistinctColumnValues({
|
|||
logger.error('Failed to query distinct values', {
|
||||
table: fullyQualifiedTable,
|
||||
column: columnName,
|
||||
dataSourceType,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
throw new Error(
|
||||
|
|
Loading…
Reference in New Issue