Merge pull request #757 from buster-so/update-message-entries-pass-through-cache

Add lru-cache dependency and refactor updateMessageEntries function
This commit is contained in:
dal 2025-08-22 20:48:39 -06:00 committed by GitHub
commit 39caebef28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 373 additions and 125 deletions

View File

@ -56,6 +56,7 @@
"drizzle-kit": "^0.31.4",
"drizzle-orm": "catalog:",
"drizzle-zod": "^0.8.2",
"lru-cache": "^11.1.0",
"postgres": "^3.4.7",
"zod": "catalog:"
}

View File

@ -0,0 +1,50 @@
import type { ModelMessage } from 'ai';
import { and, eq, isNull } from 'drizzle-orm';
import { db } from '../../../connection';
import { messages } from '../../../schema';
import type {
ChatMessageReasoningMessage,
ChatMessageResponseMessage,
} from '../../../schemas/message-schemas';
import { type MessageEntriesCacheValue, messageEntriesCache } from '../message-entries-cache';
/**
* Fetches message entries from cache or database
* Returns cached value if available, otherwise fetches from DB and caches result
*/
export async function fetchMessageEntries(
messageId: string
): Promise<MessageEntriesCacheValue | null> {
// Check cache first
const cached = messageEntriesCache.get(messageId);
if (cached) {
return cached;
}
// Fetch from database
const result = await db
.select({
responseMessages: messages.responseMessages,
reasoning: messages.reasoning,
rawLlmMessages: messages.rawLlmMessages,
})
.from(messages)
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)))
.limit(1);
if (!result[0]) {
return null;
}
// Parse and validate the data
const messageEntries: MessageEntriesCacheValue = {
responseMessages: (result[0].responseMessages as ChatMessageResponseMessage[]) || [],
reasoning: (result[0].reasoning as ChatMessageReasoningMessage[]) || [],
rawLlmMessages: (result[0].rawLlmMessages as ModelMessage[]) || [],
};
// Cache the result
messageEntriesCache.set(messageId, messageEntries);
return messageEntries;
}

View File

@ -0,0 +1,179 @@
import type { ModelMessage } from 'ai';
import type {
ChatMessageReasoningMessage,
ChatMessageResponseMessage,
} from '../../../schemas/message-schemas';
/**
* Merges response messages by 'id' field, preserving order
* New messages with matching IDs replace existing ones at their original position
* New messages without matching IDs are appended
*/
export function mergeResponseMessages(
existing: ChatMessageResponseMessage[],
updates: ChatMessageResponseMessage[]
): ChatMessageResponseMessage[] {
if (!existing || existing.length === 0) {
return updates;
}
// Create a map of new messages by ID
const updateMap = new Map<string, ChatMessageResponseMessage>();
for (const msg of updates) {
updateMap.set(msg.id, msg);
}
// Keep track of which IDs we've already processed
const processedIds = new Set<string>();
// First pass: update existing messages in place
const merged = existing.map((existingMsg) => {
if (updateMap.has(existingMsg.id)) {
processedIds.add(existingMsg.id);
const updated = updateMap.get(existingMsg.id);
if (!updated) {
throw new Error(`Expected to find message with id ${existingMsg.id} in updateMap`);
}
return updated;
}
return existingMsg;
});
// Second pass: append new messages that weren't in existing
for (const updateMsg of updates) {
if (!processedIds.has(updateMsg.id)) {
merged.push(updateMsg);
}
}
return merged;
}
/**
* Merges reasoning messages by 'id' field, preserving order
* New messages with matching IDs replace existing ones at their original position
* New messages without matching IDs are appended
*/
export function mergeReasoningMessages(
existing: ChatMessageReasoningMessage[],
updates: ChatMessageReasoningMessage[]
): ChatMessageReasoningMessage[] {
if (!existing || existing.length === 0) {
return updates;
}
// Create a map of new messages by ID
const updateMap = new Map<string, ChatMessageReasoningMessage>();
for (const msg of updates) {
updateMap.set(msg.id, msg);
}
// Keep track of which IDs we've already processed
const processedIds = new Set<string>();
// First pass: update existing messages in place
const merged = existing.map((existingMsg) => {
if (updateMap.has(existingMsg.id)) {
processedIds.add(existingMsg.id);
const updated = updateMap.get(existingMsg.id);
if (!updated) {
throw new Error(`Expected to find message with id ${existingMsg.id} in updateMap`);
}
return updated;
}
return existingMsg;
});
// Second pass: append new messages that weren't in existing
for (const updateMsg of updates) {
if (!processedIds.has(updateMsg.id)) {
merged.push(updateMsg);
}
}
return merged;
}
/**
* Helper to extract tool call IDs from content
*/
function getToolCallIds(content: unknown): string {
if (!content || typeof content !== 'object') {
return '';
}
if (Array.isArray(content)) {
const toolCallIds: string[] = [];
for (const item of content) {
if (typeof item === 'object' && item !== null && 'toolCallId' in item) {
const toolCallId = (item as { toolCallId?: unknown }).toolCallId;
if (typeof toolCallId === 'string') {
toolCallIds.push(toolCallId);
}
}
}
return toolCallIds.sort().join(',');
}
return '';
}
/**
* Creates a unique key for raw LLM messages based on role and tool call IDs
*/
function getRawLlmMessageKey(message: ModelMessage): string {
const role = message.role || '';
const toolCallIds = getToolCallIds(message.content);
return `${role}:${toolCallIds}`;
}
/**
* Merges raw LLM messages by combination of 'role' and 'toolCallId', preserving order
* Messages with the same role and tool call IDs replace existing ones at their original position
* New messages are appended
*/
export function mergeRawLlmMessages(
existing: ModelMessage[],
updates: ModelMessage[]
): ModelMessage[] {
if (!existing || existing.length === 0) {
return updates;
}
// Create a map of new messages by their unique key
const updateMap = new Map<string, ModelMessage>();
for (const msg of updates) {
const key = getRawLlmMessageKey(msg);
updateMap.set(key, msg);
}
// Keep track of which keys we've already processed
const processedKeys = new Set<string>();
// First pass: update existing messages in place
const merged = existing.map((existingMsg) => {
const key = getRawLlmMessageKey(existingMsg);
if (updateMap.has(key)) {
processedKeys.add(key);
const updated = updateMap.get(key);
if (!updated) {
throw new Error(`Expected to find message with key ${key} in updateMap`);
}
return updated;
}
return existingMsg;
});
// Second pass: append new messages that weren't in existing
for (const updateMsg of updates) {
const key = getRawLlmMessageKey(updateMsg);
if (!processedKeys.has(key)) {
merged.push(updateMsg);
}
}
return merged;
}

View File

@ -0,0 +1,80 @@
import type { ModelMessage } from 'ai';
import { LRUCache } from 'lru-cache';
import type {
ChatMessageReasoningMessage,
ChatMessageResponseMessage,
} from '../../schemas/message-schemas';
export interface MessageEntriesCacheValue {
responseMessages: ChatMessageResponseMessage[];
reasoning: ChatMessageReasoningMessage[];
rawLlmMessages: ModelMessage[];
}
interface CacheOptions {
max?: number;
ttl?: number;
}
class MessageEntriesCache {
private cache: LRUCache<string, MessageEntriesCacheValue>;
private static instance: MessageEntriesCache;
private constructor(options: CacheOptions = {}) {
this.cache = new LRUCache<string, MessageEntriesCacheValue>({
max: options.max ?? 100,
ttl: options.ttl ?? 1000 * 60 * 2, // 2 minutes TTL
updateAgeOnGet: true, // Refresh TTL on read
updateAgeOnHas: true, // Refresh TTL on has check
});
}
static getInstance(options?: CacheOptions): MessageEntriesCache {
if (!MessageEntriesCache.instance) {
MessageEntriesCache.instance = new MessageEntriesCache(options);
}
return MessageEntriesCache.instance;
}
get(messageId: string): MessageEntriesCacheValue | undefined {
return this.cache.get(messageId);
}
set(messageId: string, value: MessageEntriesCacheValue): void {
this.cache.set(messageId, value);
}
has(messageId: string): boolean {
return this.cache.has(messageId);
}
delete(messageId: string): boolean {
return this.cache.delete(messageId);
}
clear(): void {
this.cache.clear();
}
/**
* Updates cached entry with partial data
*/
update(messageId: string, partialValue: Partial<MessageEntriesCacheValue>): void {
const existing = this.get(messageId);
if (existing) {
this.set(messageId, {
...existing,
...partialValue,
});
} else {
// If not in cache, set with defaults for missing fields
this.set(messageId, {
responseMessages: partialValue.responseMessages ?? [],
reasoning: partialValue.reasoning ?? [],
rawLlmMessages: partialValue.rawLlmMessages ?? [],
});
}
}
}
export const messageEntriesCache = MessageEntriesCache.getInstance();

View File

@ -105,7 +105,6 @@ export async function getAllRawLlmMessagesForChat(chatId: string) {
export async function updateMessageFields(
messageId: string,
fields: {
//TODO: Dallin let's make a type for this. It should not just be a jsonb object.
responseMessages?: unknown;
reasoning?: unknown;
rawLlmMessages?: unknown;

View File

@ -1,9 +1,16 @@
import type { ModelMessage } from 'ai';
import { type SQL, and, eq, isNull, sql } from 'drizzle-orm';
import { and, eq, isNull } from 'drizzle-orm';
import { z } from 'zod';
import { db } from '../../connection';
import { messages } from '../../schema';
import { ReasoningMessageSchema, ResponseMessageSchema } from '../../schemas/message-schemas';
import { fetchMessageEntries } from './helpers/fetch-message-entries';
import {
mergeRawLlmMessages,
mergeReasoningMessages,
mergeResponseMessages,
} from './helpers/merge-entries';
import { messageEntriesCache } from './message-entries-cache';
const UpdateMessageEntriesSchema = z.object({
messageId: z.string().uuid(),
@ -15,15 +22,13 @@ const UpdateMessageEntriesSchema = z.object({
export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSchema>;
/**
* Updates message entries using order-preserving JSONB merge operations.
* Performs batch upserts for multiple entries in a single database operation.
* PRESERVES the exact order of input arrays during upsert/append operations.
* Updates message entries with cache-first approach for streaming.
* Cache is the source of truth during streaming, DB is updated for persistence.
*
* Upsert logic:
* - responseMessages: upsert by 'id' field, maintaining input array order
* - reasoningMessages: upsert by 'id' field, maintaining input array order
* - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining input array order
* (handles both string content and array content with tool calls)
* Merge logic:
* - responseMessages: upsert by 'id' field, maintaining order
* - reasoningMessages: upsert by 'id' field, maintaining order
* - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order
*/
export async function updateMessageEntries({
messageId,
@ -32,125 +37,55 @@ export async function updateMessageEntries({
reasoningMessages,
}: UpdateMessageEntriesParams): Promise<{ success: boolean }> {
try {
const updates: Record<string, SQL | string> = { updatedAt: new Date().toISOString() };
// Fetch existing entries from cache or database
const existingEntries = await fetchMessageEntries(messageId);
// Order-preserving merge for response messages
if (responseMessages?.length) {
const newData = JSON.stringify(responseMessages);
updates.responseMessages = sql`
CASE
WHEN ${messages.responseMessages} IS NULL THEN ${newData}::jsonb
ELSE (
WITH new_data AS (
SELECT value, ordinality as new_order
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality)
),
existing_data AS (
SELECT value, ordinality as existing_order
FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS t(value, ordinality)
)
SELECT jsonb_agg(
CASE
WHEN nd.value IS NOT NULL THEN nd.value
ELSE ed.value
END
ORDER BY COALESCE(nd.new_order, ed.existing_order)
)
FROM existing_data ed
FULL OUTER JOIN new_data nd ON ed.value->>'id' = nd.value->>'id'
WHERE COALESCE(nd.value->>'id', ed.value->>'id') IS NOT NULL
)
END`;
if (!existingEntries) {
throw new Error(`Message not found: ${messageId}`);
}
// Order-preserving merge for reasoning messages
if (reasoningMessages?.length) {
const newData = JSON.stringify(reasoningMessages);
updates.reasoning = sql`
CASE
WHEN ${messages.reasoning} IS NULL THEN ${newData}::jsonb
ELSE (
WITH new_data AS (
SELECT value, ordinality as new_order
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality)
),
existing_data AS (
SELECT value, ordinality as existing_order
FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS t(value, ordinality)
)
SELECT jsonb_agg(
CASE
WHEN nd.value IS NOT NULL THEN nd.value
ELSE ed.value
END
ORDER BY COALESCE(nd.new_order, ed.existing_order)
)
FROM existing_data ed
FULL OUTER JOIN new_data nd ON ed.value->>'id' = nd.value->>'id'
WHERE COALESCE(nd.value->>'id', ed.value->>'id') IS NOT NULL
)
END`;
// Merge with new entries
const mergedEntries = {
responseMessages: responseMessages
? mergeResponseMessages(existingEntries.responseMessages, responseMessages)
: existingEntries.responseMessages,
reasoning: reasoningMessages
? mergeReasoningMessages(existingEntries.reasoning, reasoningMessages)
: existingEntries.reasoning,
rawLlmMessages: rawLlmMessages
? mergeRawLlmMessages(existingEntries.rawLlmMessages, rawLlmMessages)
: existingEntries.rawLlmMessages,
};
// Update cache immediately (cache is source of truth during streaming)
messageEntriesCache.set(messageId, mergedEntries);
// Update database asynchronously for persistence (fire-and-forget)
// If this fails, cache still has the latest state for next update
const updateData: Record<string, unknown> = {
updatedAt: new Date().toISOString(),
};
if (responseMessages) {
updateData.responseMessages = mergedEntries.responseMessages;
}
// Order-preserving merge for raw LLM messages - handles both string and array content
if (rawLlmMessages?.length) {
const newData = JSON.stringify(rawLlmMessages);
updates.rawLlmMessages = sql`
CASE
WHEN ${messages.rawLlmMessages} IS NULL THEN ${newData}::jsonb
ELSE (
WITH new_data AS (
SELECT
value,
ordinality as new_order,
value->>'role' AS role,
COALESCE(
CASE
WHEN jsonb_typeof(value->'content') = 'array' THEN
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL)
ELSE NULL
END,
''
) AS tool_calls
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality)
),
existing_data AS (
SELECT
value,
ordinality as existing_order,
value->>'role' AS role,
COALESCE(
CASE
WHEN jsonb_typeof(value->'content') = 'array' THEN
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL)
ELSE NULL
END,
''
) AS tool_calls
FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality)
)
SELECT jsonb_agg(
CASE
WHEN nd.value IS NOT NULL THEN nd.value
ELSE ed.value
END
ORDER BY COALESCE(nd.new_order, ed.existing_order)
)
FROM existing_data ed
FULL OUTER JOIN new_data nd ON (ed.role = nd.role AND ed.tool_calls = nd.tool_calls)
WHERE COALESCE(nd.role, ed.role) IS NOT NULL
)
END`;
if (reasoningMessages) {
updateData.reasoning = mergedEntries.reasoning;
}
await db
.update(messages)
.set(updates)
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)));
if (rawLlmMessages) {
updateData.rawLlmMessages = mergedEntries.rawLlmMessages;
}
// Non-blocking DB update - don't await
db.update(messages)
.set(updateData)
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)))
.catch((error) => {
// Log but don't fail - cache has the truth
console.error('Background DB update failed (cache still valid):', error);
});
return { success: true };
} catch (error) {

View File

@ -1022,6 +1022,9 @@ importers:
drizzle-zod:
specifier: ^0.8.2
version: 0.8.2(drizzle-orm@0.44.4(@opentelemetry/api@1.9.0)(@types/pg@8.15.4)(mysql2@3.14.1)(pg@8.16.3)(postgres@3.4.7))(zod@3.25.76)
lru-cache:
specifier: ^11.1.0
version: 11.1.0
postgres:
specifier: ^3.4.7
version: 3.4.7
@ -6433,6 +6436,7 @@ packages:
bun@1.2.18:
resolution: {integrity: sha512-OR+EpNckoJN4tHMVZPaTPxDj2RgpJgJwLruTIFYbO3bQMguLd0YrmkWKYqsiihcLgm2ehIjF/H1RLfZiRa7+qQ==}
cpu: [arm64, x64, aarch64]
os: [darwin, linux, win32]
hasBin: true
@ -18396,14 +18400,14 @@ snapshots:
msw: 2.10.4(@types/node@20.19.4)(typescript@5.9.2)
vite: 7.0.5(@types/node@20.19.4)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0)
'@vitest/mocker@3.2.4(msw@2.10.4(@types/node@24.0.10)(typescript@5.9.2))(vite@7.0.5(@types/node@20.19.4)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0))':
'@vitest/mocker@3.2.4(msw@2.10.4(@types/node@24.0.10)(typescript@5.9.2))(vite@7.0.5(@types/node@24.0.10)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0))':
dependencies:
'@vitest/spy': 3.2.4
estree-walker: 3.0.3
magic-string: 0.30.17
optionalDependencies:
msw: 2.10.4(@types/node@24.0.10)(typescript@5.9.2)
vite: 7.0.5(@types/node@20.19.4)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0)
vite: 7.0.5(@types/node@24.0.10)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0)
'@vitest/mocker@3.2.4(msw@2.10.4(@types/node@24.0.10)(typescript@5.9.2))(vite@7.1.3(@types/node@24.0.10)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0))':
dependencies:
@ -25600,7 +25604,7 @@ snapshots:
dependencies:
'@types/chai': 5.2.2
'@vitest/expect': 3.2.4
'@vitest/mocker': 3.2.4(msw@2.10.4(@types/node@24.0.10)(typescript@5.9.2))(vite@7.0.5(@types/node@20.19.4)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0))
'@vitest/mocker': 3.2.4(msw@2.10.4(@types/node@24.0.10)(typescript@5.9.2))(vite@7.0.5(@types/node@24.0.10)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.90.0)(terser@5.43.1)(tsx@4.20.4)(yaml@2.8.0))
'@vitest/pretty-format': 3.2.4
'@vitest/runner': 3.2.4
'@vitest/snapshot': 3.2.4