got rid of delta streaming

This commit is contained in:
dal 2025-08-20 16:48:13 -06:00
parent c94ceaa10a
commit 5d9c9e6a77
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
3 changed files with 4 additions and 176 deletions

View File

@ -1,11 +1,9 @@
import { getReportContent, updateMessageEntries, updateReportContent } from '@buster/database';
import type { ChatMessageResponseMessage } from '@buster/server-shared/chats';
import { updateMessageEntries } from '@buster/database';
import type { ToolCallOptions } from 'ai';
import {
OptimisticJsonParser,
getOptimisticValue,
} from '../../../../utils/streaming/optimistic-json-parser';
import { reportContainsMetrics } from '../helpers/report-metric-helper';
import {
createModifyReportsRawLlmMessageEntry,
createModifyReportsReasoningEntry,
@ -15,7 +13,6 @@ import type {
ModifyReportsEditState,
ModifyReportsInput,
ModifyReportsState,
ModifyReportsStreamingEdit,
} from './modify-reports-tool';
// Define TOOL_KEYS locally since we removed them from the helper
@ -28,14 +25,6 @@ const TOOL_KEYS = {
operation: 'operation' as const,
};
// Helper to check if code_to_replace is completely streamed
function isCodeToReplaceComplete(editObj: Record<string, unknown>, codeToReplace: string): boolean {
// Check if the string ends properly (not mid-token)
// We consider it complete if it has the expected value and doesn't end with incomplete JSON
const value = editObj.code_to_replace as string;
return value === codeToReplace && !value.endsWith('\\');
}
export function createModifyReportsDelta(context: ModifyReportsContext, state: ModifyReportsState) {
return async (options: { inputTextDelta: string } & ToolCallOptions) => {
// Handle string deltas (accumulate JSON text)
@ -80,14 +69,8 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
if (!state.edits) {
state.edits = [];
}
if (!state.streamingEdits) {
state.streamingEdits = [];
}
// Track response messages to create
const responseMessagesToCreate: ChatMessageResponseMessage[] = [];
// Process each edit with streaming updates
// Process each edit and update state only (no database updates)
for (let index = 0; index < editsArray.length; index++) {
const edit = editsArray[index];
if (edit && typeof edit === 'object') {
@ -111,7 +94,7 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
? 'append'
: 'replace';
// Update state edit
// Update state edit - just track the edits, don't apply them
if (!state.edits[index]) {
state.edits[index] = {
operation,
@ -123,157 +106,14 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
// Update existing edit
const existingEdit = state.edits[index];
if (existingEdit) {
existingEdit.operation = operation;
existingEdit.code_to_replace = codeToReplace || '';
existingEdit.code = code;
}
}
// Initialize streaming edit if needed
if (!state.streamingEdits[index]) {
state.streamingEdits[index] = {
operation,
codeToReplaceComplete: false,
streamingCode: '',
lastUpdateIndex: 0,
};
}
const streamingEdit = state.streamingEdits[index];
if (!streamingEdit) continue;
// Initialize snapshot on first edit if needed
if (index === 0 && !state.snapshotContent) {
const currentContent = await getReportContent({ reportId: state.reportId });
state.snapshotContent = currentContent || '';
state.workingContent = state.snapshotContent; // Track working content for sequential edits
// Initialize version number (will be updated in execute phase)
if (!state.version_number) {
state.version_number = 1;
}
}
// Only process if this edit has new content to stream
if (code.length > streamingEdit.lastUpdateIndex) {
// Build content sequentially by applying all edits up to current index
let workingContent = state.snapshotContent || '';
// Apply all previous edits first (they should be complete)
for (let i = 0; i < index; i++) {
const prevEdit = editsArray[i];
if (prevEdit && typeof prevEdit === 'object') {
const prevEditObj = prevEdit as Record<string, unknown>;
const prevEditMap = new Map(Object.entries(prevEditObj));
const prevOperation =
getOptimisticValue<string>(prevEditMap, TOOL_KEYS.operation, '') ||
(getOptimisticValue<string>(prevEditMap, TOOL_KEYS.code_to_replace, '') === ''
? 'append'
: 'replace');
const prevCodeToReplace = getOptimisticValue<string>(
prevEditMap,
TOOL_KEYS.code_to_replace,
''
);
const prevCode = getOptimisticValue<string>(prevEditMap, TOOL_KEYS.code, '');
if (prevOperation === 'append') {
workingContent = workingContent + prevCode;
} else if (
prevOperation === 'replace' &&
prevCodeToReplace &&
workingContent.includes(prevCodeToReplace)
) {
workingContent = workingContent.replace(prevCodeToReplace, prevCode || '');
}
}
}
// Now apply the current edit based on its operation
let newContent = workingContent;
if (operation === 'append') {
// APPEND: Stream directly as content comes in
newContent = workingContent + code;
} else if (operation === 'replace') {
// REPLACE: Wait for code_to_replace to be complete, then stream replacements
if (!streamingEdit.codeToReplaceComplete) {
streamingEdit.codeToReplaceComplete = isCodeToReplaceComplete(
editObj,
codeToReplace || ''
);
}
if (streamingEdit.codeToReplaceComplete) {
if (workingContent.includes(codeToReplace || '')) {
newContent = workingContent.replace(codeToReplace || '', code);
} else {
// If replace text not found, skip updating
continue;
}
} else {
// Wait for code_to_replace to complete
continue;
}
}
// Update the report content with all edits applied
try {
await updateReportContent({
reportId: state.reportId,
content: newContent,
});
// Update state
state.currentContent = newContent;
state.workingContent = newContent;
streamingEdit.lastUpdateIndex = code.length;
streamingEdit.streamingCode = code;
// Check for metrics if not already created response message
if (reportContainsMetrics(newContent) && !state.responseMessageCreated) {
responseMessagesToCreate.push({
id: state.reportId,
type: 'file' as const,
file_type: 'report' as const,
file_name: state.reportName || '',
version_number: state.version_number || 1,
filter_version_id: null,
metadata: [
{
status: 'completed' as const,
message: 'Report modified successfully',
timestamp: Date.now(),
},
],
});
state.responseMessageCreated = true;
}
} catch (error) {
console.error(
'[modify-reports] Error updating report content during streaming:',
error
);
}
}
}
}
}
// Update database with response messages if we have any
if (responseMessagesToCreate.length > 0 && context.messageId) {
try {
await updateMessageEntries({
messageId: context.messageId,
responseMessages: responseMessagesToCreate,
});
console.info('[modify-reports] Created response message during delta', {
reportId: state.reportId,
});
} catch (error) {
console.error('[modify-reports] Error creating response message during delta:', error);
// Don't throw - continue processing
}
}
}
}

View File

@ -20,7 +20,6 @@ export function modifyReportsStart(context: ModifyReportsContext, state: ModifyR
state.startTime = Date.now();
state.responseMessageCreated = false;
state.snapshotContent = undefined;
state.streamingEdits = [];
if (context.messageId) {
try {

View File

@ -68,14 +68,6 @@ const ModifyReportsEditStateSchema = z.object({
error: z.string().optional(),
});
const ModifyReportsStreamingEditSchema = z.object({
operation: z.enum(['replace', 'append']),
codeToReplaceComplete: z.boolean(),
streamingCode: z.string(),
lastUpdateIndex: z.number(),
fullyApplied: z.boolean().optional(), // Track if this edit was fully applied during streaming
});
const ModifyReportsStateSchema = z.object({
toolCallId: z.string().optional(),
argsText: z.string().optional(),
@ -89,7 +81,6 @@ const ModifyReportsStateSchema = z.object({
responseMessageCreated: z.boolean().optional(),
snapshotContent: z.string().optional(),
workingContent: z.string().optional(),
streamingEdits: z.array(ModifyReportsStreamingEditSchema).optional(),
});
// Export types
@ -98,7 +89,6 @@ export type ModifyReportsOutput = z.infer<typeof ModifyReportsOutputSchema>;
export type ModifyReportsContext = z.infer<typeof ModifyReportsContextSchema>;
export type ModifyReportsState = z.infer<typeof ModifyReportsStateSchema>;
export type ModifyReportsEditState = z.infer<typeof ModifyReportsEditStateSchema>;
export type ModifyReportsStreamingEdit = z.infer<typeof ModifyReportsStreamingEditSchema>;
// Factory function that accepts agent context and maps to tool context
export function createModifyReportsTool(context: ModifyReportsContext) {
@ -114,7 +104,6 @@ export function createModifyReportsTool(context: ModifyReportsContext) {
toolCallId: undefined,
responseMessageCreated: false,
snapshotContent: undefined,
streamingEdits: [],
};
// Create all functions with the context and state passed