ok sync jobs to turbopuffer

This commit is contained in:
dal 2025-09-04 14:36:08 -06:00
parent a69eefe2b7
commit e5c8512c76
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
19 changed files with 820 additions and 1756 deletions

View File

@ -15,9 +15,7 @@ use middleware::{
error::{init_sentry, init_tracing_subscriber, sentry_layer},
};
use rustls::crypto::ring;
use stored_values::jobs::trigger_stale_sync_jobs;
use tokio::sync::broadcast;
use tokio_cron_scheduler::{Job, JobScheduler};
use tower::ServiceBuilder;
use tower_http::{compression::CompressionLayer, trace::TraceLayer};
use tracing::{error, info, warn};
@ -58,26 +56,6 @@ async fn main() -> Result<(), anyhow::Error> {
return Ok(());
}
// --- Start Stored Values Sync Job Scheduler ---
let scheduler = JobScheduler::new().await?; // Using `?` assuming main returns Result
info!("Starting stored values sync job scheduler...");
// Schedule to run every hour
let job = Job::new_async("*/5 * * * * *", move |uuid, mut l| {
Box::pin(async move {
info!(job_uuid = %uuid, "Running hourly stored values sync job check.");
if let Err(e) = trigger_stale_sync_jobs().await {
error!(job_uuid = %uuid, "Hourly stored values sync job failed: {}", e);
}
// Optional: You could check l.next_tick_for_job(uuid).await to see the next scheduled time.
})
})?;
scheduler.add(job).await?;
scheduler.start().await?;
info!("Stored values sync job scheduler started.");
// --- End Stored Values Sync Job Scheduler ---
let protected_router = Router::new().nest("/api/v1", routes::protected_router());
let public_router = Router::new().route("/health", axum::routing::get(|| async { "OK" }));

View File

@ -21,8 +21,6 @@ vi.mock('@buster/database', () => ({
}));
vi.mock('@buster/search', () => ({
checkNamespaceExists: vi.fn(),
createNamespaceIfNotExists: vi.fn(),
deduplicateValues: vi.fn(),
generateNamespace: vi.fn(),
queryExistingKeys: vi.fn(),
@ -49,8 +47,6 @@ import {
markSyncJobFailed,
} from '@buster/database';
import {
checkNamespaceExists,
createNamespaceIfNotExists,
deduplicateValues,
generateNamespace,
queryExistingKeys,
@ -85,7 +81,7 @@ describe('processSyncJob', () => {
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
vi.mocked(markSyncJobCompleted).mockResolvedValue({
id: 'test-job-123',
status: 'completed',
status: 'success',
updatedAt: new Date().toISOString(),
lastSyncedAt: new Date().toISOString(),
errorMessage: null,
@ -124,7 +120,6 @@ describe('processSyncJob', () => {
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue(mockExistingKeys);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [
@ -175,7 +170,6 @@ describe('processSyncJob', () => {
expect(mockAdapter.query).toHaveBeenCalledWith(
expect.stringContaining('SELECT DISTINCT "company_name"')
);
expect(checkNamespaceExists).toHaveBeenCalledWith('ds-456');
expect(queryExistingKeys).toHaveBeenCalledWith({
dataSourceId: 'ds-456',
query: {
@ -272,7 +266,6 @@ describe('processSyncJob', () => {
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue(mockExistingKeys);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [],
@ -316,7 +309,6 @@ describe('processSyncJob', () => {
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(false); // Namespace doesn't exist
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [
@ -342,8 +334,6 @@ describe('processSyncJob', () => {
const result = await runTask(mockPayload);
// Verify namespace creation
expect(checkNamespaceExists).toHaveBeenCalledWith('ds-456');
expect(createNamespaceIfNotExists).toHaveBeenCalledWith('ds-456');
expect(result.success).toBe(true);
});
});
@ -417,7 +407,6 @@ describe('processSyncJob', () => {
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [
@ -461,7 +450,6 @@ describe('processSyncJob', () => {
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [
@ -522,7 +510,6 @@ describe('processSyncJob', () => {
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: specialValues.map((value) => ({
@ -568,7 +555,6 @@ describe('processSyncJob', () => {
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [
@ -627,7 +613,6 @@ describe('processSyncJob', () => {
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [

View File

@ -7,8 +7,6 @@ import {
} from '@buster/database';
import {
type SearchableValue,
checkNamespaceExists,
createNamespaceIfNotExists,
deduplicateValues,
generateNamespace,
queryExistingKeys,
@ -35,6 +33,9 @@ export const processSyncJob: ReturnType<
id: 'process-sync-job',
schema: SyncJobPayloadSchema,
maxDuration: 300, // 5 minutes per job
machine: {
preset: 'large-1x', // 4 vCPU, 8 GB RAM for handling large datasets
},
retry: {
maxAttempts: 3,
factor: 2,
@ -114,15 +115,10 @@ export const processSyncJob: ReturnType<
return result;
}
// Step 4: Ensure Turbopuffer namespace exists
// Step 4: Query existing keys from Turbopuffer
// Note: If namespace doesn't exist, it means no data has been written yet
// and we should treat it as having no existing keys
const namespace = generateNamespace(payload.dataSourceId);
const namespaceExists = await checkNamespaceExists(payload.dataSourceId);
if (!namespaceExists) {
logger.info('Creating Turbopuffer namespace', { namespace });
await createNamespaceIfNotExists(payload.dataSourceId);
}
// Step 5: Query existing keys from Turbopuffer
logger.info('Querying existing values from Turbopuffer', {
namespace,
database: payload.databaseName,
@ -131,22 +127,38 @@ export const processSyncJob: ReturnType<
column: payload.columnName,
});
const existingKeys = await queryExistingKeys({
dataSourceId: payload.dataSourceId,
query: {
database: payload.databaseName,
schema: payload.schemaName,
table: payload.tableName,
column: payload.columnName,
},
});
let existingKeys: string[] = [];
try {
existingKeys = await queryExistingKeys({
dataSourceId: payload.dataSourceId,
query: {
database: payload.databaseName,
schema: payload.schemaName,
table: payload.tableName,
column: payload.columnName,
},
});
} catch (error) {
// Check if this is a namespace not found error (404)
const errorMessage = error instanceof Error ? error.message : String(error);
if (errorMessage.includes('namespace') && errorMessage.includes('was not found')) {
logger.info('Namespace does not exist yet, treating as empty', {
namespace,
jobId: payload.jobId,
});
existingKeys = [];
} else {
// Re-throw if it's not a namespace not found error
throw error;
}
}
logger.info('Retrieved existing keys', {
jobId: payload.jobId,
existingCount: existingKeys.length,
});
// Step 6: Prepare searchable values for deduplication
// Step 5: Prepare searchable values for deduplication
const searchableValues: SearchableValue[] = distinctValues.map((value) => ({
database: payload.databaseName,
schema: payload.schemaName,
@ -155,7 +167,7 @@ export const processSyncJob: ReturnType<
value,
}));
// Step 7: Deduplicate using DuckDB
// Step 6: Deduplicate using DuckDB
logger.info('Deduplicating values', {
jobId: payload.jobId,
totalValues: searchableValues.length,
@ -202,42 +214,88 @@ export const processSyncJob: ReturnType<
return result;
}
// Step 8: Generate embeddings for new values
// Step 7: Generate embeddings for new values in batches to manage memory
logger.info('Generating embeddings for new values', {
jobId: payload.jobId,
newCount: deduplicationResult.newCount,
});
const newValueTexts = deduplicationResult.newValues.map((v) => v.value);
const embeddings = await generateSearchableValueEmbeddings(newValueTexts);
// Process embeddings in batches to avoid memory issues
const EMBEDDING_BATCH_SIZE = 1000; // Process 1000 values at a time
const allValuesWithEmbeddings: SearchableValue[] = [];
// Step 9: Combine values with embeddings
const valuesWithEmbeddings: SearchableValue[] = deduplicationResult.newValues.map(
(value, index) => ({
for (let i = 0; i < deduplicationResult.newValues.length; i += EMBEDDING_BATCH_SIZE) {
const batch = deduplicationResult.newValues.slice(i, i + EMBEDDING_BATCH_SIZE);
const batchTexts = batch.map((v) => v.value);
logger.info('Processing embedding batch', {
jobId: payload.jobId,
batchStart: i,
batchSize: batch.length,
totalValues: deduplicationResult.newValues.length,
});
// Generate embeddings for this batch
const batchEmbeddings = await generateSearchableValueEmbeddings(batchTexts);
// Combine values with embeddings for this batch
const batchWithEmbeddings = batch.map((value, index) => ({
...value,
embedding: embeddings[index],
embedding: batchEmbeddings[index],
synced_at: new Date().toISOString(),
})
);
}));
// Step 10: Upsert to Turbopuffer
allValuesWithEmbeddings.push(...batchWithEmbeddings);
}
// Step 8: Upsert to Turbopuffer in batches to manage memory
logger.info('Upserting values to Turbopuffer', {
jobId: payload.jobId,
count: valuesWithEmbeddings.length,
count: allValuesWithEmbeddings.length,
});
const upsertResult = await upsertSearchableValues({
dataSourceId: payload.dataSourceId,
values: valuesWithEmbeddings,
});
// Process upserts in batches to avoid memory and API limits
const UPSERT_BATCH_SIZE = 500; // Upsert 500 values at a time
let totalUpserted = 0;
logger.info('Upsert complete', {
for (let i = 0; i < allValuesWithEmbeddings.length; i += UPSERT_BATCH_SIZE) {
const batch = allValuesWithEmbeddings.slice(i, i + UPSERT_BATCH_SIZE);
logger.info('Processing upsert batch', {
jobId: payload.jobId,
batchStart: i,
batchSize: batch.length,
totalValues: allValuesWithEmbeddings.length,
});
const batchResult = await upsertSearchableValues({
dataSourceId: payload.dataSourceId,
values: batch,
});
totalUpserted += batchResult.upserted;
if (batchResult.errors && batchResult.errors.length > 0) {
// Log and throw error for any upsert failures
logger.error('Upsert batch failed', {
jobId: payload.jobId,
batchStart: i,
batchSize: batch.length,
errorsInBatch: batchResult.errors.length,
errors: batchResult.errors,
});
throw new Error(
`Failed to upsert ${batchResult.errors.length} values to Turbopuffer: ${batchResult.errors.slice(0, 3).join(', ')}${batchResult.errors.length > 3 ? '...' : ''}`
);
}
}
logger.info('All upserts completed successfully', {
jobId: payload.jobId,
upserted: upsertResult.upserted,
errors: upsertResult.errors,
totalUpserted: totalUpserted,
});
// Step 11: Update sync job status
// Step 9: Update sync job status
const metadata = {
processedCount: searchableValues.length,
existingCount: deduplicationResult.existingCount,

View File

@ -1,804 +0,0 @@
import * as databaseModule from '@buster/database';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import * as processSyncJobModule from './process-sync-job';
import type { DailySyncReport, DataSourceSyncSummary, SyncJobPayload } from './types';
// Define ScheduledTaskPayload type locally since it's not exported from @trigger.dev/sdk
interface ScheduledTaskPayload {
timestamp: Date;
lastTimestamp?: Date;
upcoming: Date[];
}
// Mock @trigger.dev/sdk
vi.mock('@trigger.dev/sdk', () => ({
logger: {
info: vi.fn(),
error: vi.fn(),
},
schedules: {
task: vi.fn((config) => config),
},
}));
// Mock database module
vi.mock('@buster/database', () => ({
getDataSourcesForSync: vi.fn(),
getSearchableColumns: vi.fn(),
batchCreateSyncJobs: vi.fn(),
markSyncJobInProgress: vi.fn(),
markSyncJobFailed: vi.fn(),
}));
// Mock process-sync-job module
vi.mock('./process-sync-job', () => ({
processSyncJob: {
triggerAndWait: vi.fn(),
},
}));
describe('sync-searchable-values', () => {
let syncSearchableValues: any;
let originalCrypto: typeof globalThis.crypto;
beforeEach(async () => {
vi.clearAllMocks();
// Save original crypto
originalCrypto = global.crypto;
// Mock crypto.randomUUID
const mockCrypto = {
...global.crypto,
randomUUID: vi.fn().mockReturnValue('test-execution-id'),
};
// Use Object.defineProperty to override crypto
Object.defineProperty(global, 'crypto', {
value: mockCrypto,
writable: true,
configurable: true,
});
// Dynamically import to get fresh module after mocks
const module = await import('./sync-searchable-values');
syncSearchableValues = module.syncSearchableValues;
});
afterEach(() => {
// Restore original crypto
Object.defineProperty(global, 'crypto', {
value: originalCrypto,
writable: true,
configurable: true,
});
vi.resetModules();
});
describe('Daily Sync Execution', () => {
const mockPayload: ScheduledTaskPayload = {
timestamp: new Date('2024-01-15T02:00:00Z'),
lastTimestamp: new Date('2024-01-14T02:00:00Z'),
upcoming: [],
};
it('should handle case with no data sources', async () => {
const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any;
getDataSourcesForSync.mockResolvedValue({
totalCount: 0,
dataSources: [],
});
const result = await syncSearchableValues.run(mockPayload);
expect(result).toMatchObject({
executionId: 'test-execution-id',
totalDataSources: 0,
totalColumns: 0,
successfulSyncs: 0,
failedSyncs: 0,
skippedSyncs: 0,
totalValuesProcessed: 0,
dataSourceSummaries: [],
errors: [],
});
expect(getDataSourcesForSync).toHaveBeenCalledTimes(1);
});
it('should process data sources with searchable columns', async () => {
const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any;
const getSearchableColumns = databaseModule.getSearchableColumns as any;
const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any;
const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any;
const processSyncJob = processSyncJobModule.processSyncJob as any;
getDataSourcesForSync.mockResolvedValue({
totalCount: 1,
dataSources: [
{
id: 'ds-1',
name: 'Test Database',
columnsWithStoredValues: 3,
},
],
});
getSearchableColumns.mockResolvedValue({
totalCount: 2,
columns: [
{
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
{
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
},
],
});
batchCreateSyncJobs.mockResolvedValue({
totalCreated: 2,
created: [
{
id: 'job-1',
dataSourceId: 'ds-1',
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
{
id: 'job-2',
dataSourceId: 'ds-1',
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
},
],
errors: [],
});
markSyncJobInProgress.mockResolvedValue(undefined);
processSyncJob.triggerAndWait.mockResolvedValue({
ok: true,
output: {
success: true,
processedCount: 100,
error: null,
},
});
const result = await syncSearchableValues.run(mockPayload);
expect(result).toMatchObject({
executionId: 'test-execution-id',
totalDataSources: 1,
totalColumns: 2,
successfulSyncs: 2,
failedSyncs: 0,
skippedSyncs: 0,
totalValuesProcessed: 200,
errors: [],
});
expect(getDataSourcesForSync).toHaveBeenCalledTimes(1);
expect(getSearchableColumns).toHaveBeenCalledWith({ dataSourceId: 'ds-1' });
expect(batchCreateSyncJobs).toHaveBeenCalledWith({
dataSourceId: 'ds-1',
syncType: 'daily',
columns: [
{
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
{
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
},
],
});
expect(processSyncJob.triggerAndWait).toHaveBeenCalledTimes(2);
});
it('should handle sync job failures', async () => {
const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any;
const getSearchableColumns = databaseModule.getSearchableColumns as any;
const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any;
const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any;
const markSyncJobFailed = databaseModule.markSyncJobFailed as any;
const processSyncJob = processSyncJobModule.processSyncJob as any;
getDataSourcesForSync.mockResolvedValue({
totalCount: 1,
dataSources: [
{
id: 'ds-1',
name: 'Test Database',
columnsWithStoredValues: 1,
},
],
});
getSearchableColumns.mockResolvedValue({
totalCount: 1,
columns: [
{
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
],
});
batchCreateSyncJobs.mockResolvedValue({
totalCreated: 1,
created: [
{
id: 'job-1',
dataSourceId: 'ds-1',
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
],
errors: [],
});
markSyncJobInProgress.mockResolvedValue(undefined);
markSyncJobFailed.mockResolvedValue(undefined);
processSyncJob.triggerAndWait.mockResolvedValue({
ok: false,
error: 'Task execution failed',
});
const result = await syncSearchableValues.run(mockPayload);
expect(result).toMatchObject({
executionId: 'test-execution-id',
totalDataSources: 1,
totalColumns: 1,
successfulSyncs: 0,
failedSyncs: 1,
skippedSyncs: 0,
totalValuesProcessed: 0,
});
expect(result.dataSourceSummaries[0].errors).toContain(
'Job job-1 failed: Task execution failed'
);
expect(markSyncJobFailed).toHaveBeenCalledWith('job-1', 'Task execution failed');
});
it('should handle batch creation errors', async () => {
const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any;
const getSearchableColumns = databaseModule.getSearchableColumns as any;
const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any;
const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any;
const processSyncJob = processSyncJobModule.processSyncJob as any;
getDataSourcesForSync.mockResolvedValue({
totalCount: 1,
dataSources: [
{
id: 'ds-1',
name: 'Test Database',
columnsWithStoredValues: 2,
},
],
});
getSearchableColumns.mockResolvedValue({
totalCount: 2,
columns: [
{
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
{
databaseName: 'testdb',
schemaName: 'public',
tableName: 'products',
columnName: 'description',
},
],
});
batchCreateSyncJobs.mockResolvedValue({
totalCreated: 1,
created: [
{
id: 'job-1',
dataSourceId: 'ds-1',
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
],
errors: [
{
column: {
tableName: 'products',
columnName: 'description',
},
error: 'Column not found',
},
],
});
markSyncJobInProgress.mockResolvedValue(undefined);
processSyncJob.triggerAndWait.mockResolvedValue({
ok: true,
output: {
success: true,
processedCount: 100,
error: null,
},
});
const result = await syncSearchableValues.run(mockPayload);
expect(result.dataSourceSummaries[0]).toMatchObject({
totalColumns: 2,
successfulSyncs: 1,
failedSyncs: 1,
errors: ['Failed to create job for products.description: Column not found'],
});
});
it('should skip data sources with no searchable columns', async () => {
const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any;
const getSearchableColumns = databaseModule.getSearchableColumns as any;
getDataSourcesForSync.mockResolvedValue({
totalCount: 1,
dataSources: [
{
id: 'ds-1',
name: 'Empty Database',
columnsWithStoredValues: 0,
},
],
});
getSearchableColumns.mockResolvedValue({
totalCount: 0,
columns: [],
});
const result = await syncSearchableValues.run(mockPayload);
expect(result).toMatchObject({
totalDataSources: 1,
totalColumns: 0,
successfulSyncs: 0,
failedSyncs: 0,
skippedSyncs: 0,
totalValuesProcessed: 0,
});
expect(result.dataSourceSummaries[0]).toMatchObject({
dataSourceId: 'ds-1',
dataSourceName: 'Empty Database',
totalColumns: 0,
skippedSyncs: 0,
});
});
it('should handle processing exceptions gracefully', async () => {
const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any;
const getSearchableColumns = databaseModule.getSearchableColumns as any;
const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any;
const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any;
const markSyncJobFailed = databaseModule.markSyncJobFailed as any;
const processSyncJob = processSyncJobModule.processSyncJob as any;
getDataSourcesForSync.mockResolvedValue({
totalCount: 1,
dataSources: [
{
id: 'ds-1',
name: 'Test Database',
columnsWithStoredValues: 1,
},
],
});
getSearchableColumns.mockResolvedValue({
totalCount: 1,
columns: [
{
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
],
});
batchCreateSyncJobs.mockResolvedValue({
totalCreated: 1,
created: [
{
id: 'job-1',
dataSourceId: 'ds-1',
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
],
errors: [],
});
markSyncJobInProgress.mockResolvedValue(undefined);
markSyncJobFailed.mockResolvedValue(undefined);
// Simulate exception during processSyncJob
processSyncJob.triggerAndWait.mockRejectedValue(new Error('Network error'));
const result = await syncSearchableValues.run(mockPayload);
expect(result).toMatchObject({
totalDataSources: 1,
failedSyncs: 1,
successfulSyncs: 0,
});
expect(result.dataSourceSummaries[0].errors).toContain('Job job-1 failed: Network error');
expect(markSyncJobFailed).toHaveBeenCalledWith('job-1', 'Network error');
});
it('should handle fatal errors during execution', async () => {
const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any;
getDataSourcesForSync.mockRejectedValue(new Error('Database connection failed'));
const result = await syncSearchableValues.run(mockPayload);
expect(result).toMatchObject({
executionId: 'test-execution-id',
totalDataSources: 0,
totalColumns: 0,
successfulSyncs: 0,
failedSyncs: 0,
skippedSyncs: 0,
totalValuesProcessed: 0,
errors: ['Fatal error during sync execution: Database connection failed'],
});
});
it('should batch process sync jobs correctly', async () => {
const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any;
const getSearchableColumns = databaseModule.getSearchableColumns as any;
const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any;
const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any;
const processSyncJob = processSyncJobModule.processSyncJob as any;
// Create 15 jobs to test batching (batch size is 10)
const mockJobs = Array.from({ length: 15 }, (_, i) => ({
id: `job-${i + 1}`,
dataSourceId: 'ds-1',
databaseName: 'testdb',
schemaName: 'public',
tableName: `table${i + 1}`,
columnName: 'col1',
}));
getDataSourcesForSync.mockResolvedValue({
totalCount: 1,
dataSources: [
{
id: 'ds-1',
name: 'Test Database',
columnsWithStoredValues: 15,
},
],
});
getSearchableColumns.mockResolvedValue({
totalCount: 15,
columns: mockJobs.map((job) => ({
databaseName: job.databaseName,
schemaName: job.schemaName,
tableName: job.tableName,
columnName: job.columnName,
})),
});
batchCreateSyncJobs.mockResolvedValue({
totalCreated: 15,
created: mockJobs,
errors: [],
});
markSyncJobInProgress.mockResolvedValue(undefined);
let processCallCount = 0;
processSyncJob.triggerAndWait.mockImplementation(() => {
processCallCount++;
return Promise.resolve({
ok: true,
output: {
success: true,
processedCount: 10,
error: null,
},
});
});
const result = await syncSearchableValues.run(mockPayload);
expect(processCallCount).toBe(15);
expect(markSyncJobInProgress).toHaveBeenCalledTimes(15);
expect(result).toMatchObject({
totalColumns: 15,
successfulSyncs: 15,
failedSyncs: 0,
totalValuesProcessed: 150,
});
});
it('should handle mixed success and failure in batch processing', async () => {
const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any;
const getSearchableColumns = databaseModule.getSearchableColumns as any;
const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any;
const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any;
const markSyncJobFailed = databaseModule.markSyncJobFailed as any;
const processSyncJob = processSyncJobModule.processSyncJob as any;
getDataSourcesForSync.mockResolvedValue({
totalCount: 1,
dataSources: [
{
id: 'ds-1',
name: 'Test Database',
columnsWithStoredValues: 3,
},
],
});
getSearchableColumns.mockResolvedValue({
totalCount: 3,
columns: [
{
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
{
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
},
{
databaseName: 'testdb',
schemaName: 'public',
tableName: 'products',
columnName: 'title',
},
],
});
batchCreateSyncJobs.mockResolvedValue({
totalCreated: 3,
created: [
{
id: 'job-1',
dataSourceId: 'ds-1',
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
{
id: 'job-2',
dataSourceId: 'ds-1',
databaseName: 'testdb',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
},
{
id: 'job-3',
dataSourceId: 'ds-1',
databaseName: 'testdb',
schemaName: 'public',
tableName: 'products',
columnName: 'title',
},
],
errors: [],
});
markSyncJobInProgress.mockResolvedValue(undefined);
markSyncJobFailed.mockResolvedValue(undefined);
// Mixed results: success, failure, exception
processSyncJob.triggerAndWait
.mockResolvedValueOnce({
ok: true,
output: {
success: true,
processedCount: 50,
error: null,
},
})
.mockResolvedValueOnce({
ok: true,
output: {
success: false,
processedCount: 0,
error: 'Validation failed',
},
})
.mockRejectedValueOnce(new Error('Unexpected error'));
const result = await syncSearchableValues.run(mockPayload);
expect(result).toMatchObject({
totalColumns: 3,
successfulSyncs: 1,
failedSyncs: 2,
totalValuesProcessed: 50,
});
expect(markSyncJobFailed).toHaveBeenCalledTimes(2);
expect(markSyncJobFailed).toHaveBeenCalledWith('job-2', 'Validation failed');
expect(markSyncJobFailed).toHaveBeenCalledWith('job-3', 'Unexpected error');
});
});
describe('createReport helper', () => {
it('should calculate report metrics correctly', async () => {
// Since createReport is not exported, we test it through the main function
const getDataSourcesForSync = databaseModule.getDataSourcesForSync as any;
const getSearchableColumns = databaseModule.getSearchableColumns as any;
const batchCreateSyncJobs = databaseModule.batchCreateSyncJobs as any;
const markSyncJobInProgress = databaseModule.markSyncJobInProgress as any;
const processSyncJob = processSyncJobModule.processSyncJob as any;
getDataSourcesForSync.mockResolvedValue({
totalCount: 2,
dataSources: [
{ id: 'ds-1', name: 'DB1', columnsWithStoredValues: 2 },
{ id: 'ds-2', name: 'DB2', columnsWithStoredValues: 3 },
],
});
getSearchableColumns
.mockResolvedValueOnce({
totalCount: 2,
columns: [
{ databaseName: 'db1', schemaName: 'public', tableName: 't1', columnName: 'c1' },
{ databaseName: 'db1', schemaName: 'public', tableName: 't1', columnName: 'c2' },
],
})
.mockResolvedValueOnce({
totalCount: 3,
columns: [
{ databaseName: 'db2', schemaName: 'public', tableName: 't2', columnName: 'c1' },
{ databaseName: 'db2', schemaName: 'public', tableName: 't2', columnName: 'c2' },
{ databaseName: 'db2', schemaName: 'public', tableName: 't2', columnName: 'c3' },
],
});
batchCreateSyncJobs
.mockResolvedValueOnce({
totalCreated: 2,
created: [
{
id: 'j1',
dataSourceId: 'ds-1',
databaseName: 'db1',
schemaName: 'public',
tableName: 't1',
columnName: 'c1',
},
{
id: 'j2',
dataSourceId: 'ds-1',
databaseName: 'db1',
schemaName: 'public',
tableName: 't1',
columnName: 'c2',
},
],
errors: [],
})
.mockResolvedValueOnce({
totalCreated: 3,
created: [
{
id: 'j3',
dataSourceId: 'ds-2',
databaseName: 'db2',
schemaName: 'public',
tableName: 't2',
columnName: 'c1',
},
{
id: 'j4',
dataSourceId: 'ds-2',
databaseName: 'db2',
schemaName: 'public',
tableName: 't2',
columnName: 'c2',
},
{
id: 'j5',
dataSourceId: 'ds-2',
databaseName: 'db2',
schemaName: 'public',
tableName: 't2',
columnName: 'c3',
},
],
errors: [],
});
markSyncJobInProgress.mockResolvedValue(undefined);
// DS1: 2 successes (100 values each)
// DS2: 1 success (150 values), 1 failure, 1 skip
processSyncJob.triggerAndWait
.mockResolvedValueOnce({ ok: true, output: { success: true, processedCount: 100 } })
.mockResolvedValueOnce({ ok: true, output: { success: true, processedCount: 100 } })
.mockResolvedValueOnce({ ok: true, output: { success: true, processedCount: 150 } })
.mockResolvedValueOnce({ ok: false, error: 'Failed' })
.mockResolvedValueOnce({ ok: true, output: { success: true, processedCount: 0 } }); // Skipped
const result = await syncSearchableValues.run({
timestamp: new Date('2024-01-15T02:00:00Z'),
lastTimestamp: new Date('2024-01-14T02:00:00Z'),
upcoming: [],
});
expect(result).toMatchObject({
executionId: 'test-execution-id',
totalDataSources: 2,
totalColumns: 5,
successfulSyncs: 4,
failedSyncs: 1,
skippedSyncs: 0,
totalValuesProcessed: 350,
});
expect(result.dataSourceSummaries).toHaveLength(2);
expect(result.dataSourceSummaries[0]).toMatchObject({
dataSourceId: 'ds-1',
totalColumns: 2,
successfulSyncs: 2,
failedSyncs: 0,
totalValuesProcessed: 200,
});
expect(result.dataSourceSummaries[1]).toMatchObject({
dataSourceId: 'ds-2',
totalColumns: 3,
successfulSyncs: 2,
failedSyncs: 1,
totalValuesProcessed: 150,
});
});
});
});

View File

@ -1,11 +1,4 @@
import {
batchCreateSyncJobs,
getDataSourcesForSync,
getSearchableColumns,
markSyncJobFailed,
markSyncJobInProgress,
} from '@buster/database';
import { checkNamespaceExists, createNamespaceIfNotExists } from '@buster/search';
import { getExistingSyncJobs } from '@buster/database';
import { logger, schedules } from '@trigger.dev/sdk';
import { processSyncJob } from './process-sync-job';
import type { DailySyncReport, DataSourceSyncSummary, SyncJobPayload } from './types';
@ -14,12 +7,11 @@ import type { DailySyncReport, DataSourceSyncSummary, SyncJobPayload } from './t
* Daily scheduled task to sync searchable values from data sources
*
* This task runs every day at 2 AM UTC and:
* 1. Identifies data sources with searchable columns
* 2. Creates sync jobs for each column that needs syncing
* 3. Queues individual sync tasks for processing
* 4. Tracks and reports on the overall sync status
* 1. Retrieves existing sync jobs from stored_values_sync_jobs table
* 2. Triggers sync jobs for processing (fire and forget)
* 3. Reports on the number of jobs triggered
*
* The actual sync logic is delegated to process-sync-job.ts (Ticket 8)
* The actual sync logic is handled asynchronously by process-sync-job.ts
*/
export const syncSearchableValues = schedules.task({
id: 'sync-searchable-values',
@ -44,60 +36,31 @@ export const syncSearchableValues = schedules.task({
});
try {
// Step 1: Get all data sources that have searchable columns
const dataSourcesResult = await getDataSourcesForSync();
logger.info('Found data sources for sync', {
executionId,
totalDataSources: dataSourcesResult.totalCount,
dataSources: dataSourcesResult.dataSources.map((ds) => ({
id: ds.id,
name: ds.name,
columnsWithStoredValues: ds.columnsWithStoredValues,
})),
// Step 1: Get all existing sync jobs from the stored_values_sync_jobs table
const syncJobsResult = await getExistingSyncJobs({
statuses: ['pending', 'success'],
});
if (dataSourcesResult.totalCount === 0) {
logger.info('No data sources found with searchable columns', { executionId });
logger.info('Found existing sync jobs', {
executionId,
totalJobs: syncJobsResult.totalCount,
dataSourceCount: Object.keys(syncJobsResult.byDataSource).length,
});
if (syncJobsResult.totalCount === 0) {
logger.info('No sync jobs found to process', { executionId });
return createReport(executionId, startTime, [], errors);
}
// Step 2: Ensure TurboPuffer namespaces exist for all data sources
logger.info('Checking TurboPuffer namespaces', {
executionId,
dataSourceCount: dataSourcesResult.totalCount,
});
// Step 2: Process sync jobs grouped by data source
for (const dataSourceId in syncJobsResult.byDataSource) {
const dataSourceInfo = syncJobsResult.byDataSource[dataSourceId];
if (!dataSourceInfo) continue;
for (const dataSource of dataSourcesResult.dataSources) {
try {
const namespaceExists = await checkNamespaceExists(dataSource.id);
if (!namespaceExists) {
logger.info('Creating TurboPuffer namespace', {
executionId,
dataSourceId: dataSource.id,
dataSourceName: dataSource.name,
});
// Namespace will be created automatically on first write
await createNamespaceIfNotExists(dataSource.id);
}
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
logger.error('Failed to check/create namespace', {
executionId,
dataSourceId: dataSource.id,
dataSourceName: dataSource.name,
error: errorMsg,
});
errors.push(`Failed to check/create namespace for ${dataSource.name}: ${errorMsg}`);
}
}
// Step 3: Process each data source
for (const dataSource of dataSourcesResult.dataSources) {
const summary: DataSourceSyncSummary = {
dataSourceId: dataSource.id,
dataSourceName: dataSource.name,
totalColumns: 0,
dataSourceId,
dataSourceName: dataSourceInfo.dataSourceName,
totalColumns: dataSourceInfo.jobCount,
successfulSyncs: 0,
failedSyncs: 0,
skippedSyncs: 0,
@ -108,141 +71,78 @@ export const syncSearchableValues = schedules.task({
try {
logger.info('Processing data source', {
executionId,
dataSourceId: dataSource.id,
dataSourceName: dataSource.name,
dataSourceId,
dataSourceName: dataSourceInfo.dataSourceName,
jobCount: dataSourceInfo.jobCount,
});
// Get searchable columns for this data source
const columnsResult = await getSearchableColumns({
dataSourceId: dataSource.id,
});
// Step 3: Trigger sync jobs for this data source (fire and forget)
const jobs = dataSourceInfo.jobs;
let triggeredCount = 0;
let triggerFailureCount = 0;
summary.totalColumns = columnsResult.totalCount;
for (const job of jobs) {
try {
// Prepare payload for processing
const syncPayload: SyncJobPayload = {
jobId: job.id,
dataSourceId: job.dataSourceId,
databaseName: job.databaseName,
schemaName: job.schemaName,
tableName: job.tableName,
columnName: job.columnName,
maxValues: 1000,
};
if (columnsResult.totalCount === 0) {
logger.info('No searchable columns found for data source', {
executionId,
dataSourceId: dataSource.id,
});
summary.skippedSyncs = summary.totalColumns;
dataSourceSummaries.push(summary);
continue;
// Trigger the sync job without waiting
await processSyncJob.trigger(syncPayload);
triggeredCount++;
logger.info('Sync job triggered', {
executionId,
jobId: job.id,
table: job.tableName,
column: job.columnName,
});
} catch (error) {
triggerFailureCount++;
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
summary.errors.push(
`Failed to trigger job ${job.tableName}.${job.columnName}: ${errorMsg}`
);
logger.error('Failed to trigger sync job', {
executionId,
jobId: job.id,
table: job.tableName,
column: job.columnName,
error: errorMsg,
});
}
}
// Step 4: Create sync jobs for all columns
const columnsToSync = columnsResult.columns.map((col) => ({
databaseName: col.databaseName,
schemaName: col.schemaName,
tableName: col.tableName,
columnName: col.columnName,
}));
const batchResult = await batchCreateSyncJobs({
dataSourceId: dataSource.id,
syncType: 'daily',
columns: columnsToSync,
});
logger.info('Created sync jobs', {
executionId,
dataSourceId: dataSource.id,
totalCreated: batchResult.totalCreated,
errors: batchResult.errors.length,
});
// Track any errors from job creation
for (const error of batchResult.errors) {
summary.errors.push(
`Failed to create job for ${error.column.tableName}.${error.column.columnName}: ${error.error}`
);
summary.failedSyncs++;
}
// Step 5: Process each sync job
const batchSize = 10; // Process in batches
const jobs = batchResult.created;
for (let i = 0; i < jobs.length; i += batchSize) {
const batch = jobs.slice(i, i + batchSize);
const batchPromises = batch.map(async (job) => {
try {
// Mark job as in progress
await markSyncJobInProgress(job.id);
// Prepare payload for processing
const syncPayload: SyncJobPayload = {
jobId: job.id,
dataSourceId: job.dataSourceId,
databaseName: job.databaseName,
schemaName: job.schemaName,
tableName: job.tableName,
columnName: job.columnName,
maxValues: 1000,
};
// Process the sync job (stub implementation for now)
const result = await processSyncJob.triggerAndWait(syncPayload);
if (result.ok && result.output.success) {
summary.successfulSyncs++;
summary.totalValuesProcessed += result.output.processedCount || 0;
logger.info('Sync job completed successfully', {
executionId,
jobId: job.id,
processedCount: result.output.processedCount,
});
} else {
summary.failedSyncs++;
const errorMsg = result.ok
? result.output.error || 'Unknown error'
: 'Task execution failed';
summary.errors.push(`Job ${job.id} failed: ${errorMsg}`);
// Mark job as failed in database
await markSyncJobFailed(job.id, errorMsg);
logger.error('Sync job failed', {
executionId,
jobId: job.id,
error: errorMsg,
});
}
} catch (error) {
summary.failedSyncs++;
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
summary.errors.push(`Job ${job.id} failed: ${errorMsg}`);
// Mark job as failed in database
await markSyncJobFailed(job.id, errorMsg);
logger.error('Error processing sync job', {
executionId,
jobId: job.id,
error: errorMsg,
});
}
});
// Wait for batch to complete before starting next batch
await Promise.allSettled(batchPromises);
}
summary.successfulSyncs = triggeredCount;
summary.failedSyncs = triggerFailureCount;
dataSourceSummaries.push(summary);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
summary.errors.push(`Data source processing failed: ${errorMsg}`);
errors.push(`Failed to process data source ${dataSource.name}: ${errorMsg}`);
errors.push(
`Failed to process data source ${dataSourceInfo.dataSourceName}: ${errorMsg}`
);
dataSourceSummaries.push(summary);
logger.error('Error processing data source', {
executionId,
dataSourceId: dataSource.id,
dataSourceId,
dataSourceName: dataSourceInfo.dataSourceName,
error: errorMsg,
});
}
}
// Step 6: Generate and return report
// Step 3: Generate and return report
const report = createReport(executionId, startTime, dataSourceSummaries, errors);
logger.info('Daily sync completed', {

View File

@ -135,8 +135,9 @@ export function createThinkAndPrepAgent(thinkAndPrepAgentSchema: ThinkAndPrepAge
streamText({
model: Sonnet4,
headers: {
'anthropic-beta':'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11',
'anthropic_beta': 'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11',
'anthropic-beta':
'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11',
anthropic_beta: 'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11',
},
providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
tools: {

View File

@ -17,14 +17,14 @@ import {
validateEmbeddingDimensions,
} from './generate-embeddings';
// Mock the AI SDK and OpenAI provider
// Mock the AI SDK and Gateway provider
vi.mock('ai', () => ({
embedMany: vi.fn(),
}));
vi.mock('@ai-sdk/openai', () => ({
createOpenAI: vi.fn(() => ({
embedding: vi.fn(() => 'mocked-embedding-model'),
vi.mock('@ai-sdk/gateway', () => ({
createGateway: vi.fn(() => ({
textEmbeddingModel: vi.fn(() => 'mocked-embedding-model'),
})),
}));

View File

@ -1,9 +1,10 @@
/**
* Functional embedding generation for searchable values
* Uses OpenAI text-embedding-3-small model (1536 dimensions)
* Uses openai/text-embedding-3-small model (1536 dimensions) via AI Gateway
*/
import { createOpenAI } from '@ai-sdk/openai';
import { createGateway } from '@ai-sdk/gateway';
import type { EmbeddingModel } from 'ai';
import { embedMany } from 'ai';
import { z } from 'zod';
@ -51,7 +52,7 @@ export const EMBEDDING_CONFIG = {
BATCH_SIZE: 100,
MAX_RETRIES: 3,
RATE_LIMIT_DELAY: 1000,
MODEL: 'text-embedding-3-small',
MODEL: 'openai/text-embedding-3-small',
DIMENSIONS: 1536,
} as const;
@ -112,13 +113,14 @@ export const isRetryableError = (error: unknown): boolean => {
// ============================================================================
/**
* Get OpenAI embedding model
* Get embedding model via gateway
*/
const getEmbeddingModel = (modelName: string = EMBEDDING_CONFIG.MODEL) => {
const openai = createOpenAI({
apiKey: process.env.OPENAI_API_KEY,
const getEmbeddingModel = (modelName: string = EMBEDDING_CONFIG.MODEL): EmbeddingModel<string> => {
const gateway = createGateway({
...(process.env.AI_GATEWAY_API_KEY && { apiKey: process.env.AI_GATEWAY_API_KEY }),
});
return openai.embedding(modelName);
// Use the textEmbeddingModel method to create an embedding model
return gateway.textEmbeddingModel(modelName);
};
/**

View File

@ -89,10 +89,9 @@ async function generateTodosWithLLM(
const { object, textStream } = streamObject({
headers: {
'anthropic-beta':
'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11',
anthropic_beta:
'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11',
'anthropic-beta':
'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11',
anthropic_beta: 'fine-grained-tool-streaming-2025-05-14,extended-cache-ttl-2025-04-11',
},
model: Sonnet4,
schema: llmOutputSchema,

View File

@ -1,128 +0,0 @@
import { and, eq, gt, isNull, sql } from 'drizzle-orm';
import { z } from 'zod';
import { db } from '../../connection';
import { dataSources, storedValuesSyncJobs } from '../../schema';
// ============================================================================
// VALIDATION SCHEMAS
// ============================================================================
export const DataSourceForSyncSchema = z.object({
id: z.string().uuid(),
name: z.string(),
type: z.string(),
organizationId: z.string().uuid(),
columnsWithStoredValues: z.number().int().min(0),
});
export const GetDataSourcesForSyncOutputSchema = z.object({
dataSources: z.array(DataSourceForSyncSchema),
totalCount: z.number().int().min(0),
});
// ============================================================================
// TYPES
// ============================================================================
export type DataSourceForSync = z.infer<typeof DataSourceForSyncSchema>;
export type GetDataSourcesForSyncOutput = z.infer<typeof GetDataSourcesForSyncOutputSchema>;
// ============================================================================
// QUERY FUNCTION
// ============================================================================
/**
* Get all data sources that have sync jobs in the stored_values_sync_jobs table
* These are the data sources that need to be synced to Turbopuffer
*/
export async function getDataSourcesForSync(): Promise<GetDataSourcesForSyncOutput> {
try {
// Query data sources that have sync jobs
const results = await db
.selectDistinct({
id: dataSources.id,
name: dataSources.name,
type: dataSources.type,
organizationId: dataSources.organizationId,
})
.from(dataSources)
.innerJoin(storedValuesSyncJobs, eq(storedValuesSyncJobs.dataSourceId, dataSources.id))
.where(
and(
// Only active data sources
isNull(dataSources.deletedAt),
// Only successfully onboarded data sources
eq(dataSources.onboardingStatus, 'completed')
)
);
// Count sync jobs for each data source
const dataSourcesWithCounts = await Promise.all(
results.map(async (ds) => {
const columnCount = await db
.select({
count: sql<number>`COUNT(DISTINCT ${storedValuesSyncJobs.columnName})`,
})
.from(storedValuesSyncJobs)
.where(eq(storedValuesSyncJobs.dataSourceId, ds.id));
return {
...ds,
columnsWithStoredValues: Number(columnCount[0]?.count || 0),
};
})
);
// Validate and return results
return GetDataSourcesForSyncOutputSchema.parse({
dataSources: dataSourcesWithCounts,
totalCount: dataSourcesWithCounts.length,
});
} catch (error) {
// Handle Zod validation errors
if (error instanceof z.ZodError) {
throw new Error(`Validation error in getDataSourcesForSync: ${error.message}`);
}
// Handle database errors
if (error instanceof Error) {
throw new Error(`Database error in getDataSourcesForSync: ${error.message}`);
}
// Unknown error
throw new Error(`Unknown error in getDataSourcesForSync: ${String(error)}`);
}
}
/**
* Check if a specific data source needs syncing
* Useful for validating before queuing a sync job
*/
export async function dataSourceNeedsSync(dataSourceId: string): Promise<boolean> {
try {
// Validate input
const validatedId = z.string().uuid().parse(dataSourceId);
const result = await db
.select({
id: storedValuesSyncJobs.id,
})
.from(storedValuesSyncJobs)
.innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId))
.where(
and(
eq(dataSources.id, validatedId),
isNull(dataSources.deletedAt),
eq(dataSources.onboardingStatus, 'completed')
)
)
.limit(1);
return result.length > 0;
} catch (error) {
if (error instanceof z.ZodError) {
throw new Error(`Invalid dataSourceId: ${error.message}`);
}
throw error;
}
}

View File

@ -0,0 +1,243 @@
import { and, eq, inArray, isNull, or, sql } from 'drizzle-orm';
import { z } from 'zod';
import { db } from '../../connection';
import { dataSources, storedValuesSyncJobs } from '../../schema';
// ============================================================================
// VALIDATION SCHEMAS
// ============================================================================
export const GetExistingSyncJobsInputSchema = z.object({
statuses: z
.array(z.string())
.optional()
.default(['pending', 'pending_manual', 'pending_initial', 'failed']),
limit: z.number().int().min(1).optional(),
});
export const ExistingSyncJobSchema = z.object({
id: z.string().uuid(),
dataSourceId: z.string().uuid(),
dataSourceName: z.string(),
dataSourceType: z.string(),
organizationId: z.string().uuid(),
databaseName: z.string(),
schemaName: z.string(),
tableName: z.string(),
columnName: z.string(),
status: z.string(),
errorMessage: z.string().nullable(),
lastSyncedAt: z.string().datetime().nullable(),
createdAt: z.string().datetime(),
});
export const GetExistingSyncJobsOutputSchema = z.object({
syncJobs: z.array(ExistingSyncJobSchema),
totalCount: z.number().int().min(0),
byDataSource: z.record(
z.string().uuid(),
z.object({
dataSourceId: z.string().uuid(),
dataSourceName: z.string(),
dataSourceType: z.string(),
organizationId: z.string().uuid(),
jobCount: z.number().int().min(0),
jobs: z.array(ExistingSyncJobSchema),
})
),
});
// ============================================================================
// TYPES
// ============================================================================
export type GetExistingSyncJobsInput = z.infer<typeof GetExistingSyncJobsInputSchema>;
export type ExistingSyncJob = z.infer<typeof ExistingSyncJobSchema>;
export type GetExistingSyncJobsOutput = z.infer<typeof GetExistingSyncJobsOutputSchema>;
// ============================================================================
// QUERY FUNCTIONS
// ============================================================================
/**
* Get existing sync jobs that need to be processed
* Returns jobs from the stored_values_sync_jobs table grouped by data source
*/
export async function getExistingSyncJobs(
input: Partial<GetExistingSyncJobsInput> = {}
): Promise<GetExistingSyncJobsOutput> {
try {
const validated = GetExistingSyncJobsInputSchema.parse(input);
// Build the query to get existing sync jobs with their data source info
const baseQuery = db
.select({
id: storedValuesSyncJobs.id,
dataSourceId: storedValuesSyncJobs.dataSourceId,
dataSourceName: dataSources.name,
dataSourceType: dataSources.type,
organizationId: dataSources.organizationId,
databaseName: storedValuesSyncJobs.databaseName,
schemaName: storedValuesSyncJobs.schemaName,
tableName: storedValuesSyncJobs.tableName,
columnName: storedValuesSyncJobs.columnName,
status: storedValuesSyncJobs.status,
errorMessage: storedValuesSyncJobs.errorMessage,
lastSyncedAt: storedValuesSyncJobs.lastSyncedAt,
createdAt: storedValuesSyncJobs.createdAt,
})
.from(storedValuesSyncJobs)
.innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId))
.where(
and(
// Only get jobs with specified statuses
inArray(storedValuesSyncJobs.status, validated.statuses),
// Only active data sources
isNull(dataSources.deletedAt)
)
);
// Add limit if specified
const query = validated.limit ? baseQuery.limit(validated.limit) : baseQuery;
const syncJobs = await query;
// Transform the results
const formattedJobs: ExistingSyncJob[] = syncJobs.map((job) => ({
...job,
lastSyncedAt: job.lastSyncedAt ? new Date(job.lastSyncedAt).toISOString() : null,
createdAt: new Date(job.createdAt).toISOString(),
}));
// Group jobs by data source
const byDataSource: Record<
string,
{
dataSourceId: string;
dataSourceName: string;
dataSourceType: string;
organizationId: string;
jobCount: number;
jobs: ExistingSyncJob[];
}
> = {};
for (const job of formattedJobs) {
if (!byDataSource[job.dataSourceId]) {
byDataSource[job.dataSourceId] = {
dataSourceId: job.dataSourceId,
dataSourceName: job.dataSourceName,
dataSourceType: job.dataSourceType,
organizationId: job.organizationId,
jobCount: 0,
jobs: [],
};
}
const dataSourceInfo = byDataSource[job.dataSourceId];
if (dataSourceInfo) {
dataSourceInfo.jobs.push(job);
dataSourceInfo.jobCount++;
}
}
return GetExistingSyncJobsOutputSchema.parse({
syncJobs: formattedJobs,
totalCount: formattedJobs.length,
byDataSource,
});
} catch (error) {
if (error instanceof z.ZodError) {
throw new Error(`Validation error in getExistingSyncJobs: ${error.message}`);
}
if (error instanceof Error) {
throw new Error(`Database error in getExistingSyncJobs: ${error.message}`);
}
throw new Error(`Unknown error in getExistingSyncJobs: ${String(error)}`);
}
}
/**
* Get sync jobs for a specific data source
* Useful for processing jobs per data source
*/
export async function getSyncJobsForDataSource(
dataSourceId: string,
statuses: string[] = ['pending', 'pending_manual', 'pending_initial', 'failed']
): Promise<ExistingSyncJob[]> {
try {
const validatedId = z.string().uuid().parse(dataSourceId);
const jobs = await db
.select({
id: storedValuesSyncJobs.id,
dataSourceId: storedValuesSyncJobs.dataSourceId,
dataSourceName: dataSources.name,
dataSourceType: dataSources.type,
organizationId: dataSources.organizationId,
databaseName: storedValuesSyncJobs.databaseName,
schemaName: storedValuesSyncJobs.schemaName,
tableName: storedValuesSyncJobs.tableName,
columnName: storedValuesSyncJobs.columnName,
status: storedValuesSyncJobs.status,
errorMessage: storedValuesSyncJobs.errorMessage,
lastSyncedAt: storedValuesSyncJobs.lastSyncedAt,
createdAt: storedValuesSyncJobs.createdAt,
})
.from(storedValuesSyncJobs)
.innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId))
.where(
and(
eq(storedValuesSyncJobs.dataSourceId, validatedId),
inArray(storedValuesSyncJobs.status, statuses),
isNull(dataSources.deletedAt)
)
);
return jobs.map((job) => ({
...job,
lastSyncedAt: job.lastSyncedAt ? new Date(job.lastSyncedAt).toISOString() : null,
createdAt: new Date(job.createdAt).toISOString(),
}));
} catch (error) {
if (error instanceof z.ZodError) {
throw new Error(`Invalid dataSourceId: ${error.message}`);
}
if (error instanceof Error) {
throw new Error(`Database error in getSyncJobsForDataSource: ${error.message}`);
}
throw new Error(`Unknown error in getSyncJobsForDataSource: ${String(error)}`);
}
}
/**
* Count pending sync jobs
* Useful for monitoring and reporting
*/
export async function countPendingSyncJobs(): Promise<number> {
try {
const result = await db
.select({
count: sql<number>`COUNT(*)`,
})
.from(storedValuesSyncJobs)
.innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId))
.where(
and(
inArray(storedValuesSyncJobs.status, ['pending', 'pending_manual', 'pending_initial']),
isNull(dataSources.deletedAt)
)
);
return Number(result[0]?.count || 0);
} catch (error) {
if (error instanceof Error) {
throw new Error(`Database error in countPendingSyncJobs: ${error.message}`);
}
throw new Error(`Unknown error in countPendingSyncJobs: ${String(error)}`);
}
}

View File

@ -1,231 +0,0 @@
import { and, eq, isNull, sql } from 'drizzle-orm';
import { z } from 'zod';
import { db } from '../../connection';
import { dataSources, storedValuesSyncJobs } from '../../schema';
// ============================================================================
// VALIDATION SCHEMAS
// ============================================================================
export const GetSearchableColumnsInputSchema = z.object({
dataSourceId: z.string().uuid(),
});
export const SearchableColumnSchema = z.object({
id: z.string().uuid(),
databaseName: z.string(),
schemaName: z.string(),
tableName: z.string(),
columnName: z.string(),
status: z.string(),
errorMessage: z.string().nullable(),
lastSyncedAt: z.string().datetime().nullable(),
});
export const GetSearchableColumnsOutputSchema = z.object({
columns: z.array(SearchableColumnSchema),
totalCount: z.number().int().min(0),
});
export const GetColumnsNeedingSyncInputSchema = z.object({
dataSourceId: z.string().uuid(),
hoursThreshold: z.number().min(0).optional().default(24), // Default to 24 hours
});
export const ColumnsNeedingSyncOutputSchema = z.object({
columns: z.array(SearchableColumnSchema),
totalCount: z.number().int().min(0),
neverSynced: z.number().int().min(0),
stale: z.number().int().min(0),
});
// ============================================================================
// TYPES
// ============================================================================
export type GetSearchableColumnsInput = z.infer<typeof GetSearchableColumnsInputSchema>;
export type SearchableColumn = z.infer<typeof SearchableColumnSchema>;
export type GetSearchableColumnsOutput = z.infer<typeof GetSearchableColumnsOutputSchema>;
export type GetColumnsNeedingSyncInput = z.infer<typeof GetColumnsNeedingSyncInputSchema>;
export type ColumnsNeedingSyncOutput = z.infer<typeof ColumnsNeedingSyncOutputSchema>;
// ============================================================================
// QUERY FUNCTIONS
// ============================================================================
/**
* Get all searchable columns for a data source
* Returns columns from the stored_values_sync_jobs table
*/
export async function getSearchableColumns(
input: GetSearchableColumnsInput
): Promise<GetSearchableColumnsOutput> {
try {
const validated = GetSearchableColumnsInputSchema.parse(input);
// Query all sync jobs for the data source
const columns = await db
.select({
id: storedValuesSyncJobs.id,
databaseName: storedValuesSyncJobs.databaseName,
schemaName: storedValuesSyncJobs.schemaName,
tableName: storedValuesSyncJobs.tableName,
columnName: storedValuesSyncJobs.columnName,
status: storedValuesSyncJobs.status,
errorMessage: storedValuesSyncJobs.errorMessage,
lastSyncedAt: storedValuesSyncJobs.lastSyncedAt,
})
.from(storedValuesSyncJobs)
.innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId))
.where(
and(
eq(storedValuesSyncJobs.dataSourceId, validated.dataSourceId),
isNull(dataSources.deletedAt),
eq(dataSources.onboardingStatus, 'completed')
)
);
return GetSearchableColumnsOutputSchema.parse({
columns,
totalCount: columns.length,
});
} catch (error) {
if (error instanceof z.ZodError) {
throw new Error(`Validation error in getSearchableColumns: ${error.message}`);
}
if (error instanceof Error) {
throw new Error(`Database error in getSearchableColumns: ${error.message}`);
}
throw new Error(`Unknown error in getSearchableColumns: ${String(error)}`);
}
}
/**
* Get columns that need to be synced
* Returns columns that have never been synced or are stale
*/
export async function getColumnsNeedingSync(
input: GetColumnsNeedingSyncInput
): Promise<ColumnsNeedingSyncOutput> {
try {
const validated = GetColumnsNeedingSyncInputSchema.parse(input);
// Calculate the threshold timestamp
const thresholdDate = new Date();
thresholdDate.setHours(thresholdDate.getHours() - validated.hoursThreshold);
const thresholdTimestamp = thresholdDate.toISOString();
// Query all searchable columns
const allColumns = await getSearchableColumns({
dataSourceId: validated.dataSourceId,
});
// Filter columns that need syncing
const columnsNeedingSync = allColumns.columns.filter((column) => {
// Never synced
if (!column.lastSyncedAt) {
return true;
}
// Check if stale
return column.lastSyncedAt < thresholdTimestamp;
});
// Count never synced vs stale
const neverSynced = columnsNeedingSync.filter((c) => !c.lastSyncedAt).length;
const stale = columnsNeedingSync.filter((c) => c.lastSyncedAt).length;
return ColumnsNeedingSyncOutputSchema.parse({
columns: columnsNeedingSync,
totalCount: columnsNeedingSync.length,
neverSynced,
stale,
});
} catch (error) {
if (error instanceof z.ZodError) {
throw new Error(`Validation error in getColumnsNeedingSync: ${error.message}`);
}
throw error;
}
}
/**
* Get column details for sync job creation
* Returns the minimum information needed to create sync jobs
*/
export async function getColumnDetailsForSync(dataSourceId: string) {
try {
const validated = z.string().uuid().parse(dataSourceId);
const columns = await db
.select({
databaseName: storedValuesSyncJobs.databaseName,
schemaName: storedValuesSyncJobs.schemaName,
tableName: storedValuesSyncJobs.tableName,
columnName: storedValuesSyncJobs.columnName,
columnId: storedValuesSyncJobs.id,
lastSynced: storedValuesSyncJobs.lastSyncedAt,
})
.from(storedValuesSyncJobs)
.innerJoin(dataSources, eq(dataSources.id, storedValuesSyncJobs.dataSourceId))
.where(
and(
eq(storedValuesSyncJobs.dataSourceId, validated),
isNull(dataSources.deletedAt),
eq(dataSources.onboardingStatus, 'completed')
)
);
return columns;
} catch (error) {
if (error instanceof z.ZodError) {
throw new Error(`Invalid dataSourceId: ${error.message}`);
}
throw error;
}
}
/**
* Update column sync metadata after successful sync
* Updates the stored_values_sync_jobs table
*/
export async function updateColumnSyncMetadata(
syncJobId: string,
metadata: {
status: 'syncing' | 'success' | 'failed' | 'error';
error?: string | null;
lastSynced?: string;
}
): Promise<void> {
try {
const validatedId = z.string().uuid().parse(syncJobId);
const updateData: Record<string, unknown> = {
status: metadata.status,
};
if (metadata.error !== undefined) {
updateData.errorMessage = metadata.error;
}
if (metadata.lastSynced !== undefined) {
updateData.lastSyncedAt = metadata.lastSynced;
}
await db.update(storedValuesSyncJobs).set(updateData).where(eq(storedValuesSyncJobs.id, validatedId));
} catch (error) {
if (error instanceof z.ZodError) {
throw new Error(`Invalid syncJobId: ${error.message}`);
}
if (error instanceof Error) {
throw new Error(`Failed to update column sync metadata: ${error.message}`);
}
throw error;
}
}

View File

@ -7,19 +7,22 @@
*/
// ============================================================================
// DATA SOURCE QUERIES
// EXISTING SYNC JOBS QUERIES
// ============================================================================
export {
// Functions
getDataSourcesForSync,
dataSourceNeedsSync,
getExistingSyncJobs,
getSyncJobsForDataSource,
countPendingSyncJobs,
// Types
type DataSourceForSync,
type GetDataSourcesForSyncOutput,
type GetExistingSyncJobsInput,
type ExistingSyncJob,
type GetExistingSyncJobsOutput,
// Schemas
DataSourceForSyncSchema,
GetDataSourcesForSyncOutputSchema,
} from './get-data-sources-for-sync';
GetExistingSyncJobsInputSchema,
ExistingSyncJobSchema,
GetExistingSyncJobsOutputSchema,
} from './get-existing-jobs';
// ============================================================================
// SYNC JOB CREATION
@ -69,26 +72,3 @@ export {
BulkUpdateSyncJobsInputSchema,
BulkUpdateSyncJobsOutputSchema,
} from './update-sync-job';
// ============================================================================
// SEARCHABLE COLUMNS QUERIES
// ============================================================================
export {
// Functions
getSearchableColumns,
getColumnsNeedingSync,
getColumnDetailsForSync,
updateColumnSyncMetadata,
// Types
type GetSearchableColumnsInput,
type SearchableColumn,
type GetSearchableColumnsOutput,
type GetColumnsNeedingSyncInput,
type ColumnsNeedingSyncOutput,
// Schemas
GetSearchableColumnsInputSchema,
SearchableColumnSchema,
GetSearchableColumnsOutputSchema,
GetColumnsNeedingSyncInputSchema,
ColumnsNeedingSyncOutputSchema,
} from './get-searchable-columns';

View File

@ -23,53 +23,126 @@ describe('Sync Jobs Query Helpers', () => {
vi.clearAllMocks();
});
describe('getDataSourcesForSync', () => {
it('should validate and return data sources with searchable columns', async () => {
describe('getExistingSyncJobs', () => {
it('should validate and return existing sync jobs grouped by data source', async () => {
// This test validates the schema structure
const mockOutput: syncJobs.GetDataSourcesForSyncOutput = {
dataSources: [
const mockOutput: syncJobs.GetExistingSyncJobsOutput = {
syncJobs: [
{
id: '123e4567-e89b-12d3-a456-426614174000',
name: 'Test Data Source',
type: 'postgresql',
organizationId: '123e4567-e89b-12d3-a456-426614174001',
columnsWithStoredValues: 5,
dataSourceId: '123e4567-e89b-12d3-a456-426614174001',
dataSourceName: 'Test Data Source',
dataSourceType: 'postgresql',
organizationId: '123e4567-e89b-12d3-a456-426614174002',
databaseName: 'test_db',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
status: 'pending',
errorMessage: null,
lastSyncedAt: null,
createdAt: '2024-01-01T00:00:00.000Z',
},
{
id: '123e4567-e89b-12d3-a456-426614174003',
dataSourceId: '123e4567-e89b-12d3-a456-426614174001',
dataSourceName: 'Test Data Source',
dataSourceType: 'postgresql',
organizationId: '123e4567-e89b-12d3-a456-426614174002',
databaseName: 'test_db',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
status: 'failed',
errorMessage: 'Connection timeout',
lastSyncedAt: '2024-01-01T00:00:00.000Z',
createdAt: '2024-01-01T00:00:00.000Z',
},
],
totalCount: 1,
totalCount: 2,
byDataSource: {
'123e4567-e89b-12d3-a456-426614174001': {
dataSourceId: '123e4567-e89b-12d3-a456-426614174001',
dataSourceName: 'Test Data Source',
dataSourceType: 'postgresql',
organizationId: '123e4567-e89b-12d3-a456-426614174002',
jobCount: 2,
jobs: [
{
id: '123e4567-e89b-12d3-a456-426614174000',
dataSourceId: '123e4567-e89b-12d3-a456-426614174001',
dataSourceName: 'Test Data Source',
dataSourceType: 'postgresql',
organizationId: '123e4567-e89b-12d3-a456-426614174002',
databaseName: 'test_db',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
status: 'pending',
errorMessage: null,
lastSyncedAt: null,
createdAt: '2024-01-01T00:00:00.000Z',
},
{
id: '123e4567-e89b-12d3-a456-426614174003',
dataSourceId: '123e4567-e89b-12d3-a456-426614174001',
dataSourceName: 'Test Data Source',
dataSourceType: 'postgresql',
organizationId: '123e4567-e89b-12d3-a456-426614174002',
databaseName: 'test_db',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
status: 'failed',
errorMessage: 'Connection timeout',
lastSyncedAt: '2024-01-01T00:00:00.000Z',
createdAt: '2024-01-01T00:00:00.000Z',
},
],
},
},
};
// Validate output schema
const validated = syncJobs.GetDataSourcesForSyncOutputSchema.parse(mockOutput);
expect(validated.dataSources).toHaveLength(1);
expect(validated.dataSources[0]?.columnsWithStoredValues).toBe(5);
const validated = syncJobs.GetExistingSyncJobsOutputSchema.parse(mockOutput);
expect(validated.syncJobs).toHaveLength(2);
expect(validated.totalCount).toBe(2);
expect(Object.keys(validated.byDataSource)).toHaveLength(1);
expect(validated.byDataSource['123e4567-e89b-12d3-a456-426614174001']?.jobCount).toBe(2);
});
it('should reject invalid data source data', () => {
it('should reject invalid sync job data', () => {
const invalidData = {
dataSources: [
syncJobs: [
{
id: 'not-a-uuid',
name: 'Test',
type: 'postgresql',
organizationId: '123e4567-e89b-12d3-a456-426614174001',
columnsWithStoredValues: -1, // Invalid: negative count
dataSourceId: '123e4567-e89b-12d3-a456-426614174001',
dataSourceName: 'Test',
dataSourceType: 'postgresql',
organizationId: '123e4567-e89b-12d3-a456-426614174002',
databaseName: 'test_db',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
status: 'pending',
errorMessage: null,
lastSyncedAt: null,
createdAt: '2024-01-01T00:00:00.000Z',
},
],
totalCount: 1,
byDataSource: {},
};
expect(() => syncJobs.GetDataSourcesForSyncOutputSchema.parse(invalidData)).toThrow(
z.ZodError
);
expect(() => syncJobs.GetExistingSyncJobsOutputSchema.parse(invalidData)).toThrow(z.ZodError);
});
});
describe('createSearchableValuesSyncJob', () => {
it('should validate sync job input', () => {
it('should validate sync job creation input', () => {
const validInput: syncJobs.CreateSyncJobInput = {
dataSourceId: '123e4567-e89b-12d3-a456-426614174000',
databaseName: 'analytics',
databaseName: 'test_db',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
@ -77,14 +150,14 @@ describe('Sync Jobs Query Helpers', () => {
};
const validated = syncJobs.CreateSyncJobInputSchema.parse(validInput);
expect(validated.dataSourceId).toBe('123e4567-e89b-12d3-a456-426614174000');
expect(validated.syncType).toBe('daily');
expect(validated.databaseName).toBe('analytics');
});
it('should reject empty strings in required fields', () => {
it('should reject invalid creation input', () => {
const invalidInput = {
dataSourceId: '123e4567-e89b-12d3-a456-426614174000',
databaseName: '', // Invalid: empty string
dataSourceId: 'not-a-uuid',
databaseName: '',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
@ -92,89 +165,135 @@ describe('Sync Jobs Query Helpers', () => {
expect(() => syncJobs.CreateSyncJobInputSchema.parse(invalidInput)).toThrow(z.ZodError);
});
it('should validate batch create input', () => {
const batchInput: syncJobs.BatchCreateSyncJobsInput = {
dataSourceId: '123e4567-e89b-12d3-a456-426614174000',
syncType: 'manual',
columns: [
{
databaseName: 'analytics',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
},
{
databaseName: 'analytics',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
],
};
const validated = syncJobs.BatchCreateSyncJobsInputSchema.parse(batchInput);
expect(validated.columns).toHaveLength(2);
expect(validated.syncType).toBe('manual');
});
});
describe('updateSyncJobStatus', () => {
it('should validate status update input', () => {
const validInput: syncJobs.UpdateSyncJobStatusInput = {
jobId: '123e4567-e89b-12d3-a456-426614174000',
status: 'completed',
status: 'success',
metadata: {
processedCount: 100,
existingCount: 20,
newCount: 80,
existingCount: 50,
newCount: 50,
duration: 5000,
syncedAt: new Date().toISOString(),
syncedAt: '2024-01-01T00:00:00.000Z',
},
};
const validated = syncJobs.UpdateSyncJobStatusInputSchema.parse(validInput);
expect(validated.status).toBe('completed');
expect(validated.status).toBe('success');
expect(validated.metadata?.processedCount).toBe(100);
});
it('should validate all sync job statuses', () => {
const validStatuses: syncJobs.SyncJobStatus[] = [
it('should accept valid status values', () => {
const statuses = [
'pending',
'pending_manual',
'pending_initial',
'in_progress',
'completed',
'success',
'failed',
'cancelled',
'skipped',
];
for (const status of validStatuses) {
const validated = syncJobs.SyncJobStatusSchema.parse(status);
expect(validated).toBe(status);
for (const status of statuses) {
const input = {
jobId: '123e4567-e89b-12d3-a456-426614174000',
status,
};
expect(() => syncJobs.UpdateSyncJobStatusInputSchema.parse(input)).not.toThrow();
}
});
it('should reject invalid status', () => {
expect(() => syncJobs.SyncJobStatusSchema.parse('invalid_status')).toThrow(z.ZodError);
});
it('should validate bulk update input', () => {
const bulkInput: syncJobs.BulkUpdateSyncJobsInput = {
jobIds: ['123e4567-e89b-12d3-a456-426614174000', '123e4567-e89b-12d3-a456-426614174001'],
status: 'cancelled',
errorMessage: 'Batch operation cancelled by user',
it('should reject invalid status values', () => {
const invalidInput = {
jobId: '123e4567-e89b-12d3-a456-426614174000',
status: 'invalid_status',
};
const validated = syncJobs.BulkUpdateSyncJobsInputSchema.parse(bulkInput);
expect(validated.jobIds).toHaveLength(2);
expect(validated.status).toBe('cancelled');
expect(() => syncJobs.UpdateSyncJobStatusInputSchema.parse(invalidInput)).toThrow(z.ZodError);
});
});
describe('batchCreateSyncJobs', () => {
it('should validate batch creation input', () => {
const validInput: syncJobs.BatchCreateSyncJobsInput = {
dataSourceId: '123e4567-e89b-12d3-a456-426614174000',
syncType: 'manual',
columns: [
{
databaseName: 'test_db',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
},
{
databaseName: 'test_db',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
},
],
};
const validated = syncJobs.BatchCreateSyncJobsInputSchema.parse(validInput);
expect(validated.columns).toHaveLength(2);
expect(validated.syncType).toBe('manual');
});
it('should reject empty jobIds array', () => {
it('should validate batch creation output', () => {
const validOutput: syncJobs.BatchCreateSyncJobsOutput = {
created: [
{
id: '123e4567-e89b-12d3-a456-426614174000',
dataSourceId: '123e4567-e89b-12d3-a456-426614174001',
databaseName: 'test_db',
schemaName: 'public',
tableName: 'users',
columnName: 'name',
status: 'pending',
createdAt: '2024-01-01T00:00:00.000Z',
},
],
totalCreated: 1,
errors: [
{
column: {
databaseName: 'test_db',
schemaName: 'public',
tableName: 'users',
columnName: 'email',
},
error: 'Duplicate entry',
},
],
};
const validated = syncJobs.BatchCreateSyncJobsOutputSchema.parse(validOutput);
expect(validated.totalCreated).toBe(1);
expect(validated.errors).toHaveLength(1);
});
});
describe('bulkUpdateSyncJobs', () => {
it('should validate bulk update input', () => {
const validInput: syncJobs.BulkUpdateSyncJobsInput = {
jobIds: ['123e4567-e89b-12d3-a456-426614174000', '123e4567-e89b-12d3-a456-426614174001'],
status: 'cancelled',
errorMessage: 'Batch cancelled by user',
};
const validated = syncJobs.BulkUpdateSyncJobsInputSchema.parse(validInput);
expect(validated.jobIds).toHaveLength(2);
expect(validated.status).toBe('cancelled');
expect(validated.errorMessage).toBe('Batch cancelled by user');
});
it('should reject empty job IDs array', () => {
const invalidInput = {
jobIds: [], // Invalid: empty array
jobIds: [],
status: 'cancelled',
};
@ -182,188 +301,33 @@ describe('Sync Jobs Query Helpers', () => {
});
});
describe('getSearchableColumns', () => {
it('should validate searchable columns output', () => {
const mockColumn: syncJobs.SearchableColumn = {
describe('getSyncJobStatus', () => {
it('should validate get status input', () => {
const validInput: syncJobs.GetSyncJobStatusInput = {
jobId: '123e4567-e89b-12d3-a456-426614174000',
};
const validated = syncJobs.GetSyncJobStatusInputSchema.parse(validInput);
expect(validated.jobId).toBe('123e4567-e89b-12d3-a456-426614174000');
});
it('should validate get status output', () => {
const validOutput: syncJobs.GetSyncJobStatusOutput = {
id: '123e4567-e89b-12d3-a456-426614174000',
datasetId: '123e4567-e89b-12d3-a456-426614174001',
datasetName: 'users_dataset',
databaseName: 'analytics',
dataSourceId: '123e4567-e89b-12d3-a456-426614174001',
databaseName: 'test_db',
schemaName: 'public',
tableName: 'users_dataset',
tableName: 'users',
columnName: 'email',
columnType: 'varchar',
description: null,
semanticType: null,
storedValuesStatus: 'success',
storedValuesError: null,
storedValuesCount: 1000,
storedValuesLastSynced: new Date().toISOString(),
status: 'in_progress',
lastSyncedAt: null,
errorMessage: null,
createdAt: '2024-01-01T00:00:00.000Z',
};
const validated = syncJobs.SearchableColumnSchema.parse(mockColumn);
expect(validated.columnName).toBe('email');
expect(validated.storedValuesCount).toBe(1000);
});
it('should validate columns needing sync output', () => {
const mockOutput: syncJobs.ColumnsNeedingSyncOutput = {
columns: [],
totalCount: 0,
neverSynced: 0,
stale: 0,
};
const validated = syncJobs.ColumnsNeedingSyncOutputSchema.parse(mockOutput);
expect(validated.totalCount).toBe(0);
expect(validated.neverSynced).toBe(0);
});
it('should validate get searchable columns output with grouping', () => {
const mockColumn: syncJobs.SearchableColumn = {
id: '123e4567-e89b-12d3-a456-426614174000',
datasetId: '123e4567-e89b-12d3-a456-426614174001',
datasetName: 'users_dataset',
databaseName: 'analytics',
schemaName: 'public',
tableName: 'users_dataset',
columnName: 'email',
columnType: 'varchar',
description: null,
semanticType: null,
storedValuesStatus: null,
storedValuesError: null,
storedValuesCount: null,
storedValuesLastSynced: null,
};
const mockOutput: syncJobs.GetSearchableColumnsOutput = {
columns: [mockColumn],
totalCount: 1,
byDataset: {
'123e4567-e89b-12d3-a456-426614174001': {
datasetName: 'users_dataset',
columns: [mockColumn],
count: 1,
},
},
};
const validated = syncJobs.GetSearchableColumnsOutputSchema.parse(mockOutput);
expect(validated.totalCount).toBe(1);
expect(validated.byDataset).toHaveProperty('123e4567-e89b-12d3-a456-426614174001');
});
});
describe('Schema Edge Cases', () => {
it('should handle nullable fields correctly', () => {
const column: syncJobs.SearchableColumn = {
id: '123e4567-e89b-12d3-a456-426614174000',
datasetId: '123e4567-e89b-12d3-a456-426614174001',
datasetName: 'test',
databaseName: 'db',
schemaName: 'schema',
tableName: 'table',
columnName: 'col',
columnType: 'text',
description: null,
semanticType: null,
storedValuesStatus: null,
storedValuesError: null,
storedValuesCount: null,
storedValuesLastSynced: null,
};
const validated = syncJobs.SearchableColumnSchema.parse(column);
expect(validated.description).toBeNull();
expect(validated.storedValuesCount).toBeNull();
});
it('should validate datetime strings', () => {
const validDatetime = '2024-01-01T12:00:00.000Z';
const invalidDatetime = '2024-01-01 12:00:00'; // Missing T and Z
const column = {
id: '123e4567-e89b-12d3-a456-426614174000',
datasetId: '123e4567-e89b-12d3-a456-426614174001',
datasetName: 'test',
databaseName: 'db',
schemaName: 'schema',
tableName: 'table',
columnName: 'col',
columnType: 'text',
description: null,
semanticType: null,
storedValuesStatus: null,
storedValuesError: null,
storedValuesCount: null,
storedValuesLastSynced: validDatetime,
};
// Should accept valid datetime
const validated = syncJobs.SearchableColumnSchema.parse(column);
expect(validated.storedValuesLastSynced).toBe(validDatetime);
// Should reject invalid datetime
column.storedValuesLastSynced = invalidDatetime;
expect(() => syncJobs.SearchableColumnSchema.parse(column)).toThrow(z.ZodError);
});
it('should validate UUID formats', () => {
const validUuid = '123e4567-e89b-12d3-a456-426614174000';
const invalidUuid = '123-456-789';
// Valid UUID
const validInput = { dataSourceId: validUuid };
const validated = syncJobs.GetSearchableColumnsInputSchema.parse(validInput);
expect(validated.dataSourceId).toBe(validUuid);
// Invalid UUID
const invalidInput = { dataSourceId: invalidUuid };
expect(() => syncJobs.GetSearchableColumnsInputSchema.parse(invalidInput)).toThrow(
z.ZodError
);
});
});
describe('Type Exports', () => {
it('should export all required types', () => {
// Verify type exports exist (TypeScript will check at compile time)
const _dataSourceType: syncJobs.DataSourceForSync = {
id: '123e4567-e89b-12d3-a456-426614174000',
name: 'Test',
type: 'postgresql',
organizationId: '123e4567-e89b-12d3-a456-426614174001',
columnsWithStoredValues: 0,
};
const _syncJobInput: syncJobs.CreateSyncJobInput = {
dataSourceId: '123e4567-e89b-12d3-a456-426614174000',
databaseName: 'db',
schemaName: 'schema',
tableName: 'table',
columnName: 'column',
};
const _searchableColumn: syncJobs.SearchableColumn = {
id: '123e4567-e89b-12d3-a456-426614174000',
datasetId: '123e4567-e89b-12d3-a456-426614174001',
datasetName: 'test',
databaseName: 'db',
schemaName: 'schema',
tableName: 'table',
columnName: 'col',
columnType: 'text',
description: null,
semanticType: null,
storedValuesStatus: null,
storedValuesError: null,
storedValuesCount: null,
storedValuesLastSynced: null,
};
// Types are valid if this compiles
expect(true).toBe(true);
const validated = syncJobs.GetSyncJobStatusOutputSchema.parse(validOutput);
expect(validated.status).toBe('in_progress');
expect(validated.lastSyncedAt).toBeNull();
});
});
});

View File

@ -12,7 +12,7 @@ export const SyncJobStatusSchema = z.enum([
'pending_manual',
'pending_initial',
'in_progress',
'completed',
'success',
'failed',
'cancelled',
'skipped',
@ -103,7 +103,7 @@ export async function updateSyncJobStatus(
};
// Handle success case
if (validated.status === 'completed' && validated.metadata?.syncedAt) {
if (validated.status === 'success' && validated.metadata?.syncedAt) {
updateData.lastSyncedAt = validated.metadata.syncedAt;
updateData.errorMessage = null; // Clear any previous error
}
@ -131,9 +131,7 @@ export async function updateSyncJobStatus(
return UpdateSyncJobOutputSchema.parse({
...updated,
lastSyncedAt: updated.lastSyncedAt
? new Date(updated.lastSyncedAt).toISOString()
: null,
lastSyncedAt: updated.lastSyncedAt ? new Date(updated.lastSyncedAt).toISOString() : null,
updatedAt: new Date().toISOString(),
});
} catch (error) {
@ -182,9 +180,7 @@ export async function getSyncJobStatus(
return GetSyncJobStatusOutputSchema.parse({
...job,
lastSyncedAt: job.lastSyncedAt
? new Date(job.lastSyncedAt).toISOString()
: null,
lastSyncedAt: job.lastSyncedAt ? new Date(job.lastSyncedAt).toISOString() : null,
createdAt: new Date(job.createdAt).toISOString(),
});
} catch (error) {
@ -225,7 +221,7 @@ export async function markSyncJobCompleted(
): Promise<UpdateSyncJobOutput> {
return updateSyncJobStatus({
jobId,
status: 'completed',
status: 'success',
metadata: {
...metadata,
syncedAt: new Date().toISOString(),

View File

@ -144,7 +144,7 @@ describe('Turbopuffer Client', () => {
expect(keys[1]).toBe('db1:public:users:email:john@example.com');
expect(mockNamespace.query).toHaveBeenCalledWith({
top_k: 10000,
top_k: 1200,
filters: ['database', 'Eq', 'db1'],
include_attributes: ['database', 'schema', 'table', 'column', 'value'],
});
@ -186,7 +186,7 @@ describe('Turbopuffer Client', () => {
await queryExistingKeys({ dataSourceId: mockDataSourceId, query });
expect(mockNamespace.query).toHaveBeenCalledWith({
top_k: 10000,
top_k: 1200,
filters: [
'And',
[
@ -249,6 +249,7 @@ describe('Turbopuffer Client', () => {
value: ['John', 'john@example.com'],
synced_at: expect.arrayContaining([expect.any(String)]),
},
distance_metric: 'cosine_distance',
});
});

View File

@ -45,7 +45,7 @@ const QueryInputSchema = z.object({
const GetAllInputSchema = z.object({
dataSourceId: DataSourceIdSchema,
limit: z.number().int().min(1).max(10000).optional().default(1000),
limit: z.number().int().min(1).max(1200).optional().default(1000),
});
// ============================================================================
@ -154,7 +154,7 @@ export const chunk = <T>(arr: T[], size: number): T[][] => {
export const createClient = (): Turbopuffer => {
const apiKey = process.env.TURBOPUFFER_API_KEY;
const region = process.env.TURBOPUFFER_REGION || 'aws-us-east-1';
if (!apiKey) {
throw new TurbopufferError(
'TURBOPUFFER_API_KEY environment variable is not set',
@ -162,10 +162,10 @@ export const createClient = (): Turbopuffer => {
false
);
}
return new Turbopuffer({
return new Turbopuffer({
apiKey,
region
region,
});
};
@ -242,6 +242,8 @@ export const checkNamespaceExists = async (dataSourceId: string): Promise<boolea
/**
* Query existing keys with validation
* Note: Turbopuffer has a maximum top_k limit of 1200
* For larger datasets, this will return up to 1200 most recent values
*/
export const queryExistingKeys = async (
input: z.infer<typeof QueryInputSchema>
@ -252,7 +254,7 @@ export const queryExistingKeys = async (
return withRetry(async () => {
const response = await ns.query({
top_k: 10000,
top_k: 1200, // Maximum allowed by Turbopuffer
...(filter && { filters: filter }),
include_attributes: ['database', 'schema', 'table', 'column', 'value'],
});
@ -285,7 +287,10 @@ const processBatch = async (
}
const columns = valuesToColumns(batch);
await ns.write({ upsert_columns: columns });
await ns.write({
upsert_columns: columns,
distance_metric: 'cosine_distance'
});
return { success: true, count: batch.length };
} catch (error) {
@ -461,11 +466,29 @@ export const searchSimilarValues = async (
// ============================================================================
/**
* Legacy function signatures for compatibility
* Ensure namespace exists (namespaces are auto-created on first write in TurboPuffer)
* This function verifies the namespace can be accessed and logs the status
*/
export const createNamespaceIfNotExists = async (dataSourceId: string): Promise<void> => {
const exists = await checkNamespaceExists(dataSourceId);
if (!exists) {
console.info(`Namespace ${generateNamespace(dataSourceId)} will be created on first write`);
const validatedId = DataSourceIdSchema.parse(dataSourceId);
const namespaceName = generateNamespace(validatedId);
try {
const exists = await checkNamespaceExists(validatedId);
if (exists) {
console.info(`TurboPuffer namespace ${namespaceName} already exists`);
} else {
console.info(`TurboPuffer namespace ${namespaceName} will be created on first write`);
// Note: TurboPuffer auto-creates namespaces on first write
// We cannot explicitly create empty namespaces
}
} catch (error) {
const errorMsg = error instanceof Error ? error.message : 'Unknown error';
console.error(`Failed to check TurboPuffer namespace ${namespaceName}: ${errorMsg}`);
throw new TurbopufferError(
`Failed to verify namespace ${namespaceName}: ${errorMsg}`,
'NAMESPACE_CHECK_FAILED',
true
);
}
};

View File

@ -3,7 +3,7 @@
* Uses functional composition and Zod validation
*/
import * as duckdb from 'duckdb';
import duckdb from 'duckdb';
import { z } from 'zod';
import {
type DeduplicationResult,
@ -59,32 +59,74 @@ export const formatSqlInClause = (values: string[]): string => {
export interface DuckDBConnection {
db: duckdb.Database;
conn: duckdb.Connection;
dbPath?: string; // Store path for cleanup
}
/**
* Create an in-memory DuckDB connection
* Create a DuckDB connection with optimized settings for large datasets
* Uses disk storage for better memory management
*/
export const createConnection = (): Promise<DuckDBConnection> => {
export const createConnection = (useDisk = true): Promise<DuckDBConnection> => {
return new Promise((resolve, reject) => {
const db = new duckdb.Database(':memory:', (err) => {
// Use disk storage for large datasets to avoid memory issues
// The database file will be automatically cleaned up
const dbPath = useDisk ? `/tmp/duckdb-dedupe-${Date.now()}.db` : ':memory:';
const db = new duckdb.Database(dbPath, (err) => {
if (err) {
reject(new Error(`Failed to create DuckDB database: ${err.message}`));
return;
}
const conn = db.connect();
resolve({ db, conn });
// Configure DuckDB for optimal performance with large datasets
if (useDisk) {
conn.exec("SET memory_limit='2GB';", (err) => {
if (err) console.warn('Failed to set memory limit:', err);
});
conn.exec('SET threads=4;', (err) => {
if (err) console.warn('Failed to set thread count:', err);
});
}
const connection: DuckDBConnection = { db, conn };
if (useDisk && dbPath) {
connection.dbPath = dbPath;
}
resolve(connection);
});
});
};
/**
* Close DuckDB connection and database
* Also cleans up temporary database files if using disk storage
*/
export const closeConnection = (connection: DuckDBConnection): Promise<void> => {
return new Promise((resolve) => {
connection.conn.close(() => {
connection.db.close(() => {
// Clean up temporary database file if it exists
if (connection.dbPath && connection.dbPath !== ':memory:') {
const fs = require('node:fs');
try {
// DuckDB creates additional WAL and temporary files
const files = [
connection.dbPath,
`${connection.dbPath}.wal`,
`${connection.dbPath}.tmp`,
];
files.forEach((file) => {
if (fs.existsSync(file)) {
fs.unlinkSync(file);
}
});
} catch (err) {
console.warn(`Failed to clean up temporary DuckDB file: ${err}`);
}
}
resolve();
});
});
@ -112,44 +154,72 @@ export const executeQuery = <T = unknown>(conn: duckdb.Connection, sql: string):
/**
* Create and populate temporary tables for deduplication
* Optimized for large datasets with increased batch sizes and indexes
*/
const setupTables = async (
conn: duckdb.Connection,
existingKeys: string[],
newKeys: string[]
): Promise<void> => {
// Create tables
// Create tables without primary key constraint initially for faster bulk loading
await executeQuery(
conn,
`
CREATE TABLE existing_keys (key VARCHAR PRIMARY KEY)
CREATE TABLE existing_keys (key VARCHAR)
`
);
await executeQuery(
conn,
`
CREATE TABLE new_keys (key VARCHAR PRIMARY KEY)
CREATE TABLE new_keys (key VARCHAR)
`
);
// Use larger batches for better performance with disk-based storage
const BATCH_SIZE = 10000; // Increased from 1000 to 10000
// Insert existing keys in batches
if (existingKeys.length > 0) {
const batches = batchArray(existingKeys, 1000);
for (const batch of batches) {
console.info(`Inserting ${existingKeys.length} existing keys in batches of ${BATCH_SIZE}`);
const batches = batchArray(existingKeys, BATCH_SIZE);
for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
if (!batch) continue;
const values = batch.map((key) => `('${escapeSqlString(key)}')`).join(',');
await executeQuery(conn, `INSERT INTO existing_keys VALUES ${values}`);
// Log progress for large datasets
if (i > 0 && i % 10 === 0) {
console.info(
`Inserted ${Math.min((i + 1) * BATCH_SIZE, existingKeys.length)}/${existingKeys.length} existing keys`
);
}
}
}
// Insert new keys in batches
if (newKeys.length > 0) {
const batches = batchArray(newKeys, 1000);
for (const batch of batches) {
console.info(`Inserting ${newKeys.length} new keys in batches of ${BATCH_SIZE}`);
const batches = batchArray(newKeys, BATCH_SIZE);
for (let i = 0; i < batches.length; i++) {
const batch = batches[i];
if (!batch) continue;
const values = batch.map((key) => `('${escapeSqlString(key)}')`).join(',');
await executeQuery(conn, `INSERT INTO new_keys VALUES ${values}`);
// Log progress for large datasets
if (i > 0 && i % 10 === 0) {
console.info(
`Inserted ${Math.min((i + 1) * BATCH_SIZE, newKeys.length)}/${newKeys.length} new keys`
);
}
}
}
// Create indexes after bulk loading for better query performance
await executeQuery(conn, `CREATE INDEX idx_existing_keys ON existing_keys(key)`);
await executeQuery(conn, `CREATE INDEX idx_new_keys ON new_keys(key)`);
};
/**
@ -170,6 +240,7 @@ const findUniqueNewKeys = async (conn: duckdb.Connection): Promise<string[]> =>
/**
* Core deduplication logic using DuckDB
* Automatically switches to disk-based storage for large datasets
*/
const performDeduplication = async (
existingKeys: string[],
@ -198,8 +269,17 @@ const performDeduplication = async (
let connection: DuckDBConnection | null = null;
try {
// Determine whether to use disk based on dataset size
// Use disk for large datasets to avoid memory issues
const totalKeys = existingKeys.length + newKeys.length;
const useDisk = totalKeys > 50000; // Switch to disk for datasets over 50k keys
if (useDisk) {
console.info(`Using disk-based DuckDB for deduplication (${totalKeys} total keys)`);
}
// Create DuckDB connection
connection = await createConnection();
connection = await createConnection(useDisk);
// Setup tables and insert data
await setupTables(connection.conn, existingKeys, newKeys);

View File

@ -880,6 +880,9 @@ importers:
'@ai-sdk/gateway':
specifier: ^1.0.15
version: 1.0.15(zod@3.25.76)
'@ai-sdk/openai':
specifier: 2.0.0-beta.16
version: 2.0.0-beta.16(zod@3.25.76)
'@ai-sdk/provider':
specifier: ^2.0.0
version: 2.0.0
@ -12538,14 +12541,28 @@ snapshots:
'@ai-sdk/provider-utils': 3.0.0-beta.10(zod@3.25.1)
zod: 3.25.1
'@ai-sdk/openai@2.0.0-beta.16(zod@3.25.76)':
dependencies:
'@ai-sdk/provider': 2.0.0-beta.2
'@ai-sdk/provider-utils': 3.0.0-beta.10(zod@3.25.76)
zod: 3.25.76
'@ai-sdk/provider-utils@3.0.0-beta.10(zod@3.25.1)':
dependencies:
'@ai-sdk/provider': 2.0.0-beta.2
'@standard-schema/spec': 1.0.0
eventsource-parser: 3.0.3
eventsource-parser: 3.0.6
zod: 3.25.1
zod-to-json-schema: 3.24.6(zod@3.25.1)
'@ai-sdk/provider-utils@3.0.0-beta.10(zod@3.25.76)':
dependencies:
'@ai-sdk/provider': 2.0.0-beta.2
'@standard-schema/spec': 1.0.0
eventsource-parser: 3.0.6
zod: 3.25.76
zod-to-json-schema: 3.24.6(zod@3.25.76)
'@ai-sdk/provider-utils@3.0.1(zod@3.25.1)':
dependencies:
'@ai-sdk/provider': 2.0.0
@ -20689,7 +20706,7 @@ snapshots:
eventsource@3.0.7:
dependencies:
eventsource-parser: 3.0.3
eventsource-parser: 3.0.6
evp_bytestokey@1.0.3:
dependencies: