Potential fix on report stream being cut early

This commit is contained in:
dal 2025-09-24 07:50:48 -06:00
parent 1e43ecf5ec
commit 89813ef82e
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
4 changed files with 143 additions and 36 deletions

View File

@ -309,39 +309,52 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
}, },
}; };
// Update the database with the result of all edits (concurrent, not chained) // Update the database with the result of all edits (sequentially chained)
try { try {
// Initialize pending writes array if needed
if (!state.pendingDbWrites) {
state.pendingDbWrites = [];
}
// We're already inside a check for state.reportId being defined // We're already inside a check for state.reportId being defined
if (!state.reportId) { if (!state.reportId) {
throw new Error('Report ID is unexpectedly undefined'); throw new Error('Report ID is unexpectedly undefined');
} }
// Create the write promise and add to pending writes // Initialize the promise chain if this is the first write
const writePromise = batchUpdateReport({ if (!state.lastUpdate) {
reportId: state.reportId, state.lastUpdate = Promise.resolve();
content: currentContent, }
name: state.reportName || undefined,
versionHistory, // Chain this write to happen after the previous one completes
}) state.lastUpdate = state.lastUpdate
.then(() => { .then(async () => {
// Convert to void to match the array type // Double-check that content has actually changed since last write
return; // This prevents redundant writes if multiple deltas arrive with same content
if (currentContent === state.lastSavedContent) {
console.info('[modify-reports-delta] Skipping write - content unchanged', {
reportId: state.reportId,
});
return;
}
// Perform the database write
await batchUpdateReport({
reportId: state.reportId as string, // We checked it's defined above
content: currentContent,
name: state.reportName || undefined,
versionHistory,
});
console.info('[modify-reports-delta] Database write completed', {
reportId: state.reportId,
version: newVersion,
});
}) })
.catch((error) => { .catch((error) => {
// Log error but don't break the chain - allow subsequent writes to continue
console.error('[modify-reports-delta] Database write failed:', error); console.error('[modify-reports-delta] Database write failed:', error);
throw error; // Don't re-throw - let the chain continue for resilience
}); });
// Add to pending writes array // Wait for this specific write to complete before proceeding
state.pendingDbWrites.push(writePromise); // This ensures we don't mark things as saved until the write is done
await state.lastUpdate;
// Await this specific write to handle errors
await writePromise;
// No cache update during delta - execute will handle write-through // No cache update during delta - execute will handle write-through

View File

@ -412,6 +412,104 @@ Updated content with metrics.`;
expect(updateCall.responseMessages).toBeUndefined(); expect(updateCall.responseMessages).toBeUndefined();
}); });
it('should wait for lastUpdate promise before executing final write', async () => {
// This test validates the fix for the race condition where delta writes
// could complete out of order, causing data inconsistency
// Create a delayed promise to simulate an in-progress delta write
let resolveLastUpdate: () => void;
const lastUpdatePromise = new Promise<void>((resolve) => {
resolveLastUpdate = resolve;
});
// Set up state with an in-progress lastUpdate
state.lastUpdate = lastUpdatePromise;
state.snapshotContent = '# Original Report';
mockDbLimit.mockResolvedValue([
{
content: '# Original Report',
versionHistory: null,
},
]);
const input: ModifyReportsInput = {
id: 'report-concurrent',
name: 'Concurrent Write Test',
edits: [
{
operation: 'replace' as const,
code_to_replace: '# Original Report',
code: '# Final Version',
},
],
};
const execute = createModifyReportsExecute(context, state);
// Start the execute (it should wait for lastUpdate)
const executePromise = execute(input);
// Give it a moment to ensure it's waiting
await new Promise((resolve) => setTimeout(resolve, 10));
// batchUpdateReport should not have been called yet
const mockBatchUpdateReport = vi.mocked(
await import('@buster/database/queries').then((m) => m.batchUpdateReport)
);
expect(mockBatchUpdateReport).not.toHaveBeenCalled();
// Now resolve the lastUpdate promise
resolveLastUpdate!();
// Wait for execute to complete
const result = await executePromise;
// Now batchUpdateReport should have been called
expect(mockBatchUpdateReport).toHaveBeenCalled();
expect(result.success).toBe(true);
expect(result.file.content).toContain('# Final Version');
});
it('should handle lastUpdate promise rejection gracefully', async () => {
// Create a rejected promise to simulate a failed delta write
const lastUpdatePromise = Promise.reject(new Error('Delta write failed'));
// Set up state with a rejected lastUpdate
state.lastUpdate = lastUpdatePromise;
state.snapshotContent = '# Original Report';
mockDbLimit.mockResolvedValue([
{
content: '# Original Report',
versionHistory: null,
},
]);
const input: ModifyReportsInput = {
id: 'report-error-handling',
name: 'Error Handling Test',
edits: [
{
operation: 'replace' as const,
code_to_replace: '# Original Report',
code: '# Updated Report',
},
],
};
const execute = createModifyReportsExecute(context, state);
// Execute should still complete successfully despite the failed lastUpdate
const result = await execute(input);
expect(result.success).toBe(true);
expect(result.file.content).toContain('# Updated Report');
// Should log a warning about the failed delta write
// (In a real test, you might spy on console.warn)
});
it('should only check metrics on successful modifications', async () => { it('should only check metrics on successful modifications', async () => {
// Setup two reports - one success, one failure // Setup two reports - one success, one failure
mockDbLimit mockDbLimit

View File

@ -177,22 +177,18 @@ async function processEditOperations(
// Write all changes to database in one operation // Write all changes to database in one operation
try { try {
// Wait for ALL pending delta writes to complete before doing final update // Wait for the last delta write to complete before doing final update
if (state?.pendingDbWrites && state.pendingDbWrites.length > 0) { if (state?.lastUpdate) {
console.info( console.info('[modify-reports-execute] Waiting for last delta write to complete');
`[modify-reports-execute] Waiting for ${state.pendingDbWrites.length} pending delta writes to complete`
);
try { try {
// Wait for all writes to complete (some may fail, that's OK) // Wait for the last write in the chain to complete
await Promise.allSettled(state.pendingDbWrites); await state.lastUpdate;
// Add small delay to ensure we're absolutely last
await new Promise((resolve) => setTimeout(resolve, 50));
console.info( console.info(
'[modify-reports-execute] All delta writes completed, proceeding with final update' '[modify-reports-execute] Last delta write completed, proceeding with final update'
); );
} catch (error) { } catch (error) {
console.warn( console.warn(
'[modify-reports-execute] Error waiting for delta writes, proceeding with final update:', '[modify-reports-execute] Error waiting for last delta write, proceeding with final update:',
error error
); );
} }
@ -325,7 +321,7 @@ const modifyReportsFile = wrapTraced(
messageId, messageId,
snapshotContent, // Pass immutable snapshot snapshotContent, // Pass immutable snapshot
versionHistory, // Pass snapshot version history versionHistory, // Pass snapshot version history
state // Pass state to access lastDbWritePromise state // Pass state to access lastUpdate
); );
// Track file associations if this is a new version (not part of same turn) // Track file associations if this is a new version (not part of same turn)
@ -417,7 +413,7 @@ export function createModifyReportsExecute(
context, context,
state.snapshotContent, // Pass immutable snapshot from state state.snapshotContent, // Pass immutable snapshot from state
state.versionHistory, // Pass snapshot version history from state state.versionHistory, // Pass snapshot version history from state
state // Pass state to access lastDbWritePromise state // Pass state to access lastUpdate
); );
if (!result) { if (!result) {

View File

@ -105,7 +105,7 @@ export type ModifyReportsEditState = z.infer<typeof ModifyReportsEditStateSchema
// Extend the inferred type to include Promise fields (not supported by Zod directly) // Extend the inferred type to include Promise fields (not supported by Zod directly)
export type ModifyReportsState = z.infer<typeof ModifyReportsStateSchema> & { export type ModifyReportsState = z.infer<typeof ModifyReportsStateSchema> & {
pendingDbWrites?: Promise<void>[]; // Track all pending writes lastUpdate?: Promise<void>; // Track the last write promise for sequential chaining
}; };
// Factory function that accepts agent context and maps to tool context // Factory function that accepts agent context and maps to tool context