fixed the streaming loading infinite state

This commit is contained in:
dal 2025-09-25 18:38:46 -06:00
parent aa6523f8e5
commit 8f314268e3
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
4 changed files with 120 additions and 25 deletions

View File

@ -98,10 +98,10 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) {
const docsSystemMessage = docsContent const docsSystemMessage = docsContent
? ({ ? ({
role: 'system', role: 'system',
content: `<data_catalog_docs>\n${docsContent}\n</data_catalog_docs>`, content: `<data_catalog_docs>\n${docsContent}\n</data_catalog_docs>`,
providerOptions: DEFAULT_ANTHROPIC_OPTIONS, providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
} as ModelMessage) } as ModelMessage)
: null; : null;
async function stream({ messages }: AnalystStreamOptions) { async function stream({ messages }: AnalystStreamOptions) {
@ -134,19 +134,19 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) {
// Create analyst instructions system message with proper escaping // Create analyst instructions system message with proper escaping
const analystInstructionsMessage = analystInstructions const analystInstructionsMessage = analystInstructions
? ({ ? ({
role: 'system', role: 'system',
content: `<organization_instructions>\n${analystInstructions}\n</organization_instructions>`, content: `<organization_instructions>\n${analystInstructions}\n</organization_instructions>`,
providerOptions: DEFAULT_ANTHROPIC_OPTIONS, providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
} as ModelMessage) } as ModelMessage)
: null; : null;
// Create user personalization system message // Create user personalization system message
const userPersonalizationSystemMessage = userPersonalizationMessageContent const userPersonalizationSystemMessage = userPersonalizationMessageContent
? ({ ? ({
role: 'system', role: 'system',
content: userPersonalizationMessageContent, content: userPersonalizationMessageContent,
providerOptions: DEFAULT_ANTHROPIC_OPTIONS, providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
} as ModelMessage) } as ModelMessage)
: null; : null;
return wrapTraced( return wrapTraced(

View File

@ -6,7 +6,15 @@ export const DEFAULT_ANTHROPIC_OPTIONS = {
gateway: { gateway: {
order: ['bedrock', 'anthropic', 'vertex'], order: ['bedrock', 'anthropic', 'vertex'],
}, },
anthropic: { cacheControl: { type: 'ephemeral' } }, anthropic: {
cacheControl: { type: 'ephemeral' },
},
bedrock: {
cacheControl: { type: 'ephemeral' },
additionalModelRequestFields: {
anthropic_beta: ['fine-grained-tool-streaming-2025-05-14'],
},
}
}; };
export const DEFAULT_OPENAI_OPTIONS = { export const DEFAULT_OPENAI_OPTIONS = {

View File

@ -21,16 +21,14 @@ const UpdateMessageEntriesSchema = z.object({
export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSchema>; export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSchema>;
// Simple in-memory queue for each messageId
const updateQueues = new Map<string, Promise<{ success: boolean }>>();
/** /**
* Updates message entries with cache-first approach for streaming. * Internal function that performs the actual update logic.
* Cache is the source of truth during streaming, DB is updated for persistence. * This is separated so it can be queued.
*
* Merge logic:
* - responseMessages: upsert by 'id' field, maintaining order
* - reasoningMessages: upsert by 'id' field, maintaining order
* - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order
*/ */
export async function updateMessageEntries({ async function performUpdate({
messageId, messageId,
rawLlmMessages, rawLlmMessages,
responseMessages, responseMessages,
@ -95,3 +93,41 @@ export async function updateMessageEntries({
throw new Error(`Failed to update message entries for message ${messageId}`); throw new Error(`Failed to update message entries for message ${messageId}`);
} }
} }
/**
* Updates message entries with cache-first approach for streaming.
* Cache is the source of truth during streaming, DB is updated for persistence.
*
* Updates are queued per messageId to ensure they execute in order.
*
* Merge logic:
* - responseMessages: upsert by 'id' field, maintaining order
* - reasoningMessages: upsert by 'id' field, maintaining order
* - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order
*/
export async function updateMessageEntries(
params: UpdateMessageEntriesParams
): Promise<{ success: boolean }> {
const { messageId } = params;
// Get the current promise for this messageId, or use a resolved promise as the starting point
const currentQueue = updateQueues.get(messageId) ?? Promise.resolve({ success: true });
// Chain the new update to run after the current queue completes
const newQueue = currentQueue
.then(() => performUpdate(params))
.catch(() => performUpdate(params)); // Still try to run even if previous failed
// Update the queue for this messageId
updateQueues.set(messageId, newQueue);
// Clean up the queue entry once this update completes
newQueue.finally(() => {
// Only remove if this is still the current queue
if (updateQueues.get(messageId) === newQueue) {
updateQueues.delete(messageId);
}
});
return newQueue;
}

View File

@ -31,18 +31,26 @@ type VersionHistoryEntry = {
type VersionHistory = Record<string, VersionHistoryEntry>; type VersionHistory = Record<string, VersionHistoryEntry>;
// Simple in-memory queue for each reportId
const updateQueues = new Map<string, Promise<{
id: string;
name: string;
content: string;
versionHistory: VersionHistory | null;
}>>();
/** /**
* Updates a report with new content, optionally name, and version history in a single operation * Internal function that performs the actual update logic.
* This is more efficient than multiple individual updates * This is separated so it can be queued.
*/ */
export const batchUpdateReport = async ( async function performUpdate(
params: BatchUpdateReportInput params: BatchUpdateReportInput
): Promise<{ ): Promise<{
id: string; id: string;
name: string; name: string;
content: string; content: string;
versionHistory: VersionHistory | null; versionHistory: VersionHistory | null;
}> => { }> {
const { reportId, content, name, versionHistory } = BatchUpdateReportInputSchema.parse(params); const { reportId, content, name, versionHistory } = BatchUpdateReportInputSchema.parse(params);
try { try {
@ -93,4 +101,47 @@ export const batchUpdateReport = async (
throw new Error('Failed to batch update report'); throw new Error('Failed to batch update report');
} }
}
/**
* Updates a report with new content, optionally name, and version history in a single operation
* This is more efficient than multiple individual updates
*
* Updates are queued per reportId to ensure they execute in order.
*/
export const batchUpdateReport = async (
params: BatchUpdateReportInput
): Promise<{
id: string;
name: string;
content: string;
versionHistory: VersionHistory | null;
}> => {
const { reportId } = params;
// Get the current promise for this reportId, or use a resolved promise as the starting point
const currentQueue = updateQueues.get(reportId) ?? Promise.resolve({
id: '',
name: '',
content: '',
versionHistory: null
});
// Chain the new update to run after the current queue completes
const newQueue = currentQueue
.then(() => performUpdate(params))
.catch(() => performUpdate(params)); // Still try to run even if previous failed
// Update the queue for this reportId
updateQueues.set(reportId, newQueue);
// Clean up the queue entry once this update completes
newQueue.finally(() => {
// Only remove if this is still the current queue
if (updateQueues.get(reportId) === newQueue) {
updateQueues.delete(reportId);
}
});
return newQueue;
}; };