From 8f314268e3d3668ead2a6918ebe542006cb7f2bc Mon Sep 17 00:00:00 2001 From: dal Date: Thu, 25 Sep 2025 18:38:46 -0600 Subject: [PATCH] fixed the streaming loading infinite state --- .../src/agents/analyst-agent/analyst-agent.ts | 24 ++++---- packages/ai/src/llm/providers/gateway.ts | 10 +++- .../messages/update-message-entries.ts | 52 +++++++++++++--- .../queries/reports/batch-update-report.ts | 59 +++++++++++++++++-- 4 files changed, 120 insertions(+), 25 deletions(-) diff --git a/packages/ai/src/agents/analyst-agent/analyst-agent.ts b/packages/ai/src/agents/analyst-agent/analyst-agent.ts index 23ab7f86a..57f8d9108 100644 --- a/packages/ai/src/agents/analyst-agent/analyst-agent.ts +++ b/packages/ai/src/agents/analyst-agent/analyst-agent.ts @@ -98,10 +98,10 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) { const docsSystemMessage = docsContent ? ({ - role: 'system', - content: `\n${docsContent}\n`, - providerOptions: DEFAULT_ANTHROPIC_OPTIONS, - } as ModelMessage) + role: 'system', + content: `\n${docsContent}\n`, + providerOptions: DEFAULT_ANTHROPIC_OPTIONS, + } as ModelMessage) : null; async function stream({ messages }: AnalystStreamOptions) { @@ -134,19 +134,19 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) { // Create analyst instructions system message with proper escaping const analystInstructionsMessage = analystInstructions ? ({ - role: 'system', - content: `\n${analystInstructions}\n`, - providerOptions: DEFAULT_ANTHROPIC_OPTIONS, - } as ModelMessage) + role: 'system', + content: `\n${analystInstructions}\n`, + providerOptions: DEFAULT_ANTHROPIC_OPTIONS, + } as ModelMessage) : null; // Create user personalization system message const userPersonalizationSystemMessage = userPersonalizationMessageContent ? ({ - role: 'system', - content: userPersonalizationMessageContent, - providerOptions: DEFAULT_ANTHROPIC_OPTIONS, - } as ModelMessage) + role: 'system', + content: userPersonalizationMessageContent, + providerOptions: DEFAULT_ANTHROPIC_OPTIONS, + } as ModelMessage) : null; return wrapTraced( diff --git a/packages/ai/src/llm/providers/gateway.ts b/packages/ai/src/llm/providers/gateway.ts index 8d18d51ad..ba263c218 100644 --- a/packages/ai/src/llm/providers/gateway.ts +++ b/packages/ai/src/llm/providers/gateway.ts @@ -6,7 +6,15 @@ export const DEFAULT_ANTHROPIC_OPTIONS = { gateway: { order: ['bedrock', 'anthropic', 'vertex'], }, - anthropic: { cacheControl: { type: 'ephemeral' } }, + anthropic: { + cacheControl: { type: 'ephemeral' }, + }, + bedrock: { + cacheControl: { type: 'ephemeral' }, + additionalModelRequestFields: { + anthropic_beta: ['fine-grained-tool-streaming-2025-05-14'], + }, + } }; export const DEFAULT_OPENAI_OPTIONS = { diff --git a/packages/database/src/queries/messages/update-message-entries.ts b/packages/database/src/queries/messages/update-message-entries.ts index 6d2b6cb38..52ad14012 100644 --- a/packages/database/src/queries/messages/update-message-entries.ts +++ b/packages/database/src/queries/messages/update-message-entries.ts @@ -21,16 +21,14 @@ const UpdateMessageEntriesSchema = z.object({ export type UpdateMessageEntriesParams = z.infer; +// Simple in-memory queue for each messageId +const updateQueues = new Map>(); + /** - * Updates message entries with cache-first approach for streaming. - * Cache is the source of truth during streaming, DB is updated for persistence. - * - * Merge logic: - * - responseMessages: upsert by 'id' field, maintaining order - * - reasoningMessages: upsert by 'id' field, maintaining order - * - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order + * Internal function that performs the actual update logic. + * This is separated so it can be queued. */ -export async function updateMessageEntries({ +async function performUpdate({ messageId, rawLlmMessages, responseMessages, @@ -95,3 +93,41 @@ export async function updateMessageEntries({ throw new Error(`Failed to update message entries for message ${messageId}`); } } + +/** + * Updates message entries with cache-first approach for streaming. + * Cache is the source of truth during streaming, DB is updated for persistence. + * + * Updates are queued per messageId to ensure they execute in order. + * + * Merge logic: + * - responseMessages: upsert by 'id' field, maintaining order + * - reasoningMessages: upsert by 'id' field, maintaining order + * - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order + */ +export async function updateMessageEntries( + params: UpdateMessageEntriesParams +): Promise<{ success: boolean }> { + const { messageId } = params; + + // Get the current promise for this messageId, or use a resolved promise as the starting point + const currentQueue = updateQueues.get(messageId) ?? Promise.resolve({ success: true }); + + // Chain the new update to run after the current queue completes + const newQueue = currentQueue + .then(() => performUpdate(params)) + .catch(() => performUpdate(params)); // Still try to run even if previous failed + + // Update the queue for this messageId + updateQueues.set(messageId, newQueue); + + // Clean up the queue entry once this update completes + newQueue.finally(() => { + // Only remove if this is still the current queue + if (updateQueues.get(messageId) === newQueue) { + updateQueues.delete(messageId); + } + }); + + return newQueue; +} diff --git a/packages/database/src/queries/reports/batch-update-report.ts b/packages/database/src/queries/reports/batch-update-report.ts index bf02c730f..a4d573667 100644 --- a/packages/database/src/queries/reports/batch-update-report.ts +++ b/packages/database/src/queries/reports/batch-update-report.ts @@ -31,18 +31,26 @@ type VersionHistoryEntry = { type VersionHistory = Record; +// Simple in-memory queue for each reportId +const updateQueues = new Map>(); + /** - * Updates a report with new content, optionally name, and version history in a single operation - * This is more efficient than multiple individual updates + * Internal function that performs the actual update logic. + * This is separated so it can be queued. */ -export const batchUpdateReport = async ( +async function performUpdate( params: BatchUpdateReportInput ): Promise<{ id: string; name: string; content: string; versionHistory: VersionHistory | null; -}> => { +}> { const { reportId, content, name, versionHistory } = BatchUpdateReportInputSchema.parse(params); try { @@ -93,4 +101,47 @@ export const batchUpdateReport = async ( throw new Error('Failed to batch update report'); } +} + +/** + * Updates a report with new content, optionally name, and version history in a single operation + * This is more efficient than multiple individual updates + * + * Updates are queued per reportId to ensure they execute in order. + */ +export const batchUpdateReport = async ( + params: BatchUpdateReportInput +): Promise<{ + id: string; + name: string; + content: string; + versionHistory: VersionHistory | null; +}> => { + const { reportId } = params; + + // Get the current promise for this reportId, or use a resolved promise as the starting point + const currentQueue = updateQueues.get(reportId) ?? Promise.resolve({ + id: '', + name: '', + content: '', + versionHistory: null + }); + + // Chain the new update to run after the current queue completes + const newQueue = currentQueue + .then(() => performUpdate(params)) + .catch(() => performUpdate(params)); // Still try to run even if previous failed + + // Update the queue for this reportId + updateQueues.set(reportId, newQueue); + + // Clean up the queue entry once this update completes + newQueue.finally(() => { + // Only remove if this is still the current queue + if (updateQueues.get(reportId) === newQueue) { + updateQueues.delete(reportId); + } + }); + + return newQueue; };