Merge pull request #1105 from buster-so/dallin-bus-1910-streaming-broke-towards-the-end-of-its-research

Potential fix on report stream being cut early
This commit is contained in:
dal 2025-09-24 08:01:02 -06:00 committed by GitHub
commit 030d355f87
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 143 additions and 36 deletions

View File

@ -309,39 +309,52 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
},
};
// Update the database with the result of all edits (concurrent, not chained)
// Update the database with the result of all edits (sequentially chained)
try {
// Initialize pending writes array if needed
if (!state.pendingDbWrites) {
state.pendingDbWrites = [];
}
// We're already inside a check for state.reportId being defined
if (!state.reportId) {
throw new Error('Report ID is unexpectedly undefined');
}
// Create the write promise and add to pending writes
const writePromise = batchUpdateReport({
// Initialize the promise chain if this is the first write
if (!state.lastUpdate) {
state.lastUpdate = Promise.resolve();
}
// Chain this write to happen after the previous one completes
state.lastUpdate = state.lastUpdate
.then(async () => {
// Double-check that content has actually changed since last write
// This prevents redundant writes if multiple deltas arrive with same content
if (currentContent === state.lastSavedContent) {
console.info('[modify-reports-delta] Skipping write - content unchanged', {
reportId: state.reportId,
});
return;
}
// Perform the database write
await batchUpdateReport({
reportId: state.reportId as string, // We checked it's defined above
content: currentContent,
name: state.reportName || undefined,
versionHistory,
})
.then(() => {
// Convert to void to match the array type
return;
})
.catch((error) => {
console.error('[modify-reports-delta] Database write failed:', error);
throw error;
});
// Add to pending writes array
state.pendingDbWrites.push(writePromise);
console.info('[modify-reports-delta] Database write completed', {
reportId: state.reportId,
version: newVersion,
});
})
.catch((error) => {
// Log error but don't break the chain - allow subsequent writes to continue
console.error('[modify-reports-delta] Database write failed:', error);
// Don't re-throw - let the chain continue for resilience
});
// Await this specific write to handle errors
await writePromise;
// Wait for this specific write to complete before proceeding
// This ensures we don't mark things as saved until the write is done
await state.lastUpdate;
// No cache update during delta - execute will handle write-through

View File

@ -412,6 +412,104 @@ Updated content with metrics.`;
expect(updateCall.responseMessages).toBeUndefined();
});
it('should wait for lastUpdate promise before executing final write', async () => {
// This test validates the fix for the race condition where delta writes
// could complete out of order, causing data inconsistency
// Create a delayed promise to simulate an in-progress delta write
let resolveLastUpdate: () => void;
const lastUpdatePromise = new Promise<void>((resolve) => {
resolveLastUpdate = resolve;
});
// Set up state with an in-progress lastUpdate
state.lastUpdate = lastUpdatePromise;
state.snapshotContent = '# Original Report';
mockDbLimit.mockResolvedValue([
{
content: '# Original Report',
versionHistory: null,
},
]);
const input: ModifyReportsInput = {
id: 'report-concurrent',
name: 'Concurrent Write Test',
edits: [
{
operation: 'replace' as const,
code_to_replace: '# Original Report',
code: '# Final Version',
},
],
};
const execute = createModifyReportsExecute(context, state);
// Start the execute (it should wait for lastUpdate)
const executePromise = execute(input);
// Give it a moment to ensure it's waiting
await new Promise((resolve) => setTimeout(resolve, 10));
// batchUpdateReport should not have been called yet
const mockBatchUpdateReport = vi.mocked(
await import('@buster/database/queries').then((m) => m.batchUpdateReport)
);
expect(mockBatchUpdateReport).not.toHaveBeenCalled();
// Now resolve the lastUpdate promise
resolveLastUpdate!();
// Wait for execute to complete
const result = await executePromise;
// Now batchUpdateReport should have been called
expect(mockBatchUpdateReport).toHaveBeenCalled();
expect(result.success).toBe(true);
expect(result.file.content).toContain('# Final Version');
});
it('should handle lastUpdate promise rejection gracefully', async () => {
// Create a rejected promise to simulate a failed delta write
const lastUpdatePromise = Promise.reject(new Error('Delta write failed'));
// Set up state with a rejected lastUpdate
state.lastUpdate = lastUpdatePromise;
state.snapshotContent = '# Original Report';
mockDbLimit.mockResolvedValue([
{
content: '# Original Report',
versionHistory: null,
},
]);
const input: ModifyReportsInput = {
id: 'report-error-handling',
name: 'Error Handling Test',
edits: [
{
operation: 'replace' as const,
code_to_replace: '# Original Report',
code: '# Updated Report',
},
],
};
const execute = createModifyReportsExecute(context, state);
// Execute should still complete successfully despite the failed lastUpdate
const result = await execute(input);
expect(result.success).toBe(true);
expect(result.file.content).toContain('# Updated Report');
// Should log a warning about the failed delta write
// (In a real test, you might spy on console.warn)
});
it('should only check metrics on successful modifications', async () => {
// Setup two reports - one success, one failure
mockDbLimit

View File

@ -177,22 +177,18 @@ async function processEditOperations(
// Write all changes to database in one operation
try {
// Wait for ALL pending delta writes to complete before doing final update
if (state?.pendingDbWrites && state.pendingDbWrites.length > 0) {
console.info(
`[modify-reports-execute] Waiting for ${state.pendingDbWrites.length} pending delta writes to complete`
);
// Wait for the last delta write to complete before doing final update
if (state?.lastUpdate) {
console.info('[modify-reports-execute] Waiting for last delta write to complete');
try {
// Wait for all writes to complete (some may fail, that's OK)
await Promise.allSettled(state.pendingDbWrites);
// Add small delay to ensure we're absolutely last
await new Promise((resolve) => setTimeout(resolve, 50));
// Wait for the last write in the chain to complete
await state.lastUpdate;
console.info(
'[modify-reports-execute] All delta writes completed, proceeding with final update'
'[modify-reports-execute] Last delta write completed, proceeding with final update'
);
} catch (error) {
console.warn(
'[modify-reports-execute] Error waiting for delta writes, proceeding with final update:',
'[modify-reports-execute] Error waiting for last delta write, proceeding with final update:',
error
);
}
@ -325,7 +321,7 @@ const modifyReportsFile = wrapTraced(
messageId,
snapshotContent, // Pass immutable snapshot
versionHistory, // Pass snapshot version history
state // Pass state to access lastDbWritePromise
state // Pass state to access lastUpdate
);
// Track file associations if this is a new version (not part of same turn)
@ -417,7 +413,7 @@ export function createModifyReportsExecute(
context,
state.snapshotContent, // Pass immutable snapshot from state
state.versionHistory, // Pass snapshot version history from state
state // Pass state to access lastDbWritePromise
state // Pass state to access lastUpdate
);
if (!result) {

View File

@ -105,7 +105,7 @@ export type ModifyReportsEditState = z.infer<typeof ModifyReportsEditStateSchema
// Extend the inferred type to include Promise fields (not supported by Zod directly)
export type ModifyReportsState = z.infer<typeof ModifyReportsStateSchema> & {
pendingDbWrites?: Promise<void>[]; // Track all pending writes
lastUpdate?: Promise<void>; // Track the last write promise for sequential chaining
};
// Factory function that accepts agent context and maps to tool context