fix: optimize sync-searchable-values performance with concurrent processing

- Add concurrent embedding generation (5 batches concurrently)
- Add concurrent Turbopuffer upserts (3 batches concurrently)
- Fix dynamic SQL identifier quoting for different databases (Snowflake, PostgreSQL, MySQL, etc.)
- Remove unnecessary ORDER BY clause when joining with cached datasets

These optimizations significantly reduce processing time for large datasets by processing
multiple batches in parallel instead of sequentially.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
dal 2025-09-19 11:55:59 -06:00
parent 7b01c8ca0c
commit ab62bd3f20
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 171 additions and 54 deletions

View File

@ -172,84 +172,201 @@ export const processSyncJob: ReturnType<
newCount: cacheResult.newValues.length,
});
// Process embeddings in batches to avoid memory issues
// Process embeddings in batches with concurrency to improve performance
const EMBEDDING_BATCH_SIZE = 1000; // Process 1000 values at a time
const EMBEDDING_CONCURRENCY = 5; // Process up to 5 batches concurrently
const allValuesWithEmbeddings: SearchableValue[] = [];
// Create batches
const embeddingBatches: SearchableValue[][] = [];
for (let i = 0; i < newSearchableValues.length; i += EMBEDDING_BATCH_SIZE) {
const batch = newSearchableValues.slice(i, i + EMBEDDING_BATCH_SIZE);
const batchTexts = batch.map((v) => v.value);
logger.info('Processing embedding batch', {
[identifierType]: identifier,
batchStart: i,
batchSize: batch.length,
totalValues: newSearchableValues.length,
});
// Generate embeddings for this batch
const batchEmbeddings = await generateSearchableValueEmbeddings(batchTexts);
// Log embedding dimensions for debugging
if (batchEmbeddings.length > 0 && batchEmbeddings[0]) {
logger.info('Embedding dimensions check', {
[identifierType]: identifier,
firstEmbeddingLength: batchEmbeddings[0].length,
expectedDimensions: 512,
allEmbeddingLengths: batchEmbeddings.slice(0, 5).map((e) => e?.length), // Log first 5 for sample
});
}
// Combine values with embeddings for this batch
const batchWithEmbeddings = batch.map((value, index) => ({
...value,
embedding: batchEmbeddings[index],
synced_at: new Date().toISOString(),
}));
allValuesWithEmbeddings.push(...batchWithEmbeddings);
embeddingBatches.push(newSearchableValues.slice(i, i + EMBEDDING_BATCH_SIZE));
}
// Step 8: Upsert to Turbopuffer in batches to manage memory
logger.info('Processing embeddings with concurrency', {
[identifierType]: identifier,
totalBatches: embeddingBatches.length,
batchSize: EMBEDDING_BATCH_SIZE,
concurrency: EMBEDDING_CONCURRENCY,
totalValues: newSearchableValues.length,
});
// Process batches with controlled concurrency
for (let i = 0; i < embeddingBatches.length; i += EMBEDDING_CONCURRENCY) {
const concurrentBatches = embeddingBatches.slice(i, i + EMBEDDING_CONCURRENCY);
logger.info('Processing concurrent embedding batch group', {
[identifierType]: identifier,
groupStart: i,
groupSize: concurrentBatches.length,
totalGroups: Math.ceil(embeddingBatches.length / EMBEDDING_CONCURRENCY),
});
// Process multiple batches concurrently
const embeddingPromises = concurrentBatches.map(async (batch, batchIndex) => {
const absoluteBatchIndex = i + batchIndex;
const batchTexts = batch.map((v) => v.value);
logger.info('Starting embedding batch', {
[identifierType]: identifier,
batchIndex: absoluteBatchIndex,
batchSize: batch.length,
totalBatches: embeddingBatches.length,
});
try {
// Generate embeddings for this batch
const batchEmbeddings = await generateSearchableValueEmbeddings(batchTexts);
// Log embedding dimensions for first batch only
if (absoluteBatchIndex === 0 && batchEmbeddings.length > 0 && batchEmbeddings[0]) {
logger.info('Embedding dimensions check', {
[identifierType]: identifier,
firstEmbeddingLength: batchEmbeddings[0].length,
expectedDimensions: 512,
allEmbeddingLengths: batchEmbeddings.slice(0, 5).map((e) => e?.length),
});
}
// Combine values with embeddings for this batch
const batchWithEmbeddings = batch.map((value, index) => ({
...value,
embedding: batchEmbeddings[index],
synced_at: new Date().toISOString(),
}));
logger.info('Completed embedding batch', {
[identifierType]: identifier,
batchIndex: absoluteBatchIndex,
processedCount: batchWithEmbeddings.length,
});
return batchWithEmbeddings;
} catch (error) {
logger.error('Failed to process embedding batch', {
[identifierType]: identifier,
batchIndex: absoluteBatchIndex,
error: error instanceof Error ? error.message : 'Unknown error',
});
throw error;
}
});
// Wait for all concurrent batches to complete
const results = await Promise.all(embeddingPromises);
// Flatten and add to results
for (const batchResult of results) {
allValuesWithEmbeddings.push(...batchResult);
}
logger.info('Completed concurrent embedding batch group', {
[identifierType]: identifier,
groupStart: i,
processedCount: results.reduce((sum, r) => sum + r.length, 0),
totalProcessed: allValuesWithEmbeddings.length,
});
}
// Step 8: Upsert to Turbopuffer in batches with concurrency
logger.info('Upserting values to Turbopuffer', {
[identifierType]: identifier,
count: allValuesWithEmbeddings.length,
});
// Process upserts in batches to avoid memory and API limits
// Process upserts in batches with concurrency to improve performance
const UPSERT_BATCH_SIZE = 500; // Upsert 500 values at a time
const UPSERT_CONCURRENCY = 3; // Process up to 3 upserts concurrently (lower than embeddings to avoid overwhelming Turbopuffer)
let totalUpserted = 0;
const upsertErrors: string[] = [];
// Create upsert batches
const upsertBatches: SearchableValue[][] = [];
for (let i = 0; i < allValuesWithEmbeddings.length; i += UPSERT_BATCH_SIZE) {
const batch = allValuesWithEmbeddings.slice(i, i + UPSERT_BATCH_SIZE);
upsertBatches.push(allValuesWithEmbeddings.slice(i, i + UPSERT_BATCH_SIZE));
}
logger.info('Processing upsert batch', {
logger.info('Processing upserts with concurrency', {
[identifierType]: identifier,
totalBatches: upsertBatches.length,
batchSize: UPSERT_BATCH_SIZE,
concurrency: UPSERT_CONCURRENCY,
totalValues: allValuesWithEmbeddings.length,
});
// Process batches with controlled concurrency
for (let i = 0; i < upsertBatches.length; i += UPSERT_CONCURRENCY) {
const concurrentBatches = upsertBatches.slice(i, i + UPSERT_CONCURRENCY);
logger.info('Processing concurrent upsert batch group', {
[identifierType]: identifier,
batchStart: i,
batchSize: batch.length,
totalValues: allValuesWithEmbeddings.length,
groupStart: i,
groupSize: concurrentBatches.length,
totalGroups: Math.ceil(upsertBatches.length / UPSERT_CONCURRENCY),
});
const batchResult = await upsertSearchableValues({
dataSourceId: payload.dataSourceId,
values: batch,
});
// Process multiple upsert batches concurrently
const upsertPromises = concurrentBatches.map(async (batch, batchIndex) => {
const absoluteBatchIndex = i + batchIndex;
totalUpserted += batchResult.upserted;
if (batchResult.errors && batchResult.errors.length > 0) {
// Log and throw error for any upsert failures
logger.error('Upsert batch failed', {
logger.info('Starting upsert batch', {
[identifierType]: identifier,
batchStart: i,
batchIndex: absoluteBatchIndex,
batchSize: batch.length,
errorsInBatch: batchResult.errors.length,
errors: batchResult.errors,
totalBatches: upsertBatches.length,
});
throw new Error(
`Failed to upsert ${batchResult.errors.length} values to Turbopuffer: ${batchResult.errors.slice(0, 3).join(', ')}${batchResult.errors.length > 3 ? '...' : ''}`
);
}
try {
const batchResult = await upsertSearchableValues({
dataSourceId: payload.dataSourceId,
values: batch,
});
if (batchResult.errors && batchResult.errors.length > 0) {
// Collect errors but don't throw immediately
logger.error('Upsert batch had errors', {
[identifierType]: identifier,
batchIndex: absoluteBatchIndex,
errorsInBatch: batchResult.errors.length,
errors: batchResult.errors,
});
upsertErrors.push(...batchResult.errors);
}
logger.info('Completed upsert batch', {
[identifierType]: identifier,
batchIndex: absoluteBatchIndex,
upserted: batchResult.upserted,
});
return batchResult.upserted;
} catch (error) {
logger.error('Failed to process upsert batch', {
[identifierType]: identifier,
batchIndex: absoluteBatchIndex,
error: error instanceof Error ? error.message : 'Unknown error',
});
throw error;
}
});
// Wait for all concurrent upserts to complete
const results = await Promise.all(upsertPromises);
totalUpserted += results.reduce((sum, count) => sum + count, 0);
logger.info('Completed concurrent upsert batch group', {
[identifierType]: identifier,
groupStart: i,
totalUpserted: totalUpserted,
});
}
// Check if there were any errors
if (upsertErrors.length > 0) {
throw new Error(
`Failed to upsert ${upsertErrors.length} values to Turbopuffer: ${upsertErrors.slice(0, 3).join(', ')}${upsertErrors.length > 3 ? '...' : ''}`
);
}
logger.info('All upserts completed successfully', {