mirror of https://github.com/buster-so/buster.git
Merge branch 'staging' into big-nate-bus-1959-public-share-url-for-a-chat-doesnt-include-chat-id
This commit is contained in:
commit
b883d1a746
|
@ -1,4 +1,5 @@
|
|||
import type { PermissionedDataset } from '@buster/access-controls';
|
||||
import { waitForPendingUpdates } from '@buster/database/queries';
|
||||
import { type ModelMessage, hasToolCall, stepCountIs, streamText } from 'ai';
|
||||
import { wrapTraced } from 'braintrust';
|
||||
import z from 'zod';
|
||||
|
@ -199,8 +200,10 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) {
|
|||
hasToolResults: !!event.toolResults,
|
||||
});
|
||||
},
|
||||
onFinish: () => {
|
||||
onFinish: async () => {
|
||||
console.info('Analyst Agent finished');
|
||||
// Ensure all pending database updates complete before stream terminates
|
||||
await waitForPendingUpdates(analystAgentOptions.messageId);
|
||||
},
|
||||
}),
|
||||
{
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import type { PermissionedDataset } from '@buster/access-controls';
|
||||
import { waitForPendingUpdates } from '@buster/database/queries';
|
||||
import { type ModelMessage, hasToolCall, stepCountIs, streamText } from 'ai';
|
||||
import { wrapTraced } from 'braintrust';
|
||||
import z from 'zod';
|
||||
|
@ -230,6 +231,11 @@ export function createThinkAndPrepAgent(thinkAndPrepAgentSchema: ThinkAndPrepAge
|
|||
hasToolResults: !!event.toolResults,
|
||||
});
|
||||
},
|
||||
onFinish: async () => {
|
||||
console.info('Think and Prep Agent finished');
|
||||
// Ensure all pending database updates complete before stream terminates
|
||||
await waitForPendingUpdates(messageId);
|
||||
},
|
||||
}),
|
||||
{
|
||||
name: 'Think and Prep Agent',
|
||||
|
|
|
@ -1,4 +1,8 @@
|
|||
import { updateMessage, updateMessageEntries } from '@buster/database/queries';
|
||||
import {
|
||||
updateMessage,
|
||||
updateMessageEntries,
|
||||
waitForPendingUpdates,
|
||||
} from '@buster/database/queries';
|
||||
import { wrapTraced } from 'braintrust';
|
||||
import { cleanupState } from '../../shared/cleanup-state';
|
||||
import { createRawToolResultEntry } from '../../shared/create-raw-llm-tool-result-entry';
|
||||
|
@ -57,6 +61,10 @@ export function createDoneToolExecute(context: DoneToolContext, state: DoneToolS
|
|||
}
|
||||
|
||||
const result = await processDone(state, state.toolCallId, context.messageId, context);
|
||||
|
||||
// Wait for all pending updates from delta/finish to complete before returning
|
||||
await waitForPendingUpdates(context.messageId);
|
||||
|
||||
cleanupState(state);
|
||||
return result;
|
||||
},
|
||||
|
|
|
@ -24,6 +24,17 @@ export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSche
|
|||
// Simple in-memory queue for each messageId
|
||||
const updateQueues = new Map<string, Promise<{ success: boolean }>>();
|
||||
|
||||
/**
|
||||
* Wait for all pending updates for a given messageId to complete.
|
||||
* This ensures all queued updates are flushed to the database before proceeding.
|
||||
*/
|
||||
export async function waitForPendingUpdates(messageId: string): Promise<void> {
|
||||
const pendingQueue = updateQueues.get(messageId);
|
||||
if (pendingQueue) {
|
||||
await pendingQueue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal function that performs the actual update logic.
|
||||
* This is separated so it can be queued.
|
||||
|
|
Loading…
Reference in New Issue