diff --git a/packages/ai/src/agents/analyst-agent/analyst-agent.ts b/packages/ai/src/agents/analyst-agent/analyst-agent.ts index 23ab7f86a..b79c110cc 100644 --- a/packages/ai/src/agents/analyst-agent/analyst-agent.ts +++ b/packages/ai/src/agents/analyst-agent/analyst-agent.ts @@ -1,4 +1,5 @@ import type { PermissionedDataset } from '@buster/access-controls'; +import { waitForPendingUpdates } from '@buster/database/queries'; import { type ModelMessage, hasToolCall, stepCountIs, streamText } from 'ai'; import { wrapTraced } from 'braintrust'; import z from 'zod'; @@ -199,8 +200,10 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) { hasToolResults: !!event.toolResults, }); }, - onFinish: () => { + onFinish: async () => { console.info('Analyst Agent finished'); + // Ensure all pending database updates complete before stream terminates + await waitForPendingUpdates(analystAgentOptions.messageId); }, }), { diff --git a/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts b/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts index cdf89bda9..feac20378 100644 --- a/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts +++ b/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts @@ -1,4 +1,5 @@ import type { PermissionedDataset } from '@buster/access-controls'; +import { waitForPendingUpdates } from '@buster/database/queries'; import { type ModelMessage, hasToolCall, stepCountIs, streamText } from 'ai'; import { wrapTraced } from 'braintrust'; import z from 'zod'; @@ -230,6 +231,11 @@ export function createThinkAndPrepAgent(thinkAndPrepAgentSchema: ThinkAndPrepAge hasToolResults: !!event.toolResults, }); }, + onFinish: async () => { + console.info('Think and Prep Agent finished'); + // Ensure all pending database updates complete before stream terminates + await waitForPendingUpdates(messageId); + }, }), { name: 'Think and Prep Agent', diff --git a/packages/ai/src/tools/communication-tools/done-tool/done-tool-execute.ts b/packages/ai/src/tools/communication-tools/done-tool/done-tool-execute.ts index 40b2a22b3..fbd7062b7 100644 --- a/packages/ai/src/tools/communication-tools/done-tool/done-tool-execute.ts +++ b/packages/ai/src/tools/communication-tools/done-tool/done-tool-execute.ts @@ -1,4 +1,8 @@ -import { updateMessage, updateMessageEntries } from '@buster/database/queries'; +import { + updateMessage, + updateMessageEntries, + waitForPendingUpdates, +} from '@buster/database/queries'; import { wrapTraced } from 'braintrust'; import { cleanupState } from '../../shared/cleanup-state'; import { createRawToolResultEntry } from '../../shared/create-raw-llm-tool-result-entry'; @@ -57,6 +61,10 @@ export function createDoneToolExecute(context: DoneToolContext, state: DoneToolS } const result = await processDone(state, state.toolCallId, context.messageId, context); + + // Wait for all pending updates from delta/finish to complete before returning + await waitForPendingUpdates(context.messageId); + cleanupState(state); return result; }, diff --git a/packages/database/src/queries/messages/update-message-entries.ts b/packages/database/src/queries/messages/update-message-entries.ts index 3b56d6d3c..b7f705360 100644 --- a/packages/database/src/queries/messages/update-message-entries.ts +++ b/packages/database/src/queries/messages/update-message-entries.ts @@ -24,6 +24,17 @@ export type UpdateMessageEntriesParams = z.infer>(); +/** + * Wait for all pending updates for a given messageId to complete. + * This ensures all queued updates are flushed to the database before proceeding. + */ +export async function waitForPendingUpdates(messageId: string): Promise { + const pendingQueue = updateQueues.get(messageId); + if (pendingQueue) { + await pendingQueue; + } +} + /** * Internal function that performs the actual update logic. * This is separated so it can be queued.