Add searchable values sync functionality

- Introduced the `processSyncJob` function to handle the complete workflow for syncing searchable values, including fetching credentials, querying distinct values, deduplication, generating embeddings, and upserting to Turbopuffer.
- Added support for the `@buster/search` package in the project configuration.
- Updated dependencies to version 4.0.2 for `@trigger.dev/build`, `@trigger.dev/core`, and `@trigger.dev/sdk`.
- Enhanced logging for better traceability during the sync process.
This commit is contained in:
dal 2025-09-02 12:57:59 -06:00
parent 5f73d2b14b
commit 836a2b0787
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
4 changed files with 1007 additions and 50 deletions

View File

@ -25,6 +25,7 @@
"@buster/ai": "workspace:*",
"@buster/data-source": "workspace:^",
"@buster/database": "workspace:*",
"@buster/search": "workspace:*",
"@buster/server-shared": "workspace:*",
"@buster/slack": "workspace:*",
"@buster/test-utils": "workspace:*",

View File

@ -0,0 +1,670 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { processSyncJob } from './process-sync-job';
import type { SyncJobPayload } from './types';
// Extract the run function from the task
const runTask = (processSyncJob as any).run;
// Mock all external dependencies
vi.mock('@buster/ai', () => ({
generateSearchableValueEmbeddings: vi.fn(),
}));
vi.mock('@buster/data-source', () => ({
createAdapter: vi.fn(),
}));
vi.mock('@buster/database', () => ({
getDataSourceCredentials: vi.fn(),
markSyncJobCompleted: vi.fn(),
markSyncJobFailed: vi.fn(),
}));
vi.mock('@buster/search', () => ({
checkNamespaceExists: vi.fn(),
createNamespaceIfNotExists: vi.fn(),
deduplicateValues: vi.fn(),
generateNamespace: vi.fn(),
queryExistingKeys: vi.fn(),
upsertSearchableValues: vi.fn(),
}));
vi.mock('@trigger.dev/sdk', () => ({
logger: {
info: vi.fn(),
error: vi.fn(),
},
schemaTask: vi.fn((config) => ({
...config,
run: config.run,
})),
}));
// Import mocked modules
import { generateSearchableValueEmbeddings } from '@buster/ai';
import { createAdapter } from '@buster/data-source';
import {
getDataSourceCredentials,
markSyncJobCompleted,
markSyncJobFailed,
} from '@buster/database';
import {
checkNamespaceExists,
createNamespaceIfNotExists,
deduplicateValues,
generateNamespace,
queryExistingKeys,
upsertSearchableValues,
} from '@buster/search';
describe('processSyncJob', () => {
const mockPayload: SyncJobPayload = {
jobId: 'test-job-123',
dataSourceId: 'ds-456',
databaseName: 'test_db',
schemaName: 'public',
tableName: 'customers',
columnName: 'company_name',
maxValues: 1000,
};
const mockAdapter = {
testConnection: vi.fn(),
close: vi.fn(),
query: vi.fn(),
initialize: vi.fn(),
executeBulk: vi.fn(),
getMetadata: vi.fn(),
isConnected: vi.fn(),
};
beforeEach(() => {
vi.clearAllMocks();
// Reset all mock implementations to default behavior
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
vi.mocked(markSyncJobCompleted).mockResolvedValue({
id: 'test-job-123',
status: 'completed',
updatedAt: new Date().toISOString(),
lastSyncedAt: new Date().toISOString(),
errorMessage: null,
});
vi.mocked(markSyncJobFailed).mockResolvedValue({
id: 'test-job-123',
status: 'failed',
updatedAt: new Date().toISOString(),
lastSyncedAt: null,
errorMessage: 'Error message',
});
mockAdapter.testConnection.mockResolvedValue(undefined);
mockAdapter.close.mockResolvedValue(undefined);
mockAdapter.query.mockResolvedValue({ rows: [], fields: [] });
});
afterEach(() => {
vi.clearAllMocks();
});
describe('successful sync workflow', () => {
it('should complete full sync workflow with new values', async () => {
// Setup mocks for successful flow
const mockCredentials = { type: 'postgresql', host: 'localhost' };
const mockDistinctValues = ['Apple Inc.', 'Google LLC', 'Microsoft Corp'];
const mockExistingKeys = ['test_db:public:customers:company_name:Apple Inc.'];
const mockEmbeddings = [
[0.1, 0.2, 0.3], // Google LLC embedding
[0.4, 0.5, 0.6], // Microsoft Corp embedding
];
vi.mocked(getDataSourceCredentials).mockResolvedValue(mockCredentials);
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.query.mockResolvedValue({
rows: mockDistinctValues.map((v) => ({ value: v })),
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds_ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue(mockExistingKeys);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [
{
database: 'test_db',
schema: 'public',
table: 'customers',
column: 'company_name',
value: 'Google LLC',
},
{
database: 'test_db',
schema: 'public',
table: 'customers',
column: 'company_name',
value: 'Microsoft Corp',
},
],
existingCount: 1,
newCount: 2,
});
vi.mocked(generateSearchableValueEmbeddings).mockResolvedValue(mockEmbeddings);
vi.mocked(upsertSearchableValues).mockResolvedValue({
namespace: 'ds_ds-456',
upserted: 2,
errors: [],
});
// Execute the task
const result = await runTask(mockPayload);
// Verify result
expect(result).toEqual({
jobId: 'test-job-123',
success: true,
processedCount: 3,
existingCount: 1,
newCount: 2,
duration: expect.any(Number),
});
// Verify workflow steps were called correctly
expect(getDataSourceCredentials).toHaveBeenCalledWith({
dataSourceId: 'ds-456',
});
expect(createAdapter).toHaveBeenCalledWith(mockCredentials);
expect(mockAdapter.testConnection).toHaveBeenCalled();
expect(mockAdapter.query).toHaveBeenCalledWith(
expect.stringContaining('SELECT DISTINCT "company_name"')
);
expect(checkNamespaceExists).toHaveBeenCalledWith('ds_ds-456');
expect(queryExistingKeys).toHaveBeenCalledWith({
dataSourceId: 'ds-456',
query: {
database: 'test_db',
schema: 'public',
table: 'customers',
column: 'company_name',
},
});
expect(deduplicateValues).toHaveBeenCalledWith({
existingKeys: mockExistingKeys,
newValues: expect.arrayContaining([
expect.objectContaining({ value: 'Apple Inc.' }),
expect.objectContaining({ value: 'Google LLC' }),
expect.objectContaining({ value: 'Microsoft Corp' }),
]),
});
expect(generateSearchableValueEmbeddings).toHaveBeenCalledWith([
'Google LLC',
'Microsoft Corp',
]);
expect(upsertSearchableValues).toHaveBeenCalledWith({
dataSourceId: 'ds-456',
values: expect.arrayContaining([
expect.objectContaining({
value: 'Google LLC',
embedding: [0.1, 0.2, 0.3],
}),
expect.objectContaining({
value: 'Microsoft Corp',
embedding: [0.4, 0.5, 0.6],
}),
]),
});
expect(markSyncJobCompleted).toHaveBeenCalledWith(
'test-job-123',
expect.objectContaining({
processedCount: 3,
existingCount: 1,
newCount: 2,
})
);
expect(mockAdapter.close).toHaveBeenCalled();
});
it('should handle case when no values need syncing (empty column)', async () => {
// Setup mocks for empty column
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.query.mockResolvedValue({
rows: [],
fields: [],
});
// Execute the task
const result = await runTask(mockPayload);
// Verify result
expect(result).toEqual({
jobId: 'test-job-123',
success: true,
processedCount: 0,
existingCount: 0,
newCount: 0,
duration: expect.any(Number),
});
// Verify early exit
expect(markSyncJobCompleted).toHaveBeenCalledWith(
'test-job-123',
expect.objectContaining({
processedCount: 0,
existingCount: 0,
newCount: 0,
})
);
expect(queryExistingKeys).not.toHaveBeenCalled();
expect(generateSearchableValueEmbeddings).not.toHaveBeenCalled();
expect(mockAdapter.close).toHaveBeenCalled();
});
it('should handle case when all values already exist', async () => {
// Setup mocks for all existing values
const mockDistinctValues = ['Apple Inc.', 'Google LLC'];
const mockExistingKeys = [
'test_db:public:customers:company_name:Apple Inc.',
'test_db:public:customers:company_name:Google LLC',
];
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.query.mockResolvedValue({
rows: mockDistinctValues.map((v) => ({ value: v })),
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds_ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue(mockExistingKeys);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [],
existingCount: 2,
newCount: 0,
});
// Execute the task
const result = await runTask(mockPayload);
// Verify result
expect(result).toEqual({
jobId: 'test-job-123',
success: true,
processedCount: 2,
existingCount: 2,
newCount: 0,
duration: expect.any(Number),
});
// Verify no embeddings or upserts were needed
expect(generateSearchableValueEmbeddings).not.toHaveBeenCalled();
expect(upsertSearchableValues).not.toHaveBeenCalled();
expect(markSyncJobCompleted).toHaveBeenCalledWith(
'test-job-123',
expect.objectContaining({
processedCount: 2,
existingCount: 2,
newCount: 0,
})
);
expect(mockAdapter.close).toHaveBeenCalled();
});
it('should create namespace if it does not exist', async () => {
// Setup mocks with non-existing namespace
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.query.mockResolvedValue({
rows: [{ value: 'Test Company' }],
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds_ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(false); // Namespace doesn't exist
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [
{
database: 'test_db',
schema: 'public',
table: 'customers',
column: 'company_name',
value: 'Test Company',
},
],
existingCount: 0,
newCount: 1,
});
vi.mocked(generateSearchableValueEmbeddings).mockResolvedValue([[0.1, 0.2, 0.3]]);
vi.mocked(upsertSearchableValues).mockResolvedValue({
namespace: 'ds_ds-456',
upserted: 1,
errors: [],
});
// Execute the task
const result = await runTask(mockPayload);
// Verify namespace creation
expect(checkNamespaceExists).toHaveBeenCalledWith('ds_ds-456');
expect(createNamespaceIfNotExists).toHaveBeenCalledWith('ds-456');
expect(result.success).toBe(true);
});
});
describe('error handling', () => {
it('should handle credential fetch errors', async () => {
// Setup mock to fail credential fetch
vi.mocked(getDataSourceCredentials).mockRejectedValue(
new Error('Failed to fetch credentials from vault')
);
// Execute the task
const result = await runTask(mockPayload);
// Verify error handling
expect(result).toEqual({
jobId: 'test-job-123',
success: false,
error: 'Failed to fetch credentials from vault',
});
expect(markSyncJobFailed).toHaveBeenCalledWith(
'test-job-123',
'Failed to fetch credentials from vault'
);
});
it('should handle database connection errors', async () => {
// Setup mock to fail connection
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.testConnection.mockRejectedValue(new Error('Connection timeout'));
// Execute the task
const result = await runTask(mockPayload);
// Verify error handling
expect(result).toEqual({
jobId: 'test-job-123',
success: false,
error: 'Connection timeout',
});
expect(markSyncJobFailed).toHaveBeenCalledWith('test-job-123', 'Connection timeout');
expect(mockAdapter.close).toHaveBeenCalled(); // Cleanup attempted
});
it('should handle query execution errors', async () => {
// Setup mock to fail query
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.query.mockRejectedValue(new Error('Table not found'));
// Execute the task
const result = await runTask(mockPayload);
// Verify error handling
expect(result).toEqual({
jobId: 'test-job-123',
success: false,
error: expect.stringContaining('Failed to query distinct values'),
});
expect(markSyncJobFailed).toHaveBeenCalled();
expect(mockAdapter.close).toHaveBeenCalled();
});
it('should handle embedding generation errors', async () => {
// Setup mocks up to embedding generation
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.query.mockResolvedValue({
rows: [{ value: 'Test Company' }],
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds_ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [
{
database: 'test_db',
schema: 'public',
table: 'customers',
column: 'company_name',
value: 'Test Company',
},
],
existingCount: 0,
newCount: 1,
});
vi.mocked(generateSearchableValueEmbeddings).mockRejectedValue(
new Error('OpenAI API rate limit exceeded')
);
// Execute the task
const result = await runTask(mockPayload);
// Verify error handling
expect(result).toEqual({
jobId: 'test-job-123',
success: false,
error: 'OpenAI API rate limit exceeded',
});
expect(markSyncJobFailed).toHaveBeenCalledWith(
'test-job-123',
'OpenAI API rate limit exceeded'
);
expect(mockAdapter.close).toHaveBeenCalled();
});
it('should handle Turbopuffer upsert errors', async () => {
// Setup mocks up to upsert
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.query.mockResolvedValue({
rows: [{ value: 'Test Company' }],
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds_ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [
{
database: 'test_db',
schema: 'public',
table: 'customers',
column: 'company_name',
value: 'Test Company',
},
],
existingCount: 0,
newCount: 1,
});
vi.mocked(generateSearchableValueEmbeddings).mockResolvedValue([[0.1, 0.2, 0.3]]);
vi.mocked(upsertSearchableValues).mockRejectedValue(new Error('Turbopuffer API error'));
// Execute the task
const result = await runTask(mockPayload);
// Verify error handling
expect(result).toEqual({
jobId: 'test-job-123',
success: false,
error: 'Turbopuffer API error',
});
expect(markSyncJobFailed).toHaveBeenCalledWith('test-job-123', 'Turbopuffer API error');
expect(mockAdapter.close).toHaveBeenCalled();
});
it('should handle disconnect errors gracefully', async () => {
// Setup mocks with disconnect failure
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.query.mockResolvedValue({
rows: [],
fields: [],
});
mockAdapter.close.mockRejectedValue(new Error('Failed to close connection'));
// Execute the task
const result = await runTask(mockPayload);
// Should still return success despite disconnect error
expect(result.success).toBe(true);
expect(mockAdapter.close).toHaveBeenCalled();
});
});
describe('edge cases', () => {
it('should handle values with special characters', async () => {
// Setup mocks with special characters
const specialValues = ["O'Reilly Media", 'AT&T', '"Quoted" Company', 'Line\nBreak Corp'];
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.query.mockResolvedValue({
rows: specialValues.map((v) => ({ value: v })),
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds_ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: specialValues.map((value) => ({
database: 'test_db',
schema: 'public',
table: 'customers',
column: 'company_name',
value,
})),
existingCount: 0,
newCount: specialValues.length,
});
vi.mocked(generateSearchableValueEmbeddings).mockResolvedValue(
specialValues.map(() => [0.1, 0.2, 0.3])
);
vi.mocked(upsertSearchableValues).mockResolvedValue({
namespace: 'ds_ds-456',
upserted: specialValues.length,
errors: [],
});
// Execute the task
const result = await runTask(mockPayload);
// Verify handling of special characters
expect(result.success).toBe(true);
expect(result.newCount).toBe(specialValues.length);
expect(generateSearchableValueEmbeddings).toHaveBeenCalledWith(specialValues);
});
it('should filter out null and empty values from query results', async () => {
// Setup mocks with mixed valid/invalid values
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.query.mockResolvedValue({
rows: [
{ value: 'Valid Company' },
{ value: null },
{ value: '' },
{ value: ' ' }, // Only whitespace
{ value: 'Another Valid' },
],
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds_ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [
{
database: 'test_db',
schema: 'public',
table: 'customers',
column: 'company_name',
value: 'Valid Company',
},
{
database: 'test_db',
schema: 'public',
table: 'customers',
column: 'company_name',
value: 'Another Valid',
},
],
existingCount: 0,
newCount: 2,
});
vi.mocked(generateSearchableValueEmbeddings).mockResolvedValue([
[0.1, 0.2, 0.3],
[0.4, 0.5, 0.6],
]);
vi.mocked(upsertSearchableValues).mockResolvedValue({
namespace: 'ds_ds-456',
upserted: 2,
errors: [],
});
// Execute the task
const result = await runTask(mockPayload);
// Verify only valid values were processed
expect(result.success).toBe(true);
expect(result.processedCount).toBe(2); // Only valid values
expect(deduplicateValues).toHaveBeenCalledWith({
existingKeys: [],
newValues: expect.arrayContaining([
expect.objectContaining({ value: 'Valid Company' }),
expect.objectContaining({ value: 'Another Valid' }),
]),
});
});
it('should respect maxValues limit', async () => {
// Setup payload with small limit
const limitedPayload = { ...mockPayload, maxValues: 2 };
// Setup mocks
vi.mocked(getDataSourceCredentials).mockResolvedValue({ type: 'postgresql' });
vi.mocked(createAdapter).mockResolvedValue(mockAdapter as any);
mockAdapter.query.mockResolvedValue({
rows: [{ value: 'Company 1' }, { value: 'Company 2' }],
fields: [],
});
vi.mocked(generateNamespace).mockReturnValue('ds_ds-456');
vi.mocked(checkNamespaceExists).mockResolvedValue(true);
vi.mocked(queryExistingKeys).mockResolvedValue([]);
vi.mocked(deduplicateValues).mockResolvedValue({
newValues: [
{
database: 'test_db',
schema: 'public',
table: 'customers',
column: 'company_name',
value: 'Company 1',
},
{
database: 'test_db',
schema: 'public',
table: 'customers',
column: 'company_name',
value: 'Company 2',
},
],
existingCount: 0,
newCount: 2,
});
vi.mocked(generateSearchableValueEmbeddings).mockResolvedValue([
[0.1, 0.2, 0.3],
[0.4, 0.5, 0.6],
]);
vi.mocked(upsertSearchableValues).mockResolvedValue({
namespace: 'ds_ds-456',
upserted: 2,
errors: [],
});
// Execute the task
const result = await runTask(limitedPayload);
// Verify limit was respected in query
expect(result.success).toBe(true);
expect(mockAdapter.query).toHaveBeenCalledWith(expect.stringContaining('LIMIT 2'));
});
});
});

View File

@ -1,21 +1,33 @@
import { markSyncJobCompleted } from '@buster/database';
import { generateSearchableValueEmbeddings } from '@buster/ai';
import { type DatabaseAdapter, createAdapter } from '@buster/data-source';
import {
getDataSourceCredentials,
markSyncJobCompleted,
markSyncJobFailed,
} from '@buster/database';
import {
type SearchableValue,
checkNamespaceExists,
createNamespaceIfNotExists,
deduplicateValues,
generateNamespace,
queryExistingKeys,
upsertSearchableValues,
} from '@buster/search';
import { logger, schemaTask } from '@trigger.dev/sdk';
import { type SyncJobPayload, SyncJobPayloadSchema, type SyncJobResult } from './types';
/**
* Task to process an individual searchable values sync job
*
* This is a STUB implementation for Ticket 7.
* The full implementation will be added in Ticket 8.
*
* In Ticket 8, this task will:
* 1. Connect to the data source using the provided credentials
* 2. Query distinct values from the specified column
* 3. Store the values in the searchable values cache
* 4. Update the sync job status with results
*
* For now, it simply logs and returns success to allow the daily
* cron job to be tested end-to-end.
* This task orchestrates the complete sync workflow:
* 1. Fetches credentials and connects to the data source
* 2. Queries distinct values from the specified column
* 3. Queries existing values from Turbopuffer
* 4. Deduplicates to find new values
* 5. Generates embeddings for new values
* 6. Upserts values with embeddings to Turbopuffer
* 7. Updates sync job and column metadata
*/
export const processSyncJob: ReturnType<
typeof schemaTask<'process-sync-job', typeof SyncJobPayloadSchema, SyncJobResult>
@ -32,7 +44,7 @@ export const processSyncJob: ReturnType<
run: async (payload): Promise<SyncJobResult> => {
const startTime = Date.now();
logger.info('Processing sync job (STUB)', {
logger.info('Starting sync job processing', {
jobId: payload.jobId,
dataSourceId: payload.dataSourceId,
column: {
@ -41,33 +53,211 @@ export const processSyncJob: ReturnType<
table: payload.tableName,
column: payload.columnName,
},
maxValues: payload.maxValues,
});
let adapter: DatabaseAdapter | null = null;
try {
// TODO (Ticket 8): Implement actual sync logic
// 1. Get data source credentials
// 2. Connect to data source
// 3. Query distinct values with limit
// 4. Store values in cache
// 5. Update column metadata
// Step 1: Get data source credentials
logger.info('Fetching data source credentials', { dataSourceId: payload.dataSourceId });
const credentials = await getDataSourceCredentials({
dataSourceId: payload.dataSourceId,
});
// Simulate some processing time
await new Promise((resolve) => setTimeout(resolve, 100));
// Step 2: Create and connect adapter
logger.info('Creating database adapter', { dataSourceId: payload.dataSourceId });
// @ts-expect-error - credentials type is flexible across different database types
adapter = await createAdapter(credentials);
await adapter.testConnection();
// For now, just mark the job as completed with stub data
// Step 3: Query distinct values from the source column
logger.info('Querying distinct values from source', {
table: `${payload.databaseName}.${payload.schemaName}.${payload.tableName}`,
column: payload.columnName,
limit: payload.maxValues,
});
const distinctValues = await queryDistinctColumnValues({
adapter,
databaseName: payload.databaseName,
schemaName: payload.schemaName,
tableName: payload.tableName,
columnName: payload.columnName,
limit: payload.maxValues,
});
logger.info('Retrieved distinct values', {
jobId: payload.jobId,
totalValues: distinctValues.length,
});
if (distinctValues.length === 0) {
// No values to sync
const result: SyncJobResult = {
jobId: payload.jobId,
success: true,
processedCount: 0,
existingCount: 0,
newCount: 0,
duration: Date.now() - startTime,
};
await markSyncJobCompleted(payload.jobId, {
processedCount: 0,
existingCount: 0,
newCount: 0,
duration: result.duration || 0,
});
logger.info('No values to sync', { jobId: payload.jobId });
return result;
}
// Step 4: Ensure Turbopuffer namespace exists
const namespace = generateNamespace(payload.dataSourceId);
const namespaceExists = await checkNamespaceExists(namespace);
if (!namespaceExists) {
logger.info('Creating Turbopuffer namespace', { namespace });
await createNamespaceIfNotExists(payload.dataSourceId);
}
// Step 5: Query existing keys from Turbopuffer
logger.info('Querying existing values from Turbopuffer', {
namespace,
database: payload.databaseName,
schema: payload.schemaName,
table: payload.tableName,
column: payload.columnName,
});
const existingKeys = await queryExistingKeys({
dataSourceId: payload.dataSourceId,
query: {
database: payload.databaseName,
schema: payload.schemaName,
table: payload.tableName,
column: payload.columnName,
},
});
logger.info('Retrieved existing keys', {
jobId: payload.jobId,
existingCount: existingKeys.length,
});
// Step 6: Prepare searchable values for deduplication
const searchableValues: SearchableValue[] = distinctValues.map((value) => ({
database: payload.databaseName,
schema: payload.schemaName,
table: payload.tableName,
column: payload.columnName,
value,
}));
// Step 7: Deduplicate using DuckDB
logger.info('Deduplicating values', {
jobId: payload.jobId,
totalValues: searchableValues.length,
existingKeys: existingKeys.length,
});
const deduplicationResult = await deduplicateValues({
existingKeys,
newValues: searchableValues,
});
logger.info('Deduplication complete', {
jobId: payload.jobId,
newCount: deduplicationResult.newCount,
existingCount: deduplicationResult.existingCount,
});
if (deduplicationResult.newCount === 0) {
// All values already exist
const result: SyncJobResult = {
jobId: payload.jobId,
success: true,
processedCount: searchableValues.length,
existingCount: deduplicationResult.existingCount,
newCount: 0,
duration: Date.now() - startTime,
};
await markSyncJobCompleted(payload.jobId, {
processedCount: searchableValues.length,
existingCount: deduplicationResult.existingCount,
newCount: 0,
duration: result.duration || 0,
});
// TODO: Update column sync metadata when we have column ID
// await updateColumnSyncMetadata(columnId, {
// status: 'success',
// count: searchableValues.length,
// lastSynced: new Date().toISOString(),
// });
logger.info('All values already exist in Turbopuffer', { jobId: payload.jobId });
return result;
}
// Step 8: Generate embeddings for new values
logger.info('Generating embeddings for new values', {
jobId: payload.jobId,
newCount: deduplicationResult.newCount,
});
const newValueTexts = deduplicationResult.newValues.map((v) => v.value);
const embeddings = await generateSearchableValueEmbeddings(newValueTexts);
// Step 9: Combine values with embeddings
const valuesWithEmbeddings: SearchableValue[] = deduplicationResult.newValues.map(
(value, index) => ({
...value,
embedding: embeddings[index],
synced_at: new Date().toISOString(),
})
);
// Step 10: Upsert to Turbopuffer
logger.info('Upserting values to Turbopuffer', {
jobId: payload.jobId,
count: valuesWithEmbeddings.length,
});
const upsertResult = await upsertSearchableValues({
dataSourceId: payload.dataSourceId,
values: valuesWithEmbeddings,
});
logger.info('Upsert complete', {
jobId: payload.jobId,
upserted: upsertResult.upserted,
errors: upsertResult.errors,
});
// Step 11: Update sync job status
const metadata = {
processedCount: 100, // Stub value
existingCount: 20, // Stub value
newCount: 80, // Stub value
processedCount: searchableValues.length,
existingCount: deduplicationResult.existingCount,
newCount: deduplicationResult.newCount,
duration: Date.now() - startTime,
syncedAt: new Date().toISOString(),
};
await markSyncJobCompleted(payload.jobId, metadata);
logger.info('Sync job completed (STUB)', {
// TODO: Update column sync metadata when we have column ID
// await updateColumnSyncMetadata(columnId, {
// status: 'success',
// count: searchableValues.length,
// lastSynced: new Date().toISOString(),
// });
logger.info('Sync job completed successfully', {
jobId: payload.jobId,
processedCount: metadata.processedCount,
newCount: metadata.newCount,
duration: metadata.duration,
});
@ -82,26 +272,119 @@ export const processSyncJob: ReturnType<
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
logger.error('Sync job failed (STUB)', {
logger.error('Sync job failed', {
jobId: payload.jobId,
error: errorMessage,
stack: error instanceof Error ? error.stack : undefined,
});
// Mark job as failed
await markSyncJobFailed(payload.jobId, errorMessage);
// TODO: Update column sync metadata with error when we have column ID
// await updateColumnSyncMetadata(columnId, {
// status: 'failed',
// error: errorMessage,
// }).catch((updateError) => {
// logger.error('Failed to update column metadata', {
// jobId: payload.jobId,
// error: updateError instanceof Error ? updateError.message : 'Unknown error',
// });
// });
return {
jobId: payload.jobId,
success: false,
error: errorMessage,
};
} finally {
// Clean up database connection
if (adapter) {
try {
await adapter.close();
logger.info('Database connection closed', { jobId: payload.jobId });
} catch (disconnectError) {
logger.error('Failed to disconnect database adapter', {
jobId: payload.jobId,
error: disconnectError instanceof Error ? disconnectError.message : 'Unknown error',
});
}
}
}
},
});
// ============================================================================
// HELPER FUNCTIONS
// ============================================================================
/**
* TODO (Ticket 8): Helper functions to be implemented
*
* - connectToDataSource(dataSourceId: string): Promise<DataSourceConnection>
* - queryDistinctValues(connection, column, limit): Promise<string[]>
* - storeSearchableValues(values, column): Promise<StorageResult>
* - updateColumnMetadata(columnId, metadata): Promise<void>
* - cleanupConnection(connection): Promise<void>
* Query distinct values from a specific column in the data source
* Handles different database types and returns unique text values
*/
async function queryDistinctColumnValues({
adapter,
databaseName,
schemaName,
tableName,
columnName,
limit,
}: {
adapter: DatabaseAdapter;
databaseName: string;
schemaName: string;
tableName: string;
columnName: string;
limit: number;
}): Promise<string[]> {
// Build the fully qualified table name
const fullyQualifiedTable = `${databaseName}.${schemaName}.${tableName}`;
// Build the query to get distinct non-null values
// Using parameterized identifiers for safety
const query = `
SELECT DISTINCT "${columnName}" AS value
FROM ${fullyQualifiedTable}
WHERE "${columnName}" IS NOT NULL
AND TRIM("${columnName}") != ''
ORDER BY "${columnName}"
LIMIT ${limit}
`;
logger.info('Executing distinct values query', {
table: fullyQualifiedTable,
column: columnName,
limit,
});
try {
const result = await adapter.query(query);
// Extract values from the result
const values: string[] = [];
for (const row of result.rows) {
const value = row.value;
if (
value !== null &&
value !== undefined &&
typeof value === 'string' &&
value.trim() !== ''
) {
values.push(value);
}
}
return values;
} catch (error) {
logger.error('Failed to query distinct values', {
table: fullyQualifiedTable,
column: columnName,
error: error instanceof Error ? error.message : 'Unknown error',
});
throw new Error(
`Failed to query distinct values from ${fullyQualifiedTable}.${columnName}: ${
error instanceof Error ? error.message : 'Unknown error'
}`
);
}
}

View File

@ -263,6 +263,9 @@ importers:
'@buster/database':
specifier: workspace:*
version: link:../../packages/database
'@buster/search':
specifier: workspace:*
version: link:../../packages/search
'@buster/server-shared':
specifier: workspace:*
version: link:../../packages/server-shared
@ -282,8 +285,8 @@ importers:
specifier: workspace:*
version: link:../../packages/web-tools
'@trigger.dev/sdk':
specifier: 4.0.1
version: 4.0.1(ai@5.0.5(zod@3.25.76))(zod@3.25.76)
specifier: 4.0.2
version: 4.0.2(ai@5.0.5(zod@3.25.76))(zod@3.25.76)
ai:
specifier: 'catalog:'
version: 5.0.5(zod@3.25.76)
@ -301,8 +304,8 @@ importers:
version: 3.25.76
devDependencies:
'@trigger.dev/build':
specifier: 4.0.1
version: 4.0.1(typescript@5.9.2)
specifier: 4.0.2
version: 4.0.2(typescript@5.9.2)
apps/web:
dependencies:
@ -5426,16 +5429,16 @@ packages:
resolution: {integrity: sha512-XCuKFP5PS55gnMVu3dty8KPatLqUoy/ZYzDzAGCQ8JNFCkLXzmI7vNHCR+XpbZaMWQK/vQubr7PkYq8g470J/A==}
engines: {node: '>= 10'}
'@trigger.dev/build@4.0.1':
resolution: {integrity: sha512-PGOnCPjVSKkj72xmJb6mdRbzDSP3Ti/C5/tfaBFdSZ7qcoVctSzDfS5iwEGsSoSWSIv+MVy12c4v7Ji/r7MO1A==}
'@trigger.dev/build@4.0.2':
resolution: {integrity: sha512-GOqjGIUXWEEIfWqY2o+xr//4KTHYoRQIML8cCoP/L8x1wPb45qFJWTwNaECgYp9i+9vPMI4/G3Jm/jvWp9xznQ==}
engines: {node: '>=18.20.0'}
'@trigger.dev/core@4.0.0':
resolution: {integrity: sha512-VlRMN6RPeqU66e/j0fGmWTn97DY1b+ChsMDDBm62jZ3N9XtiOlDkrWNtggPoxPtyXsHuShllo/3gpiZDvhtKww==}
engines: {node: '>=18.20.0'}
'@trigger.dev/core@4.0.1':
resolution: {integrity: sha512-NTffiVPy/zFopujdptGGoy3lj3/CKV16JA8CobCfsEpDfu+K+wEys+9p8PFY8j5I0UI86aqlFpJu9/VRqUQ/yQ==}
'@trigger.dev/core@4.0.2':
resolution: {integrity: sha512-hc/alfT7iVdJNZ5YSMbGR9FirLjURqdZ7tCBX4btKas0GDg6M5onwcQsJ3oom5TDp/Nrt+dHaviNMhFxhKCu3g==}
engines: {node: '>=18.20.0'}
'@trigger.dev/sdk@4.0.0':
@ -5448,8 +5451,8 @@ packages:
ai:
optional: true
'@trigger.dev/sdk@4.0.1':
resolution: {integrity: sha512-cdEgrwIl2Kg2jd85dA4tdePPPe+iMjAGX0Q8QrO2CNo/iBcjl7jB7uzvmSjDKYmJoC+8a30fCWviYy6ljOs1oQ==}
'@trigger.dev/sdk@4.0.2':
resolution: {integrity: sha512-ulhWJRSHPXOHz0bMvkhAKThkW63x7lnjAb87LPi6dUps1YwwoOL8Nkr15xLXa73UrldPFT+9Y/GvQ9qpzU478w==}
engines: {node: '>=18.20.0'}
peerDependencies:
ai: ^4.2.0 || ^5.0.0
@ -18001,9 +18004,9 @@ snapshots:
'@tootallnate/once@2.0.0': {}
'@trigger.dev/build@4.0.1(typescript@5.9.2)':
'@trigger.dev/build@4.0.2(typescript@5.9.2)':
dependencies:
'@trigger.dev/core': 4.0.1
'@trigger.dev/core': 4.0.2
pkg-types: 1.3.1
tinyglobby: 0.2.14
tsconfck: 3.1.3(typescript@5.9.2)
@ -18053,7 +18056,7 @@ snapshots:
- supports-color
- utf-8-validate
'@trigger.dev/core@4.0.1':
'@trigger.dev/core@4.0.2':
dependencies:
'@bugsnag/cuid': 3.2.1
'@electric-sql/client': 1.0.0-beta.1
@ -18114,11 +18117,11 @@ snapshots:
- supports-color
- utf-8-validate
'@trigger.dev/sdk@4.0.1(ai@5.0.5(zod@3.25.76))(zod@3.25.76)':
'@trigger.dev/sdk@4.0.2(ai@5.0.5(zod@3.25.76))(zod@3.25.76)':
dependencies:
'@opentelemetry/api': 1.9.0
'@opentelemetry/semantic-conventions': 1.36.0
'@trigger.dev/core': 4.0.1
'@trigger.dev/core': 4.0.2
chalk: 5.4.1
cronstrue: 2.59.0
debug: 4.4.1