From 83e70764792e61ece6b54cac434658cf55072ec3 Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 26 Sep 2025 13:29:24 -0600 Subject: [PATCH 1/3] report tooling --- .../create-reports-execute.test.ts | 2 +- .../create-reports-execute.ts | 4 +- .../modify-reports-delta.ts | 266 +++++++++--------- .../modify-reports-execute.test.ts | 26 +- .../modify-reports-execute.ts | 18 +- .../modify-reports-finish.ts | 10 + .../modify-reports-tool.ts | 3 +- .../queries/reports/batch-update-report.ts | 62 +--- 8 files changed, 177 insertions(+), 214 deletions(-) diff --git a/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.test.ts b/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.test.ts index 44dd56a9b..45de0e3f1 100644 --- a/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.test.ts +++ b/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.test.ts @@ -9,7 +9,7 @@ import type { // Mock dependencies vi.mock('@buster/database/queries', () => ({ updateMessageEntries: vi.fn().mockResolvedValue({ success: true }), - batchUpdateReport: vi.fn().mockResolvedValue({ success: true }), + updateReportWithVersion: vi.fn().mockResolvedValue(undefined), updateMetricsToReports: vi.fn().mockResolvedValue({ created: 0, updated: 0, deleted: 0 }), })); diff --git a/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.ts b/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.ts index 278b2198b..6336605c8 100644 --- a/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.ts +++ b/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.ts @@ -1,7 +1,7 @@ import { - batchUpdateReport, updateMessageEntries, updateMetricsToReports, + updateReportWithVersion, } from '@buster/database/queries'; import type { ChatMessageResponseMessage } from '@buster/server-shared/chats'; import { wrapTraced } from 'braintrust'; @@ -146,7 +146,7 @@ export function createCreateReportsExecute( }; // Update the report with complete content from input (source of truth) - await batchUpdateReport({ + await updateReportWithVersion({ reportId, content, name, diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts index 58cf24d0a..9022333c3 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts @@ -1,5 +1,5 @@ import { db } from '@buster/database/connection'; -import { batchUpdateReport, updateMessageEntries } from '@buster/database/queries'; +import { updateMessageEntries, updateReportWithVersion } from '@buster/database/queries'; import { reportFiles } from '@buster/database/schema'; import type { ChatMessageResponseMessage } from '@buster/server-shared/chats'; import type { ToolCallOptions } from 'ai'; @@ -37,11 +37,6 @@ const TOOL_KEYS = { export function createModifyReportsDelta(context: ModifyReportsContext, state: ModifyReportsState) { return async (options: { inputTextDelta: string } & ToolCallOptions) => { - // Skip delta updates if already complete (same pattern as sequential thinking) - if (state.isComplete) { - return; - } - // Handle string deltas (accumulate JSON text) state.argsText = (state.argsText || '') + options.inputTextDelta; @@ -165,8 +160,8 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M // If we have a snapshot and report ID, update the name in the database if (state.snapshotContent !== undefined && state.reportId) { try { - // We need to provide content for batchUpdateReport, so use the snapshot - await batchUpdateReport({ + // We need to provide content for updateReportWithVersion, so use the snapshot + await updateReportWithVersion({ reportId: state.reportId, content: state.snapshotContent, name: name, @@ -247,96 +242,87 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M } } - // Apply ALL edits sequentially starting from the immutable snapshot - // This happens on every delta update to ensure we have the complete result + // Queue the entire processing operation to ensure sequential execution if (state.edits && state.edits.length > 0 && state.snapshotContent !== undefined) { - // Start fresh from snapshot every time - let currentContent = state.snapshotContent; - - // Apply each edit in sequence - for (const edit of state.edits) { - // Skip if edit is null/undefined, but allow empty strings for code - if (!edit || edit.code === undefined || edit.code === null) continue; - - const operation = edit.operation; - const codeToReplace = edit.code_to_replace || ''; - const code = edit.code; - - if (operation === 'append') { - currentContent = currentContent + code; - } else if (operation === 'replace' && codeToReplace) { - // Check if the pattern exists - if (!currentContent.includes(codeToReplace)) { - console.warn('[modify-reports-delta] Pattern not found during final apply', { - codeToReplace: codeToReplace.substring(0, 50), - reportId: state.reportId, - }); - edit.status = 'failed'; - edit.error = 'Pattern not found'; - continue; - } - // Apply the replacement - currentContent = currentContent.replace(codeToReplace, code); - } - - edit.status = 'completed'; + // Initialize the promise chain if this is the first processing + if (!state.lastProcessing) { + state.lastProcessing = Promise.resolve(); } - // Only update if content actually changed from what we last saved - if (currentContent !== state.lastSavedContent) { - // Check if we should increment version (not if report was created in current turn) - const incrementVersion = await shouldIncrementVersion( - state.reportId, - context.messageId - ); + // Chain this processing to happen after the previous one completes + state.lastProcessing = state.lastProcessing + .then(async () => { + // Apply ALL edits sequentially starting from the immutable snapshot + // Start fresh from snapshot every time + let currentContent = state.snapshotContent; // We checked it's defined above - // Calculate new version - const currentVersion = state.snapshotVersion || 1; - const newVersion = incrementVersion ? currentVersion + 1 : currentVersion; - state.version_number = newVersion; + // Apply each edit in sequence + for (const edit of state.edits || []) { + // Skip if edit is null/undefined, but allow empty strings for code + if (!edit || edit.code === undefined || edit.code === null) continue; - // Track this modification for this tool invocation - state.reportModifiedInMessage = true; + const operation = edit.operation; + const codeToReplace = edit.code_to_replace || ''; + const code = edit.code; - // Update version history with the final content after all edits - const now = new Date().toISOString(); - const versionHistory = { - ...(state.versionHistory || {}), - [newVersion.toString()]: { - content: currentContent, - updated_at: now, - version_number: newVersion, - }, - }; - - // Update the database with the result of all edits (sequentially chained) - try { - // We're already inside a check for state.reportId being defined - if (!state.reportId) { - throw new Error('Report ID is unexpectedly undefined'); - } - - // Initialize the promise chain if this is the first write - if (!state.lastUpdate) { - state.lastUpdate = Promise.resolve(); - } - - // Chain this write to happen after the previous one completes - state.lastUpdate = state.lastUpdate - .then(async () => { - // Double-check that content has actually changed since last write - // This prevents redundant writes if multiple deltas arrive with same content - if (currentContent === state.lastSavedContent) { - console.info('[modify-reports-delta] Skipping write - content unchanged', { + if (operation === 'append') { + currentContent = (currentContent || '') + code; + } else if (operation === 'replace' && codeToReplace) { + // Check if the pattern exists + if (!currentContent || !currentContent.includes(codeToReplace)) { + console.warn('[modify-reports-delta] Pattern not found during final apply', { + codeToReplace: codeToReplace.substring(0, 50), reportId: state.reportId, }); - return; + edit.status = 'failed'; + edit.error = 'Pattern not found'; + continue; + } + // Apply the replacement + currentContent = (currentContent || '').replace(codeToReplace, code); + } + + edit.status = 'completed'; + } + + // Only update if content actually changed from what we last saved + if (currentContent !== state.lastSavedContent) { + // Check if we should increment version (not if report was created in current turn) + const incrementVersion = await shouldIncrementVersion( + state.reportId || '', + context.messageId + ); + + // Calculate new version + const currentVersion = state.snapshotVersion || 1; + const newVersion = incrementVersion ? currentVersion + 1 : currentVersion; + state.version_number = newVersion; + + // Track this modification for this tool invocation + state.reportModifiedInMessage = true; + + // Update version history with the final content after all edits + const now = new Date().toISOString(); + const versionHistory = { + ...(state.versionHistory || {}), + [newVersion.toString()]: { + content: currentContent || '', + updated_at: now, + version_number: newVersion, + }, + }; + + // Update the database with the result of all edits + try { + // We're already inside a check for state.reportId being defined + if (!state.reportId) { + throw new Error('Report ID is unexpectedly undefined'); } // Perform the database write - await batchUpdateReport({ - reportId: state.reportId as string, // We checked it's defined above - content: currentContent, + await updateReportWithVersion({ + reportId: state.reportId, + content: currentContent || '', name: state.reportName || undefined, versionHistory, }); @@ -345,65 +331,73 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M reportId: state.reportId, version: newVersion, }); - }) - .catch((error) => { - // Log error but don't break the chain - allow subsequent writes to continue - console.error('[modify-reports-delta] Database write failed:', error); - // Don't re-throw - let the chain continue for resilience - }); - // Wait for this specific write to complete before proceeding - // This ensures we don't mark things as saved until the write is done - await state.lastUpdate; + // Keep lastUpdate in sync for backward compatibility (already set via processing chain) - // No cache update during delta - execute will handle write-through + // No cache update during delta - execute will handle write-through - // Update state with the final content (but keep snapshot immutable) - state.finalContent = currentContent; - state.lastSavedContent = currentContent; // Track what we just saved - state.versionHistory = versionHistory; - // DO NOT update state.snapshotContent - it must remain immutable + // Update state with the final content (but keep snapshot immutable) + state.finalContent = currentContent || ''; + state.lastSavedContent = currentContent || ''; // Track what we just saved + state.versionHistory = versionHistory; + // DO NOT update state.snapshotContent - it must remain immutable - // Create response message if not already created - if (!state.responseMessageCreated && context.messageId) { - const responseMessages: ChatMessageResponseMessage[] = [ - { - id: state.reportId, - type: 'file' as const, - file_type: 'report_file' as const, - file_name: state.reportName || 'Untitled Report', - version_number: newVersion, - filter_version_id: null, - metadata: [ + // Create response message if not already created + if (!state.responseMessageCreated && context.messageId) { + const responseMessages: ChatMessageResponseMessage[] = [ { - status: 'completed' as const, - message: 'Report modified successfully', - timestamp: Date.now(), + id: state.reportId, + type: 'file' as const, + file_type: 'report_file' as const, + file_name: state.reportName || 'Untitled Report', + version_number: newVersion, + filter_version_id: null, + metadata: [ + { + status: 'completed' as const, + message: 'Report modified successfully', + timestamp: Date.now(), + }, + ], }, - ], - }, - ]; + ]; - await updateMessageEntries({ - messageId: context.messageId, - responseMessages, - }); + await updateMessageEntries({ + messageId: context.messageId, + responseMessages, + }); - state.responseMessageCreated = true; - console.info('[modify-reports-delta] Created response message during streaming'); - } - } catch (error) { - console.error('[modify-reports-delta] Error updating report content:', error); - if (state.edits) { - state.edits.forEach((edit) => { - if (edit) { - edit.status = 'failed'; - edit.error = error instanceof Error ? error.message : 'Update failed'; + state.responseMessageCreated = true; + console.info( + '[modify-reports-delta] Created response message during streaming' + ); } - }); + } catch (error) { + console.error('[modify-reports-delta] Error updating report content:', error); + if (state.edits) { + state.edits.forEach((edit) => { + if (edit) { + edit.status = 'failed'; + edit.error = error instanceof Error ? error.message : 'Update failed'; + } + }); + } + // Re-throw to be caught by the outer catch + throw error; + } } - } - } + }) + .catch((error) => { + // Log error but don't break the chain - allow subsequent processing to continue + console.error('[modify-reports-delta] Processing failed:', error); + // Don't re-throw - let the chain continue for resilience + }); + + // Clean up the processing entry once this completes + state.lastProcessing.finally(() => { + // Only remove if this is still the current processing + // This cleanup is handled by the execute function + }); } } } diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts index ca2716245..94be90ab6 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts @@ -14,7 +14,7 @@ const mockDbSelect = vi.fn(); vi.mock('@buster/database/queries', () => ({ updateMessageEntries: vi.fn().mockResolvedValue({ success: true }), - batchUpdateReport: vi.fn().mockResolvedValue({ success: true }), + updateReportWithVersion: vi.fn().mockResolvedValue(undefined), updateMetricsToReports: vi.fn().mockResolvedValue({ created: 0, updated: 0, deleted: 0 }), })); vi.mock('@buster/database/schema', () => ({ @@ -422,8 +422,8 @@ Updated content with metrics.`; resolveLastUpdate = resolve; }); - // Set up state with an in-progress lastUpdate - state.lastUpdate = lastUpdatePromise; + // Set up state with an in-progress lastProcessing + state.lastProcessing = lastUpdatePromise; state.snapshotContent = '# Original Report'; mockDbLimit.mockResolvedValue([ @@ -447,26 +447,26 @@ Updated content with metrics.`; const execute = createModifyReportsExecute(context, state); - // Start the execute (it should wait for lastUpdate) + // Start the execute (it should wait for lastProcessing) const executePromise = execute(input); // Give it a moment to ensure it's waiting await new Promise((resolve) => setTimeout(resolve, 10)); - // batchUpdateReport should not have been called yet - const mockBatchUpdateReport = vi.mocked( - await import('@buster/database/queries').then((m) => m.batchUpdateReport) + // updateReportWithVersion should not have been called yet + const mockUpdateReportWithVersion = vi.mocked( + await import('@buster/database/queries').then((m) => m.updateReportWithVersion) ); - expect(mockBatchUpdateReport).not.toHaveBeenCalled(); + expect(mockUpdateReportWithVersion).not.toHaveBeenCalled(); - // Now resolve the lastUpdate promise + // Now resolve the lastProcessing promise resolveLastUpdate!(); // Wait for execute to complete const result = await executePromise; - // Now batchUpdateReport should have been called - expect(mockBatchUpdateReport).toHaveBeenCalled(); + // Now updateReportWithVersion should have been called + expect(mockUpdateReportWithVersion).toHaveBeenCalled(); expect(result.success).toBe(true); expect(result.file.content).toContain('# Final Version'); }); @@ -475,8 +475,8 @@ Updated content with metrics.`; // Create a rejected promise to simulate a failed delta write const lastUpdatePromise = Promise.reject(new Error('Delta write failed')); - // Set up state with a rejected lastUpdate - state.lastUpdate = lastUpdatePromise; + // Set up state with a rejected lastProcessing + state.lastProcessing = lastUpdatePromise; state.snapshotContent = '# Original Report'; mockDbLimit.mockResolvedValue([ diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts index a9a547ff9..2d74bf9ff 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts @@ -1,6 +1,6 @@ import { db } from '@buster/database/connection'; import { updateMessageEntries, updateMetricsToReports } from '@buster/database/queries'; -import { batchUpdateReport } from '@buster/database/queries'; +import { updateReportWithVersion } from '@buster/database/queries'; import { reportFiles } from '@buster/database/schema'; import type { ChatMessageResponseMessage } from '@buster/server-shared/chats'; import { wrapTraced } from 'braintrust'; @@ -178,24 +178,24 @@ async function processEditOperations( // Write all changes to database in one operation try { - // Wait for the last delta write to complete before doing final update - if (state?.lastUpdate) { - console.info('[modify-reports-execute] Waiting for last delta write to complete'); + // Wait for the last delta processing to complete before doing final update + if (state?.lastProcessing) { + console.info('[modify-reports-execute] Waiting for last delta processing to complete'); try { - // Wait for the last write in the chain to complete - await state.lastUpdate; + // Wait for the last processing in the chain to complete + await state.lastProcessing; console.info( - '[modify-reports-execute] Last delta write completed, proceeding with final update' + '[modify-reports-execute] Last delta processing completed, proceeding with final update' ); } catch (error) { console.warn( - '[modify-reports-execute] Error waiting for last delta write, proceeding with final update:', + '[modify-reports-execute] Error waiting for last delta processing, proceeding with final update:', error ); } } - await batchUpdateReport({ + await updateReportWithVersion({ reportId, content: currentContent, name: reportName, diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-finish.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-finish.ts index f74c24c86..e690e3741 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-finish.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-finish.ts @@ -17,6 +17,16 @@ export function createModifyReportsFinish( return async (options: { input: ModifyReportsInput } & ToolCallOptions) => { const input = options.input; + // Wait for all queued delta processing to complete before setting isComplete + if (state.lastProcessing) { + try { + await state.lastProcessing; + console.info('[modify-reports-finish] All delta processing completed'); + } catch (error) { + console.error('[modify-reports-finish] Error waiting for delta processing:', error); + } + } + // Set isComplete to prevent further delta processing (same as sequential thinking) state.isComplete = true; diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts index e0fb3ba77..1788e5e5a 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts @@ -105,7 +105,8 @@ export type ModifyReportsEditState = z.infer & { - lastUpdate?: Promise; // Track the last write promise for sequential chaining + lastUpdate?: Promise; // Track the last write promise for sequential chaining (deprecated, use lastProcessing) + lastProcessing?: Promise; // Track the entire processing chain for proper sequencing }; // Factory function that accepts agent context and maps to tool context diff --git a/packages/database/src/queries/reports/batch-update-report.ts b/packages/database/src/queries/reports/batch-update-report.ts index 5cd2da81b..5bdc776b3 100644 --- a/packages/database/src/queries/reports/batch-update-report.ts +++ b/packages/database/src/queries/reports/batch-update-report.ts @@ -32,26 +32,13 @@ type VersionHistoryEntry = { type VersionHistory = Record; // Simple in-memory queue for each reportId -const updateQueues = new Map< - string, - Promise<{ - id: string; - name: string; - content: string; - versionHistory: VersionHistory | null; - }> ->(); +const updateQueues = new Map>(); /** * Internal function that performs the actual update logic. * This is separated so it can be queued. */ -async function performUpdate(params: BatchUpdateReportInput): Promise<{ - id: string; - name: string; - content: string; - versionHistory: VersionHistory | null; -}> { +async function performUpdate(params: BatchUpdateReportInput): Promise { const { reportId, content, name, versionHistory } = BatchUpdateReportInputSchema.parse(params); try { @@ -73,25 +60,12 @@ async function performUpdate(params: BatchUpdateReportInput): Promise<{ updateData.versionHistory = versionHistory; } - const result = await db + await db .update(reportFiles) .set(updateData) - .where(and(eq(reportFiles.id, reportId), isNull(reportFiles.deletedAt))) - .returning({ - id: reportFiles.id, - name: reportFiles.name, - content: reportFiles.content, - versionHistory: reportFiles.versionHistory, - }); - - const updatedReport = result[0]; - if (!updatedReport) { - throw new Error('Report not found or already deleted'); - } - - return updatedReport; + .where(and(eq(reportFiles.id, reportId), isNull(reportFiles.deletedAt))); } catch (error) { - console.error('Error batch updating report:', { + console.error('Error updating report with version:', { reportId, error: error instanceof Error ? error.message : error, }); @@ -100,35 +74,19 @@ async function performUpdate(params: BatchUpdateReportInput): Promise<{ throw error; } - throw new Error('Failed to batch update report'); + throw new Error('Failed to update report with version'); } } /** - * Updates a report with new content, optionally name, and version history in a single operation - * This is more efficient than multiple individual updates - * + * Updates a report's content, name, and version history in a single operation. * Updates are queued per reportId to ensure they execute in order. */ -export const batchUpdateReport = async ( - params: BatchUpdateReportInput -): Promise<{ - id: string; - name: string; - content: string; - versionHistory: VersionHistory | null; -}> => { +export const updateReportWithVersion = async (params: BatchUpdateReportInput): Promise => { const { reportId } = params; // Get the current promise for this reportId, or use a resolved promise as the starting point - const currentQueue = - updateQueues.get(reportId) ?? - Promise.resolve({ - id: '', - name: '', - content: '', - versionHistory: null, - }); + const currentQueue = updateQueues.get(reportId) ?? Promise.resolve(); // Chain the new update to run after the current queue completes const newQueue = currentQueue @@ -147,4 +105,4 @@ export const batchUpdateReport = async ( }); return newQueue; -}; +}; \ No newline at end of file From 1f45f91412e25b036740ace86f878c5c597240db Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 26 Sep 2025 14:33:51 -0600 Subject: [PATCH 2/3] update report race condition --- .../modify-reports-tool/modify-reports-delta.ts | 11 ++++++++++- .../modify-reports-tool/modify-reports-execute.ts | 9 ++++++++- .../src/queries/reports/batch-update-report.ts | 13 ++++++++++++- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts index 9022333c3..9967a43d0 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts @@ -1,5 +1,9 @@ import { db } from '@buster/database/connection'; -import { updateMessageEntries, updateReportWithVersion } from '@buster/database/queries'; +import { + updateMessageEntries, + updateReportWithVersion, + waitForPendingReportUpdates, +} from '@buster/database/queries'; import { reportFiles } from '@buster/database/schema'; import type { ChatMessageResponseMessage } from '@buster/server-shared/chats'; import type { ToolCallOptions } from 'ai'; @@ -166,6 +170,8 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M content: state.snapshotContent, name: name, }); + // Wait for the name update to fully complete in the queue + await waitForPendingReportUpdates(state.reportId); console.info('[modify-reports-delta] Updated report name', { reportId: state.reportId, name, @@ -327,6 +333,9 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M versionHistory, }); + // Wait for the database update to fully complete in the queue + await waitForPendingReportUpdates(state.reportId); + console.info('[modify-reports-delta] Database write completed', { reportId: state.reportId, version: newVersion, diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts index 2d74bf9ff..8d303609c 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts @@ -1,5 +1,9 @@ import { db } from '@buster/database/connection'; -import { updateMessageEntries, updateMetricsToReports } from '@buster/database/queries'; +import { + updateMessageEntries, + updateMetricsToReports, + waitForPendingReportUpdates, +} from '@buster/database/queries'; import { updateReportWithVersion } from '@buster/database/queries'; import { reportFiles } from '@buster/database/schema'; import type { ChatMessageResponseMessage } from '@buster/server-shared/chats'; @@ -202,6 +206,9 @@ async function processEditOperations( versionHistory: newVersionHistory, }); + // Wait for the database update to fully complete in the queue + await waitForPendingReportUpdates(reportId); + // Update cache with the modified content for future operations updateCachedSnapshot(reportId, currentContent, newVersionHistory); diff --git a/packages/database/src/queries/reports/batch-update-report.ts b/packages/database/src/queries/reports/batch-update-report.ts index 5bdc776b3..25041ba6f 100644 --- a/packages/database/src/queries/reports/batch-update-report.ts +++ b/packages/database/src/queries/reports/batch-update-report.ts @@ -34,6 +34,17 @@ type VersionHistory = Record; // Simple in-memory queue for each reportId const updateQueues = new Map>(); +/** + * Wait for all pending updates for a given reportId to complete. + * This ensures all queued updates are flushed to the database before proceeding. + */ +export async function waitForPendingReportUpdates(reportId: string): Promise { + const pendingQueue = updateQueues.get(reportId); + if (pendingQueue) { + await pendingQueue; + } +} + /** * Internal function that performs the actual update logic. * This is separated so it can be queued. @@ -105,4 +116,4 @@ export const updateReportWithVersion = async (params: BatchUpdateReportInput): P }); return newQueue; -}; \ No newline at end of file +}; From 3dc00c4a8b3db23e88704ac165ca71b87fb31e8d Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 26 Sep 2025 14:42:31 -0600 Subject: [PATCH 3/3] fix tests --- .../reports/modify-reports-tool/modify-reports-execute.test.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts index 94be90ab6..7db41f5a6 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts @@ -16,6 +16,7 @@ vi.mock('@buster/database/queries', () => ({ updateMessageEntries: vi.fn().mockResolvedValue({ success: true }), updateReportWithVersion: vi.fn().mockResolvedValue(undefined), updateMetricsToReports: vi.fn().mockResolvedValue({ created: 0, updated: 0, deleted: 0 }), + waitForPendingReportUpdates: vi.fn().mockResolvedValue(undefined), })); vi.mock('@buster/database/schema', () => ({ reportFiles: {},