From b14054a0334f8a5bb2dec2019fe096871107c9b0 Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 22 Aug 2025 17:34:40 -0600 Subject: [PATCH] Add lru-cache dependency and refactor updateMessageEntries function - Introduced lru-cache version 11.1.0 in package.json and pnpm-lock.yaml. - Refactored updateMessageEntries to utilize TypeScript-based merge logic with write-through caching. - Improved error handling for fetching existing message entries and updated database logic to merge new entries with existing ones. --- packages/database/package.json | 1 + .../messages/helpers/fetch-message-entries.ts | 50 +++++ .../queries/messages/helpers/merge-entries.ts | 183 ++++++++++++++++++ .../queries/messages/message-entries-cache.ts | 80 ++++++++ .../database/src/queries/messages/messages.ts | 1 - .../messages/update-message-entries.ts | 166 +++++----------- pnpm-lock.yaml | 10 +- 7 files changed, 369 insertions(+), 122 deletions(-) create mode 100644 packages/database/src/queries/messages/helpers/fetch-message-entries.ts create mode 100644 packages/database/src/queries/messages/helpers/merge-entries.ts create mode 100644 packages/database/src/queries/messages/message-entries-cache.ts diff --git a/packages/database/package.json b/packages/database/package.json index cf986be6f..c733a5ad6 100644 --- a/packages/database/package.json +++ b/packages/database/package.json @@ -56,6 +56,7 @@ "drizzle-kit": "^0.31.4", "drizzle-orm": "catalog:", "drizzle-zod": "^0.8.2", + "lru-cache": "^11.1.0", "postgres": "^3.4.7", "zod": "catalog:" } diff --git a/packages/database/src/queries/messages/helpers/fetch-message-entries.ts b/packages/database/src/queries/messages/helpers/fetch-message-entries.ts new file mode 100644 index 000000000..6a128f9da --- /dev/null +++ b/packages/database/src/queries/messages/helpers/fetch-message-entries.ts @@ -0,0 +1,50 @@ +import type { ModelMessage } from 'ai'; +import { and, eq, isNull } from 'drizzle-orm'; +import { db } from '../../../connection'; +import { messages } from '../../../schema'; +import type { + ChatMessageReasoningMessage, + ChatMessageResponseMessage, +} from '../../../schemas/message-schemas'; +import { type MessageEntriesCacheValue, messageEntriesCache } from '../message-entries-cache'; + +/** + * Fetches message entries from cache or database + * Returns cached value if available, otherwise fetches from DB and caches result + */ +export async function fetchMessageEntries( + messageId: string +): Promise { + // Check cache first + const cached = messageEntriesCache.get(messageId); + if (cached) { + return cached; + } + + // Fetch from database + const result = await db + .select({ + responseMessages: messages.responseMessages, + reasoning: messages.reasoning, + rawLlmMessages: messages.rawLlmMessages, + }) + .from(messages) + .where(and(eq(messages.id, messageId), isNull(messages.deletedAt))) + .limit(1); + + if (!result[0]) { + return null; + } + + // Parse and validate the data + const messageEntries: MessageEntriesCacheValue = { + responseMessages: (result[0].responseMessages as ChatMessageResponseMessage[]) || [], + reasoning: (result[0].reasoning as ChatMessageReasoningMessage[]) || [], + rawLlmMessages: (result[0].rawLlmMessages as ModelMessage[]) || [], + }; + + // Cache the result + messageEntriesCache.set(messageId, messageEntries); + + return messageEntries; +} diff --git a/packages/database/src/queries/messages/helpers/merge-entries.ts b/packages/database/src/queries/messages/helpers/merge-entries.ts new file mode 100644 index 000000000..ca56565c4 --- /dev/null +++ b/packages/database/src/queries/messages/helpers/merge-entries.ts @@ -0,0 +1,183 @@ +import type { ModelMessage } from 'ai'; +import type { + ChatMessageReasoningMessage, + ChatMessageResponseMessage, +} from '../../../schemas/message-schemas'; + +/** + * Merges response messages by 'id' field, preserving order + * New messages with matching IDs replace existing ones at their original position + * New messages without matching IDs are appended + */ +export function mergeResponseMessages( + existing: ChatMessageResponseMessage[], + updates: ChatMessageResponseMessage[] +): ChatMessageResponseMessage[] { + if (!existing || existing.length === 0) { + return updates; + } + + // Create a map of new messages by ID + const updateMap = new Map(); + const updateIds = new Set(); + + for (const msg of updates) { + updateMap.set(msg.id, msg); + updateIds.add(msg.id); + } + + // Keep track of which IDs we've already processed + const processedIds = new Set(); + + // First pass: update existing messages in place + const merged = existing.map((existingMsg) => { + if (updateMap.has(existingMsg.id)) { + processedIds.add(existingMsg.id); + const updated = updateMap.get(existingMsg.id); + if (!updated) { + throw new Error(`Expected to find message with id ${existingMsg.id} in updateMap`); + } + return updated; + } + return existingMsg; + }); + + // Second pass: append new messages that weren't in existing + for (const updateMsg of updates) { + if (!processedIds.has(updateMsg.id)) { + merged.push(updateMsg); + } + } + + return merged; +} + +/** + * Merges reasoning messages by 'id' field, preserving order + * New messages with matching IDs replace existing ones at their original position + * New messages without matching IDs are appended + */ +export function mergeReasoningMessages( + existing: ChatMessageReasoningMessage[], + updates: ChatMessageReasoningMessage[] +): ChatMessageReasoningMessage[] { + if (!existing || existing.length === 0) { + return updates; + } + + // Create a map of new messages by ID + const updateMap = new Map(); + const updateIds = new Set(); + + for (const msg of updates) { + updateMap.set(msg.id, msg); + updateIds.add(msg.id); + } + + // Keep track of which IDs we've already processed + const processedIds = new Set(); + + // First pass: update existing messages in place + const merged = existing.map((existingMsg) => { + if (updateMap.has(existingMsg.id)) { + processedIds.add(existingMsg.id); + const updated = updateMap.get(existingMsg.id); + if (!updated) { + throw new Error(`Expected to find message with id ${existingMsg.id} in updateMap`); + } + return updated; + } + return existingMsg; + }); + + // Second pass: append new messages that weren't in existing + for (const updateMsg of updates) { + if (!processedIds.has(updateMsg.id)) { + merged.push(updateMsg); + } + } + + return merged; +} + +/** + * Helper to extract tool call IDs from content + */ +function getToolCallIds(content: unknown): string { + if (!content || typeof content !== 'object') { + return ''; + } + + if (Array.isArray(content)) { + const toolCallIds: string[] = []; + for (const item of content) { + if (typeof item === 'object' && item !== null && 'toolCallId' in item) { + const toolCallId = (item as { toolCallId?: unknown }).toolCallId; + if (typeof toolCallId === 'string') { + toolCallIds.push(toolCallId); + } + } + } + return toolCallIds.sort().join(','); + } + + return ''; +} + +/** + * Creates a unique key for raw LLM messages based on role and tool call IDs + */ +function getRawLlmMessageKey(message: ModelMessage): string { + const role = message.role || ''; + const toolCallIds = getToolCallIds(message.content); + return `${role}:${toolCallIds}`; +} + +/** + * Merges raw LLM messages by combination of 'role' and 'toolCallId', preserving order + * Messages with the same role and tool call IDs replace existing ones at their original position + * New messages are appended + */ +export function mergeRawLlmMessages( + existing: ModelMessage[], + updates: ModelMessage[] +): ModelMessage[] { + if (!existing || existing.length === 0) { + return updates; + } + + // Create a map of new messages by their unique key + const updateMap = new Map(); + + for (const msg of updates) { + const key = getRawLlmMessageKey(msg); + updateMap.set(key, msg); + } + + // Keep track of which keys we've already processed + const processedKeys = new Set(); + + // First pass: update existing messages in place + const merged = existing.map((existingMsg) => { + const key = getRawLlmMessageKey(existingMsg); + if (updateMap.has(key)) { + processedKeys.add(key); + const updated = updateMap.get(key); + if (!updated) { + throw new Error(`Expected to find message with key ${key} in updateMap`); + } + return updated; + } + return existingMsg; + }); + + // Second pass: append new messages that weren't in existing + for (const updateMsg of updates) { + const key = getRawLlmMessageKey(updateMsg); + if (!processedKeys.has(key)) { + merged.push(updateMsg); + } + } + + return merged; +} diff --git a/packages/database/src/queries/messages/message-entries-cache.ts b/packages/database/src/queries/messages/message-entries-cache.ts new file mode 100644 index 000000000..25f814a41 --- /dev/null +++ b/packages/database/src/queries/messages/message-entries-cache.ts @@ -0,0 +1,80 @@ +import type { ModelMessage } from 'ai'; +import { LRUCache } from 'lru-cache'; +import type { + ChatMessageReasoningMessage, + ChatMessageResponseMessage, +} from '../../schemas/message-schemas'; + +export interface MessageEntriesCacheValue { + responseMessages: ChatMessageResponseMessage[]; + reasoning: ChatMessageReasoningMessage[]; + rawLlmMessages: ModelMessage[]; +} + +interface CacheOptions { + max?: number; + ttl?: number; +} + +class MessageEntriesCache { + private cache: LRUCache; + private static instance: MessageEntriesCache; + + private constructor(options: CacheOptions = {}) { + this.cache = new LRUCache({ + max: options.max ?? 100, + ttl: options.ttl ?? 1000 * 60 * 2, // 2 minutes TTL + updateAgeOnGet: true, // Refresh TTL on read + updateAgeOnHas: true, // Refresh TTL on has check + }); + } + + static getInstance(options?: CacheOptions): MessageEntriesCache { + if (!MessageEntriesCache.instance) { + MessageEntriesCache.instance = new MessageEntriesCache(options); + } + return MessageEntriesCache.instance; + } + + get(messageId: string): MessageEntriesCacheValue | undefined { + return this.cache.get(messageId); + } + + set(messageId: string, value: MessageEntriesCacheValue): void { + this.cache.set(messageId, value); + } + + has(messageId: string): boolean { + return this.cache.has(messageId); + } + + delete(messageId: string): boolean { + return this.cache.delete(messageId); + } + + clear(): void { + this.cache.clear(); + } + + /** + * Updates cached entry with partial data + */ + update(messageId: string, partialValue: Partial): void { + const existing = this.get(messageId); + if (existing) { + this.set(messageId, { + ...existing, + ...partialValue, + }); + } else { + // If not in cache, set with defaults for missing fields + this.set(messageId, { + responseMessages: partialValue.responseMessages ?? [], + reasoning: partialValue.reasoning ?? [], + rawLlmMessages: partialValue.rawLlmMessages ?? [], + }); + } + } +} + +export const messageEntriesCache = MessageEntriesCache.getInstance(); diff --git a/packages/database/src/queries/messages/messages.ts b/packages/database/src/queries/messages/messages.ts index 7994600dc..7cb94381b 100644 --- a/packages/database/src/queries/messages/messages.ts +++ b/packages/database/src/queries/messages/messages.ts @@ -105,7 +105,6 @@ export async function getAllRawLlmMessagesForChat(chatId: string) { export async function updateMessageFields( messageId: string, fields: { - //TODO: Dallin let's make a type for this. It should not just be a jsonb object. responseMessages?: unknown; reasoning?: unknown; rawLlmMessages?: unknown; diff --git a/packages/database/src/queries/messages/update-message-entries.ts b/packages/database/src/queries/messages/update-message-entries.ts index cdc51370c..a603c3d74 100644 --- a/packages/database/src/queries/messages/update-message-entries.ts +++ b/packages/database/src/queries/messages/update-message-entries.ts @@ -1,9 +1,16 @@ import type { ModelMessage } from 'ai'; -import { type SQL, and, eq, isNull, sql } from 'drizzle-orm'; +import { and, eq, isNull } from 'drizzle-orm'; import { z } from 'zod'; import { db } from '../../connection'; import { messages } from '../../schema'; import { ReasoningMessageSchema, ResponseMessageSchema } from '../../schemas/message-schemas'; +import { fetchMessageEntries } from './helpers/fetch-message-entries'; +import { + mergeRawLlmMessages, + mergeReasoningMessages, + mergeResponseMessages, +} from './helpers/merge-entries'; +import { messageEntriesCache } from './message-entries-cache'; const UpdateMessageEntriesSchema = z.object({ messageId: z.string().uuid(), @@ -15,15 +22,13 @@ const UpdateMessageEntriesSchema = z.object({ export type UpdateMessageEntriesParams = z.infer; /** - * Updates message entries using order-preserving JSONB merge operations. - * Performs batch upserts for multiple entries in a single database operation. - * PRESERVES the exact order of input arrays during upsert/append operations. + * Updates message entries using TypeScript-based merge logic with write-through caching. + * Fetches existing entries from cache/database, merges with updates, and saves back. * - * Upsert logic: - * - responseMessages: upsert by 'id' field, maintaining input array order - * - reasoningMessages: upsert by 'id' field, maintaining input array order - * - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining input array order - * (handles both string content and array content with tool calls) + * Merge logic: + * - responseMessages: upsert by 'id' field, maintaining order + * - reasoningMessages: upsert by 'id' field, maintaining order + * - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order */ export async function updateMessageEntries({ messageId, @@ -32,126 +37,51 @@ export async function updateMessageEntries({ reasoningMessages, }: UpdateMessageEntriesParams): Promise<{ success: boolean }> { try { - const updates: Record = { updatedAt: new Date().toISOString() }; + // Fetch existing entries from cache or database + const existingEntries = await fetchMessageEntries(messageId); - // Order-preserving merge for response messages - if (responseMessages?.length) { - const newData = JSON.stringify(responseMessages); - updates.responseMessages = sql` - CASE - WHEN ${messages.responseMessages} IS NULL THEN ${newData}::jsonb - ELSE ( - WITH new_data AS ( - SELECT value, ordinality as new_order - FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality) - ), - existing_data AS ( - SELECT value, ordinality as existing_order - FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS t(value, ordinality) - ) - SELECT jsonb_agg( - CASE - WHEN nd.value IS NOT NULL THEN nd.value - ELSE ed.value - END - ORDER BY COALESCE(nd.new_order, ed.existing_order) - ) - FROM existing_data ed - FULL OUTER JOIN new_data nd ON ed.value->>'id' = nd.value->>'id' - WHERE COALESCE(nd.value->>'id', ed.value->>'id') IS NOT NULL - ) - END`; + if (!existingEntries) { + throw new Error(`Message not found: ${messageId}`); } - // Order-preserving merge for reasoning messages - if (reasoningMessages?.length) { - const newData = JSON.stringify(reasoningMessages); - updates.reasoning = sql` - CASE - WHEN ${messages.reasoning} IS NULL THEN ${newData}::jsonb - ELSE ( - WITH new_data AS ( - SELECT value, ordinality as new_order - FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality) - ), - existing_data AS ( - SELECT value, ordinality as existing_order - FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS t(value, ordinality) - ) - SELECT jsonb_agg( - CASE - WHEN nd.value IS NOT NULL THEN nd.value - ELSE ed.value - END - ORDER BY COALESCE(nd.new_order, ed.existing_order) - ) - FROM existing_data ed - FULL OUTER JOIN new_data nd ON ed.value->>'id' = nd.value->>'id' - WHERE COALESCE(nd.value->>'id', ed.value->>'id') IS NOT NULL - ) - END`; + // Merge with new entries + const mergedEntries = { + responseMessages: responseMessages + ? mergeResponseMessages(existingEntries.responseMessages, responseMessages) + : existingEntries.responseMessages, + reasoning: reasoningMessages + ? mergeReasoningMessages(existingEntries.reasoning, reasoningMessages) + : existingEntries.reasoning, + rawLlmMessages: rawLlmMessages + ? mergeRawLlmMessages(existingEntries.rawLlmMessages, rawLlmMessages) + : existingEntries.rawLlmMessages, + }; + + // Update database with merged entries + const updateData: Record = { + updatedAt: new Date().toISOString(), + }; + + if (responseMessages) { + updateData.responseMessages = mergedEntries.responseMessages; } - // Order-preserving merge for raw LLM messages - handles both string and array content - if (rawLlmMessages?.length) { - const newData = JSON.stringify(rawLlmMessages); - updates.rawLlmMessages = sql` - CASE - WHEN ${messages.rawLlmMessages} IS NULL THEN ${newData}::jsonb - ELSE ( - WITH new_data AS ( - SELECT - value, - ordinality as new_order, - value->>'role' AS role, - COALESCE( - CASE - WHEN jsonb_typeof(value->'content') = 'array' THEN - (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') - FROM jsonb_array_elements(value->'content') c - WHERE c->>'toolCallId' IS NOT NULL) - ELSE NULL - END, - '' - ) AS tool_calls - FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality) - ), - existing_data AS ( - SELECT - value, - ordinality as existing_order, - value->>'role' AS role, - COALESCE( - CASE - WHEN jsonb_typeof(value->'content') = 'array' THEN - (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') - FROM jsonb_array_elements(value->'content') c - WHERE c->>'toolCallId' IS NOT NULL) - ELSE NULL - END, - '' - ) AS tool_calls - FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality) - ) - SELECT jsonb_agg( - CASE - WHEN nd.value IS NOT NULL THEN nd.value - ELSE ed.value - END - ORDER BY COALESCE(nd.new_order, ed.existing_order) - ) - FROM existing_data ed - FULL OUTER JOIN new_data nd ON (ed.role = nd.role AND ed.tool_calls = nd.tool_calls) - WHERE COALESCE(nd.role, ed.role) IS NOT NULL - ) - END`; + if (reasoningMessages) { + updateData.reasoning = mergedEntries.reasoning; + } + + if (rawLlmMessages) { + updateData.rawLlmMessages = mergedEntries.rawLlmMessages; } await db .update(messages) - .set(updates) + .set(updateData) .where(and(eq(messages.id, messageId), isNull(messages.deletedAt))); + // Write-through: update cache with merged entries + messageEntriesCache.set(messageId, mergedEntries); + return { success: true }; } catch (error) { console.error('Failed to update message entries:', error); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4fdc6877e..705a0ee9e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1022,6 +1022,9 @@ importers: drizzle-zod: specifier: ^0.8.2 version: 0.8.2(drizzle-orm@0.44.4(@opentelemetry/api@1.9.0)(@types/pg@8.15.4)(mysql2@3.14.1)(pg@8.16.3)(postgres@3.4.7))(zod@3.25.76) + lru-cache: + specifier: ^11.1.0 + version: 11.1.0 postgres: specifier: ^3.4.7 version: 3.4.7 @@ -6433,6 +6436,7 @@ packages: bun@1.2.18: resolution: {integrity: sha512-OR+EpNckoJN4tHMVZPaTPxDj2RgpJgJwLruTIFYbO3bQMguLd0YrmkWKYqsiihcLgm2ehIjF/H1RLfZiRa7+qQ==} + cpu: [arm64, x64, aarch64] os: [darwin, linux, win32] hasBin: true @@ -18396,14 +18400,14 @@ snapshots: msw: 2.10.4(@types/node@20.19.4)(typescript@5.9.2) vite: 7.0.5(@types/node@20.19.4)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0) - '@vitest/mocker@3.2.4(msw@2.10.4(@types/node@24.0.10)(typescript@5.9.2))(vite@7.0.5(@types/node@20.19.4)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0))': + '@vitest/mocker@3.2.4(msw@2.10.4(@types/node@24.0.10)(typescript@5.9.2))(vite@7.0.5(@types/node@24.0.10)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0))': dependencies: '@vitest/spy': 3.2.4 estree-walker: 3.0.3 magic-string: 0.30.17 optionalDependencies: msw: 2.10.4(@types/node@24.0.10)(typescript@5.9.2) - vite: 7.0.5(@types/node@20.19.4)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0) + vite: 7.0.5(@types/node@24.0.10)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0) '@vitest/mocker@3.2.4(msw@2.10.4(@types/node@24.0.10)(typescript@5.9.2))(vite@7.1.3(@types/node@24.0.10)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0))': dependencies: @@ -25600,7 +25604,7 @@ snapshots: dependencies: '@types/chai': 5.2.2 '@vitest/expect': 3.2.4 - '@vitest/mocker': 3.2.4(msw@2.10.4(@types/node@24.0.10)(typescript@5.9.2))(vite@7.0.5(@types/node@20.19.4)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0)) + '@vitest/mocker': 3.2.4(msw@2.10.4(@types/node@24.0.10)(typescript@5.9.2))(vite@7.0.5(@types/node@24.0.10)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0)) '@vitest/pretty-format': 3.2.4 '@vitest/runner': 3.2.4 '@vitest/snapshot': 3.2.4