diff --git a/packages/ai/src/agents/analyst-agent/analyst-agent.ts b/packages/ai/src/agents/analyst-agent/analyst-agent.ts
index 23ab7f86a..57f8d9108 100644
--- a/packages/ai/src/agents/analyst-agent/analyst-agent.ts
+++ b/packages/ai/src/agents/analyst-agent/analyst-agent.ts
@@ -98,10 +98,10 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) {
const docsSystemMessage = docsContent
? ({
- role: 'system',
- content: `\n${docsContent}\n`,
- providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
- } as ModelMessage)
+ role: 'system',
+ content: `\n${docsContent}\n`,
+ providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
+ } as ModelMessage)
: null;
async function stream({ messages }: AnalystStreamOptions) {
@@ -134,19 +134,19 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) {
// Create analyst instructions system message with proper escaping
const analystInstructionsMessage = analystInstructions
? ({
- role: 'system',
- content: `\n${analystInstructions}\n`,
- providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
- } as ModelMessage)
+ role: 'system',
+ content: `\n${analystInstructions}\n`,
+ providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
+ } as ModelMessage)
: null;
// Create user personalization system message
const userPersonalizationSystemMessage = userPersonalizationMessageContent
? ({
- role: 'system',
- content: userPersonalizationMessageContent,
- providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
- } as ModelMessage)
+ role: 'system',
+ content: userPersonalizationMessageContent,
+ providerOptions: DEFAULT_ANTHROPIC_OPTIONS,
+ } as ModelMessage)
: null;
return wrapTraced(
diff --git a/packages/ai/src/llm/providers/gateway.ts b/packages/ai/src/llm/providers/gateway.ts
index 8d18d51ad..ba263c218 100644
--- a/packages/ai/src/llm/providers/gateway.ts
+++ b/packages/ai/src/llm/providers/gateway.ts
@@ -6,7 +6,15 @@ export const DEFAULT_ANTHROPIC_OPTIONS = {
gateway: {
order: ['bedrock', 'anthropic', 'vertex'],
},
- anthropic: { cacheControl: { type: 'ephemeral' } },
+ anthropic: {
+ cacheControl: { type: 'ephemeral' },
+ },
+ bedrock: {
+ cacheControl: { type: 'ephemeral' },
+ additionalModelRequestFields: {
+ anthropic_beta: ['fine-grained-tool-streaming-2025-05-14'],
+ },
+ }
};
export const DEFAULT_OPENAI_OPTIONS = {
diff --git a/packages/database/src/queries/messages/update-message-entries.ts b/packages/database/src/queries/messages/update-message-entries.ts
index 6d2b6cb38..52ad14012 100644
--- a/packages/database/src/queries/messages/update-message-entries.ts
+++ b/packages/database/src/queries/messages/update-message-entries.ts
@@ -21,16 +21,14 @@ const UpdateMessageEntriesSchema = z.object({
export type UpdateMessageEntriesParams = z.infer;
+// Simple in-memory queue for each messageId
+const updateQueues = new Map>();
+
/**
- * Updates message entries with cache-first approach for streaming.
- * Cache is the source of truth during streaming, DB is updated for persistence.
- *
- * Merge logic:
- * - responseMessages: upsert by 'id' field, maintaining order
- * - reasoningMessages: upsert by 'id' field, maintaining order
- * - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order
+ * Internal function that performs the actual update logic.
+ * This is separated so it can be queued.
*/
-export async function updateMessageEntries({
+async function performUpdate({
messageId,
rawLlmMessages,
responseMessages,
@@ -95,3 +93,41 @@ export async function updateMessageEntries({
throw new Error(`Failed to update message entries for message ${messageId}`);
}
}
+
+/**
+ * Updates message entries with cache-first approach for streaming.
+ * Cache is the source of truth during streaming, DB is updated for persistence.
+ *
+ * Updates are queued per messageId to ensure they execute in order.
+ *
+ * Merge logic:
+ * - responseMessages: upsert by 'id' field, maintaining order
+ * - reasoningMessages: upsert by 'id' field, maintaining order
+ * - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order
+ */
+export async function updateMessageEntries(
+ params: UpdateMessageEntriesParams
+): Promise<{ success: boolean }> {
+ const { messageId } = params;
+
+ // Get the current promise for this messageId, or use a resolved promise as the starting point
+ const currentQueue = updateQueues.get(messageId) ?? Promise.resolve({ success: true });
+
+ // Chain the new update to run after the current queue completes
+ const newQueue = currentQueue
+ .then(() => performUpdate(params))
+ .catch(() => performUpdate(params)); // Still try to run even if previous failed
+
+ // Update the queue for this messageId
+ updateQueues.set(messageId, newQueue);
+
+ // Clean up the queue entry once this update completes
+ newQueue.finally(() => {
+ // Only remove if this is still the current queue
+ if (updateQueues.get(messageId) === newQueue) {
+ updateQueues.delete(messageId);
+ }
+ });
+
+ return newQueue;
+}
diff --git a/packages/database/src/queries/reports/batch-update-report.ts b/packages/database/src/queries/reports/batch-update-report.ts
index bf02c730f..a4d573667 100644
--- a/packages/database/src/queries/reports/batch-update-report.ts
+++ b/packages/database/src/queries/reports/batch-update-report.ts
@@ -31,18 +31,26 @@ type VersionHistoryEntry = {
type VersionHistory = Record;
+// Simple in-memory queue for each reportId
+const updateQueues = new Map>();
+
/**
- * Updates a report with new content, optionally name, and version history in a single operation
- * This is more efficient than multiple individual updates
+ * Internal function that performs the actual update logic.
+ * This is separated so it can be queued.
*/
-export const batchUpdateReport = async (
+async function performUpdate(
params: BatchUpdateReportInput
): Promise<{
id: string;
name: string;
content: string;
versionHistory: VersionHistory | null;
-}> => {
+}> {
const { reportId, content, name, versionHistory } = BatchUpdateReportInputSchema.parse(params);
try {
@@ -93,4 +101,47 @@ export const batchUpdateReport = async (
throw new Error('Failed to batch update report');
}
+}
+
+/**
+ * Updates a report with new content, optionally name, and version history in a single operation
+ * This is more efficient than multiple individual updates
+ *
+ * Updates are queued per reportId to ensure they execute in order.
+ */
+export const batchUpdateReport = async (
+ params: BatchUpdateReportInput
+): Promise<{
+ id: string;
+ name: string;
+ content: string;
+ versionHistory: VersionHistory | null;
+}> => {
+ const { reportId } = params;
+
+ // Get the current promise for this reportId, or use a resolved promise as the starting point
+ const currentQueue = updateQueues.get(reportId) ?? Promise.resolve({
+ id: '',
+ name: '',
+ content: '',
+ versionHistory: null
+ });
+
+ // Chain the new update to run after the current queue completes
+ const newQueue = currentQueue
+ .then(() => performUpdate(params))
+ .catch(() => performUpdate(params)); // Still try to run even if previous failed
+
+ // Update the queue for this reportId
+ updateQueues.set(reportId, newQueue);
+
+ // Clean up the queue entry once this update completes
+ newQueue.finally(() => {
+ // Only remove if this is still the current queue
+ if (updateQueues.get(reportId) === newQueue) {
+ updateQueues.delete(reportId);
+ }
+ });
+
+ return newQueue;
};