diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts index 6b35e5e41..58cf24d0a 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts @@ -309,39 +309,52 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M }, }; - // Update the database with the result of all edits (concurrent, not chained) + // Update the database with the result of all edits (sequentially chained) try { - // Initialize pending writes array if needed - if (!state.pendingDbWrites) { - state.pendingDbWrites = []; - } - // We're already inside a check for state.reportId being defined if (!state.reportId) { throw new Error('Report ID is unexpectedly undefined'); } - // Create the write promise and add to pending writes - const writePromise = batchUpdateReport({ - reportId: state.reportId, - content: currentContent, - name: state.reportName || undefined, - versionHistory, - }) - .then(() => { - // Convert to void to match the array type - return; + // Initialize the promise chain if this is the first write + if (!state.lastUpdate) { + state.lastUpdate = Promise.resolve(); + } + + // Chain this write to happen after the previous one completes + state.lastUpdate = state.lastUpdate + .then(async () => { + // Double-check that content has actually changed since last write + // This prevents redundant writes if multiple deltas arrive with same content + if (currentContent === state.lastSavedContent) { + console.info('[modify-reports-delta] Skipping write - content unchanged', { + reportId: state.reportId, + }); + return; + } + + // Perform the database write + await batchUpdateReport({ + reportId: state.reportId as string, // We checked it's defined above + content: currentContent, + name: state.reportName || undefined, + versionHistory, + }); + + console.info('[modify-reports-delta] Database write completed', { + reportId: state.reportId, + version: newVersion, + }); }) .catch((error) => { + // Log error but don't break the chain - allow subsequent writes to continue console.error('[modify-reports-delta] Database write failed:', error); - throw error; + // Don't re-throw - let the chain continue for resilience }); - // Add to pending writes array - state.pendingDbWrites.push(writePromise); - - // Await this specific write to handle errors - await writePromise; + // Wait for this specific write to complete before proceeding + // This ensures we don't mark things as saved until the write is done + await state.lastUpdate; // No cache update during delta - execute will handle write-through diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts index f0ed023f8..ca2716245 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts @@ -412,6 +412,104 @@ Updated content with metrics.`; expect(updateCall.responseMessages).toBeUndefined(); }); + it('should wait for lastUpdate promise before executing final write', async () => { + // This test validates the fix for the race condition where delta writes + // could complete out of order, causing data inconsistency + + // Create a delayed promise to simulate an in-progress delta write + let resolveLastUpdate: () => void; + const lastUpdatePromise = new Promise((resolve) => { + resolveLastUpdate = resolve; + }); + + // Set up state with an in-progress lastUpdate + state.lastUpdate = lastUpdatePromise; + state.snapshotContent = '# Original Report'; + + mockDbLimit.mockResolvedValue([ + { + content: '# Original Report', + versionHistory: null, + }, + ]); + + const input: ModifyReportsInput = { + id: 'report-concurrent', + name: 'Concurrent Write Test', + edits: [ + { + operation: 'replace' as const, + code_to_replace: '# Original Report', + code: '# Final Version', + }, + ], + }; + + const execute = createModifyReportsExecute(context, state); + + // Start the execute (it should wait for lastUpdate) + const executePromise = execute(input); + + // Give it a moment to ensure it's waiting + await new Promise((resolve) => setTimeout(resolve, 10)); + + // batchUpdateReport should not have been called yet + const mockBatchUpdateReport = vi.mocked( + await import('@buster/database/queries').then((m) => m.batchUpdateReport) + ); + expect(mockBatchUpdateReport).not.toHaveBeenCalled(); + + // Now resolve the lastUpdate promise + resolveLastUpdate!(); + + // Wait for execute to complete + const result = await executePromise; + + // Now batchUpdateReport should have been called + expect(mockBatchUpdateReport).toHaveBeenCalled(); + expect(result.success).toBe(true); + expect(result.file.content).toContain('# Final Version'); + }); + + it('should handle lastUpdate promise rejection gracefully', async () => { + // Create a rejected promise to simulate a failed delta write + const lastUpdatePromise = Promise.reject(new Error('Delta write failed')); + + // Set up state with a rejected lastUpdate + state.lastUpdate = lastUpdatePromise; + state.snapshotContent = '# Original Report'; + + mockDbLimit.mockResolvedValue([ + { + content: '# Original Report', + versionHistory: null, + }, + ]); + + const input: ModifyReportsInput = { + id: 'report-error-handling', + name: 'Error Handling Test', + edits: [ + { + operation: 'replace' as const, + code_to_replace: '# Original Report', + code: '# Updated Report', + }, + ], + }; + + const execute = createModifyReportsExecute(context, state); + + // Execute should still complete successfully despite the failed lastUpdate + const result = await execute(input); + + expect(result.success).toBe(true); + expect(result.file.content).toContain('# Updated Report'); + + // Should log a warning about the failed delta write + // (In a real test, you might spy on console.warn) + }); + it('should only check metrics on successful modifications', async () => { // Setup two reports - one success, one failure mockDbLimit diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts index 6b07439d2..863a6c31f 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts @@ -177,22 +177,18 @@ async function processEditOperations( // Write all changes to database in one operation try { - // Wait for ALL pending delta writes to complete before doing final update - if (state?.pendingDbWrites && state.pendingDbWrites.length > 0) { - console.info( - `[modify-reports-execute] Waiting for ${state.pendingDbWrites.length} pending delta writes to complete` - ); + // Wait for the last delta write to complete before doing final update + if (state?.lastUpdate) { + console.info('[modify-reports-execute] Waiting for last delta write to complete'); try { - // Wait for all writes to complete (some may fail, that's OK) - await Promise.allSettled(state.pendingDbWrites); - // Add small delay to ensure we're absolutely last - await new Promise((resolve) => setTimeout(resolve, 50)); + // Wait for the last write in the chain to complete + await state.lastUpdate; console.info( - '[modify-reports-execute] All delta writes completed, proceeding with final update' + '[modify-reports-execute] Last delta write completed, proceeding with final update' ); } catch (error) { console.warn( - '[modify-reports-execute] Error waiting for delta writes, proceeding with final update:', + '[modify-reports-execute] Error waiting for last delta write, proceeding with final update:', error ); } @@ -325,7 +321,7 @@ const modifyReportsFile = wrapTraced( messageId, snapshotContent, // Pass immutable snapshot versionHistory, // Pass snapshot version history - state // Pass state to access lastDbWritePromise + state // Pass state to access lastUpdate ); // Track file associations if this is a new version (not part of same turn) @@ -417,7 +413,7 @@ export function createModifyReportsExecute( context, state.snapshotContent, // Pass immutable snapshot from state state.versionHistory, // Pass snapshot version history from state - state // Pass state to access lastDbWritePromise + state // Pass state to access lastUpdate ); if (!result) { diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts index a31b1715a..e0fb3ba77 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts @@ -105,7 +105,7 @@ export type ModifyReportsEditState = z.infer & { - pendingDbWrites?: Promise[]; // Track all pending writes + lastUpdate?: Promise; // Track the last write promise for sequential chaining }; // Factory function that accepts agent context and maps to tool context