Merge pull request #1190 from buster-so/report-sequential-writes

Report sequential writes
This commit is contained in:
dal 2025-09-26 14:53:26 -06:00 committed by GitHub
commit 5ed6bdffbd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 205 additions and 214 deletions

View File

@ -9,7 +9,7 @@ import type {
// Mock dependencies
vi.mock('@buster/database/queries', () => ({
updateMessageEntries: vi.fn().mockResolvedValue({ success: true }),
batchUpdateReport: vi.fn().mockResolvedValue({ success: true }),
updateReportWithVersion: vi.fn().mockResolvedValue(undefined),
updateMetricsToReports: vi.fn().mockResolvedValue({ created: 0, updated: 0, deleted: 0 }),
}));

View File

@ -1,7 +1,7 @@
import {
batchUpdateReport,
updateMessageEntries,
updateMetricsToReports,
updateReportWithVersion,
} from '@buster/database/queries';
import type { ChatMessageResponseMessage } from '@buster/server-shared/chats';
import { wrapTraced } from 'braintrust';
@ -146,7 +146,7 @@ export function createCreateReportsExecute(
};
// Update the report with complete content from input (source of truth)
await batchUpdateReport({
await updateReportWithVersion({
reportId,
content,
name,

View File

@ -1,5 +1,9 @@
import { db } from '@buster/database/connection';
import { batchUpdateReport, updateMessageEntries } from '@buster/database/queries';
import {
updateMessageEntries,
updateReportWithVersion,
waitForPendingReportUpdates,
} from '@buster/database/queries';
import { reportFiles } from '@buster/database/schema';
import type { ChatMessageResponseMessage } from '@buster/server-shared/chats';
import type { ToolCallOptions } from 'ai';
@ -37,11 +41,6 @@ const TOOL_KEYS = {
export function createModifyReportsDelta(context: ModifyReportsContext, state: ModifyReportsState) {
return async (options: { inputTextDelta: string } & ToolCallOptions) => {
// Skip delta updates if already complete (same pattern as sequential thinking)
if (state.isComplete) {
return;
}
// Handle string deltas (accumulate JSON text)
state.argsText = (state.argsText || '') + options.inputTextDelta;
@ -165,12 +164,14 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
// If we have a snapshot and report ID, update the name in the database
if (state.snapshotContent !== undefined && state.reportId) {
try {
// We need to provide content for batchUpdateReport, so use the snapshot
await batchUpdateReport({
// We need to provide content for updateReportWithVersion, so use the snapshot
await updateReportWithVersion({
reportId: state.reportId,
content: state.snapshotContent,
name: name,
});
// Wait for the name update to fully complete in the queue
await waitForPendingReportUpdates(state.reportId);
console.info('[modify-reports-delta] Updated report name', {
reportId: state.reportId,
name,
@ -247,163 +248,165 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
}
}
// Apply ALL edits sequentially starting from the immutable snapshot
// This happens on every delta update to ensure we have the complete result
// Queue the entire processing operation to ensure sequential execution
if (state.edits && state.edits.length > 0 && state.snapshotContent !== undefined) {
// Start fresh from snapshot every time
let currentContent = state.snapshotContent;
// Apply each edit in sequence
for (const edit of state.edits) {
// Skip if edit is null/undefined, but allow empty strings for code
if (!edit || edit.code === undefined || edit.code === null) continue;
const operation = edit.operation;
const codeToReplace = edit.code_to_replace || '';
const code = edit.code;
if (operation === 'append') {
currentContent = currentContent + code;
} else if (operation === 'replace' && codeToReplace) {
// Check if the pattern exists
if (!currentContent.includes(codeToReplace)) {
console.warn('[modify-reports-delta] Pattern not found during final apply', {
codeToReplace: codeToReplace.substring(0, 50),
reportId: state.reportId,
});
edit.status = 'failed';
edit.error = 'Pattern not found';
continue;
}
// Apply the replacement
currentContent = currentContent.replace(codeToReplace, code);
}
edit.status = 'completed';
// Initialize the promise chain if this is the first processing
if (!state.lastProcessing) {
state.lastProcessing = Promise.resolve();
}
// Only update if content actually changed from what we last saved
if (currentContent !== state.lastSavedContent) {
// Check if we should increment version (not if report was created in current turn)
const incrementVersion = await shouldIncrementVersion(
state.reportId,
context.messageId
);
// Chain this processing to happen after the previous one completes
state.lastProcessing = state.lastProcessing
.then(async () => {
// Apply ALL edits sequentially starting from the immutable snapshot
// Start fresh from snapshot every time
let currentContent = state.snapshotContent; // We checked it's defined above
// Calculate new version
const currentVersion = state.snapshotVersion || 1;
const newVersion = incrementVersion ? currentVersion + 1 : currentVersion;
state.version_number = newVersion;
// Apply each edit in sequence
for (const edit of state.edits || []) {
// Skip if edit is null/undefined, but allow empty strings for code
if (!edit || edit.code === undefined || edit.code === null) continue;
// Track this modification for this tool invocation
state.reportModifiedInMessage = true;
const operation = edit.operation;
const codeToReplace = edit.code_to_replace || '';
const code = edit.code;
// Update version history with the final content after all edits
const now = new Date().toISOString();
const versionHistory = {
...(state.versionHistory || {}),
[newVersion.toString()]: {
content: currentContent,
updated_at: now,
version_number: newVersion,
},
};
// Update the database with the result of all edits (sequentially chained)
try {
// We're already inside a check for state.reportId being defined
if (!state.reportId) {
throw new Error('Report ID is unexpectedly undefined');
}
// Initialize the promise chain if this is the first write
if (!state.lastUpdate) {
state.lastUpdate = Promise.resolve();
}
// Chain this write to happen after the previous one completes
state.lastUpdate = state.lastUpdate
.then(async () => {
// Double-check that content has actually changed since last write
// This prevents redundant writes if multiple deltas arrive with same content
if (currentContent === state.lastSavedContent) {
console.info('[modify-reports-delta] Skipping write - content unchanged', {
if (operation === 'append') {
currentContent = (currentContent || '') + code;
} else if (operation === 'replace' && codeToReplace) {
// Check if the pattern exists
if (!currentContent || !currentContent.includes(codeToReplace)) {
console.warn('[modify-reports-delta] Pattern not found during final apply', {
codeToReplace: codeToReplace.substring(0, 50),
reportId: state.reportId,
});
return;
edit.status = 'failed';
edit.error = 'Pattern not found';
continue;
}
// Apply the replacement
currentContent = (currentContent || '').replace(codeToReplace, code);
}
edit.status = 'completed';
}
// Only update if content actually changed from what we last saved
if (currentContent !== state.lastSavedContent) {
// Check if we should increment version (not if report was created in current turn)
const incrementVersion = await shouldIncrementVersion(
state.reportId || '',
context.messageId
);
// Calculate new version
const currentVersion = state.snapshotVersion || 1;
const newVersion = incrementVersion ? currentVersion + 1 : currentVersion;
state.version_number = newVersion;
// Track this modification for this tool invocation
state.reportModifiedInMessage = true;
// Update version history with the final content after all edits
const now = new Date().toISOString();
const versionHistory = {
...(state.versionHistory || {}),
[newVersion.toString()]: {
content: currentContent || '',
updated_at: now,
version_number: newVersion,
},
};
// Update the database with the result of all edits
try {
// We're already inside a check for state.reportId being defined
if (!state.reportId) {
throw new Error('Report ID is unexpectedly undefined');
}
// Perform the database write
await batchUpdateReport({
reportId: state.reportId as string, // We checked it's defined above
content: currentContent,
await updateReportWithVersion({
reportId: state.reportId,
content: currentContent || '',
name: state.reportName || undefined,
versionHistory,
});
// Wait for the database update to fully complete in the queue
await waitForPendingReportUpdates(state.reportId);
console.info('[modify-reports-delta] Database write completed', {
reportId: state.reportId,
version: newVersion,
});
})
.catch((error) => {
// Log error but don't break the chain - allow subsequent writes to continue
console.error('[modify-reports-delta] Database write failed:', error);
// Don't re-throw - let the chain continue for resilience
});
// Wait for this specific write to complete before proceeding
// This ensures we don't mark things as saved until the write is done
await state.lastUpdate;
// Keep lastUpdate in sync for backward compatibility (already set via processing chain)
// No cache update during delta - execute will handle write-through
// No cache update during delta - execute will handle write-through
// Update state with the final content (but keep snapshot immutable)
state.finalContent = currentContent;
state.lastSavedContent = currentContent; // Track what we just saved
state.versionHistory = versionHistory;
// DO NOT update state.snapshotContent - it must remain immutable
// Update state with the final content (but keep snapshot immutable)
state.finalContent = currentContent || '';
state.lastSavedContent = currentContent || ''; // Track what we just saved
state.versionHistory = versionHistory;
// DO NOT update state.snapshotContent - it must remain immutable
// Create response message if not already created
if (!state.responseMessageCreated && context.messageId) {
const responseMessages: ChatMessageResponseMessage[] = [
{
id: state.reportId,
type: 'file' as const,
file_type: 'report_file' as const,
file_name: state.reportName || 'Untitled Report',
version_number: newVersion,
filter_version_id: null,
metadata: [
// Create response message if not already created
if (!state.responseMessageCreated && context.messageId) {
const responseMessages: ChatMessageResponseMessage[] = [
{
status: 'completed' as const,
message: 'Report modified successfully',
timestamp: Date.now(),
id: state.reportId,
type: 'file' as const,
file_type: 'report_file' as const,
file_name: state.reportName || 'Untitled Report',
version_number: newVersion,
filter_version_id: null,
metadata: [
{
status: 'completed' as const,
message: 'Report modified successfully',
timestamp: Date.now(),
},
],
},
],
},
];
];
await updateMessageEntries({
messageId: context.messageId,
responseMessages,
});
await updateMessageEntries({
messageId: context.messageId,
responseMessages,
});
state.responseMessageCreated = true;
console.info('[modify-reports-delta] Created response message during streaming');
}
} catch (error) {
console.error('[modify-reports-delta] Error updating report content:', error);
if (state.edits) {
state.edits.forEach((edit) => {
if (edit) {
edit.status = 'failed';
edit.error = error instanceof Error ? error.message : 'Update failed';
state.responseMessageCreated = true;
console.info(
'[modify-reports-delta] Created response message during streaming'
);
}
});
} catch (error) {
console.error('[modify-reports-delta] Error updating report content:', error);
if (state.edits) {
state.edits.forEach((edit) => {
if (edit) {
edit.status = 'failed';
edit.error = error instanceof Error ? error.message : 'Update failed';
}
});
}
// Re-throw to be caught by the outer catch
throw error;
}
}
}
}
})
.catch((error) => {
// Log error but don't break the chain - allow subsequent processing to continue
console.error('[modify-reports-delta] Processing failed:', error);
// Don't re-throw - let the chain continue for resilience
});
// Clean up the processing entry once this completes
state.lastProcessing.finally(() => {
// Only remove if this is still the current processing
// This cleanup is handled by the execute function
});
}
}
}

View File

@ -14,8 +14,9 @@ const mockDbSelect = vi.fn();
vi.mock('@buster/database/queries', () => ({
updateMessageEntries: vi.fn().mockResolvedValue({ success: true }),
batchUpdateReport: vi.fn().mockResolvedValue({ success: true }),
updateReportWithVersion: vi.fn().mockResolvedValue(undefined),
updateMetricsToReports: vi.fn().mockResolvedValue({ created: 0, updated: 0, deleted: 0 }),
waitForPendingReportUpdates: vi.fn().mockResolvedValue(undefined),
}));
vi.mock('@buster/database/schema', () => ({
reportFiles: {},
@ -422,8 +423,8 @@ Updated content with metrics.`;
resolveLastUpdate = resolve;
});
// Set up state with an in-progress lastUpdate
state.lastUpdate = lastUpdatePromise;
// Set up state with an in-progress lastProcessing
state.lastProcessing = lastUpdatePromise;
state.snapshotContent = '# Original Report';
mockDbLimit.mockResolvedValue([
@ -447,26 +448,26 @@ Updated content with metrics.`;
const execute = createModifyReportsExecute(context, state);
// Start the execute (it should wait for lastUpdate)
// Start the execute (it should wait for lastProcessing)
const executePromise = execute(input);
// Give it a moment to ensure it's waiting
await new Promise((resolve) => setTimeout(resolve, 10));
// batchUpdateReport should not have been called yet
const mockBatchUpdateReport = vi.mocked(
await import('@buster/database/queries').then((m) => m.batchUpdateReport)
// updateReportWithVersion should not have been called yet
const mockUpdateReportWithVersion = vi.mocked(
await import('@buster/database/queries').then((m) => m.updateReportWithVersion)
);
expect(mockBatchUpdateReport).not.toHaveBeenCalled();
expect(mockUpdateReportWithVersion).not.toHaveBeenCalled();
// Now resolve the lastUpdate promise
// Now resolve the lastProcessing promise
resolveLastUpdate!();
// Wait for execute to complete
const result = await executePromise;
// Now batchUpdateReport should have been called
expect(mockBatchUpdateReport).toHaveBeenCalled();
// Now updateReportWithVersion should have been called
expect(mockUpdateReportWithVersion).toHaveBeenCalled();
expect(result.success).toBe(true);
expect(result.file.content).toContain('# Final Version');
});
@ -475,8 +476,8 @@ Updated content with metrics.`;
// Create a rejected promise to simulate a failed delta write
const lastUpdatePromise = Promise.reject(new Error('Delta write failed'));
// Set up state with a rejected lastUpdate
state.lastUpdate = lastUpdatePromise;
// Set up state with a rejected lastProcessing
state.lastProcessing = lastUpdatePromise;
state.snapshotContent = '# Original Report';
mockDbLimit.mockResolvedValue([

View File

@ -1,6 +1,10 @@
import { db } from '@buster/database/connection';
import { updateMessageEntries, updateMetricsToReports } from '@buster/database/queries';
import { batchUpdateReport } from '@buster/database/queries';
import {
updateMessageEntries,
updateMetricsToReports,
waitForPendingReportUpdates,
} from '@buster/database/queries';
import { updateReportWithVersion } from '@buster/database/queries';
import { reportFiles } from '@buster/database/schema';
import type { ChatMessageResponseMessage } from '@buster/server-shared/chats';
import { wrapTraced } from 'braintrust';
@ -178,30 +182,33 @@ async function processEditOperations(
// Write all changes to database in one operation
try {
// Wait for the last delta write to complete before doing final update
if (state?.lastUpdate) {
console.info('[modify-reports-execute] Waiting for last delta write to complete');
// Wait for the last delta processing to complete before doing final update
if (state?.lastProcessing) {
console.info('[modify-reports-execute] Waiting for last delta processing to complete');
try {
// Wait for the last write in the chain to complete
await state.lastUpdate;
// Wait for the last processing in the chain to complete
await state.lastProcessing;
console.info(
'[modify-reports-execute] Last delta write completed, proceeding with final update'
'[modify-reports-execute] Last delta processing completed, proceeding with final update'
);
} catch (error) {
console.warn(
'[modify-reports-execute] Error waiting for last delta write, proceeding with final update:',
'[modify-reports-execute] Error waiting for last delta processing, proceeding with final update:',
error
);
}
}
await batchUpdateReport({
await updateReportWithVersion({
reportId,
content: currentContent,
name: reportName,
versionHistory: newVersionHistory,
});
// Wait for the database update to fully complete in the queue
await waitForPendingReportUpdates(reportId);
// Update cache with the modified content for future operations
updateCachedSnapshot(reportId, currentContent, newVersionHistory);

View File

@ -17,6 +17,16 @@ export function createModifyReportsFinish(
return async (options: { input: ModifyReportsInput } & ToolCallOptions) => {
const input = options.input;
// Wait for all queued delta processing to complete before setting isComplete
if (state.lastProcessing) {
try {
await state.lastProcessing;
console.info('[modify-reports-finish] All delta processing completed');
} catch (error) {
console.error('[modify-reports-finish] Error waiting for delta processing:', error);
}
}
// Set isComplete to prevent further delta processing (same as sequential thinking)
state.isComplete = true;

View File

@ -105,7 +105,8 @@ export type ModifyReportsEditState = z.infer<typeof ModifyReportsEditStateSchema
// Extend the inferred type to include Promise fields (not supported by Zod directly)
export type ModifyReportsState = z.infer<typeof ModifyReportsStateSchema> & {
lastUpdate?: Promise<void>; // Track the last write promise for sequential chaining
lastUpdate?: Promise<void>; // Track the last write promise for sequential chaining (deprecated, use lastProcessing)
lastProcessing?: Promise<void>; // Track the entire processing chain for proper sequencing
};
// Factory function that accepts agent context and maps to tool context

View File

@ -32,26 +32,24 @@ type VersionHistoryEntry = {
type VersionHistory = Record<string, VersionHistoryEntry>;
// Simple in-memory queue for each reportId
const updateQueues = new Map<
string,
Promise<{
id: string;
name: string;
content: string;
versionHistory: VersionHistory | null;
}>
>();
const updateQueues = new Map<string, Promise<void>>();
/**
* Wait for all pending updates for a given reportId to complete.
* This ensures all queued updates are flushed to the database before proceeding.
*/
export async function waitForPendingReportUpdates(reportId: string): Promise<void> {
const pendingQueue = updateQueues.get(reportId);
if (pendingQueue) {
await pendingQueue;
}
}
/**
* Internal function that performs the actual update logic.
* This is separated so it can be queued.
*/
async function performUpdate(params: BatchUpdateReportInput): Promise<{
id: string;
name: string;
content: string;
versionHistory: VersionHistory | null;
}> {
async function performUpdate(params: BatchUpdateReportInput): Promise<void> {
const { reportId, content, name, versionHistory } = BatchUpdateReportInputSchema.parse(params);
try {
@ -73,25 +71,12 @@ async function performUpdate(params: BatchUpdateReportInput): Promise<{
updateData.versionHistory = versionHistory;
}
const result = await db
await db
.update(reportFiles)
.set(updateData)
.where(and(eq(reportFiles.id, reportId), isNull(reportFiles.deletedAt)))
.returning({
id: reportFiles.id,
name: reportFiles.name,
content: reportFiles.content,
versionHistory: reportFiles.versionHistory,
});
const updatedReport = result[0];
if (!updatedReport) {
throw new Error('Report not found or already deleted');
}
return updatedReport;
.where(and(eq(reportFiles.id, reportId), isNull(reportFiles.deletedAt)));
} catch (error) {
console.error('Error batch updating report:', {
console.error('Error updating report with version:', {
reportId,
error: error instanceof Error ? error.message : error,
});
@ -100,35 +85,19 @@ async function performUpdate(params: BatchUpdateReportInput): Promise<{
throw error;
}
throw new Error('Failed to batch update report');
throw new Error('Failed to update report with version');
}
}
/**
* Updates a report with new content, optionally name, and version history in a single operation
* This is more efficient than multiple individual updates
*
* Updates a report's content, name, and version history in a single operation.
* Updates are queued per reportId to ensure they execute in order.
*/
export const batchUpdateReport = async (
params: BatchUpdateReportInput
): Promise<{
id: string;
name: string;
content: string;
versionHistory: VersionHistory | null;
}> => {
export const updateReportWithVersion = async (params: BatchUpdateReportInput): Promise<void> => {
const { reportId } = params;
// Get the current promise for this reportId, or use a resolved promise as the starting point
const currentQueue =
updateQueues.get(reportId) ??
Promise.resolve({
id: '',
name: '',
content: '',
versionHistory: null,
});
const currentQueue = updateQueues.get(reportId) ?? Promise.resolve();
// Chain the new update to run after the current queue completes
const newQueue = currentQueue