Merge pull request #1253 from buster-so/fixCutOffResponseMessage

fixing the report and response messages being cutoff
This commit is contained in:
wellsbunk5 2025-10-03 13:21:44 -06:00 committed by GitHub
commit 07c9c1354b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 105 additions and 83 deletions

View File

@ -29,11 +29,7 @@ export function createDoneToolDelta(context: DoneToolContext, doneToolState: Don
return async function doneToolDelta(
options: { inputTextDelta: string } & ToolCallOptions
): Promise<void> {
if (doneToolState.isFinalizing) {
return;
}
if (isMessageUpdateQueueClosed(context.messageId)) {
if (doneToolState.isFinalizing || isMessageUpdateQueueClosed(context.messageId)) {
return;
}

View File

@ -79,6 +79,7 @@ async function processDone(
// Factory function that creates the execute function with proper context typing
const updateMessage = databaseQueries.updateMessage;
const updateMessageEntries = databaseQueries.updateMessageEntries;
const closeMessageUpdateQueue = databaseQueries.closeMessageUpdateQueue;
const waitForPendingUpdates =
databaseQueries.waitForPendingUpdates ?? (async (_messageId: string) => {});
@ -90,11 +91,8 @@ export function createDoneToolExecute(context: DoneToolContext, state: DoneToolS
}
state.isFinalizing = true;
// Part of temporary solution: wait for 300ms after state is set to isFinalizing to block new requests and allow current pending requests to complete
await new Promise((resolve) => setTimeout(resolve, 300));
// CRITICAL: Wait for ALL pending updates from delta/finish to complete FIRST
// This ensures execute's update is always the last one in the queue
if (typeof state.latestSequenceNumber === 'number') {
closeMessageUpdateQueue(context.messageId);
if (state.latestSequenceNumber) {
await waitForPendingUpdates(context.messageId, {
upToSequence: state.latestSequenceNumber,
});
@ -118,13 +116,7 @@ export function createDoneToolExecute(context: DoneToolContext, state: DoneToolS
state.finalSequenceNumber = sequenceNumber;
}
if (typeof state.finalSequenceNumber === 'number') {
await waitForPendingUpdates(context.messageId, {
upToSequence: state.finalSequenceNumber,
});
} else {
await waitForPendingUpdates(context.messageId);
}
await waitForPendingUpdates(context.messageId);
cleanupState(state);
return output;

View File

@ -21,6 +21,7 @@ vi.mock('@buster/database/queries', () => ({
waitForPendingUpdates: vi.fn().mockResolvedValue(undefined),
isMessageUpdateQueueClosed: vi.fn().mockReturnValue(false),
getAssetLatestVersion: vi.fn().mockResolvedValue(1),
closeMessageUpdateQueue: vi.fn(),
}));
// Import mocked functions after the mock definition

View File

@ -5,7 +5,7 @@
export function cleanupState(state: Record<string, unknown>): void {
// Clear all properties except toolCallId (might be needed for logging)
Object.keys(state).forEach((key) => {
if (key !== 'toolCallId') {
if (key !== 'toolCallId' && key !== 'isFinalizing') {
state[key] = undefined;
}
});

View File

@ -1,6 +1,11 @@
import { randomUUID } from 'node:crypto';
import { db } from '@buster/database/connection';
import { updateMessageEntries, updateReportContent } from '@buster/database/queries';
import {
isReportUpdateQueueClosed,
updateMessageEntries,
updateReportContent,
updateReportWithVersion,
} from '@buster/database/queries';
import { assetPermissions, reportFiles } from '@buster/database/schema';
import type { ToolCallOptions } from 'ai';
import {
@ -48,6 +53,9 @@ function createInitialReportVersionHistory(content: string, createdAt: string):
export function createCreateReportsDelta(context: CreateReportsContext, state: CreateReportsState) {
return async (options: { inputTextDelta: string } & ToolCallOptions) => {
if (state.file?.id && isReportUpdateQueueClosed(state.file.id)) {
return;
}
// Handle string deltas (accumulate JSON text)
state.argsText = (state.argsText || '') + options.inputTextDelta;
@ -167,9 +175,19 @@ export function createCreateReportsDelta(context: CreateReportsContext, state: C
// Update report content if we have content
if (content && reportId) {
try {
await updateReportContent({
reportId: reportId,
content: content,
const now = new Date().toISOString();
const versionHistory = {
'1': {
content,
updated_at: now,
version_number: 1,
},
};
await updateReportWithVersion({
reportId,
content,
name,
versionHistory,
});
// Keep the file status as 'loading' during streaming

View File

@ -11,6 +11,8 @@ vi.mock('@buster/database/queries', () => ({
updateMessageEntries: vi.fn().mockResolvedValue({ success: true }),
updateReportWithVersion: vi.fn().mockResolvedValue(undefined),
updateMetricsToReports: vi.fn().mockResolvedValue({ created: 0, updated: 0, deleted: 0 }),
closeReportUpdateQueue: vi.fn(),
waitForPendingReportUpdates: vi.fn().mockResolvedValue(undefined),
}));
vi.mock('./helpers/create-reports-tool-transform-helper', () => ({

View File

@ -1,7 +1,9 @@
import {
closeReportUpdateQueue,
updateMessageEntries,
updateMetricsToReports,
updateReportWithVersion,
waitForPendingReportUpdates,
} from '@buster/database/queries';
import type { ChatMessageResponseMessage } from '@buster/server-shared/chats';
import { wrapTraced } from 'braintrust';
@ -81,6 +83,9 @@ export function createCreateReportsExecute(
) {
return wrapTraced(
async (input: CreateReportsInput): Promise<CreateReportsOutput> => {
if (state.file?.id) {
closeReportUpdateQueue(state.file.id);
}
const startTime = Date.now();
try {
@ -146,12 +151,18 @@ export function createCreateReportsExecute(
};
// Update the report with complete content from input (source of truth)
await updateReportWithVersion({
reportId,
content,
name,
versionHistory,
});
await updateReportWithVersion(
{
reportId,
content,
name,
versionHistory,
},
{
isFinal: true,
}
);
await waitForPendingReportUpdates(reportId);
// Update cache with the newly created report content
updateCachedSnapshot(reportId, content, versionHistory);

View File

@ -1,5 +1,7 @@
import { db } from '@buster/database/connection';
import {
isReportUpdateQueueClosed,
reopenReportUpdateQueue,
updateMessageEntries,
updateReportWithVersion,
waitForPendingReportUpdates,
@ -41,6 +43,10 @@ const TOOL_KEYS = {
export function createModifyReportsDelta(context: ModifyReportsContext, state: ModifyReportsState) {
return async (options: { inputTextDelta: string } & ToolCallOptions) => {
if (state.reportId && isReportUpdateQueueClosed(state.reportId)) {
return;
}
// Handle string deltas (accumulate JSON text)
state.argsText = (state.argsText || '') + options.inputTextDelta;
@ -56,6 +62,10 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
TOOL_KEYS.edits,
[]
);
if (id && state.firstDelta) {
state.firstDelta = false;
reopenReportUpdateQueue(id);
}
// Validate that we have a complete UUID before processing
// UUID format: 8-4-4-4-12 characters (36 total with hyphens)

View File

@ -17,6 +17,7 @@ vi.mock('@buster/database/queries', () => ({
updateReportWithVersion: vi.fn().mockResolvedValue(undefined),
updateMetricsToReports: vi.fn().mockResolvedValue({ created: 0, updated: 0, deleted: 0 }),
waitForPendingReportUpdates: vi.fn().mockResolvedValue(undefined),
closeReportUpdateQueue: vi.fn(),
}));
vi.mock('@buster/database/schema', () => ({
reportFiles: {},

View File

@ -1,5 +1,6 @@
import { db } from '@buster/database/connection';
import {
closeReportUpdateQueue,
updateMessageEntries,
updateMetricsToReports,
waitForPendingReportUpdates,
@ -199,12 +200,17 @@ async function processEditOperations(
}
}
await updateReportWithVersion({
reportId,
content: currentContent,
name: reportName,
versionHistory: newVersionHistory,
});
await updateReportWithVersion(
{
reportId,
content: currentContent,
name: reportName,
versionHistory: newVersionHistory,
},
{
isFinal: true,
}
);
// Wait for the database update to fully complete in the queue
await waitForPendingReportUpdates(reportId);
@ -410,6 +416,9 @@ export function createModifyReportsExecute(
) {
return wrapTraced(
async (input: ModifyReportsInput): Promise<ModifyReportsOutput> => {
if (input.id) {
closeReportUpdateQueue(input.id);
}
const startTime = Date.now();
try {

View File

@ -16,6 +16,7 @@ export function modifyReportsStart(context: ModifyReportsContext, state: ModifyR
state.startTime = Date.now();
state.responseMessageCreated = false;
state.snapshotContent = undefined;
state.firstDelta = true;
if (context.messageId) {
try {

View File

@ -95,6 +95,7 @@ const ModifyReportsStateSchema = z.object({
)
.optional(),
isComplete: z.boolean().optional().describe('Whether the tool execution is complete'),
firstDelta: z.boolean().optional().describe('Whether this is the first delta'),
});
// Export types
@ -123,6 +124,7 @@ export function createModifyReportsTool(context: ModifyReportsContext) {
responseMessageCreated: false,
snapshotContent: undefined,
reportModifiedInMessage: false,
firstDelta: true,
};
// Create all functions with the context and state passed

View File

@ -70,18 +70,6 @@ function getOrCreateQueueState(messageId: string): MessageUpdateQueueState {
return initialState;
}
// function cleanupQueueIfIdle(messageId: string, state: MessageUpdateQueueState): void {
// if (
// state.closed &&
// state.finalSequence !== undefined &&
// state.lastCompletedSequence >= state.finalSequence &&
// state.pending.size === 0
// ) {
// console.info('[cleanupQueueIfIdle] CLEANING UP QUEUE');
// updateQueues.delete(messageId);
// }
// }
export function isMessageUpdateQueueClosed(messageId: string): boolean {
const queue = updateQueues.get(messageId);
return queue?.closed ?? false;
@ -108,7 +96,6 @@ export async function waitForPendingUpdates(
if (targetSequence === undefined) {
await queue.tailPromise;
// cleanupQueueIfIdle(messageId, queue);
return;
}
@ -116,7 +103,6 @@ export async function waitForPendingUpdates(
const effectiveTarget = Math.min(targetSequence, maxKnownSequence);
if (effectiveTarget <= queue.lastCompletedSequence) {
// cleanupQueueIfIdle(messageId, queue);
return;
}
@ -134,8 +120,13 @@ export async function waitForPendingUpdates(
} else {
await queue.tailPromise;
}
}
// cleanupQueueIfIdle(messageId, queue);
export function closeMessageUpdateQueue(messageId: string): void {
const queue = updateQueues.get(messageId);
if (queue) {
queue.closed = true;
}
}
/**
@ -236,8 +227,9 @@ export async function updateMessageEntries(
const { messageId } = params;
const queue = getOrCreateQueueState(messageId);
const isFinal = options?.isFinal ?? false;
if (queue.closed) {
if (!isFinal && queue.closed) {
const lastKnownSequence = queue.finalSequence ?? queue.nextSequence - 1;
return {
success: false,
@ -246,12 +238,6 @@ export async function updateMessageEntries(
};
}
const isFinal = options?.isFinal ?? false;
if (isFinal) {
queue.closed = true;
}
const sequenceNumber = queue.nextSequence;
queue.nextSequence += 1;
@ -273,7 +259,6 @@ export async function updateMessageEntries(
if (isFinal) {
queue.finalSequence = sequenceNumber;
}
// cleanupQueueIfIdle(messageId, queue);
return success;
};

View File

@ -62,7 +62,7 @@ type ReportUpdateQueueState = {
const updateQueues = new Map<string, ReportUpdateQueueState>();
function getOrCreateQueueState(reportId: string): ReportUpdateQueueState {
function getOrCreateQueueState(reportId: string, isFinal?: boolean): ReportUpdateQueueState {
const existing = updateQueues.get(reportId);
if (existing) {
return existing;
@ -73,29 +73,26 @@ function getOrCreateQueueState(reportId: string): ReportUpdateQueueState {
nextSequence: 0,
pending: new Map(),
lastCompletedSequence: -1,
closed: false,
// If it is final, the queue should be closed
closed: isFinal ?? false,
};
updateQueues.set(reportId, initialState);
return initialState;
}
function cleanupQueueIfIdle(reportId: string, state: ReportUpdateQueueState): void {
if (
state.closed &&
state.finalSequence !== undefined &&
state.lastCompletedSequence >= state.finalSequence &&
state.pending.size === 0
) {
updateQueues.delete(reportId);
}
}
export function isReportUpdateQueueClosed(reportId: string): boolean {
const queue = updateQueues.get(reportId);
return queue?.closed ?? false;
}
export function reopenReportUpdateQueue(reportId: string): void {
const queue = updateQueues.get(reportId);
if (queue) {
queue.closed = false;
}
}
type WaitForPendingReportUpdateOptions = {
upToSequence?: number;
};
@ -117,7 +114,6 @@ export async function waitForPendingReportUpdates(
if (targetSequence === undefined) {
await queue.tailPromise;
cleanupQueueIfIdle(reportId, queue);
return;
}
@ -125,7 +121,6 @@ export async function waitForPendingReportUpdates(
const effectiveTarget = Math.min(targetSequence, maxKnownSequence);
if (effectiveTarget <= queue.lastCompletedSequence) {
cleanupQueueIfIdle(reportId, queue);
return;
}
@ -143,8 +138,6 @@ export async function waitForPendingReportUpdates(
} else {
await queue.tailPromise;
}
cleanupQueueIfIdle(reportId, queue);
}
/**
@ -191,6 +184,13 @@ async function performUpdate(params: BatchUpdateReportInput): Promise<void> {
}
}
export function closeReportUpdateQueue(reportId: string): void {
const queue = updateQueues.get(reportId);
if (queue) {
queue.closed = true;
}
}
/**
* Updates a report's content, name, and version history in a single operation.
* Updates are queued per reportId to ensure they execute in order.
@ -209,9 +209,10 @@ export const updateReportWithVersion = async (
options?: UpdateReportWithVersionOptions
): Promise<UpdateReportWithVersionResult> => {
const { reportId } = params;
const queue = getOrCreateQueueState(reportId);
const isFinal = options?.isFinal ?? false;
const queue = getOrCreateQueueState(reportId, isFinal);
if (queue.closed) {
if (!isFinal && queue.closed) {
const lastKnownSequence = queue.finalSequence ?? queue.nextSequence - 1;
return {
sequenceNumber: lastKnownSequence >= 0 ? lastKnownSequence : -1,
@ -219,12 +220,6 @@ export const updateReportWithVersion = async (
};
}
const isFinal = options?.isFinal ?? false;
if (isFinal) {
queue.closed = true;
}
const sequenceNumber = queue.nextSequence;
queue.nextSequence += 1;
@ -246,7 +241,6 @@ export const updateReportWithVersion = async (
if (isFinal) {
queue.finalSequence = sequenceNumber;
}
cleanupQueueIfIdle(reportId, queue);
};
const resultPromise = runPromise