Merge pull request #1183 from buster-so/dallin-bus-1971-final-response-in-angel-studios-chat-got-cut-off-before

dallin bus 1971 final response in angel studios chat got cut off before
This commit is contained in:
dal 2025-09-26 13:08:59 -06:00 committed by GitHub
commit 355149358f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 30 additions and 2 deletions

View File

@ -1,4 +1,5 @@
import type { PermissionedDataset } from '@buster/access-controls';
import { waitForPendingUpdates } from '@buster/database/queries';
import { type ModelMessage, hasToolCall, stepCountIs, streamText } from 'ai';
import { wrapTraced } from 'braintrust';
import z from 'zod';
@ -199,8 +200,10 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) {
hasToolResults: !!event.toolResults,
});
},
onFinish: () => {
onFinish: async () => {
console.info('Analyst Agent finished');
// Ensure all pending database updates complete before stream terminates
await waitForPendingUpdates(analystAgentOptions.messageId);
},
}),
{

View File

@ -1,4 +1,5 @@
import type { PermissionedDataset } from '@buster/access-controls';
import { waitForPendingUpdates } from '@buster/database/queries';
import { type ModelMessage, hasToolCall, stepCountIs, streamText } from 'ai';
import { wrapTraced } from 'braintrust';
import z from 'zod';
@ -230,6 +231,11 @@ export function createThinkAndPrepAgent(thinkAndPrepAgentSchema: ThinkAndPrepAge
hasToolResults: !!event.toolResults,
});
},
onFinish: async () => {
console.info('Think and Prep Agent finished');
// Ensure all pending database updates complete before stream terminates
await waitForPendingUpdates(messageId);
},
}),
{
name: 'Think and Prep Agent',

View File

@ -1,4 +1,8 @@
import { updateMessage, updateMessageEntries } from '@buster/database/queries';
import {
updateMessage,
updateMessageEntries,
waitForPendingUpdates,
} from '@buster/database/queries';
import { wrapTraced } from 'braintrust';
import { cleanupState } from '../../shared/cleanup-state';
import { createRawToolResultEntry } from '../../shared/create-raw-llm-tool-result-entry';
@ -57,6 +61,10 @@ export function createDoneToolExecute(context: DoneToolContext, state: DoneToolS
}
const result = await processDone(state, state.toolCallId, context.messageId, context);
// Wait for all pending updates from delta/finish to complete before returning
await waitForPendingUpdates(context.messageId);
cleanupState(state);
return result;
},

View File

@ -24,6 +24,17 @@ export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSche
// Simple in-memory queue for each messageId
const updateQueues = new Map<string, Promise<{ success: boolean }>>();
/**
* Wait for all pending updates for a given messageId to complete.
* This ensures all queued updates are flushed to the database before proceeding.
*/
export async function waitForPendingUpdates(messageId: string): Promise<void> {
const pendingQueue = updateQueues.get(messageId);
if (pendingQueue) {
await pendingQueue;
}
}
/**
* Internal function that performs the actual update logic.
* This is separated so it can be queued.