diff --git a/apps/api/server/src/main.rs b/apps/api/server/src/main.rs index bdf499fab..72b93fb38 100644 --- a/apps/api/server/src/main.rs +++ b/apps/api/server/src/main.rs @@ -15,9 +15,7 @@ use middleware::{ error::{init_sentry, init_tracing_subscriber, sentry_layer}, }; use rustls::crypto::ring; -use stored_values::jobs::trigger_stale_sync_jobs; use tokio::sync::broadcast; -use tokio_cron_scheduler::{Job, JobScheduler}; use tower::ServiceBuilder; use tower_http::{compression::CompressionLayer, trace::TraceLayer}; use tracing::{error, info, warn}; @@ -58,26 +56,6 @@ async fn main() -> Result<(), anyhow::Error> { return Ok(()); } - // --- Start Stored Values Sync Job Scheduler --- - let scheduler = JobScheduler::new().await?; // Using `?` assuming main returns Result - info!("Starting stored values sync job scheduler..."); - - // Schedule to run every hour - let job = Job::new_async("*/5 * * * * *", move |uuid, mut l| { - Box::pin(async move { - info!(job_uuid = %uuid, "Running hourly stored values sync job check."); - if let Err(e) = trigger_stale_sync_jobs().await { - error!(job_uuid = %uuid, "Hourly stored values sync job failed: {}", e); - } - // Optional: You could check l.next_tick_for_job(uuid).await to see the next scheduled time. - }) - })?; - - scheduler.add(job).await?; - scheduler.start().await?; - info!("Stored values sync job scheduler started."); - // --- End Stored Values Sync Job Scheduler --- - let protected_router = Router::new().nest("/api/v1", routes::protected_router()); let public_router = Router::new().route("/health", axum::routing::get(|| async { "OK" })); diff --git a/apps/trigger/src/tasks/sync-searchable-values/process-sync-job.test.ts b/apps/trigger/src/tasks/sync-searchable-values/process-sync-job.test.ts index 0479809df..10ad12f7f 100644 --- a/apps/trigger/src/tasks/sync-searchable-values/process-sync-job.test.ts +++ b/apps/trigger/src/tasks/sync-searchable-values/process-sync-job.test.ts @@ -21,8 +21,6 @@ vi.mock('@buster/database', () => ({ })); vi.mock('@buster/search', () => ({ - checkNamespaceExists: vi.fn(), - createNamespaceIfNotExists: vi.fn(), deduplicateValues: vi.fn(), generateNamespace: vi.fn(), queryExistingKeys: vi.fn(), @@ -49,8 +47,6 @@ import { markSyncJobFailed, } from '@buster/database'; import { - checkNamespaceExists, - createNamespaceIfNotExists, deduplicateValues, generateNamespace, queryExistingKeys, @@ -85,7 +81,7 @@ describe('processSyncJob', () => { vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any); vi.mocked(markSyncJobCompleted).mockResolvedValue({ id: 'test-job-123', - status: 'completed', + status: 'success', updatedAt: new Date().toISOString(), lastSyncedAt: new Date().toISOString(), errorMessage: null, @@ -124,7 +120,6 @@ describe('processSyncJob', () => { fields: [], }); vi.mocked(generateNamespace).mockReturnValue('ds-456'); - vi.mocked(checkNamespaceExists).mockResolvedValue(true); vi.mocked(queryExistingKeys).mockResolvedValue(mockExistingKeys); vi.mocked(deduplicateValues).mockResolvedValue({ newValues: [ @@ -175,7 +170,6 @@ describe('processSyncJob', () => { expect(mockAdapter.query).toHaveBeenCalledWith( expect.stringContaining('SELECT DISTINCT "company_name"') ); - expect(checkNamespaceExists).toHaveBeenCalledWith('ds-456'); expect(queryExistingKeys).toHaveBeenCalledWith({ dataSourceId: 'ds-456', query: { @@ -272,7 +266,6 @@ describe('processSyncJob', () => { fields: [], }); vi.mocked(generateNamespace).mockReturnValue('ds-456'); - vi.mocked(checkNamespaceExists).mockResolvedValue(true); vi.mocked(queryExistingKeys).mockResolvedValue(mockExistingKeys); vi.mocked(deduplicateValues).mockResolvedValue({ newValues: [], @@ -316,7 +309,6 @@ describe('processSyncJob', () => { fields: [], }); vi.mocked(generateNamespace).mockReturnValue('ds-456'); - vi.mocked(checkNamespaceExists).mockResolvedValue(false); // Namespace doesn't exist vi.mocked(queryExistingKeys).mockResolvedValue([]); vi.mocked(deduplicateValues).mockResolvedValue({ newValues: [ @@ -342,8 +334,6 @@ describe('processSyncJob', () => { const result = await runTask(mockPayload); // Verify namespace creation - expect(checkNamespaceExists).toHaveBeenCalledWith('ds-456'); - expect(createNamespaceIfNotExists).toHaveBeenCalledWith('ds-456'); expect(result.success).toBe(true); }); }); @@ -417,7 +407,6 @@ describe('processSyncJob', () => { fields: [], }); vi.mocked(generateNamespace).mockReturnValue('ds-456'); - vi.mocked(checkNamespaceExists).mockResolvedValue(true); vi.mocked(queryExistingKeys).mockResolvedValue([]); vi.mocked(deduplicateValues).mockResolvedValue({ newValues: [ @@ -461,7 +450,6 @@ describe('processSyncJob', () => { fields: [], }); vi.mocked(generateNamespace).mockReturnValue('ds-456'); - vi.mocked(checkNamespaceExists).mockResolvedValue(true); vi.mocked(queryExistingKeys).mockResolvedValue([]); vi.mocked(deduplicateValues).mockResolvedValue({ newValues: [ @@ -522,7 +510,6 @@ describe('processSyncJob', () => { fields: [], }); vi.mocked(generateNamespace).mockReturnValue('ds-456'); - vi.mocked(checkNamespaceExists).mockResolvedValue(true); vi.mocked(queryExistingKeys).mockResolvedValue([]); vi.mocked(deduplicateValues).mockResolvedValue({ newValues: specialValues.map((value) => ({ @@ -568,7 +555,6 @@ describe('processSyncJob', () => { fields: [], }); vi.mocked(generateNamespace).mockReturnValue('ds-456'); - vi.mocked(checkNamespaceExists).mockResolvedValue(true); vi.mocked(queryExistingKeys).mockResolvedValue([]); vi.mocked(deduplicateValues).mockResolvedValue({ newValues: [ @@ -627,7 +613,6 @@ describe('processSyncJob', () => { fields: [], }); vi.mocked(generateNamespace).mockReturnValue('ds-456'); - vi.mocked(checkNamespaceExists).mockResolvedValue(true); vi.mocked(queryExistingKeys).mockResolvedValue([]); vi.mocked(deduplicateValues).mockResolvedValue({ newValues: [ diff --git a/apps/trigger/src/tasks/sync-searchable-values/process-sync-job.ts b/apps/trigger/src/tasks/sync-searchable-values/process-sync-job.ts index 26da9a699..db206c64a 100644 --- a/apps/trigger/src/tasks/sync-searchable-values/process-sync-job.ts +++ b/apps/trigger/src/tasks/sync-searchable-values/process-sync-job.ts @@ -7,8 +7,6 @@ import { } from '@buster/database'; import { type SearchableValue, - checkNamespaceExists, - createNamespaceIfNotExists, deduplicateValues, generateNamespace, queryExistingKeys, @@ -35,6 +33,9 @@ export const processSyncJob: ReturnType< id: 'process-sync-job', schema: SyncJobPayloadSchema, maxDuration: 300, // 5 minutes per job + machine: { + preset: 'large-1x', // 4 vCPU, 8 GB RAM for handling large datasets + }, retry: { maxAttempts: 3, factor: 2, @@ -114,15 +115,10 @@ export const processSyncJob: ReturnType< return result; } - // Step 4: Ensure Turbopuffer namespace exists + // Step 4: Query existing keys from Turbopuffer + // Note: If namespace doesn't exist, it means no data has been written yet + // and we should treat it as having no existing keys const namespace = generateNamespace(payload.dataSourceId); - const namespaceExists = await checkNamespaceExists(payload.dataSourceId); - if (!namespaceExists) { - logger.info('Creating Turbopuffer namespace', { namespace }); - await createNamespaceIfNotExists(payload.dataSourceId); - } - - // Step 5: Query existing keys from Turbopuffer logger.info('Querying existing values from Turbopuffer', { namespace, database: payload.databaseName, @@ -131,22 +127,38 @@ export const processSyncJob: ReturnType< column: payload.columnName, }); - const existingKeys = await queryExistingKeys({ - dataSourceId: payload.dataSourceId, - query: { - database: payload.databaseName, - schema: payload.schemaName, - table: payload.tableName, - column: payload.columnName, - }, - }); + let existingKeys: string[] = []; + try { + existingKeys = await queryExistingKeys({ + dataSourceId: payload.dataSourceId, + query: { + database: payload.databaseName, + schema: payload.schemaName, + table: payload.tableName, + column: payload.columnName, + }, + }); + } catch (error) { + // Check if this is a namespace not found error (404) + const errorMessage = error instanceof Error ? error.message : String(error); + if (errorMessage.includes('namespace') && errorMessage.includes('was not found')) { + logger.info('Namespace does not exist yet, treating as empty', { + namespace, + jobId: payload.jobId, + }); + existingKeys = []; + } else { + // Re-throw if it's not a namespace not found error + throw error; + } + } logger.info('Retrieved existing keys', { jobId: payload.jobId, existingCount: existingKeys.length, }); - // Step 6: Prepare searchable values for deduplication + // Step 5: Prepare searchable values for deduplication const searchableValues: SearchableValue[] = distinctValues.map((value) => ({ database: payload.databaseName, schema: payload.schemaName, @@ -155,7 +167,7 @@ export const processSyncJob: ReturnType< value, })); - // Step 7: Deduplicate using DuckDB + // Step 6: Deduplicate using DuckDB logger.info('Deduplicating values', { jobId: payload.jobId, totalValues: searchableValues.length, @@ -202,42 +214,88 @@ export const processSyncJob: ReturnType< return result; } - // Step 8: Generate embeddings for new values + // Step 7: Generate embeddings for new values in batches to manage memory logger.info('Generating embeddings for new values', { jobId: payload.jobId, newCount: deduplicationResult.newCount, }); - const newValueTexts = deduplicationResult.newValues.map((v) => v.value); - const embeddings = await generateSearchableValueEmbeddings(newValueTexts); + // Process embeddings in batches to avoid memory issues + const EMBEDDING_BATCH_SIZE = 1000; // Process 1000 values at a time + const allValuesWithEmbeddings: SearchableValue[] = []; - // Step 9: Combine values with embeddings - const valuesWithEmbeddings: SearchableValue[] = deduplicationResult.newValues.map( - (value, index) => ({ + for (let i = 0; i < deduplicationResult.newValues.length; i += EMBEDDING_BATCH_SIZE) { + const batch = deduplicationResult.newValues.slice(i, i + EMBEDDING_BATCH_SIZE); + const batchTexts = batch.map((v) => v.value); + + logger.info('Processing embedding batch', { + jobId: payload.jobId, + batchStart: i, + batchSize: batch.length, + totalValues: deduplicationResult.newValues.length, + }); + + // Generate embeddings for this batch + const batchEmbeddings = await generateSearchableValueEmbeddings(batchTexts); + + // Combine values with embeddings for this batch + const batchWithEmbeddings = batch.map((value, index) => ({ ...value, - embedding: embeddings[index], + embedding: batchEmbeddings[index], synced_at: new Date().toISOString(), - }) - ); + })); - // Step 10: Upsert to Turbopuffer + allValuesWithEmbeddings.push(...batchWithEmbeddings); + } + + // Step 8: Upsert to Turbopuffer in batches to manage memory logger.info('Upserting values to Turbopuffer', { jobId: payload.jobId, - count: valuesWithEmbeddings.length, + count: allValuesWithEmbeddings.length, }); - const upsertResult = await upsertSearchableValues({ - dataSourceId: payload.dataSourceId, - values: valuesWithEmbeddings, - }); + // Process upserts in batches to avoid memory and API limits + const UPSERT_BATCH_SIZE = 500; // Upsert 500 values at a time + let totalUpserted = 0; - logger.info('Upsert complete', { + for (let i = 0; i < allValuesWithEmbeddings.length; i += UPSERT_BATCH_SIZE) { + const batch = allValuesWithEmbeddings.slice(i, i + UPSERT_BATCH_SIZE); + + logger.info('Processing upsert batch', { + jobId: payload.jobId, + batchStart: i, + batchSize: batch.length, + totalValues: allValuesWithEmbeddings.length, + }); + + const batchResult = await upsertSearchableValues({ + dataSourceId: payload.dataSourceId, + values: batch, + }); + + totalUpserted += batchResult.upserted; + if (batchResult.errors && batchResult.errors.length > 0) { + // Log and throw error for any upsert failures + logger.error('Upsert batch failed', { + jobId: payload.jobId, + batchStart: i, + batchSize: batch.length, + errorsInBatch: batchResult.errors.length, + errors: batchResult.errors, + }); + + throw new Error( + `Failed to upsert ${batchResult.errors.length} values to Turbopuffer: ${batchResult.errors.slice(0, 3).join(', ')}${batchResult.errors.length > 3 ? '...' : ''}` + ); + } + } + + logger.info('All upserts completed successfully', { jobId: payload.jobId, - upserted: upsertResult.upserted, - errors: upsertResult.errors, + totalUpserted: totalUpserted, }); - // Step 11: Update sync job status + // Step 9: Update sync job status const metadata = { processedCount: searchableValues.length, existingCount: deduplicationResult.existingCount, diff --git a/apps/trigger/src/tasks/sync-searchable-values/sync-searchable-values.test.ts b/apps/trigger/src/tasks/sync-searchable-values/sync-searchable-values.test.ts deleted file mode 100644 index 349e485c4..000000000 --- a/apps/trigger/src/tasks/sync-searchable-values/sync-searchable-values.test.ts +++ /dev/null @@ -1,804 +0,0 @@ -import * as databaseModule from '@buster/database'; -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import * as processSyncJobModule from './process-sync-job'; -import type { DailySyncReport, DataSourceSyncSummary, SyncJobPayload } from './types'; - -// Define ScheduledTaskPayload type locally since it's not exported from @trigger.dev/sdk -interface ScheduledTaskPayload { - timestamp: Date; - lastTimestamp?: Date; - upcoming: Date[]; -} - -// Mock @trigger.dev/sdk -vi.mock('@trigger.dev/sdk', () => ({ - logger: { - info: vi.fn(), - error: vi.fn(), - }, - schedules: { - task: vi.fn((config) => config), - }, -})); - -// Mock database module -vi.mock('@buster/database', () => ({ - getDataSourcesForSync: vi.fn(), - getSearchableColumns: vi.fn(), - batchCreateSyncJobs: vi.fn(), - markSyncJobInProgress: vi.fn(), - markSyncJobFailed: vi.fn(), -})); - -// Mock process-sync-job module -vi.mock('./process-sync-job', () => ({ - processSyncJob: { - triggerAndWait: vi.fn(), - }, -})); - -describe('sync-searchable-values', () => { - let syncSearchableValues: any; - let originalCrypto: typeof globalThis.crypto; - - beforeEach(async () => { - vi.clearAllMocks(); - - // Save original crypto - originalCrypto = global.crypto; - - // Mock crypto.randomUUID - const mockCrypto = { - ...global.crypto, - randomUUID: vi.fn().mockReturnValue('test-execution-id'), - }; - - // Use Object.defineProperty to override crypto - Object.defineProperty(global, 'crypto', { - value: mockCrypto, - writable: true, - configurable: true, - }); - - // Dynamically import to get fresh module after mocks - const module = await import('./sync-searchable-values'); - syncSearchableValues = module.syncSearchableValues; - }); - - afterEach(() => { - // Restore original crypto - Object.defineProperty(global, 'crypto', { - value: originalCrypto, - writable: true, - configurable: true, - }); - vi.resetModules(); - }); - - describe('Daily Sync Execution', () => { - const mockPayload: ScheduledTaskPayload = { - timestamp: new Date('2024-01-15T02:00:00Z'), - lastTimestamp: new Date('2024-01-14T02:00:00Z'), - upcoming: [], - }; - - it('should handle case with no data sources', async () => { - const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any; - getDataSourcesForSync.mockResolvedValue({ - totalCount: 0, - dataSources: [], - }); - - const result = await syncSearchableValues.run(mockPayload); - - expect(result).toMatchObject({ - executionId: 'test-execution-id', - totalDataSources: 0, - totalColumns: 0, - successfulSyncs: 0, - failedSyncs: 0, - skippedSyncs: 0, - totalValuesProcessed: 0, - dataSourceSummaries: [], - errors: [], - }); - - expect(getDataSourcesForSync).toHaveBeenCalledTimes(1); - }); - - it('should process data sources with searchable columns', async () => { - const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any; - const getSearchableColumns = databaseModule.getSearchableColumns as any; - const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any; - const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any; - const processSyncJob = processSyncJobModule.processSyncJob as any; - - getDataSourcesForSync.mockResolvedValue({ - totalCount: 1, - dataSources: [ - { - id: 'ds-1', - name: 'Test Database', - columnsWithStoredValues: 3, - }, - ], - }); - - getSearchableColumns.mockResolvedValue({ - totalCount: 2, - columns: [ - { - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - { - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'email', - }, - ], - }); - - batchCreateSyncJobs.mockResolvedValue({ - totalCreated: 2, - created: [ - { - id: 'job-1', - dataSourceId: 'ds-1', - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - { - id: 'job-2', - dataSourceId: 'ds-1', - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'email', - }, - ], - errors: [], - }); - - markSyncJobInProgress.mockResolvedValue(undefined); - - processSyncJob.triggerAndWait.mockResolvedValue({ - ok: true, - output: { - success: true, - processedCount: 100, - error: null, - }, - }); - - const result = await syncSearchableValues.run(mockPayload); - - expect(result).toMatchObject({ - executionId: 'test-execution-id', - totalDataSources: 1, - totalColumns: 2, - successfulSyncs: 2, - failedSyncs: 0, - skippedSyncs: 0, - totalValuesProcessed: 200, - errors: [], - }); - - expect(getDataSourcesForSync).toHaveBeenCalledTimes(1); - expect(getSearchableColumns).toHaveBeenCalledWith({ dataSourceId: 'ds-1' }); - expect(batchCreateSyncJobs).toHaveBeenCalledWith({ - dataSourceId: 'ds-1', - syncType: 'daily', - columns: [ - { - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - { - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'email', - }, - ], - }); - expect(processSyncJob.triggerAndWait).toHaveBeenCalledTimes(2); - }); - - it('should handle sync job failures', async () => { - const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any; - const getSearchableColumns = databaseModule.getSearchableColumns as any; - const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any; - const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any; - const markSyncJobFailed = databaseModule.markSyncJobFailed as any; - const processSyncJob = processSyncJobModule.processSyncJob as any; - - getDataSourcesForSync.mockResolvedValue({ - totalCount: 1, - dataSources: [ - { - id: 'ds-1', - name: 'Test Database', - columnsWithStoredValues: 1, - }, - ], - }); - - getSearchableColumns.mockResolvedValue({ - totalCount: 1, - columns: [ - { - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - ], - }); - - batchCreateSyncJobs.mockResolvedValue({ - totalCreated: 1, - created: [ - { - id: 'job-1', - dataSourceId: 'ds-1', - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - ], - errors: [], - }); - - markSyncJobInProgress.mockResolvedValue(undefined); - markSyncJobFailed.mockResolvedValue(undefined); - - processSyncJob.triggerAndWait.mockResolvedValue({ - ok: false, - error: 'Task execution failed', - }); - - const result = await syncSearchableValues.run(mockPayload); - - expect(result).toMatchObject({ - executionId: 'test-execution-id', - totalDataSources: 1, - totalColumns: 1, - successfulSyncs: 0, - failedSyncs: 1, - skippedSyncs: 0, - totalValuesProcessed: 0, - }); - - expect(result.dataSourceSummaries[0].errors).toContain( - 'Job job-1 failed: Task execution failed' - ); - expect(markSyncJobFailed).toHaveBeenCalledWith('job-1', 'Task execution failed'); - }); - - it('should handle batch creation errors', async () => { - const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any; - const getSearchableColumns = databaseModule.getSearchableColumns as any; - const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any; - const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any; - const processSyncJob = processSyncJobModule.processSyncJob as any; - - getDataSourcesForSync.mockResolvedValue({ - totalCount: 1, - dataSources: [ - { - id: 'ds-1', - name: 'Test Database', - columnsWithStoredValues: 2, - }, - ], - }); - - getSearchableColumns.mockResolvedValue({ - totalCount: 2, - columns: [ - { - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - { - databaseName: 'testdb', - schemaName: 'public', - tableName: 'products', - columnName: 'description', - }, - ], - }); - - batchCreateSyncJobs.mockResolvedValue({ - totalCreated: 1, - created: [ - { - id: 'job-1', - dataSourceId: 'ds-1', - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - ], - errors: [ - { - column: { - tableName: 'products', - columnName: 'description', - }, - error: 'Column not found', - }, - ], - }); - - markSyncJobInProgress.mockResolvedValue(undefined); - processSyncJob.triggerAndWait.mockResolvedValue({ - ok: true, - output: { - success: true, - processedCount: 100, - error: null, - }, - }); - - const result = await syncSearchableValues.run(mockPayload); - - expect(result.dataSourceSummaries[0]).toMatchObject({ - totalColumns: 2, - successfulSyncs: 1, - failedSyncs: 1, - errors: ['Failed to create job for products.description: Column not found'], - }); - }); - - it('should skip data sources with no searchable columns', async () => { - const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any; - const getSearchableColumns = databaseModule.getSearchableColumns as any; - - getDataSourcesForSync.mockResolvedValue({ - totalCount: 1, - dataSources: [ - { - id: 'ds-1', - name: 'Empty Database', - columnsWithStoredValues: 0, - }, - ], - }); - - getSearchableColumns.mockResolvedValue({ - totalCount: 0, - columns: [], - }); - - const result = await syncSearchableValues.run(mockPayload); - - expect(result).toMatchObject({ - totalDataSources: 1, - totalColumns: 0, - successfulSyncs: 0, - failedSyncs: 0, - skippedSyncs: 0, - totalValuesProcessed: 0, - }); - - expect(result.dataSourceSummaries[0]).toMatchObject({ - dataSourceId: 'ds-1', - dataSourceName: 'Empty Database', - totalColumns: 0, - skippedSyncs: 0, - }); - }); - - it('should handle processing exceptions gracefully', async () => { - const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any; - const getSearchableColumns = databaseModule.getSearchableColumns as any; - const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any; - const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any; - const markSyncJobFailed = databaseModule.markSyncJobFailed as any; - const processSyncJob = processSyncJobModule.processSyncJob as any; - - getDataSourcesForSync.mockResolvedValue({ - totalCount: 1, - dataSources: [ - { - id: 'ds-1', - name: 'Test Database', - columnsWithStoredValues: 1, - }, - ], - }); - - getSearchableColumns.mockResolvedValue({ - totalCount: 1, - columns: [ - { - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - ], - }); - - batchCreateSyncJobs.mockResolvedValue({ - totalCreated: 1, - created: [ - { - id: 'job-1', - dataSourceId: 'ds-1', - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - ], - errors: [], - }); - - markSyncJobInProgress.mockResolvedValue(undefined); - markSyncJobFailed.mockResolvedValue(undefined); - - // Simulate exception during processSyncJob - processSyncJob.triggerAndWait.mockRejectedValue(new Error('Network error')); - - const result = await syncSearchableValues.run(mockPayload); - - expect(result).toMatchObject({ - totalDataSources: 1, - failedSyncs: 1, - successfulSyncs: 0, - }); - - expect(result.dataSourceSummaries[0].errors).toContain('Job job-1 failed: Network error'); - expect(markSyncJobFailed).toHaveBeenCalledWith('job-1', 'Network error'); - }); - - it('should handle fatal errors during execution', async () => { - const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any; - - getDataSourcesForSync.mockRejectedValue(new Error('Database connection failed')); - - const result = await syncSearchableValues.run(mockPayload); - - expect(result).toMatchObject({ - executionId: 'test-execution-id', - totalDataSources: 0, - totalColumns: 0, - successfulSyncs: 0, - failedSyncs: 0, - skippedSyncs: 0, - totalValuesProcessed: 0, - errors: ['Fatal error during sync execution: Database connection failed'], - }); - }); - - it('should batch process sync jobs correctly', async () => { - const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any; - const getSearchableColumns = databaseModule.getSearchableColumns as any; - const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any; - const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any; - const processSyncJob = processSyncJobModule.processSyncJob as any; - - // Create 15 jobs to test batching (batch size is 10) - const mockJobs = Array.from({ length: 15 }, (_, i) => ({ - id: `job-${i + 1}`, - dataSourceId: 'ds-1', - databaseName: 'testdb', - schemaName: 'public', - tableName: `table${i + 1}`, - columnName: 'col1', - })); - - getDataSourcesForSync.mockResolvedValue({ - totalCount: 1, - dataSources: [ - { - id: 'ds-1', - name: 'Test Database', - columnsWithStoredValues: 15, - }, - ], - }); - - getSearchableColumns.mockResolvedValue({ - totalCount: 15, - columns: mockJobs.map((job) => ({ - databaseName: job.databaseName, - schemaName: job.schemaName, - tableName: job.tableName, - columnName: job.columnName, - })), - }); - - batchCreateSyncJobs.mockResolvedValue({ - totalCreated: 15, - created: mockJobs, - errors: [], - }); - - markSyncJobInProgress.mockResolvedValue(undefined); - - let processCallCount = 0; - processSyncJob.triggerAndWait.mockImplementation(() => { - processCallCount++; - return Promise.resolve({ - ok: true, - output: { - success: true, - processedCount: 10, - error: null, - }, - }); - }); - - const result = await syncSearchableValues.run(mockPayload); - - expect(processCallCount).toBe(15); - expect(markSyncJobInProgress).toHaveBeenCalledTimes(15); - expect(result).toMatchObject({ - totalColumns: 15, - successfulSyncs: 15, - failedSyncs: 0, - totalValuesProcessed: 150, - }); - }); - - it('should handle mixed success and failure in batch processing', async () => { - const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any; - const getSearchableColumns = databaseModule.getSearchableColumns as any; - const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any; - const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any; - const markSyncJobFailed = databaseModule.markSyncJobFailed as any; - const processSyncJob = processSyncJobModule.processSyncJob as any; - - getDataSourcesForSync.mockResolvedValue({ - totalCount: 1, - dataSources: [ - { - id: 'ds-1', - name: 'Test Database', - columnsWithStoredValues: 3, - }, - ], - }); - - getSearchableColumns.mockResolvedValue({ - totalCount: 3, - columns: [ - { - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - { - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'email', - }, - { - databaseName: 'testdb', - schemaName: 'public', - tableName: 'products', - columnName: 'title', - }, - ], - }); - - batchCreateSyncJobs.mockResolvedValue({ - totalCreated: 3, - created: [ - { - id: 'job-1', - dataSourceId: 'ds-1', - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - { - id: 'job-2', - dataSourceId: 'ds-1', - databaseName: 'testdb', - schemaName: 'public', - tableName: 'users', - columnName: 'email', - }, - { - id: 'job-3', - dataSourceId: 'ds-1', - databaseName: 'testdb', - schemaName: 'public', - tableName: 'products', - columnName: 'title', - }, - ], - errors: [], - }); - - markSyncJobInProgress.mockResolvedValue(undefined); - markSyncJobFailed.mockResolvedValue(undefined); - - // Mixed results: success, failure, exception - processSyncJob.triggerAndWait - .mockResolvedValueOnce({ - ok: true, - output: { - success: true, - processedCount: 50, - error: null, - }, - }) - .mockResolvedValueOnce({ - ok: true, - output: { - success: false, - processedCount: 0, - error: 'Validation failed', - }, - }) - .mockRejectedValueOnce(new Error('Unexpected error')); - - const result = await syncSearchableValues.run(mockPayload); - - expect(result).toMatchObject({ - totalColumns: 3, - successfulSyncs: 1, - failedSyncs: 2, - totalValuesProcessed: 50, - }); - - expect(markSyncJobFailed).toHaveBeenCalledTimes(2); - expect(markSyncJobFailed).toHaveBeenCalledWith('job-2', 'Validation failed'); - expect(markSyncJobFailed).toHaveBeenCalledWith('job-3', 'Unexpected error'); - }); - }); - - describe('createReport helper', () => { - it('should calculate report metrics correctly', async () => { - // Since createReport is not exported, we test it through the main function - - const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any; - const getSearchableColumns = databaseModule.getSearchableColumns as any; - const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any; - const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any; - const processSyncJob = processSyncJobModule.processSyncJob as any; - - getDataSourcesForSync.mockResolvedValue({ - totalCount: 2, - dataSources: [ - { id: 'ds-1', name: 'DB1', columnsWithStoredValues: 2 }, - { id: 'ds-2', name: 'DB2', columnsWithStoredValues: 3 }, - ], - }); - - getSearchableColumns - .mockResolvedValueOnce({ - totalCount: 2, - columns: [ - { databaseName: 'db1', schemaName: 'public', tableName: 't1', columnName: 'c1' }, - { databaseName: 'db1', schemaName: 'public', tableName: 't1', columnName: 'c2' }, - ], - }) - .mockResolvedValueOnce({ - totalCount: 3, - columns: [ - { databaseName: 'db2', schemaName: 'public', tableName: 't2', columnName: 'c1' }, - { databaseName: 'db2', schemaName: 'public', tableName: 't2', columnName: 'c2' }, - { databaseName: 'db2', schemaName: 'public', tableName: 't2', columnName: 'c3' }, - ], - }); - - batchCreateSyncJobs - .mockResolvedValueOnce({ - totalCreated: 2, - created: [ - { - id: 'j1', - dataSourceId: 'ds-1', - databaseName: 'db1', - schemaName: 'public', - tableName: 't1', - columnName: 'c1', - }, - { - id: 'j2', - dataSourceId: 'ds-1', - databaseName: 'db1', - schemaName: 'public', - tableName: 't1', - columnName: 'c2', - }, - ], - errors: [], - }) - .mockResolvedValueOnce({ - totalCreated: 3, - created: [ - { - id: 'j3', - dataSourceId: 'ds-2', - databaseName: 'db2', - schemaName: 'public', - tableName: 't2', - columnName: 'c1', - }, - { - id: 'j4', - dataSourceId: 'ds-2', - databaseName: 'db2', - schemaName: 'public', - tableName: 't2', - columnName: 'c2', - }, - { - id: 'j5', - dataSourceId: 'ds-2', - databaseName: 'db2', - schemaName: 'public', - tableName: 't2', - columnName: 'c3', - }, - ], - errors: [], - }); - - markSyncJobInProgress.mockResolvedValue(undefined); - - // DS1: 2 successes (100 values each) - // DS2: 1 success (150 values), 1 failure, 1 skip - processSyncJob.triggerAndWait - .mockResolvedValueOnce({ ok: true, output: { success: true, processedCount: 100 } }) - .mockResolvedValueOnce({ ok: true, output: { success: true, processedCount: 100 } }) - .mockResolvedValueOnce({ ok: true, output: { success: true, processedCount: 150 } }) - .mockResolvedValueOnce({ ok: false, error: 'Failed' }) - .mockResolvedValueOnce({ ok: true, output: { success: true, processedCount: 0 } }); // Skipped - - const result = await syncSearchableValues.run({ - timestamp: new Date('2024-01-15T02:00:00Z'), - lastTimestamp: new Date('2024-01-14T02:00:00Z'), - upcoming: [], - }); - - expect(result).toMatchObject({ - executionId: 'test-execution-id', - totalDataSources: 2, - totalColumns: 5, - successfulSyncs: 4, - failedSyncs: 1, - skippedSyncs: 0, - totalValuesProcessed: 350, - }); - - expect(result.dataSourceSummaries).toHaveLength(2); - expect(result.dataSourceSummaries[0]).toMatchObject({ - dataSourceId: 'ds-1', - totalColumns: 2, - successfulSyncs: 2, - failedSyncs: 0, - totalValuesProcessed: 200, - }); - expect(result.dataSourceSummaries[1]).toMatchObject({ - dataSourceId: 'ds-2', - totalColumns: 3, - successfulSyncs: 2, - failedSyncs: 1, - totalValuesProcessed: 150, - }); - }); - }); -}); diff --git a/apps/trigger/src/tasks/sync-searchable-values/sync-searchable-values.ts b/apps/trigger/src/tasks/sync-searchable-values/sync-searchable-values.ts index e2d9fc2a1..a190044b8 100644 --- a/apps/trigger/src/tasks/sync-searchable-values/sync-searchable-values.ts +++ b/apps/trigger/src/tasks/sync-searchable-values/sync-searchable-values.ts @@ -1,11 +1,4 @@ -import { - batchCreateSyncJobs, - getDataSourcesForSync, - getSearchableColumns, - markSyncJobFailed, - markSyncJobInProgress, -} from '@buster/database'; -import { checkNamespaceExists, createNamespaceIfNotExists } from '@buster/search'; +import { getExistingSyncJobs } from '@buster/database'; import { logger, schedules } from '@trigger.dev/sdk'; import { processSyncJob } from './process-sync-job'; import type { DailySyncReport, DataSourceSyncSummary, SyncJobPayload } from './types'; @@ -14,12 +7,11 @@ import type { DailySyncReport, DataSourceSyncSummary, SyncJobPayload } from './t * Daily scheduled task to sync searchable values from data sources * * This task runs every day at 2 AM UTC and: - * 1. Identifies data sources with searchable columns - * 2. Creates sync jobs for each column that needs syncing - * 3. Queues individual sync tasks for processing - * 4. Tracks and reports on the overall sync status + * 1. Retrieves existing sync jobs from stored_values_sync_jobs table + * 2. Triggers sync jobs for processing (fire and forget) + * 3. Reports on the number of jobs triggered * - * The actual sync logic is delegated to process-sync-job.ts (Ticket 8) + * The actual sync logic is handled asynchronously by process-sync-job.ts */ export const syncSearchableValues = schedules.task({ id: 'sync-searchable-values', @@ -44,60 +36,31 @@ export const syncSearchableValues = schedules.task({ }); try { - // Step 1: Get all data sources that have searchable columns - const dataSourcesResult = await getDataSourcesForSync(); - - logger.info('Found data sources for sync', { - executionId, - totalDataSources: dataSourcesResult.totalCount, - dataSources: dataSourcesResult.dataSources.map((ds) => ({ - id: ds.id, - name: ds.name, - columnsWithStoredValues: ds.columnsWithStoredValues, - })), + // Step 1: Get all existing sync jobs from the stored_values_sync_jobs table + const syncJobsResult = await getExistingSyncJobs({ + statuses: ['pending', 'success'], }); - if (dataSourcesResult.totalCount === 0) { - logger.info('No data sources found with searchable columns', { executionId }); + logger.info('Found existing sync jobs', { + executionId, + totalJobs: syncJobsResult.totalCount, + dataSourceCount: Object.keys(syncJobsResult.byDataSource).length, + }); + + if (syncJobsResult.totalCount === 0) { + logger.info('No sync jobs found to process', { executionId }); return createReport(executionId, startTime, [], errors); } - // Step 2: Ensure TurboPuffer namespaces exist for all data sources - logger.info('Checking TurboPuffer namespaces', { - executionId, - dataSourceCount: dataSourcesResult.totalCount, - }); + // Step 2: Process sync jobs grouped by data source + for (const dataSourceId in syncJobsResult.byDataSource) { + const dataSourceInfo = syncJobsResult.byDataSource[dataSourceId]; + if (!dataSourceInfo) continue; - for (const dataSource of dataSourcesResult.dataSources) { - try { - const namespaceExists = await checkNamespaceExists(dataSource.id); - if (!namespaceExists) { - logger.info('Creating TurboPuffer namespace', { - executionId, - dataSourceId: dataSource.id, - dataSourceName: dataSource.name, - }); - // Namespace will be created automatically on first write - await createNamespaceIfNotExists(dataSource.id); - } - } catch (error) { - const errorMsg = error instanceof Error ? error.message : 'Unknown error'; - logger.error('Failed to check/create namespace', { - executionId, - dataSourceId: dataSource.id, - dataSourceName: dataSource.name, - error: errorMsg, - }); - errors.push(`Failed to check/create namespace for ${dataSource.name}: ${errorMsg}`); - } - } - - // Step 3: Process each data source - for (const dataSource of dataSourcesResult.dataSources) { const summary: DataSourceSyncSummary = { - dataSourceId: dataSource.id, - dataSourceName: dataSource.name, - totalColumns: 0, + dataSourceId, + dataSourceName: dataSourceInfo.dataSourceName, + totalColumns: dataSourceInfo.jobCount, successfulSyncs: 0, failedSyncs: 0, skippedSyncs: 0, @@ -108,141 +71,78 @@ export const syncSearchableValues = schedules.task({ try { logger.info('Processing data source', { executionId, - dataSourceId: dataSource.id, - dataSourceName: dataSource.name, + dataSourceId, + dataSourceName: dataSourceInfo.dataSourceName, + jobCount: dataSourceInfo.jobCount, }); - // Get searchable columns for this data source - const columnsResult = await getSearchableColumns({ - dataSourceId: dataSource.id, - }); + // Step 3: Trigger sync jobs for this data source (fire and forget) + const jobs = dataSourceInfo.jobs; + let triggeredCount = 0; + let triggerFailureCount = 0; - summary.totalColumns = columnsResult.totalCount; + for (const job of jobs) { + try { + // Prepare payload for processing + const syncPayload: SyncJobPayload = { + jobId: job.id, + dataSourceId: job.dataSourceId, + databaseName: job.databaseName, + schemaName: job.schemaName, + tableName: job.tableName, + columnName: job.columnName, + maxValues: 1000, + }; - if (columnsResult.totalCount === 0) { - logger.info('No searchable columns found for data source', { - executionId, - dataSourceId: dataSource.id, - }); - summary.skippedSyncs = summary.totalColumns; - dataSourceSummaries.push(summary); - continue; + // Trigger the sync job without waiting + await processSyncJob.trigger(syncPayload); + triggeredCount++; + + logger.info('Sync job triggered', { + executionId, + jobId: job.id, + table: job.tableName, + column: job.columnName, + }); + } catch (error) { + triggerFailureCount++; + const errorMsg = error instanceof Error ? error.message : 'Unknown error'; + summary.errors.push( + `Failed to trigger job ${job.tableName}.${job.columnName}: ${errorMsg}` + ); + + logger.error('Failed to trigger sync job', { + executionId, + jobId: job.id, + table: job.tableName, + column: job.columnName, + error: errorMsg, + }); + } } - // Step 4: Create sync jobs for all columns - const columnsToSync = columnsResult.columns.map((col) => ({ - databaseName: col.databaseName, - schemaName: col.schemaName, - tableName: col.tableName, - columnName: col.columnName, - })); - - const batchResult = await batchCreateSyncJobs({ - dataSourceId: dataSource.id, - syncType: 'daily', - columns: columnsToSync, - }); - - logger.info('Created sync jobs', { - executionId, - dataSourceId: dataSource.id, - totalCreated: batchResult.totalCreated, - errors: batchResult.errors.length, - }); - - // Track any errors from job creation - for (const error of batchResult.errors) { - summary.errors.push( - `Failed to create job for ${error.column.tableName}.${error.column.columnName}: ${error.error}` - ); - summary.failedSyncs++; - } - - // Step 5: Process each sync job - const batchSize = 10; // Process in batches - const jobs = batchResult.created; - - for (let i = 0; i < jobs.length; i += batchSize) { - const batch = jobs.slice(i, i + batchSize); - const batchPromises = batch.map(async (job) => { - try { - // Mark job as in progress - await markSyncJobInProgress(job.id); - - // Prepare payload for processing - const syncPayload: SyncJobPayload = { - jobId: job.id, - dataSourceId: job.dataSourceId, - databaseName: job.databaseName, - schemaName: job.schemaName, - tableName: job.tableName, - columnName: job.columnName, - maxValues: 1000, - }; - - // Process the sync job (stub implementation for now) - const result = await processSyncJob.triggerAndWait(syncPayload); - - if (result.ok && result.output.success) { - summary.successfulSyncs++; - summary.totalValuesProcessed += result.output.processedCount || 0; - logger.info('Sync job completed successfully', { - executionId, - jobId: job.id, - processedCount: result.output.processedCount, - }); - } else { - summary.failedSyncs++; - const errorMsg = result.ok - ? result.output.error || 'Unknown error' - : 'Task execution failed'; - summary.errors.push(`Job ${job.id} failed: ${errorMsg}`); - - // Mark job as failed in database - await markSyncJobFailed(job.id, errorMsg); - - logger.error('Sync job failed', { - executionId, - jobId: job.id, - error: errorMsg, - }); - } - } catch (error) { - summary.failedSyncs++; - const errorMsg = error instanceof Error ? error.message : 'Unknown error'; - summary.errors.push(`Job ${job.id} failed: ${errorMsg}`); - - // Mark job as failed in database - await markSyncJobFailed(job.id, errorMsg); - - logger.error('Error processing sync job', { - executionId, - jobId: job.id, - error: errorMsg, - }); - } - }); - - // Wait for batch to complete before starting next batch - await Promise.allSettled(batchPromises); - } + summary.successfulSyncs = triggeredCount; + summary.failedSyncs = triggerFailureCount; dataSourceSummaries.push(summary); } catch (error) { const errorMsg = error instanceof Error ? error.message : 'Unknown error'; summary.errors.push(`Data source processing failed: ${errorMsg}`); - errors.push(`Failed to process data source ${dataSource.name}: ${errorMsg}`); + errors.push( + `Failed to process data source ${dataSourceInfo.dataSourceName}: ${errorMsg}` + ); dataSourceSummaries.push(summary); logger.error('Error processing data source', { executionId, - dataSourceId: dataSource.id, + dataSourceId, + dataSourceName: dataSourceInfo.dataSourceName, error: errorMsg, }); } } - // Step 6: Generate and return report + // Step 3: Generate and return report const report = createReport(executionId, startTime, dataSourceSummaries, errors); logger.info('Daily sync completed', { diff --git a/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts b/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts index 3bbd5a4ce..64e46c50e 100644 --- a/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts +++ b/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts @@ -135,8 +135,9 @@ export function createThinkAndPrepAgent(thinkAndPrepAgentSchema: ThinkAndPrepAge streamText({ model: Sonnet4, headers: { - 'anthropic-beta':'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11', - 'anthropic_beta': 'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11', + 'anthropic-beta': + 'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11', + anthropic_beta: 'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11', }, providerOptions: DEFAULT_ANTHROPIC_OPTIONS, tools: { diff --git a/packages/ai/src/embeddings/generate-embeddings.test.ts b/packages/ai/src/embeddings/generate-embeddings.test.ts index 5ea8dc879..7b7bd9cdd 100644 --- a/packages/ai/src/embeddings/generate-embeddings.test.ts +++ b/packages/ai/src/embeddings/generate-embeddings.test.ts @@ -17,14 +17,14 @@ import { validateEmbeddingDimensions, } from './generate-embeddings'; -// Mock the AI SDK and OpenAI provider +// Mock the AI SDK and Gateway provider vi.mock('ai', () => ({ embedMany: vi.fn(), })); -vi.mock('@ai-sdk/openai', () => ({ - createOpenAI: vi.fn(() => ({ - embedding: vi.fn(() => 'mocked-embedding-model'), +vi.mock('@ai-sdk/gateway', () => ({ + createGateway: vi.fn(() => ({ + textEmbeddingModel: vi.fn(() => 'mocked-embedding-model'), })), })); diff --git a/packages/ai/src/embeddings/generate-embeddings.ts b/packages/ai/src/embeddings/generate-embeddings.ts index 6e2ba83d4..e32da086c 100644 --- a/packages/ai/src/embeddings/generate-embeddings.ts +++ b/packages/ai/src/embeddings/generate-embeddings.ts @@ -1,9 +1,10 @@ /** * Functional embedding generation for searchable values - * Uses OpenAI text-embedding-3-small model (1536 dimensions) + * Uses openai/text-embedding-3-small model (1536 dimensions) via AI Gateway */ -import { createOpenAI } from '@ai-sdk/openai'; +import { createGateway } from '@ai-sdk/gateway'; +import type { EmbeddingModel } from 'ai'; import { embedMany } from 'ai'; import { z } from 'zod'; @@ -51,7 +52,7 @@ export const EMBEDDING_CONFIG = { BATCH_SIZE: 100, MAX_RETRIES: 3, RATE_LIMIT_DELAY: 1000, - MODEL: 'text-embedding-3-small', + MODEL: 'openai/text-embedding-3-small', DIMENSIONS: 1536, } as const; @@ -112,13 +113,14 @@ export const isRetryableError = (error: unknown): boolean => { // ============================================================================ /** - * Get OpenAI embedding model + * Get embedding model via gateway */ -const getEmbeddingModel = (modelName: string = EMBEDDING_CONFIG.MODEL) => { - const openai = createOpenAI({ - apiKey: process.env.OPENAI_API_KEY, +const getEmbeddingModel = (modelName: string = EMBEDDING_CONFIG.MODEL): EmbeddingModel => { + const gateway = createGateway({ + ...(process.env.AI_GATEWAY_API_KEY && { apiKey: process.env.AI_GATEWAY_API_KEY }), }); - return openai.embedding(modelName); + // Use the textEmbeddingModel method to create an embedding model + return gateway.textEmbeddingModel(modelName); }; /** diff --git a/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos-step.ts b/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos-step.ts index 0e764244d..7440a7f49 100644 --- a/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos-step.ts +++ b/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos-step.ts @@ -89,10 +89,9 @@ async function generateTodosWithLLM( const { object, textStream } = streamObject({ headers: { - 'anthropic-beta': - 'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11', - anthropic_beta: - 'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11', + 'anthropic-beta': + 'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11', + anthropic_beta: 'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11', }, model: Sonnet4, schema: llmOutputSchema, diff --git a/packages/database/src/queries/sync-jobs/get-data-sources-for-sync.ts b/packages/database/src/queries/sync-jobs/get-data-sources-for-sync.ts deleted file mode 100644 index f9a5f0506..000000000 --- a/packages/database/src/queries/sync-jobs/get-data-sources-for-sync.ts +++ /dev/null @@ -1,128 +0,0 @@ -import { and, eq, gt, isNull, sql } from 'drizzle-orm'; -import { z } from 'zod'; -import { db } from '../../connection'; -import { dataSources, storedValuesSyncJobs } from '../../schema'; - -// ============================================================================ -// VALIDATION SCHEMAS -// ============================================================================ - -export const DataSourceForSyncSchema = z.object({ - id: z.string().uuid(), - name: z.string(), - type: z.string(), - organizationId: z.string().uuid(), - columnsWithStoredValues: z.number().int().min(0), -}); - -export const GetDataSourcesForSyncOutputSchema = z.object({ - dataSources: z.array(DataSourceForSyncSchema), - totalCount: z.number().int().min(0), -}); - -// ============================================================================ -// TYPES -// ============================================================================ - -export type DataSourceForSync = z.infer; -export type GetDataSourcesForSyncOutput = z.infer; - -// ============================================================================ -// QUERY FUNCTION -// ============================================================================ - -/** - * Get all data sources that have sync jobs in the stored_values_sync_jobs table - * These are the data sources that need to be synced to Turbopuffer - */ -export async function getDataSourcesForSync(): Promise { - try { - // Query data sources that have sync jobs - const results = await db - .selectDistinct({ - id: dataSources.id, - name: dataSources.name, - type: dataSources.type, - organizationId: dataSources.organizationId, - }) - .from(dataSources) - .innerJoin(storedValuesSyncJobs, eq(storedValuesSyncJobs.dataSourceId, dataSources.id)) - .where( - and( - // Only active data sources - isNull(dataSources.deletedAt), - // Only successfully onboarded data sources - eq(dataSources.onboardingStatus, 'completed') - ) - ); - - // Count sync jobs for each data source - const dataSourcesWithCounts = await Promise.all( - results.map(async (ds) => { - const columnCount = await db - .select({ - count: sql`COUNT(DISTINCT ${storedValuesSyncJobs.columnName})`, - }) - .from(storedValuesSyncJobs) - .where(eq(storedValuesSyncJobs.dataSourceId, ds.id)); - - return { - ...ds, - columnsWithStoredValues: Number(columnCount[0]?.count || 0), - }; - }) - ); - - // Validate and return results - return GetDataSourcesForSyncOutputSchema.parse({ - dataSources: dataSourcesWithCounts, - totalCount: dataSourcesWithCounts.length, - }); - } catch (error) { - // Handle Zod validation errors - if (error instanceof z.ZodError) { - throw new Error(`Validation error in getDataSourcesForSync: ${error.message}`); - } - - // Handle database errors - if (error instanceof Error) { - throw new Error(`Database error in getDataSourcesForSync: ${error.message}`); - } - - // Unknown error - throw new Error(`Unknown error in getDataSourcesForSync: ${String(error)}`); - } -} - -/** - * Check if a specific data source needs syncing - * Useful for validating before queuing a sync job - */ -export async function dataSourceNeedsSync(dataSourceId: string): Promise { - try { - // Validate input - const validatedId = z.string().uuid().parse(dataSourceId); - - const result = await db - .select({ - id: storedValuesSyncJobs.id, - }) - .from(storedValuesSyncJobs) - .innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId)) - .where( - and( - eq(dataSources.id, validatedId), - isNull(dataSources.deletedAt), - eq(dataSources.onboardingStatus, 'completed') - ) - ) - .limit(1); - - return result.length > 0; - } catch (error) { - if (error instanceof z.ZodError) { - throw new Error(`Invalid dataSourceId: ${error.message}`); - } - throw error; - } -} diff --git a/packages/database/src/queries/sync-jobs/get-existing-jobs.ts b/packages/database/src/queries/sync-jobs/get-existing-jobs.ts new file mode 100644 index 000000000..2015f8f4a --- /dev/null +++ b/packages/database/src/queries/sync-jobs/get-existing-jobs.ts @@ -0,0 +1,243 @@ +import { and, eq, inArray, isNull, or, sql } from 'drizzle-orm'; +import { z } from 'zod'; +import { db } from '../../connection'; +import { dataSources, storedValuesSyncJobs } from '../../schema'; + +// ============================================================================ +// VALIDATION SCHEMAS +// ============================================================================ + +export const GetExistingSyncJobsInputSchema = z.object({ + statuses: z + .array(z.string()) + .optional() + .default(['pending', 'pending_manual', 'pending_initial', 'failed']), + limit: z.number().int().min(1).optional(), +}); + +export const ExistingSyncJobSchema = z.object({ + id: z.string().uuid(), + dataSourceId: z.string().uuid(), + dataSourceName: z.string(), + dataSourceType: z.string(), + organizationId: z.string().uuid(), + databaseName: z.string(), + schemaName: z.string(), + tableName: z.string(), + columnName: z.string(), + status: z.string(), + errorMessage: z.string().nullable(), + lastSyncedAt: z.string().datetime().nullable(), + createdAt: z.string().datetime(), +}); + +export const GetExistingSyncJobsOutputSchema = z.object({ + syncJobs: z.array(ExistingSyncJobSchema), + totalCount: z.number().int().min(0), + byDataSource: z.record( + z.string().uuid(), + z.object({ + dataSourceId: z.string().uuid(), + dataSourceName: z.string(), + dataSourceType: z.string(), + organizationId: z.string().uuid(), + jobCount: z.number().int().min(0), + jobs: z.array(ExistingSyncJobSchema), + }) + ), +}); + +// ============================================================================ +// TYPES +// ============================================================================ + +export type GetExistingSyncJobsInput = z.infer; +export type ExistingSyncJob = z.infer; +export type GetExistingSyncJobsOutput = z.infer; + +// ============================================================================ +// QUERY FUNCTIONS +// ============================================================================ + +/** + * Get existing sync jobs that need to be processed + * Returns jobs from the stored_values_sync_jobs table grouped by data source + */ +export async function getExistingSyncJobs( + input: Partial = {} +): Promise { + try { + const validated = GetExistingSyncJobsInputSchema.parse(input); + + // Build the query to get existing sync jobs with their data source info + const baseQuery = db + .select({ + id: storedValuesSyncJobs.id, + dataSourceId: storedValuesSyncJobs.dataSourceId, + dataSourceName: dataSources.name, + dataSourceType: dataSources.type, + organizationId: dataSources.organizationId, + databaseName: storedValuesSyncJobs.databaseName, + schemaName: storedValuesSyncJobs.schemaName, + tableName: storedValuesSyncJobs.tableName, + columnName: storedValuesSyncJobs.columnName, + status: storedValuesSyncJobs.status, + errorMessage: storedValuesSyncJobs.errorMessage, + lastSyncedAt: storedValuesSyncJobs.lastSyncedAt, + createdAt: storedValuesSyncJobs.createdAt, + }) + .from(storedValuesSyncJobs) + .innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId)) + .where( + and( + // Only get jobs with specified statuses + inArray(storedValuesSyncJobs.status, validated.statuses), + // Only active data sources + isNull(dataSources.deletedAt) + ) + ); + + // Add limit if specified + const query = validated.limit ? baseQuery.limit(validated.limit) : baseQuery; + + const syncJobs = await query; + + // Transform the results + const formattedJobs: ExistingSyncJob[] = syncJobs.map((job) => ({ + ...job, + lastSyncedAt: job.lastSyncedAt ? new Date(job.lastSyncedAt).toISOString() : null, + createdAt: new Date(job.createdAt).toISOString(), + })); + + // Group jobs by data source + const byDataSource: Record< + string, + { + dataSourceId: string; + dataSourceName: string; + dataSourceType: string; + organizationId: string; + jobCount: number; + jobs: ExistingSyncJob[]; + } + > = {}; + + for (const job of formattedJobs) { + if (!byDataSource[job.dataSourceId]) { + byDataSource[job.dataSourceId] = { + dataSourceId: job.dataSourceId, + dataSourceName: job.dataSourceName, + dataSourceType: job.dataSourceType, + organizationId: job.organizationId, + jobCount: 0, + jobs: [], + }; + } + const dataSourceInfo = byDataSource[job.dataSourceId]; + if (dataSourceInfo) { + dataSourceInfo.jobs.push(job); + dataSourceInfo.jobCount++; + } + } + + return GetExistingSyncJobsOutputSchema.parse({ + syncJobs: formattedJobs, + totalCount: formattedJobs.length, + byDataSource, + }); + } catch (error) { + if (error instanceof z.ZodError) { + throw new Error(`Validation error in getExistingSyncJobs: ${error.message}`); + } + + if (error instanceof Error) { + throw new Error(`Database error in getExistingSyncJobs: ${error.message}`); + } + + throw new Error(`Unknown error in getExistingSyncJobs: ${String(error)}`); + } +} + +/** + * Get sync jobs for a specific data source + * Useful for processing jobs per data source + */ +export async function getSyncJobsForDataSource( + dataSourceId: string, + statuses: string[] = ['pending', 'pending_manual', 'pending_initial', 'failed'] +): Promise { + try { + const validatedId = z.string().uuid().parse(dataSourceId); + + const jobs = await db + .select({ + id: storedValuesSyncJobs.id, + dataSourceId: storedValuesSyncJobs.dataSourceId, + dataSourceName: dataSources.name, + dataSourceType: dataSources.type, + organizationId: dataSources.organizationId, + databaseName: storedValuesSyncJobs.databaseName, + schemaName: storedValuesSyncJobs.schemaName, + tableName: storedValuesSyncJobs.tableName, + columnName: storedValuesSyncJobs.columnName, + status: storedValuesSyncJobs.status, + errorMessage: storedValuesSyncJobs.errorMessage, + lastSyncedAt: storedValuesSyncJobs.lastSyncedAt, + createdAt: storedValuesSyncJobs.createdAt, + }) + .from(storedValuesSyncJobs) + .innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId)) + .where( + and( + eq(storedValuesSyncJobs.dataSourceId, validatedId), + inArray(storedValuesSyncJobs.status, statuses), + isNull(dataSources.deletedAt) + ) + ); + + return jobs.map((job) => ({ + ...job, + lastSyncedAt: job.lastSyncedAt ? new Date(job.lastSyncedAt).toISOString() : null, + createdAt: new Date(job.createdAt).toISOString(), + })); + } catch (error) { + if (error instanceof z.ZodError) { + throw new Error(`Invalid dataSourceId: ${error.message}`); + } + + if (error instanceof Error) { + throw new Error(`Database error in getSyncJobsForDataSource: ${error.message}`); + } + + throw new Error(`Unknown error in getSyncJobsForDataSource: ${String(error)}`); + } +} + +/** + * Count pending sync jobs + * Useful for monitoring and reporting + */ +export async function countPendingSyncJobs(): Promise { + try { + const result = await db + .select({ + count: sql`COUNT(*)`, + }) + .from(storedValuesSyncJobs) + .innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId)) + .where( + and( + inArray(storedValuesSyncJobs.status, ['pending', 'pending_manual', 'pending_initial']), + isNull(dataSources.deletedAt) + ) + ); + + return Number(result[0]?.count || 0); + } catch (error) { + if (error instanceof Error) { + throw new Error(`Database error in countPendingSyncJobs: ${error.message}`); + } + + throw new Error(`Unknown error in countPendingSyncJobs: ${String(error)}`); + } +} diff --git a/packages/database/src/queries/sync-jobs/get-searchable-columns.ts b/packages/database/src/queries/sync-jobs/get-searchable-columns.ts deleted file mode 100644 index 0219e1ec1..000000000 --- a/packages/database/src/queries/sync-jobs/get-searchable-columns.ts +++ /dev/null @@ -1,231 +0,0 @@ -import { and, eq, isNull, sql } from 'drizzle-orm'; -import { z } from 'zod'; -import { db } from '../../connection'; -import { dataSources, storedValuesSyncJobs } from '../../schema'; - -// ============================================================================ -// VALIDATION SCHEMAS -// ============================================================================ - -export const GetSearchableColumnsInputSchema = z.object({ - dataSourceId: z.string().uuid(), -}); - -export const SearchableColumnSchema = z.object({ - id: z.string().uuid(), - databaseName: z.string(), - schemaName: z.string(), - tableName: z.string(), - columnName: z.string(), - status: z.string(), - errorMessage: z.string().nullable(), - lastSyncedAt: z.string().datetime().nullable(), -}); - -export const GetSearchableColumnsOutputSchema = z.object({ - columns: z.array(SearchableColumnSchema), - totalCount: z.number().int().min(0), -}); - -export const GetColumnsNeedingSyncInputSchema = z.object({ - dataSourceId: z.string().uuid(), - hoursThreshold: z.number().min(0).optional().default(24), // Default to 24 hours -}); - -export const ColumnsNeedingSyncOutputSchema = z.object({ - columns: z.array(SearchableColumnSchema), - totalCount: z.number().int().min(0), - neverSynced: z.number().int().min(0), - stale: z.number().int().min(0), -}); - -// ============================================================================ -// TYPES -// ============================================================================ - -export type GetSearchableColumnsInput = z.infer; -export type SearchableColumn = z.infer; -export type GetSearchableColumnsOutput = z.infer; -export type GetColumnsNeedingSyncInput = z.infer; -export type ColumnsNeedingSyncOutput = z.infer; - -// ============================================================================ -// QUERY FUNCTIONS -// ============================================================================ - -/** - * Get all searchable columns for a data source - * Returns columns from the stored_values_sync_jobs table - */ -export async function getSearchableColumns( - input: GetSearchableColumnsInput -): Promise { - try { - const validated = GetSearchableColumnsInputSchema.parse(input); - - // Query all sync jobs for the data source - const columns = await db - .select({ - id: storedValuesSyncJobs.id, - databaseName: storedValuesSyncJobs.databaseName, - schemaName: storedValuesSyncJobs.schemaName, - tableName: storedValuesSyncJobs.tableName, - columnName: storedValuesSyncJobs.columnName, - status: storedValuesSyncJobs.status, - errorMessage: storedValuesSyncJobs.errorMessage, - lastSyncedAt: storedValuesSyncJobs.lastSyncedAt, - }) - .from(storedValuesSyncJobs) - .innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId)) - .where( - and( - eq(storedValuesSyncJobs.dataSourceId, validated.dataSourceId), - isNull(dataSources.deletedAt), - eq(dataSources.onboardingStatus, 'completed') - ) - ); - - return GetSearchableColumnsOutputSchema.parse({ - columns, - totalCount: columns.length, - }); - } catch (error) { - if (error instanceof z.ZodError) { - throw new Error(`Validation error in getSearchableColumns: ${error.message}`); - } - - if (error instanceof Error) { - throw new Error(`Database error in getSearchableColumns: ${error.message}`); - } - - throw new Error(`Unknown error in getSearchableColumns: ${String(error)}`); - } -} - -/** - * Get columns that need to be synced - * Returns columns that have never been synced or are stale - */ -export async function getColumnsNeedingSync( - input: GetColumnsNeedingSyncInput -): Promise { - try { - const validated = GetColumnsNeedingSyncInputSchema.parse(input); - - // Calculate the threshold timestamp - const thresholdDate = new Date(); - thresholdDate.setHours(thresholdDate.getHours() - validated.hoursThreshold); - const thresholdTimestamp = thresholdDate.toISOString(); - - // Query all searchable columns - const allColumns = await getSearchableColumns({ - dataSourceId: validated.dataSourceId, - }); - - // Filter columns that need syncing - const columnsNeedingSync = allColumns.columns.filter((column) => { - // Never synced - if (!column.lastSyncedAt) { - return true; - } - - // Check if stale - return column.lastSyncedAt < thresholdTimestamp; - }); - - // Count never synced vs stale - const neverSynced = columnsNeedingSync.filter((c) => !c.lastSyncedAt).length; - const stale = columnsNeedingSync.filter((c) => c.lastSyncedAt).length; - - return ColumnsNeedingSyncOutputSchema.parse({ - columns: columnsNeedingSync, - totalCount: columnsNeedingSync.length, - neverSynced, - stale, - }); - } catch (error) { - if (error instanceof z.ZodError) { - throw new Error(`Validation error in getColumnsNeedingSync: ${error.message}`); - } - - throw error; - } -} - -/** - * Get column details for sync job creation - * Returns the minimum information needed to create sync jobs - */ -export async function getColumnDetailsForSync(dataSourceId: string) { - try { - const validated = z.string().uuid().parse(dataSourceId); - - const columns = await db - .select({ - databaseName: storedValuesSyncJobs.databaseName, - schemaName: storedValuesSyncJobs.schemaName, - tableName: storedValuesSyncJobs.tableName, - columnName: storedValuesSyncJobs.columnName, - columnId: storedValuesSyncJobs.id, - lastSynced: storedValuesSyncJobs.lastSyncedAt, - }) - .from(storedValuesSyncJobs) - .innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId)) - .where( - and( - eq(storedValuesSyncJobs.dataSourceId, validated), - isNull(dataSources.deletedAt), - eq(dataSources.onboardingStatus, 'completed') - ) - ); - - return columns; - } catch (error) { - if (error instanceof z.ZodError) { - throw new Error(`Invalid dataSourceId: ${error.message}`); - } - - throw error; - } -} - -/** - * Update column sync metadata after successful sync - * Updates the stored_values_sync_jobs table - */ -export async function updateColumnSyncMetadata( - syncJobId: string, - metadata: { - status: 'syncing' | 'success' | 'failed' | 'error'; - error?: string | null; - lastSynced?: string; - } -): Promise { - try { - const validatedId = z.string().uuid().parse(syncJobId); - - const updateData: Record = { - status: metadata.status, - }; - - if (metadata.error !== undefined) { - updateData.errorMessage = metadata.error; - } - - if (metadata.lastSynced !== undefined) { - updateData.lastSyncedAt = metadata.lastSynced; - } - - await db.update(storedValuesSyncJobs).set(updateData).where(eq(storedValuesSyncJobs.id, validatedId)); - } catch (error) { - if (error instanceof z.ZodError) { - throw new Error(`Invalid syncJobId: ${error.message}`); - } - - if (error instanceof Error) { - throw new Error(`Failed to update column sync metadata: ${error.message}`); - } - - throw error; - } -} diff --git a/packages/database/src/queries/sync-jobs/index.ts b/packages/database/src/queries/sync-jobs/index.ts index 89cec6178..c05451d0d 100644 --- a/packages/database/src/queries/sync-jobs/index.ts +++ b/packages/database/src/queries/sync-jobs/index.ts @@ -7,19 +7,22 @@ */ // ============================================================================ -// DATA SOURCE QUERIES +// EXISTING SYNC JOBS QUERIES // ============================================================================ export { // Functions - getDataSourcesForSync, - dataSourceNeedsSync, + getExistingSyncJobs, + getSyncJobsForDataSource, + countPendingSyncJobs, // Types - type DataSourceForSync, - type GetDataSourcesForSyncOutput, + type GetExistingSyncJobsInput, + type ExistingSyncJob, + type GetExistingSyncJobsOutput, // Schemas - DataSourceForSyncSchema, - GetDataSourcesForSyncOutputSchema, -} from './get-data-sources-for-sync'; + GetExistingSyncJobsInputSchema, + ExistingSyncJobSchema, + GetExistingSyncJobsOutputSchema, +} from './get-existing-jobs'; // ============================================================================ // SYNC JOB CREATION @@ -69,26 +72,3 @@ export { BulkUpdateSyncJobsInputSchema, BulkUpdateSyncJobsOutputSchema, } from './update-sync-job'; - -// ============================================================================ -// SEARCHABLE COLUMNS QUERIES -// ============================================================================ -export { - // Functions - getSearchableColumns, - getColumnsNeedingSync, - getColumnDetailsForSync, - updateColumnSyncMetadata, - // Types - type GetSearchableColumnsInput, - type SearchableColumn, - type GetSearchableColumnsOutput, - type GetColumnsNeedingSyncInput, - type ColumnsNeedingSyncOutput, - // Schemas - GetSearchableColumnsInputSchema, - SearchableColumnSchema, - GetSearchableColumnsOutputSchema, - GetColumnsNeedingSyncInputSchema, - ColumnsNeedingSyncOutputSchema, -} from './get-searchable-columns'; diff --git a/packages/database/src/queries/sync-jobs/sync-jobs.test.ts b/packages/database/src/queries/sync-jobs/sync-jobs.test.ts index 5f4f311ad..2d62557bf 100644 --- a/packages/database/src/queries/sync-jobs/sync-jobs.test.ts +++ b/packages/database/src/queries/sync-jobs/sync-jobs.test.ts @@ -23,53 +23,126 @@ describe('Sync Jobs Query Helpers', () => { vi.clearAllMocks(); }); - describe('getDataSourcesForSync', () => { - it('should validate and return data sources with searchable columns', async () => { + describe('getExistingSyncJobs', () => { + it('should validate and return existing sync jobs grouped by data source', async () => { // This test validates the schema structure - const mockOutput: syncJobs.GetDataSourcesForSyncOutput = { - dataSources: [ + const mockOutput: syncJobs.GetExistingSyncJobsOutput = { + syncJobs: [ { id: '123e4567-e89b-12d3-a456-426614174000', - name: 'Test Data Source', - type: 'postgresql', - organizationId: '123e4567-e89b-12d3-a456-426614174001', - columnsWithStoredValues: 5, + dataSourceId: '123e4567-e89b-12d3-a456-426614174001', + dataSourceName: 'Test Data Source', + dataSourceType: 'postgresql', + organizationId: '123e4567-e89b-12d3-a456-426614174002', + databaseName: 'test_db', + schemaName: 'public', + tableName: 'users', + columnName: 'name', + status: 'pending', + errorMessage: null, + lastSyncedAt: null, + createdAt: '2024-01-01T00:00:00.000Z', + }, + { + id: '123e4567-e89b-12d3-a456-426614174003', + dataSourceId: '123e4567-e89b-12d3-a456-426614174001', + dataSourceName: 'Test Data Source', + dataSourceType: 'postgresql', + organizationId: '123e4567-e89b-12d3-a456-426614174002', + databaseName: 'test_db', + schemaName: 'public', + tableName: 'users', + columnName: 'email', + status: 'failed', + errorMessage: 'Connection timeout', + lastSyncedAt: '2024-01-01T00:00:00.000Z', + createdAt: '2024-01-01T00:00:00.000Z', }, ], - totalCount: 1, + totalCount: 2, + byDataSource: { + '123e4567-e89b-12d3-a456-426614174001': { + dataSourceId: '123e4567-e89b-12d3-a456-426614174001', + dataSourceName: 'Test Data Source', + dataSourceType: 'postgresql', + organizationId: '123e4567-e89b-12d3-a456-426614174002', + jobCount: 2, + jobs: [ + { + id: '123e4567-e89b-12d3-a456-426614174000', + dataSourceId: '123e4567-e89b-12d3-a456-426614174001', + dataSourceName: 'Test Data Source', + dataSourceType: 'postgresql', + organizationId: '123e4567-e89b-12d3-a456-426614174002', + databaseName: 'test_db', + schemaName: 'public', + tableName: 'users', + columnName: 'name', + status: 'pending', + errorMessage: null, + lastSyncedAt: null, + createdAt: '2024-01-01T00:00:00.000Z', + }, + { + id: '123e4567-e89b-12d3-a456-426614174003', + dataSourceId: '123e4567-e89b-12d3-a456-426614174001', + dataSourceName: 'Test Data Source', + dataSourceType: 'postgresql', + organizationId: '123e4567-e89b-12d3-a456-426614174002', + databaseName: 'test_db', + schemaName: 'public', + tableName: 'users', + columnName: 'email', + status: 'failed', + errorMessage: 'Connection timeout', + lastSyncedAt: '2024-01-01T00:00:00.000Z', + createdAt: '2024-01-01T00:00:00.000Z', + }, + ], + }, + }, }; // Validate output schema - const validated = syncJobs.GetDataSourcesForSyncOutputSchema.parse(mockOutput); - expect(validated.dataSources).toHaveLength(1); - expect(validated.dataSources[0]?.columnsWithStoredValues).toBe(5); + const validated = syncJobs.GetExistingSyncJobsOutputSchema.parse(mockOutput); + expect(validated.syncJobs).toHaveLength(2); + expect(validated.totalCount).toBe(2); + expect(Object.keys(validated.byDataSource)).toHaveLength(1); + expect(validated.byDataSource['123e4567-e89b-12d3-a456-426614174001']?.jobCount).toBe(2); }); - it('should reject invalid data source data', () => { + it('should reject invalid sync job data', () => { const invalidData = { - dataSources: [ + syncJobs: [ { id: 'not-a-uuid', - name: 'Test', - type: 'postgresql', - organizationId: '123e4567-e89b-12d3-a456-426614174001', - columnsWithStoredValues: -1, // Invalid: negative count + dataSourceId: '123e4567-e89b-12d3-a456-426614174001', + dataSourceName: 'Test', + dataSourceType: 'postgresql', + organizationId: '123e4567-e89b-12d3-a456-426614174002', + databaseName: 'test_db', + schemaName: 'public', + tableName: 'users', + columnName: 'name', + status: 'pending', + errorMessage: null, + lastSyncedAt: null, + createdAt: '2024-01-01T00:00:00.000Z', }, ], totalCount: 1, + byDataSource: {}, }; - expect(() => syncJobs.GetDataSourcesForSyncOutputSchema.parse(invalidData)).toThrow( - z.ZodError - ); + expect(() => syncJobs.GetExistingSyncJobsOutputSchema.parse(invalidData)).toThrow(z.ZodError); }); }); describe('createSearchableValuesSyncJob', () => { - it('should validate sync job input', () => { + it('should validate sync job creation input', () => { const validInput: syncJobs.CreateSyncJobInput = { dataSourceId: '123e4567-e89b-12d3-a456-426614174000', - databaseName: 'analytics', + databaseName: 'test_db', schemaName: 'public', tableName: 'users', columnName: 'email', @@ -77,14 +150,14 @@ describe('Sync Jobs Query Helpers', () => { }; const validated = syncJobs.CreateSyncJobInputSchema.parse(validInput); + expect(validated.dataSourceId).toBe('123e4567-e89b-12d3-a456-426614174000'); expect(validated.syncType).toBe('daily'); - expect(validated.databaseName).toBe('analytics'); }); - it('should reject empty strings in required fields', () => { + it('should reject invalid creation input', () => { const invalidInput = { - dataSourceId: '123e4567-e89b-12d3-a456-426614174000', - databaseName: '', // Invalid: empty string + dataSourceId: 'not-a-uuid', + databaseName: '', schemaName: 'public', tableName: 'users', columnName: 'email', @@ -92,89 +165,135 @@ describe('Sync Jobs Query Helpers', () => { expect(() => syncJobs.CreateSyncJobInputSchema.parse(invalidInput)).toThrow(z.ZodError); }); - - it('should validate batch create input', () => { - const batchInput: syncJobs.BatchCreateSyncJobsInput = { - dataSourceId: '123e4567-e89b-12d3-a456-426614174000', - syncType: 'manual', - columns: [ - { - databaseName: 'analytics', - schemaName: 'public', - tableName: 'users', - columnName: 'email', - }, - { - databaseName: 'analytics', - schemaName: 'public', - tableName: 'users', - columnName: 'name', - }, - ], - }; - - const validated = syncJobs.BatchCreateSyncJobsInputSchema.parse(batchInput); - expect(validated.columns).toHaveLength(2); - expect(validated.syncType).toBe('manual'); - }); }); describe('updateSyncJobStatus', () => { it('should validate status update input', () => { const validInput: syncJobs.UpdateSyncJobStatusInput = { jobId: '123e4567-e89b-12d3-a456-426614174000', - status: 'completed', + status: 'success', metadata: { processedCount: 100, - existingCount: 20, - newCount: 80, + existingCount: 50, + newCount: 50, duration: 5000, - syncedAt: new Date().toISOString(), + syncedAt: '2024-01-01T00:00:00.000Z', }, }; const validated = syncJobs.UpdateSyncJobStatusInputSchema.parse(validInput); - expect(validated.status).toBe('completed'); + expect(validated.status).toBe('success'); expect(validated.metadata?.processedCount).toBe(100); }); - it('should validate all sync job statuses', () => { - const validStatuses: syncJobs.SyncJobStatus[] = [ + it('should accept valid status values', () => { + const statuses = [ 'pending', 'pending_manual', 'pending_initial', 'in_progress', - 'completed', + 'success', 'failed', 'cancelled', 'skipped', ]; - for (const status of validStatuses) { - const validated = syncJobs.SyncJobStatusSchema.parse(status); - expect(validated).toBe(status); + for (const status of statuses) { + const input = { + jobId: '123e4567-e89b-12d3-a456-426614174000', + status, + }; + expect(() => syncJobs.UpdateSyncJobStatusInputSchema.parse(input)).not.toThrow(); } }); - it('should reject invalid status', () => { - expect(() => syncJobs.SyncJobStatusSchema.parse('invalid_status')).toThrow(z.ZodError); - }); - - it('should validate bulk update input', () => { - const bulkInput: syncJobs.BulkUpdateSyncJobsInput = { - jobIds: ['123e4567-e89b-12d3-a456-426614174000', '123e4567-e89b-12d3-a456-426614174001'], - status: 'cancelled', - errorMessage: 'Batch operation cancelled by user', + it('should reject invalid status values', () => { + const invalidInput = { + jobId: '123e4567-e89b-12d3-a456-426614174000', + status: 'invalid_status', }; - const validated = syncJobs.BulkUpdateSyncJobsInputSchema.parse(bulkInput); - expect(validated.jobIds).toHaveLength(2); - expect(validated.status).toBe('cancelled'); + expect(() => syncJobs.UpdateSyncJobStatusInputSchema.parse(invalidInput)).toThrow(z.ZodError); + }); + }); + + describe('batchCreateSyncJobs', () => { + it('should validate batch creation input', () => { + const validInput: syncJobs.BatchCreateSyncJobsInput = { + dataSourceId: '123e4567-e89b-12d3-a456-426614174000', + syncType: 'manual', + columns: [ + { + databaseName: 'test_db', + schemaName: 'public', + tableName: 'users', + columnName: 'name', + }, + { + databaseName: 'test_db', + schemaName: 'public', + tableName: 'users', + columnName: 'email', + }, + ], + }; + + const validated = syncJobs.BatchCreateSyncJobsInputSchema.parse(validInput); + expect(validated.columns).toHaveLength(2); + expect(validated.syncType).toBe('manual'); }); - it('should reject empty jobIds array', () => { + it('should validate batch creation output', () => { + const validOutput: syncJobs.BatchCreateSyncJobsOutput = { + created: [ + { + id: '123e4567-e89b-12d3-a456-426614174000', + dataSourceId: '123e4567-e89b-12d3-a456-426614174001', + databaseName: 'test_db', + schemaName: 'public', + tableName: 'users', + columnName: 'name', + status: 'pending', + createdAt: '2024-01-01T00:00:00.000Z', + }, + ], + totalCreated: 1, + errors: [ + { + column: { + databaseName: 'test_db', + schemaName: 'public', + tableName: 'users', + columnName: 'email', + }, + error: 'Duplicate entry', + }, + ], + }; + + const validated = syncJobs.BatchCreateSyncJobsOutputSchema.parse(validOutput); + expect(validated.totalCreated).toBe(1); + expect(validated.errors).toHaveLength(1); + }); + }); + + describe('bulkUpdateSyncJobs', () => { + it('should validate bulk update input', () => { + const validInput: syncJobs.BulkUpdateSyncJobsInput = { + jobIds: ['123e4567-e89b-12d3-a456-426614174000', '123e4567-e89b-12d3-a456-426614174001'], + status: 'cancelled', + errorMessage: 'Batch cancelled by user', + }; + + const validated = syncJobs.BulkUpdateSyncJobsInputSchema.parse(validInput); + expect(validated.jobIds).toHaveLength(2); + expect(validated.status).toBe('cancelled'); + expect(validated.errorMessage).toBe('Batch cancelled by user'); + }); + + it('should reject empty job IDs array', () => { const invalidInput = { - jobIds: [], // Invalid: empty array + jobIds: [], status: 'cancelled', }; @@ -182,188 +301,33 @@ describe('Sync Jobs Query Helpers', () => { }); }); - describe('getSearchableColumns', () => { - it('should validate searchable columns output', () => { - const mockColumn: syncJobs.SearchableColumn = { + describe('getSyncJobStatus', () => { + it('should validate get status input', () => { + const validInput: syncJobs.GetSyncJobStatusInput = { + jobId: '123e4567-e89b-12d3-a456-426614174000', + }; + + const validated = syncJobs.GetSyncJobStatusInputSchema.parse(validInput); + expect(validated.jobId).toBe('123e4567-e89b-12d3-a456-426614174000'); + }); + + it('should validate get status output', () => { + const validOutput: syncJobs.GetSyncJobStatusOutput = { id: '123e4567-e89b-12d3-a456-426614174000', - datasetId: '123e4567-e89b-12d3-a456-426614174001', - datasetName: 'users_dataset', - databaseName: 'analytics', + dataSourceId: '123e4567-e89b-12d3-a456-426614174001', + databaseName: 'test_db', schemaName: 'public', - tableName: 'users_dataset', + tableName: 'users', columnName: 'email', - columnType: 'varchar', - description: null, - semanticType: null, - storedValuesStatus: 'success', - storedValuesError: null, - storedValuesCount: 1000, - storedValuesLastSynced: new Date().toISOString(), + status: 'in_progress', + lastSyncedAt: null, + errorMessage: null, + createdAt: '2024-01-01T00:00:00.000Z', }; - const validated = syncJobs.SearchableColumnSchema.parse(mockColumn); - expect(validated.columnName).toBe('email'); - expect(validated.storedValuesCount).toBe(1000); - }); - - it('should validate columns needing sync output', () => { - const mockOutput: syncJobs.ColumnsNeedingSyncOutput = { - columns: [], - totalCount: 0, - neverSynced: 0, - stale: 0, - }; - - const validated = syncJobs.ColumnsNeedingSyncOutputSchema.parse(mockOutput); - expect(validated.totalCount).toBe(0); - expect(validated.neverSynced).toBe(0); - }); - - it('should validate get searchable columns output with grouping', () => { - const mockColumn: syncJobs.SearchableColumn = { - id: '123e4567-e89b-12d3-a456-426614174000', - datasetId: '123e4567-e89b-12d3-a456-426614174001', - datasetName: 'users_dataset', - databaseName: 'analytics', - schemaName: 'public', - tableName: 'users_dataset', - columnName: 'email', - columnType: 'varchar', - description: null, - semanticType: null, - storedValuesStatus: null, - storedValuesError: null, - storedValuesCount: null, - storedValuesLastSynced: null, - }; - - const mockOutput: syncJobs.GetSearchableColumnsOutput = { - columns: [mockColumn], - totalCount: 1, - byDataset: { - '123e4567-e89b-12d3-a456-426614174001': { - datasetName: 'users_dataset', - columns: [mockColumn], - count: 1, - }, - }, - }; - - const validated = syncJobs.GetSearchableColumnsOutputSchema.parse(mockOutput); - expect(validated.totalCount).toBe(1); - expect(validated.byDataset).toHaveProperty('123e4567-e89b-12d3-a456-426614174001'); - }); - }); - - describe('Schema Edge Cases', () => { - it('should handle nullable fields correctly', () => { - const column: syncJobs.SearchableColumn = { - id: '123e4567-e89b-12d3-a456-426614174000', - datasetId: '123e4567-e89b-12d3-a456-426614174001', - datasetName: 'test', - databaseName: 'db', - schemaName: 'schema', - tableName: 'table', - columnName: 'col', - columnType: 'text', - description: null, - semanticType: null, - storedValuesStatus: null, - storedValuesError: null, - storedValuesCount: null, - storedValuesLastSynced: null, - }; - - const validated = syncJobs.SearchableColumnSchema.parse(column); - expect(validated.description).toBeNull(); - expect(validated.storedValuesCount).toBeNull(); - }); - - it('should validate datetime strings', () => { - const validDatetime = '2024-01-01T12:00:00.000Z'; - const invalidDatetime = '2024-01-01 12:00:00'; // Missing T and Z - - const column = { - id: '123e4567-e89b-12d3-a456-426614174000', - datasetId: '123e4567-e89b-12d3-a456-426614174001', - datasetName: 'test', - databaseName: 'db', - schemaName: 'schema', - tableName: 'table', - columnName: 'col', - columnType: 'text', - description: null, - semanticType: null, - storedValuesStatus: null, - storedValuesError: null, - storedValuesCount: null, - storedValuesLastSynced: validDatetime, - }; - - // Should accept valid datetime - const validated = syncJobs.SearchableColumnSchema.parse(column); - expect(validated.storedValuesLastSynced).toBe(validDatetime); - - // Should reject invalid datetime - column.storedValuesLastSynced = invalidDatetime; - expect(() => syncJobs.SearchableColumnSchema.parse(column)).toThrow(z.ZodError); - }); - - it('should validate UUID formats', () => { - const validUuid = '123e4567-e89b-12d3-a456-426614174000'; - const invalidUuid = '123-456-789'; - - // Valid UUID - const validInput = { dataSourceId: validUuid }; - const validated = syncJobs.GetSearchableColumnsInputSchema.parse(validInput); - expect(validated.dataSourceId).toBe(validUuid); - - // Invalid UUID - const invalidInput = { dataSourceId: invalidUuid }; - expect(() => syncJobs.GetSearchableColumnsInputSchema.parse(invalidInput)).toThrow( - z.ZodError - ); - }); - }); - - describe('Type Exports', () => { - it('should export all required types', () => { - // Verify type exports exist (TypeScript will check at compile time) - const _dataSourceType: syncJobs.DataSourceForSync = { - id: '123e4567-e89b-12d3-a456-426614174000', - name: 'Test', - type: 'postgresql', - organizationId: '123e4567-e89b-12d3-a456-426614174001', - columnsWithStoredValues: 0, - }; - - const _syncJobInput: syncJobs.CreateSyncJobInput = { - dataSourceId: '123e4567-e89b-12d3-a456-426614174000', - databaseName: 'db', - schemaName: 'schema', - tableName: 'table', - columnName: 'column', - }; - - const _searchableColumn: syncJobs.SearchableColumn = { - id: '123e4567-e89b-12d3-a456-426614174000', - datasetId: '123e4567-e89b-12d3-a456-426614174001', - datasetName: 'test', - databaseName: 'db', - schemaName: 'schema', - tableName: 'table', - columnName: 'col', - columnType: 'text', - description: null, - semanticType: null, - storedValuesStatus: null, - storedValuesError: null, - storedValuesCount: null, - storedValuesLastSynced: null, - }; - - // Types are valid if this compiles - expect(true).toBe(true); + const validated = syncJobs.GetSyncJobStatusOutputSchema.parse(validOutput); + expect(validated.status).toBe('in_progress'); + expect(validated.lastSyncedAt).toBeNull(); }); }); }); diff --git a/packages/database/src/queries/sync-jobs/update-sync-job.ts b/packages/database/src/queries/sync-jobs/update-sync-job.ts index 0ef5e1a49..bfa46a32e 100644 --- a/packages/database/src/queries/sync-jobs/update-sync-job.ts +++ b/packages/database/src/queries/sync-jobs/update-sync-job.ts @@ -12,7 +12,7 @@ export const SyncJobStatusSchema = z.enum([ 'pending_manual', 'pending_initial', 'in_progress', - 'completed', + 'success', 'failed', 'cancelled', 'skipped', @@ -103,7 +103,7 @@ export async function updateSyncJobStatus( }; // Handle success case - if (validated.status === 'completed' && validated.metadata?.syncedAt) { + if (validated.status === 'success' && validated.metadata?.syncedAt) { updateData.lastSyncedAt = validated.metadata.syncedAt; updateData.errorMessage = null; // Clear any previous error } @@ -131,9 +131,7 @@ export async function updateSyncJobStatus( return UpdateSyncJobOutputSchema.parse({ ...updated, - lastSyncedAt: updated.lastSyncedAt - ? new Date(updated.lastSyncedAt).toISOString() - : null, + lastSyncedAt: updated.lastSyncedAt ? new Date(updated.lastSyncedAt).toISOString() : null, updatedAt: new Date().toISOString(), }); } catch (error) { @@ -182,9 +180,7 @@ export async function getSyncJobStatus( return GetSyncJobStatusOutputSchema.parse({ ...job, - lastSyncedAt: job.lastSyncedAt - ? new Date(job.lastSyncedAt).toISOString() - : null, + lastSyncedAt: job.lastSyncedAt ? new Date(job.lastSyncedAt).toISOString() : null, createdAt: new Date(job.createdAt).toISOString(), }); } catch (error) { @@ -225,7 +221,7 @@ export async function markSyncJobCompleted( ): Promise { return updateSyncJobStatus({ jobId, - status: 'completed', + status: 'success', metadata: { ...metadata, syncedAt: new Date().toISOString(), diff --git a/packages/search/src/searchable-values/client.test.ts b/packages/search/src/searchable-values/client.test.ts index 1d036e867..7952d45cf 100644 --- a/packages/search/src/searchable-values/client.test.ts +++ b/packages/search/src/searchable-values/client.test.ts @@ -144,7 +144,7 @@ describe('Turbopuffer Client', () => { expect(keys[1]).toBe('db1:public:users:email:john@example.com'); expect(mockNamespace.query).toHaveBeenCalledWith({ - top_k: 10000, + top_k: 1200, filters: ['database', 'Eq', 'db1'], include_attributes: ['database', 'schema', 'table', 'column', 'value'], }); @@ -186,7 +186,7 @@ describe('Turbopuffer Client', () => { await queryExistingKeys({ dataSourceId: mockDataSourceId, query }); expect(mockNamespace.query).toHaveBeenCalledWith({ - top_k: 10000, + top_k: 1200, filters: [ 'And', [ @@ -249,6 +249,7 @@ describe('Turbopuffer Client', () => { value: ['John', 'john@example.com'], synced_at: expect.arrayContaining([expect.any(String)]), }, + distance_metric: 'cosine_distance', }); }); diff --git a/packages/search/src/searchable-values/client.ts b/packages/search/src/searchable-values/client.ts index 3ccd7bcf5..890b64a56 100644 --- a/packages/search/src/searchable-values/client.ts +++ b/packages/search/src/searchable-values/client.ts @@ -45,7 +45,7 @@ const QueryInputSchema = z.object({ const GetAllInputSchema = z.object({ dataSourceId: DataSourceIdSchema, - limit: z.number().int().min(1).max(10000).optional().default(1000), + limit: z.number().int().min(1).max(1200).optional().default(1000), }); // ============================================================================ @@ -154,7 +154,7 @@ export const chunk = (arr: T[], size: number): T[][] => { export const createClient = (): Turbopuffer => { const apiKey = process.env.TURBOPUFFER_API_KEY; const region = process.env.TURBOPUFFER_REGION || 'aws-us-east-1'; - + if (!apiKey) { throw new TurbopufferError( 'TURBOPUFFER_API_KEY environment variable is not set', @@ -162,10 +162,10 @@ export const createClient = (): Turbopuffer => { false ); } - - return new Turbopuffer({ + + return new Turbopuffer({ apiKey, - region + region, }); }; @@ -242,6 +242,8 @@ export const checkNamespaceExists = async (dataSourceId: string): Promise @@ -252,7 +254,7 @@ export const queryExistingKeys = async ( return withRetry(async () => { const response = await ns.query({ - top_k: 10000, + top_k: 1200, // Maximum allowed by Turbopuffer ...(filter && { filters: filter }), include_attributes: ['database', 'schema', 'table', 'column', 'value'], }); @@ -285,7 +287,10 @@ const processBatch = async ( } const columns = valuesToColumns(batch); - await ns.write({ upsert_columns: columns }); + await ns.write({ + upsert_columns: columns, + distance_metric: 'cosine_distance' + }); return { success: true, count: batch.length }; } catch (error) { @@ -461,11 +466,29 @@ export const searchSimilarValues = async ( // ============================================================================ /** - * Legacy function signatures for compatibility + * Ensure namespace exists (namespaces are auto-created on first write in TurboPuffer) + * This function verifies the namespace can be accessed and logs the status */ export const createNamespaceIfNotExists = async (dataSourceId: string): Promise => { - const exists = await checkNamespaceExists(dataSourceId); - if (!exists) { - console.info(`Namespace ${generateNamespace(dataSourceId)} will be created on first write`); + const validatedId = DataSourceIdSchema.parse(dataSourceId); + const namespaceName = generateNamespace(validatedId); + + try { + const exists = await checkNamespaceExists(validatedId); + if (exists) { + console.info(`TurboPuffer namespace ${namespaceName} already exists`); + } else { + console.info(`TurboPuffer namespace ${namespaceName} will be created on first write`); + // Note: TurboPuffer auto-creates namespaces on first write + // We cannot explicitly create empty namespaces + } + } catch (error) { + const errorMsg = error instanceof Error ? error.message : 'Unknown error'; + console.error(`Failed to check TurboPuffer namespace ${namespaceName}: ${errorMsg}`); + throw new TurbopufferError( + `Failed to verify namespace ${namespaceName}: ${errorMsg}`, + 'NAMESPACE_CHECK_FAILED', + true + ); } }; diff --git a/packages/search/src/searchable-values/deduplicate.ts b/packages/search/src/searchable-values/deduplicate.ts index 605cffaf5..e4e5c191d 100644 --- a/packages/search/src/searchable-values/deduplicate.ts +++ b/packages/search/src/searchable-values/deduplicate.ts @@ -3,7 +3,7 @@ * Uses functional composition and Zod validation */ -import * as duckdb from 'duckdb'; +import duckdb from 'duckdb'; import { z } from 'zod'; import { type DeduplicationResult, @@ -59,32 +59,74 @@ export const formatSqlInClause = (values: string[]): string => { export interface DuckDBConnection { db: duckdb.Database; conn: duckdb.Connection; + dbPath?: string; // Store path for cleanup } /** - * Create an in-memory DuckDB connection + * Create a DuckDB connection with optimized settings for large datasets + * Uses disk storage for better memory management */ -export const createConnection = (): Promise => { +export const createConnection = (useDisk = true): Promise => { return new Promise((resolve, reject) => { - const db = new duckdb.Database(':memory:', (err) => { + // Use disk storage for large datasets to avoid memory issues + // The database file will be automatically cleaned up + const dbPath = useDisk ? `/tmp/duckdb-dedupe-${Date.now()}.db` : ':memory:'; + + const db = new duckdb.Database(dbPath, (err) => { if (err) { reject(new Error(`Failed to create DuckDB database: ${err.message}`)); return; } const conn = db.connect(); - resolve({ db, conn }); + + // Configure DuckDB for optimal performance with large datasets + if (useDisk) { + conn.exec("SET memory_limit='2GB';", (err) => { + if (err) console.warn('Failed to set memory limit:', err); + }); + conn.exec('SET threads=4;', (err) => { + if (err) console.warn('Failed to set thread count:', err); + }); + } + + const connection: DuckDBConnection = { db, conn }; + if (useDisk && dbPath) { + connection.dbPath = dbPath; + } + resolve(connection); }); }); }; /** * Close DuckDB connection and database + * Also cleans up temporary database files if using disk storage */ export const closeConnection = (connection: DuckDBConnection): Promise => { return new Promise((resolve) => { connection.conn.close(() => { connection.db.close(() => { + // Clean up temporary database file if it exists + if (connection.dbPath && connection.dbPath !== ':memory:') { + const fs = require('node:fs'); + try { + // DuckDB creates additional WAL and temporary files + const files = [ + connection.dbPath, + `${connection.dbPath}.wal`, + `${connection.dbPath}.tmp`, + ]; + + files.forEach((file) => { + if (fs.existsSync(file)) { + fs.unlinkSync(file); + } + }); + } catch (err) { + console.warn(`Failed to clean up temporary DuckDB file: ${err}`); + } + } resolve(); }); }); @@ -112,44 +154,72 @@ export const executeQuery = (conn: duckdb.Connection, sql: string): /** * Create and populate temporary tables for deduplication + * Optimized for large datasets with increased batch sizes and indexes */ const setupTables = async ( conn: duckdb.Connection, existingKeys: string[], newKeys: string[] ): Promise => { - // Create tables + // Create tables without primary key constraint initially for faster bulk loading await executeQuery( conn, ` - CREATE TABLE existing_keys (key VARCHAR PRIMARY KEY) + CREATE TABLE existing_keys (key VARCHAR) ` ); await executeQuery( conn, ` - CREATE TABLE new_keys (key VARCHAR PRIMARY KEY) + CREATE TABLE new_keys (key VARCHAR) ` ); + // Use larger batches for better performance with disk-based storage + const BATCH_SIZE = 10000; // Increased from 1000 to 10000 + // Insert existing keys in batches if (existingKeys.length > 0) { - const batches = batchArray(existingKeys, 1000); - for (const batch of batches) { + console.info(`Inserting ${existingKeys.length} existing keys in batches of ${BATCH_SIZE}`); + const batches = batchArray(existingKeys, BATCH_SIZE); + for (let i = 0; i < batches.length; i++) { + const batch = batches[i]; + if (!batch) continue; const values = batch.map((key) => `('${escapeSqlString(key)}')`).join(','); await executeQuery(conn, `INSERT INTO existing_keys VALUES ${values}`); + + // Log progress for large datasets + if (i > 0 && i % 10 === 0) { + console.info( + `Inserted ${Math.min((i + 1) * BATCH_SIZE, existingKeys.length)}/${existingKeys.length} existing keys` + ); + } } } // Insert new keys in batches if (newKeys.length > 0) { - const batches = batchArray(newKeys, 1000); - for (const batch of batches) { + console.info(`Inserting ${newKeys.length} new keys in batches of ${BATCH_SIZE}`); + const batches = batchArray(newKeys, BATCH_SIZE); + for (let i = 0; i < batches.length; i++) { + const batch = batches[i]; + if (!batch) continue; const values = batch.map((key) => `('${escapeSqlString(key)}')`).join(','); await executeQuery(conn, `INSERT INTO new_keys VALUES ${values}`); + + // Log progress for large datasets + if (i > 0 && i % 10 === 0) { + console.info( + `Inserted ${Math.min((i + 1) * BATCH_SIZE, newKeys.length)}/${newKeys.length} new keys` + ); + } } } + + // Create indexes after bulk loading for better query performance + await executeQuery(conn, `CREATE INDEX idx_existing_keys ON existing_keys(key)`); + await executeQuery(conn, `CREATE INDEX idx_new_keys ON new_keys(key)`); }; /** @@ -170,6 +240,7 @@ const findUniqueNewKeys = async (conn: duckdb.Connection): Promise => /** * Core deduplication logic using DuckDB + * Automatically switches to disk-based storage for large datasets */ const performDeduplication = async ( existingKeys: string[], @@ -198,8 +269,17 @@ const performDeduplication = async ( let connection: DuckDBConnection | null = null; try { + // Determine whether to use disk based on dataset size + // Use disk for large datasets to avoid memory issues + const totalKeys = existingKeys.length + newKeys.length; + const useDisk = totalKeys > 50000; // Switch to disk for datasets over 50k keys + + if (useDisk) { + console.info(`Using disk-based DuckDB for deduplication (${totalKeys} total keys)`); + } + // Create DuckDB connection - connection = await createConnection(); + connection = await createConnection(useDisk); // Setup tables and insert data await setupTables(connection.conn, existingKeys, newKeys); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 48a972a44..1928b90ed 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -880,6 +880,9 @@ importers: '@ai-sdk/gateway': specifier: ^1.0.15 version: 1.0.15(zod@3.25.76) + '@ai-sdk/openai': + specifier: 2.0.0-beta.16 + version: 2.0.0-beta.16(zod@3.25.76) '@ai-sdk/provider': specifier: ^2.0.0 version: 2.0.0 @@ -12538,14 +12541,28 @@ snapshots: '@ai-sdk/provider-utils': 3.0.0-beta.10(zod@3.25.1) zod: 3.25.1 + '@ai-sdk/openai@2.0.0-beta.16(zod@3.25.76)': + dependencies: + '@ai-sdk/provider': 2.0.0-beta.2 + '@ai-sdk/provider-utils': 3.0.0-beta.10(zod@3.25.76) + zod: 3.25.76 + '@ai-sdk/provider-utils@3.0.0-beta.10(zod@3.25.1)': dependencies: '@ai-sdk/provider': 2.0.0-beta.2 '@standard-schema/spec': 1.0.0 - eventsource-parser: 3.0.3 + eventsource-parser: 3.0.6 zod: 3.25.1 zod-to-json-schema: 3.24.6(zod@3.25.1) + '@ai-sdk/provider-utils@3.0.0-beta.10(zod@3.25.76)': + dependencies: + '@ai-sdk/provider': 2.0.0-beta.2 + '@standard-schema/spec': 1.0.0 + eventsource-parser: 3.0.6 + zod: 3.25.76 + zod-to-json-schema: 3.24.6(zod@3.25.76) + '@ai-sdk/provider-utils@3.0.1(zod@3.25.1)': dependencies: '@ai-sdk/provider': 2.0.0 @@ -20689,7 +20706,7 @@ snapshots: eventsource@3.0.7: dependencies: - eventsource-parser: 3.0.3 + eventsource-parser: 3.0.6 evp_bytestokey@1.0.3: dependencies: