streaming fix - need to wait for pending updates

This commit is contained in:
dal 2025-09-26 12:59:48 -06:00
parent 4b80fe7a66
commit 0b17429e10
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
4 changed files with 26 additions and 2 deletions

View File

@ -1,4 +1,5 @@
import type { PermissionedDataset } from '@buster/access-controls';
import { waitForPendingUpdates } from '@buster/database/queries';
import { type ModelMessage, hasToolCall, stepCountIs, streamText } from 'ai';
import { wrapTraced } from 'braintrust';
import z from 'zod';
@ -199,8 +200,10 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) {
hasToolResults: !!event.toolResults,
});
},
onFinish: () => {
onFinish: async () => {
console.info('Analyst Agent finished');
// Ensure all pending database updates complete before stream terminates
await waitForPendingUpdates(analystAgentOptions.messageId);
},
}),
{

View File

@ -1,4 +1,5 @@
import type { PermissionedDataset } from '@buster/access-controls';
import { waitForPendingUpdates } from '@buster/database/queries';
import { type ModelMessage, hasToolCall, stepCountIs, streamText } from 'ai';
import { wrapTraced } from 'braintrust';
import z from 'zod';
@ -230,6 +231,11 @@ export function createThinkAndPrepAgent(thinkAndPrepAgentSchema: ThinkAndPrepAge
hasToolResults: !!event.toolResults,
});
},
onFinish: async () => {
console.info('Think and Prep Agent finished');
// Ensure all pending database updates complete before stream terminates
await waitForPendingUpdates(messageId);
},
}),
{
name: 'Think and Prep Agent',

View File

@ -1,4 +1,4 @@
import { updateMessage, updateMessageEntries } from '@buster/database/queries';
import { updateMessage, updateMessageEntries, waitForPendingUpdates } from '@buster/database/queries';
import { wrapTraced } from 'braintrust';
import { cleanupState } from '../../shared/cleanup-state';
import { createRawToolResultEntry } from '../../shared/create-raw-llm-tool-result-entry';
@ -57,6 +57,10 @@ export function createDoneToolExecute(context: DoneToolContext, state: DoneToolS
}
const result = await processDone(state, state.toolCallId, context.messageId, context);
// Wait for all pending updates from delta/finish to complete before returning
await waitForPendingUpdates(context.messageId);
cleanupState(state);
return result;
},

View File

@ -24,6 +24,17 @@ export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSche
// Simple in-memory queue for each messageId
const updateQueues = new Map<string, Promise<{ success: boolean }>>();
/**
* Wait for all pending updates for a given messageId to complete.
* This ensures all queued updates are flushed to the database before proceeding.
*/
export async function waitForPendingUpdates(messageId: string): Promise<void> {
const pendingQueue = updateQueues.get(messageId);
if (pendingQueue) {
await pendingQueue;
}
}
/**
* Internal function that performs the actual update logic.
* This is separated so it can be queued.