diff --git a/packages/ai/src/tools/communication-tools/done-tool/done-tool-delta.ts b/packages/ai/src/tools/communication-tools/done-tool/done-tool-delta.ts index ae2fb9251..e502e9209 100644 --- a/packages/ai/src/tools/communication-tools/done-tool/done-tool-delta.ts +++ b/packages/ai/src/tools/communication-tools/done-tool/done-tool-delta.ts @@ -29,11 +29,7 @@ export function createDoneToolDelta(context: DoneToolContext, doneToolState: Don return async function doneToolDelta( options: { inputTextDelta: string } & ToolCallOptions ): Promise { - if (doneToolState.isFinalizing) { - return; - } - - if (isMessageUpdateQueueClosed(context.messageId)) { + if (doneToolState.isFinalizing || isMessageUpdateQueueClosed(context.messageId)) { return; } diff --git a/packages/ai/src/tools/communication-tools/done-tool/done-tool-execute.ts b/packages/ai/src/tools/communication-tools/done-tool/done-tool-execute.ts index 65bbabb01..61a44b1f1 100644 --- a/packages/ai/src/tools/communication-tools/done-tool/done-tool-execute.ts +++ b/packages/ai/src/tools/communication-tools/done-tool/done-tool-execute.ts @@ -79,6 +79,7 @@ async function processDone( // Factory function that creates the execute function with proper context typing const updateMessage = databaseQueries.updateMessage; const updateMessageEntries = databaseQueries.updateMessageEntries; +const closeMessageUpdateQueue = databaseQueries.closeMessageUpdateQueue; const waitForPendingUpdates = databaseQueries.waitForPendingUpdates ?? (async (_messageId: string) => {}); @@ -90,11 +91,8 @@ export function createDoneToolExecute(context: DoneToolContext, state: DoneToolS } state.isFinalizing = true; - // Part of temporary solution: wait for 300ms after state is set to isFinalizing to block new requests and allow current pending requests to complete - await new Promise((resolve) => setTimeout(resolve, 300)); - // CRITICAL: Wait for ALL pending updates from delta/finish to complete FIRST - // This ensures execute's update is always the last one in the queue - if (typeof state.latestSequenceNumber === 'number') { + closeMessageUpdateQueue(context.messageId); + if (state.latestSequenceNumber) { await waitForPendingUpdates(context.messageId, { upToSequence: state.latestSequenceNumber, }); @@ -118,13 +116,7 @@ export function createDoneToolExecute(context: DoneToolContext, state: DoneToolS state.finalSequenceNumber = sequenceNumber; } - if (typeof state.finalSequenceNumber === 'number') { - await waitForPendingUpdates(context.messageId, { - upToSequence: state.finalSequenceNumber, - }); - } else { - await waitForPendingUpdates(context.messageId); - } + await waitForPendingUpdates(context.messageId); cleanupState(state); return output; diff --git a/packages/ai/src/tools/communication-tools/done-tool/done-tool-start.test.ts b/packages/ai/src/tools/communication-tools/done-tool/done-tool-start.test.ts index 6d0dd20c0..20a9665b8 100644 --- a/packages/ai/src/tools/communication-tools/done-tool/done-tool-start.test.ts +++ b/packages/ai/src/tools/communication-tools/done-tool/done-tool-start.test.ts @@ -21,6 +21,7 @@ vi.mock('@buster/database/queries', () => ({ waitForPendingUpdates: vi.fn().mockResolvedValue(undefined), isMessageUpdateQueueClosed: vi.fn().mockReturnValue(false), getAssetLatestVersion: vi.fn().mockResolvedValue(1), + closeMessageUpdateQueue: vi.fn(), })); // Import mocked functions after the mock definition diff --git a/packages/ai/src/tools/shared/cleanup-state.ts b/packages/ai/src/tools/shared/cleanup-state.ts index b6135ebf7..ba9e6de38 100644 --- a/packages/ai/src/tools/shared/cleanup-state.ts +++ b/packages/ai/src/tools/shared/cleanup-state.ts @@ -5,7 +5,7 @@ export function cleanupState(state: Record): void { // Clear all properties except toolCallId (might be needed for logging) Object.keys(state).forEach((key) => { - if (key !== 'toolCallId') { + if (key !== 'toolCallId' && key !== 'isFinalizing') { state[key] = undefined; } }); diff --git a/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-delta.ts b/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-delta.ts index 05db58cb3..43240318e 100644 --- a/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-delta.ts +++ b/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-delta.ts @@ -1,6 +1,11 @@ import { randomUUID } from 'node:crypto'; import { db } from '@buster/database/connection'; -import { updateMessageEntries, updateReportContent } from '@buster/database/queries'; +import { + isReportUpdateQueueClosed, + updateMessageEntries, + updateReportContent, + updateReportWithVersion, +} from '@buster/database/queries'; import { assetPermissions, reportFiles } from '@buster/database/schema'; import type { ToolCallOptions } from 'ai'; import { @@ -48,6 +53,9 @@ function createInitialReportVersionHistory(content: string, createdAt: string): export function createCreateReportsDelta(context: CreateReportsContext, state: CreateReportsState) { return async (options: { inputTextDelta: string } & ToolCallOptions) => { + if (state.file?.id && isReportUpdateQueueClosed(state.file.id)) { + return; + } // Handle string deltas (accumulate JSON text) state.argsText = (state.argsText || '') + options.inputTextDelta; @@ -167,9 +175,19 @@ export function createCreateReportsDelta(context: CreateReportsContext, state: C // Update report content if we have content if (content && reportId) { try { - await updateReportContent({ - reportId: reportId, - content: content, + const now = new Date().toISOString(); + const versionHistory = { + '1': { + content, + updated_at: now, + version_number: 1, + }, + }; + await updateReportWithVersion({ + reportId, + content, + name, + versionHistory, }); // Keep the file status as 'loading' during streaming diff --git a/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.test.ts b/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.test.ts index 45de0e3f1..9cf71d52a 100644 --- a/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.test.ts +++ b/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.test.ts @@ -11,6 +11,8 @@ vi.mock('@buster/database/queries', () => ({ updateMessageEntries: vi.fn().mockResolvedValue({ success: true }), updateReportWithVersion: vi.fn().mockResolvedValue(undefined), updateMetricsToReports: vi.fn().mockResolvedValue({ created: 0, updated: 0, deleted: 0 }), + closeReportUpdateQueue: vi.fn(), + waitForPendingReportUpdates: vi.fn().mockResolvedValue(undefined), })); vi.mock('./helpers/create-reports-tool-transform-helper', () => ({ diff --git a/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.ts b/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.ts index 6336605c8..a2fa5244d 100644 --- a/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.ts +++ b/packages/ai/src/tools/visualization-tools/reports/create-reports-tool/create-reports-execute.ts @@ -1,7 +1,9 @@ import { + closeReportUpdateQueue, updateMessageEntries, updateMetricsToReports, updateReportWithVersion, + waitForPendingReportUpdates, } from '@buster/database/queries'; import type { ChatMessageResponseMessage } from '@buster/server-shared/chats'; import { wrapTraced } from 'braintrust'; @@ -81,6 +83,9 @@ export function createCreateReportsExecute( ) { return wrapTraced( async (input: CreateReportsInput): Promise => { + if (state.file?.id) { + closeReportUpdateQueue(state.file.id); + } const startTime = Date.now(); try { @@ -146,12 +151,18 @@ export function createCreateReportsExecute( }; // Update the report with complete content from input (source of truth) - await updateReportWithVersion({ - reportId, - content, - name, - versionHistory, - }); + await updateReportWithVersion( + { + reportId, + content, + name, + versionHistory, + }, + { + isFinal: true, + } + ); + await waitForPendingReportUpdates(reportId); // Update cache with the newly created report content updateCachedSnapshot(reportId, content, versionHistory); diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts index 9967a43d0..3aa78fcfa 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-delta.ts @@ -1,5 +1,7 @@ import { db } from '@buster/database/connection'; import { + isReportUpdateQueueClosed, + reopenReportUpdateQueue, updateMessageEntries, updateReportWithVersion, waitForPendingReportUpdates, @@ -41,6 +43,10 @@ const TOOL_KEYS = { export function createModifyReportsDelta(context: ModifyReportsContext, state: ModifyReportsState) { return async (options: { inputTextDelta: string } & ToolCallOptions) => { + if (state.reportId && isReportUpdateQueueClosed(state.reportId)) { + return; + } + // Handle string deltas (accumulate JSON text) state.argsText = (state.argsText || '') + options.inputTextDelta; @@ -56,6 +62,10 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M TOOL_KEYS.edits, [] ); + if (id && state.firstDelta) { + state.firstDelta = false; + reopenReportUpdateQueue(id); + } // Validate that we have a complete UUID before processing // UUID format: 8-4-4-4-12 characters (36 total with hyphens) diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts index 7db41f5a6..8150c7142 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.test.ts @@ -17,6 +17,7 @@ vi.mock('@buster/database/queries', () => ({ updateReportWithVersion: vi.fn().mockResolvedValue(undefined), updateMetricsToReports: vi.fn().mockResolvedValue({ created: 0, updated: 0, deleted: 0 }), waitForPendingReportUpdates: vi.fn().mockResolvedValue(undefined), + closeReportUpdateQueue: vi.fn(), })); vi.mock('@buster/database/schema', () => ({ reportFiles: {}, diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts index 8d303609c..a8a6cca4d 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-execute.ts @@ -1,5 +1,6 @@ import { db } from '@buster/database/connection'; import { + closeReportUpdateQueue, updateMessageEntries, updateMetricsToReports, waitForPendingReportUpdates, @@ -199,12 +200,17 @@ async function processEditOperations( } } - await updateReportWithVersion({ - reportId, - content: currentContent, - name: reportName, - versionHistory: newVersionHistory, - }); + await updateReportWithVersion( + { + reportId, + content: currentContent, + name: reportName, + versionHistory: newVersionHistory, + }, + { + isFinal: true, + } + ); // Wait for the database update to fully complete in the queue await waitForPendingReportUpdates(reportId); @@ -410,6 +416,9 @@ export function createModifyReportsExecute( ) { return wrapTraced( async (input: ModifyReportsInput): Promise => { + if (input.id) { + closeReportUpdateQueue(input.id); + } const startTime = Date.now(); try { diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-start.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-start.ts index 4c548fb28..53162fe6f 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-start.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-start.ts @@ -16,6 +16,7 @@ export function modifyReportsStart(context: ModifyReportsContext, state: ModifyR state.startTime = Date.now(); state.responseMessageCreated = false; state.snapshotContent = undefined; + state.firstDelta = true; if (context.messageId) { try { diff --git a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts index 1788e5e5a..4e61dcc02 100644 --- a/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts +++ b/packages/ai/src/tools/visualization-tools/reports/modify-reports-tool/modify-reports-tool.ts @@ -95,6 +95,7 @@ const ModifyReportsStateSchema = z.object({ ) .optional(), isComplete: z.boolean().optional().describe('Whether the tool execution is complete'), + firstDelta: z.boolean().optional().describe('Whether this is the first delta'), }); // Export types @@ -123,6 +124,7 @@ export function createModifyReportsTool(context: ModifyReportsContext) { responseMessageCreated: false, snapshotContent: undefined, reportModifiedInMessage: false, + firstDelta: true, }; // Create all functions with the context and state passed diff --git a/packages/database/src/queries/messages/update-message-entries.ts b/packages/database/src/queries/messages/update-message-entries.ts index e1bfd6e4d..a01f0df03 100644 --- a/packages/database/src/queries/messages/update-message-entries.ts +++ b/packages/database/src/queries/messages/update-message-entries.ts @@ -70,18 +70,6 @@ function getOrCreateQueueState(messageId: string): MessageUpdateQueueState { return initialState; } -// function cleanupQueueIfIdle(messageId: string, state: MessageUpdateQueueState): void { -// if ( -// state.closed && -// state.finalSequence !== undefined && -// state.lastCompletedSequence >= state.finalSequence && -// state.pending.size === 0 -// ) { -// console.info('[cleanupQueueIfIdle] CLEANING UP QUEUE'); -// updateQueues.delete(messageId); -// } -// } - export function isMessageUpdateQueueClosed(messageId: string): boolean { const queue = updateQueues.get(messageId); return queue?.closed ?? false; @@ -108,7 +96,6 @@ export async function waitForPendingUpdates( if (targetSequence === undefined) { await queue.tailPromise; - // cleanupQueueIfIdle(messageId, queue); return; } @@ -116,7 +103,6 @@ export async function waitForPendingUpdates( const effectiveTarget = Math.min(targetSequence, maxKnownSequence); if (effectiveTarget <= queue.lastCompletedSequence) { - // cleanupQueueIfIdle(messageId, queue); return; } @@ -134,8 +120,13 @@ export async function waitForPendingUpdates( } else { await queue.tailPromise; } +} - // cleanupQueueIfIdle(messageId, queue); +export function closeMessageUpdateQueue(messageId: string): void { + const queue = updateQueues.get(messageId); + if (queue) { + queue.closed = true; + } } /** @@ -236,8 +227,9 @@ export async function updateMessageEntries( const { messageId } = params; const queue = getOrCreateQueueState(messageId); + const isFinal = options?.isFinal ?? false; - if (queue.closed) { + if (!isFinal && queue.closed) { const lastKnownSequence = queue.finalSequence ?? queue.nextSequence - 1; return { success: false, @@ -246,12 +238,6 @@ export async function updateMessageEntries( }; } - const isFinal = options?.isFinal ?? false; - - if (isFinal) { - queue.closed = true; - } - const sequenceNumber = queue.nextSequence; queue.nextSequence += 1; @@ -273,7 +259,6 @@ export async function updateMessageEntries( if (isFinal) { queue.finalSequence = sequenceNumber; } - // cleanupQueueIfIdle(messageId, queue); return success; }; diff --git a/packages/database/src/queries/reports/batch-update-report.ts b/packages/database/src/queries/reports/batch-update-report.ts index fa5edbecc..d6fc38b91 100644 --- a/packages/database/src/queries/reports/batch-update-report.ts +++ b/packages/database/src/queries/reports/batch-update-report.ts @@ -62,7 +62,7 @@ type ReportUpdateQueueState = { const updateQueues = new Map(); -function getOrCreateQueueState(reportId: string): ReportUpdateQueueState { +function getOrCreateQueueState(reportId: string, isFinal?: boolean): ReportUpdateQueueState { const existing = updateQueues.get(reportId); if (existing) { return existing; @@ -73,29 +73,26 @@ function getOrCreateQueueState(reportId: string): ReportUpdateQueueState { nextSequence: 0, pending: new Map(), lastCompletedSequence: -1, - closed: false, + // If it is final, the queue should be closed + closed: isFinal ?? false, }; updateQueues.set(reportId, initialState); return initialState; } -function cleanupQueueIfIdle(reportId: string, state: ReportUpdateQueueState): void { - if ( - state.closed && - state.finalSequence !== undefined && - state.lastCompletedSequence >= state.finalSequence && - state.pending.size === 0 - ) { - updateQueues.delete(reportId); - } -} - export function isReportUpdateQueueClosed(reportId: string): boolean { const queue = updateQueues.get(reportId); return queue?.closed ?? false; } +export function reopenReportUpdateQueue(reportId: string): void { + const queue = updateQueues.get(reportId); + if (queue) { + queue.closed = false; + } +} + type WaitForPendingReportUpdateOptions = { upToSequence?: number; }; @@ -117,7 +114,6 @@ export async function waitForPendingReportUpdates( if (targetSequence === undefined) { await queue.tailPromise; - cleanupQueueIfIdle(reportId, queue); return; } @@ -125,7 +121,6 @@ export async function waitForPendingReportUpdates( const effectiveTarget = Math.min(targetSequence, maxKnownSequence); if (effectiveTarget <= queue.lastCompletedSequence) { - cleanupQueueIfIdle(reportId, queue); return; } @@ -143,8 +138,6 @@ export async function waitForPendingReportUpdates( } else { await queue.tailPromise; } - - cleanupQueueIfIdle(reportId, queue); } /** @@ -191,6 +184,13 @@ async function performUpdate(params: BatchUpdateReportInput): Promise { } } +export function closeReportUpdateQueue(reportId: string): void { + const queue = updateQueues.get(reportId); + if (queue) { + queue.closed = true; + } +} + /** * Updates a report's content, name, and version history in a single operation. * Updates are queued per reportId to ensure they execute in order. @@ -209,9 +209,10 @@ export const updateReportWithVersion = async ( options?: UpdateReportWithVersionOptions ): Promise => { const { reportId } = params; - const queue = getOrCreateQueueState(reportId); + const isFinal = options?.isFinal ?? false; + const queue = getOrCreateQueueState(reportId, isFinal); - if (queue.closed) { + if (!isFinal && queue.closed) { const lastKnownSequence = queue.finalSequence ?? queue.nextSequence - 1; return { sequenceNumber: lastKnownSequence >= 0 ? lastKnownSequence : -1, @@ -219,12 +220,6 @@ export const updateReportWithVersion = async ( }; } - const isFinal = options?.isFinal ?? false; - - if (isFinal) { - queue.closed = true; - } - const sequenceNumber = queue.nextSequence; queue.nextSequence += 1; @@ -246,7 +241,6 @@ export const updateReportWithVersion = async ( if (isFinal) { queue.finalSequence = sequenceNumber; } - cleanupQueueIfIdle(reportId, queue); }; const resultPromise = runPromise