diff --git a/packages/database/src/queries/messages/helpers/merge-entries.ts b/packages/database/src/queries/messages/helpers/merge-entries.ts index 014f16e5b..1ee478eee 100644 --- a/packages/database/src/queries/messages/helpers/merge-entries.ts +++ b/packages/database/src/queries/messages/helpers/merge-entries.ts @@ -19,11 +19,9 @@ export function mergeResponseMessages( // Create a map of new messages by ID const updateMap = new Map(); - const updateMap = new Map(); for (const msg of updates) { updateMap.set(msg.id, msg); - updateIds.add(msg.id); } // Keep track of which IDs we've already processed @@ -67,11 +65,9 @@ export function mergeReasoningMessages( // Create a map of new messages by ID const updateMap = new Map(); - const updateMap = new Map(); for (const msg of updates) { updateMap.set(msg.id, msg); - updateIds.add(msg.id); } // Keep track of which IDs we've already processed diff --git a/packages/database/src/queries/messages/update-message-entries.ts b/packages/database/src/queries/messages/update-message-entries.ts index e0c23c43b..ba2a41840 100644 --- a/packages/database/src/queries/messages/update-message-entries.ts +++ b/packages/database/src/queries/messages/update-message-entries.ts @@ -22,8 +22,8 @@ const UpdateMessageEntriesSchema = z.object({ export type UpdateMessageEntriesParams = z.infer; /** - * Updates message entries using TypeScript-based merge logic with write-through caching. - * Fetches existing entries from cache/database, merges with updates, and saves back. + * Updates message entries with cache-first approach for streaming. + * Cache is the source of truth during streaming, DB is updated for persistence. * * Merge logic: * - responseMessages: upsert by 'id' field, maintaining order @@ -57,7 +57,11 @@ export async function updateMessageEntries({ : existingEntries.rawLlmMessages, }; - // Update database with merged entries + // Update cache immediately (cache is source of truth during streaming) + messageEntriesCache.set(messageId, mergedEntries); + + // Update database asynchronously for persistence (fire-and-forget) + // If this fails, cache still has the latest state for next update const updateData: Record = { updatedAt: new Date().toISOString(), }; @@ -74,18 +78,14 @@ export async function updateMessageEntries({ updateData.rawLlmMessages = mergedEntries.rawLlmMessages; } - await db - .update(messages) + // Non-blocking DB update - don't await + db.update(messages) .set(updateData) - .where(and(eq(messages.id, messageId), isNull(messages.deletedAt))); - - await db - .update(messages) - .set(updateData) - .where(and(eq(messages.id, messageId), isNull(messages.deletedAt))); - - // Write-through: update cache with merged entries only after successful DB update - messageEntriesCache.set(messageId, mergedEntries); + .where(and(eq(messages.id, messageId), isNull(messages.deletedAt))) + .catch((error) => { + // Log but don't fail - cache has the truth + console.error('Background DB update failed (cache still valid):', error); + }); return { success: true }; } catch (error) {