mirror of https://github.com/buster-so/buster.git
fixing the report and response messages being cutoff
This commit is contained in:
parent
977f5f0f35
commit
84ce78a95e
|
@ -29,11 +29,7 @@ export function createDoneToolDelta(context: DoneToolContext, doneToolState: Don
|
|||
return async function doneToolDelta(
|
||||
options: { inputTextDelta: string } & ToolCallOptions
|
||||
): Promise<void> {
|
||||
if (doneToolState.isFinalizing) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (isMessageUpdateQueueClosed(context.messageId)) {
|
||||
if (doneToolState.isFinalizing || isMessageUpdateQueueClosed(context.messageId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -79,6 +79,7 @@ async function processDone(
|
|||
// Factory function that creates the execute function with proper context typing
|
||||
const updateMessage = databaseQueries.updateMessage;
|
||||
const updateMessageEntries = databaseQueries.updateMessageEntries;
|
||||
const closeMessageUpdateQueue = databaseQueries.closeMessageUpdateQueue;
|
||||
const waitForPendingUpdates =
|
||||
databaseQueries.waitForPendingUpdates ?? (async (_messageId: string) => {});
|
||||
|
||||
|
@ -90,11 +91,8 @@ export function createDoneToolExecute(context: DoneToolContext, state: DoneToolS
|
|||
}
|
||||
|
||||
state.isFinalizing = true;
|
||||
// Part of temporary solution: wait for 300ms after state is set to isFinalizing to block new requests and allow current pending requests to complete
|
||||
await new Promise((resolve) => setTimeout(resolve, 300));
|
||||
// CRITICAL: Wait for ALL pending updates from delta/finish to complete FIRST
|
||||
// This ensures execute's update is always the last one in the queue
|
||||
if (typeof state.latestSequenceNumber === 'number') {
|
||||
closeMessageUpdateQueue(context.messageId);
|
||||
if (state.latestSequenceNumber) {
|
||||
await waitForPendingUpdates(context.messageId, {
|
||||
upToSequence: state.latestSequenceNumber,
|
||||
});
|
||||
|
@ -118,13 +116,7 @@ export function createDoneToolExecute(context: DoneToolContext, state: DoneToolS
|
|||
state.finalSequenceNumber = sequenceNumber;
|
||||
}
|
||||
|
||||
if (typeof state.finalSequenceNumber === 'number') {
|
||||
await waitForPendingUpdates(context.messageId, {
|
||||
upToSequence: state.finalSequenceNumber,
|
||||
});
|
||||
} else {
|
||||
await waitForPendingUpdates(context.messageId);
|
||||
}
|
||||
await waitForPendingUpdates(context.messageId);
|
||||
|
||||
cleanupState(state);
|
||||
return output;
|
||||
|
|
|
@ -21,6 +21,7 @@ vi.mock('@buster/database/queries', () => ({
|
|||
waitForPendingUpdates: vi.fn().mockResolvedValue(undefined),
|
||||
isMessageUpdateQueueClosed: vi.fn().mockReturnValue(false),
|
||||
getAssetLatestVersion: vi.fn().mockResolvedValue(1),
|
||||
closeMessageUpdateQueue: vi.fn(),
|
||||
}));
|
||||
|
||||
// Import mocked functions after the mock definition
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
export function cleanupState(state: Record<string, unknown>): void {
|
||||
// Clear all properties except toolCallId (might be needed for logging)
|
||||
Object.keys(state).forEach((key) => {
|
||||
if (key !== 'toolCallId') {
|
||||
if (key !== 'toolCallId' && key !== 'isFinalizing') {
|
||||
state[key] = undefined;
|
||||
}
|
||||
});
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
import { randomUUID } from 'node:crypto';
|
||||
import { db } from '@buster/database/connection';
|
||||
import { updateMessageEntries, updateReportContent } from '@buster/database/queries';
|
||||
import {
|
||||
isReportUpdateQueueClosed,
|
||||
updateMessageEntries,
|
||||
updateReportContent,
|
||||
updateReportWithVersion,
|
||||
} from '@buster/database/queries';
|
||||
import { assetPermissions, reportFiles } from '@buster/database/schema';
|
||||
import type { ToolCallOptions } from 'ai';
|
||||
import {
|
||||
|
@ -48,6 +53,9 @@ function createInitialReportVersionHistory(content: string, createdAt: string):
|
|||
|
||||
export function createCreateReportsDelta(context: CreateReportsContext, state: CreateReportsState) {
|
||||
return async (options: { inputTextDelta: string } & ToolCallOptions) => {
|
||||
if (state.file?.id && isReportUpdateQueueClosed(state.file.id)) {
|
||||
return;
|
||||
}
|
||||
// Handle string deltas (accumulate JSON text)
|
||||
state.argsText = (state.argsText || '') + options.inputTextDelta;
|
||||
|
||||
|
@ -167,9 +175,19 @@ export function createCreateReportsDelta(context: CreateReportsContext, state: C
|
|||
// Update report content if we have content
|
||||
if (content && reportId) {
|
||||
try {
|
||||
await updateReportContent({
|
||||
reportId: reportId,
|
||||
content: content,
|
||||
const now = new Date().toISOString();
|
||||
const versionHistory = {
|
||||
'1': {
|
||||
content,
|
||||
updated_at: now,
|
||||
version_number: 1,
|
||||
},
|
||||
};
|
||||
await updateReportWithVersion({
|
||||
reportId,
|
||||
content,
|
||||
name,
|
||||
versionHistory,
|
||||
});
|
||||
|
||||
// Keep the file status as 'loading' during streaming
|
||||
|
|
|
@ -11,6 +11,8 @@ vi.mock('@buster/database/queries', () => ({
|
|||
updateMessageEntries: vi.fn().mockResolvedValue({ success: true }),
|
||||
updateReportWithVersion: vi.fn().mockResolvedValue(undefined),
|
||||
updateMetricsToReports: vi.fn().mockResolvedValue({ created: 0, updated: 0, deleted: 0 }),
|
||||
closeReportUpdateQueue: vi.fn(),
|
||||
waitForPendingReportUpdates: vi.fn().mockResolvedValue(undefined),
|
||||
}));
|
||||
|
||||
vi.mock('./helpers/create-reports-tool-transform-helper', () => ({
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
import {
|
||||
closeReportUpdateQueue,
|
||||
updateMessageEntries,
|
||||
updateMetricsToReports,
|
||||
updateReportWithVersion,
|
||||
waitForPendingReportUpdates,
|
||||
} from '@buster/database/queries';
|
||||
import type { ChatMessageResponseMessage } from '@buster/server-shared/chats';
|
||||
import { wrapTraced } from 'braintrust';
|
||||
|
@ -81,6 +83,9 @@ export function createCreateReportsExecute(
|
|||
) {
|
||||
return wrapTraced(
|
||||
async (input: CreateReportsInput): Promise<CreateReportsOutput> => {
|
||||
if (state.file?.id) {
|
||||
closeReportUpdateQueue(state.file.id);
|
||||
}
|
||||
const startTime = Date.now();
|
||||
|
||||
try {
|
||||
|
@ -146,12 +151,18 @@ export function createCreateReportsExecute(
|
|||
};
|
||||
|
||||
// Update the report with complete content from input (source of truth)
|
||||
await updateReportWithVersion({
|
||||
reportId,
|
||||
content,
|
||||
name,
|
||||
versionHistory,
|
||||
});
|
||||
await updateReportWithVersion(
|
||||
{
|
||||
reportId,
|
||||
content,
|
||||
name,
|
||||
versionHistory,
|
||||
},
|
||||
{
|
||||
isFinal: true,
|
||||
}
|
||||
);
|
||||
await waitForPendingReportUpdates(reportId);
|
||||
|
||||
// Update cache with the newly created report content
|
||||
updateCachedSnapshot(reportId, content, versionHistory);
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
import { db } from '@buster/database/connection';
|
||||
import {
|
||||
isReportUpdateQueueClosed,
|
||||
reopenReportUpdateQueue,
|
||||
updateMessageEntries,
|
||||
updateReportWithVersion,
|
||||
waitForPendingReportUpdates,
|
||||
|
@ -41,6 +43,10 @@ const TOOL_KEYS = {
|
|||
|
||||
export function createModifyReportsDelta(context: ModifyReportsContext, state: ModifyReportsState) {
|
||||
return async (options: { inputTextDelta: string } & ToolCallOptions) => {
|
||||
if (state.reportId && isReportUpdateQueueClosed(state.reportId)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Handle string deltas (accumulate JSON text)
|
||||
state.argsText = (state.argsText || '') + options.inputTextDelta;
|
||||
|
||||
|
@ -56,6 +62,10 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
|
|||
TOOL_KEYS.edits,
|
||||
[]
|
||||
);
|
||||
if (id && state.firstDelta) {
|
||||
state.firstDelta = false;
|
||||
reopenReportUpdateQueue(id);
|
||||
}
|
||||
|
||||
// Validate that we have a complete UUID before processing
|
||||
// UUID format: 8-4-4-4-12 characters (36 total with hyphens)
|
||||
|
|
|
@ -17,6 +17,7 @@ vi.mock('@buster/database/queries', () => ({
|
|||
updateReportWithVersion: vi.fn().mockResolvedValue(undefined),
|
||||
updateMetricsToReports: vi.fn().mockResolvedValue({ created: 0, updated: 0, deleted: 0 }),
|
||||
waitForPendingReportUpdates: vi.fn().mockResolvedValue(undefined),
|
||||
closeReportUpdateQueue: vi.fn(),
|
||||
}));
|
||||
vi.mock('@buster/database/schema', () => ({
|
||||
reportFiles: {},
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import { db } from '@buster/database/connection';
|
||||
import {
|
||||
closeReportUpdateQueue,
|
||||
updateMessageEntries,
|
||||
updateMetricsToReports,
|
||||
waitForPendingReportUpdates,
|
||||
|
@ -199,12 +200,17 @@ async function processEditOperations(
|
|||
}
|
||||
}
|
||||
|
||||
await updateReportWithVersion({
|
||||
reportId,
|
||||
content: currentContent,
|
||||
name: reportName,
|
||||
versionHistory: newVersionHistory,
|
||||
});
|
||||
await updateReportWithVersion(
|
||||
{
|
||||
reportId,
|
||||
content: currentContent,
|
||||
name: reportName,
|
||||
versionHistory: newVersionHistory,
|
||||
},
|
||||
{
|
||||
isFinal: true,
|
||||
}
|
||||
);
|
||||
|
||||
// Wait for the database update to fully complete in the queue
|
||||
await waitForPendingReportUpdates(reportId);
|
||||
|
@ -410,6 +416,9 @@ export function createModifyReportsExecute(
|
|||
) {
|
||||
return wrapTraced(
|
||||
async (input: ModifyReportsInput): Promise<ModifyReportsOutput> => {
|
||||
if (input.id) {
|
||||
closeReportUpdateQueue(input.id);
|
||||
}
|
||||
const startTime = Date.now();
|
||||
|
||||
try {
|
||||
|
|
|
@ -16,6 +16,7 @@ export function modifyReportsStart(context: ModifyReportsContext, state: ModifyR
|
|||
state.startTime = Date.now();
|
||||
state.responseMessageCreated = false;
|
||||
state.snapshotContent = undefined;
|
||||
state.firstDelta = true;
|
||||
|
||||
if (context.messageId) {
|
||||
try {
|
||||
|
|
|
@ -95,6 +95,7 @@ const ModifyReportsStateSchema = z.object({
|
|||
)
|
||||
.optional(),
|
||||
isComplete: z.boolean().optional().describe('Whether the tool execution is complete'),
|
||||
firstDelta: z.boolean().optional().describe('Whether this is the first delta'),
|
||||
});
|
||||
|
||||
// Export types
|
||||
|
@ -123,6 +124,7 @@ export function createModifyReportsTool(context: ModifyReportsContext) {
|
|||
responseMessageCreated: false,
|
||||
snapshotContent: undefined,
|
||||
reportModifiedInMessage: false,
|
||||
firstDelta: true,
|
||||
};
|
||||
|
||||
// Create all functions with the context and state passed
|
||||
|
|
|
@ -70,18 +70,6 @@ function getOrCreateQueueState(messageId: string): MessageUpdateQueueState {
|
|||
return initialState;
|
||||
}
|
||||
|
||||
// function cleanupQueueIfIdle(messageId: string, state: MessageUpdateQueueState): void {
|
||||
// if (
|
||||
// state.closed &&
|
||||
// state.finalSequence !== undefined &&
|
||||
// state.lastCompletedSequence >= state.finalSequence &&
|
||||
// state.pending.size === 0
|
||||
// ) {
|
||||
// console.info('[cleanupQueueIfIdle] CLEANING UP QUEUE');
|
||||
// updateQueues.delete(messageId);
|
||||
// }
|
||||
// }
|
||||
|
||||
export function isMessageUpdateQueueClosed(messageId: string): boolean {
|
||||
const queue = updateQueues.get(messageId);
|
||||
return queue?.closed ?? false;
|
||||
|
@ -108,7 +96,6 @@ export async function waitForPendingUpdates(
|
|||
|
||||
if (targetSequence === undefined) {
|
||||
await queue.tailPromise;
|
||||
// cleanupQueueIfIdle(messageId, queue);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -116,7 +103,6 @@ export async function waitForPendingUpdates(
|
|||
const effectiveTarget = Math.min(targetSequence, maxKnownSequence);
|
||||
|
||||
if (effectiveTarget <= queue.lastCompletedSequence) {
|
||||
// cleanupQueueIfIdle(messageId, queue);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -134,8 +120,13 @@ export async function waitForPendingUpdates(
|
|||
} else {
|
||||
await queue.tailPromise;
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupQueueIfIdle(messageId, queue);
|
||||
export function closeMessageUpdateQueue(messageId: string): void {
|
||||
const queue = updateQueues.get(messageId);
|
||||
if (queue) {
|
||||
queue.closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -236,8 +227,9 @@ export async function updateMessageEntries(
|
|||
const { messageId } = params;
|
||||
|
||||
const queue = getOrCreateQueueState(messageId);
|
||||
const isFinal = options?.isFinal ?? false;
|
||||
|
||||
if (queue.closed) {
|
||||
if (!isFinal && queue.closed) {
|
||||
const lastKnownSequence = queue.finalSequence ?? queue.nextSequence - 1;
|
||||
return {
|
||||
success: false,
|
||||
|
@ -246,12 +238,6 @@ export async function updateMessageEntries(
|
|||
};
|
||||
}
|
||||
|
||||
const isFinal = options?.isFinal ?? false;
|
||||
|
||||
if (isFinal) {
|
||||
queue.closed = true;
|
||||
}
|
||||
|
||||
const sequenceNumber = queue.nextSequence;
|
||||
queue.nextSequence += 1;
|
||||
|
||||
|
@ -273,7 +259,6 @@ export async function updateMessageEntries(
|
|||
if (isFinal) {
|
||||
queue.finalSequence = sequenceNumber;
|
||||
}
|
||||
// cleanupQueueIfIdle(messageId, queue);
|
||||
return success;
|
||||
};
|
||||
|
||||
|
|
|
@ -62,7 +62,7 @@ type ReportUpdateQueueState = {
|
|||
|
||||
const updateQueues = new Map<string, ReportUpdateQueueState>();
|
||||
|
||||
function getOrCreateQueueState(reportId: string): ReportUpdateQueueState {
|
||||
function getOrCreateQueueState(reportId: string, isFinal?: boolean): ReportUpdateQueueState {
|
||||
const existing = updateQueues.get(reportId);
|
||||
if (existing) {
|
||||
return existing;
|
||||
|
@ -73,29 +73,26 @@ function getOrCreateQueueState(reportId: string): ReportUpdateQueueState {
|
|||
nextSequence: 0,
|
||||
pending: new Map(),
|
||||
lastCompletedSequence: -1,
|
||||
closed: false,
|
||||
// If it is final, the queue should be closed
|
||||
closed: isFinal ?? false,
|
||||
};
|
||||
|
||||
updateQueues.set(reportId, initialState);
|
||||
return initialState;
|
||||
}
|
||||
|
||||
function cleanupQueueIfIdle(reportId: string, state: ReportUpdateQueueState): void {
|
||||
if (
|
||||
state.closed &&
|
||||
state.finalSequence !== undefined &&
|
||||
state.lastCompletedSequence >= state.finalSequence &&
|
||||
state.pending.size === 0
|
||||
) {
|
||||
updateQueues.delete(reportId);
|
||||
}
|
||||
}
|
||||
|
||||
export function isReportUpdateQueueClosed(reportId: string): boolean {
|
||||
const queue = updateQueues.get(reportId);
|
||||
return queue?.closed ?? false;
|
||||
}
|
||||
|
||||
export function reopenReportUpdateQueue(reportId: string): void {
|
||||
const queue = updateQueues.get(reportId);
|
||||
if (queue) {
|
||||
queue.closed = false;
|
||||
}
|
||||
}
|
||||
|
||||
type WaitForPendingReportUpdateOptions = {
|
||||
upToSequence?: number;
|
||||
};
|
||||
|
@ -117,7 +114,6 @@ export async function waitForPendingReportUpdates(
|
|||
|
||||
if (targetSequence === undefined) {
|
||||
await queue.tailPromise;
|
||||
cleanupQueueIfIdle(reportId, queue);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -125,7 +121,6 @@ export async function waitForPendingReportUpdates(
|
|||
const effectiveTarget = Math.min(targetSequence, maxKnownSequence);
|
||||
|
||||
if (effectiveTarget <= queue.lastCompletedSequence) {
|
||||
cleanupQueueIfIdle(reportId, queue);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -143,8 +138,6 @@ export async function waitForPendingReportUpdates(
|
|||
} else {
|
||||
await queue.tailPromise;
|
||||
}
|
||||
|
||||
cleanupQueueIfIdle(reportId, queue);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -191,6 +184,13 @@ async function performUpdate(params: BatchUpdateReportInput): Promise<void> {
|
|||
}
|
||||
}
|
||||
|
||||
export function closeReportUpdateQueue(reportId: string): void {
|
||||
const queue = updateQueues.get(reportId);
|
||||
if (queue) {
|
||||
queue.closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates a report's content, name, and version history in a single operation.
|
||||
* Updates are queued per reportId to ensure they execute in order.
|
||||
|
@ -209,9 +209,10 @@ export const updateReportWithVersion = async (
|
|||
options?: UpdateReportWithVersionOptions
|
||||
): Promise<UpdateReportWithVersionResult> => {
|
||||
const { reportId } = params;
|
||||
const queue = getOrCreateQueueState(reportId);
|
||||
const isFinal = options?.isFinal ?? false;
|
||||
const queue = getOrCreateQueueState(reportId, isFinal);
|
||||
|
||||
if (queue.closed) {
|
||||
if (!isFinal && queue.closed) {
|
||||
const lastKnownSequence = queue.finalSequence ?? queue.nextSequence - 1;
|
||||
return {
|
||||
sequenceNumber: lastKnownSequence >= 0 ? lastKnownSequence : -1,
|
||||
|
@ -219,12 +220,6 @@ export const updateReportWithVersion = async (
|
|||
};
|
||||
}
|
||||
|
||||
const isFinal = options?.isFinal ?? false;
|
||||
|
||||
if (isFinal) {
|
||||
queue.closed = true;
|
||||
}
|
||||
|
||||
const sequenceNumber = queue.nextSequence;
|
||||
queue.nextSequence += 1;
|
||||
|
||||
|
@ -246,7 +241,6 @@ export const updateReportWithVersion = async (
|
|||
if (isFinal) {
|
||||
queue.finalSequence = sequenceNumber;
|
||||
}
|
||||
cleanupQueueIfIdle(reportId, queue);
|
||||
};
|
||||
|
||||
const resultPromise = runPromise
|
||||
|
|
Loading…
Reference in New Issue