speed up writes to db

This commit is contained in:
dal 2025-09-02 21:58:26 -06:00
parent 02e70b6905
commit 30f7e8ac88
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
4 changed files with 84 additions and 239 deletions

View File

@ -15,13 +15,13 @@ import { type MessageEntriesCacheValue, messageEntriesCache } from '../message-e
export async function fetchMessageEntries(
messageId: string
): Promise<MessageEntriesCacheValue | null> {
// Check cache first
// Check cache first - return immediately if found
const cached = messageEntriesCache.get(messageId);
if (cached) {
return cached;
}
// Fetch from database
// Fetch from database only if not in cache
const result = await db
.select({
responseMessages: messages.responseMessages,
@ -36,14 +36,14 @@ export async function fetchMessageEntries(
return null;
}
// Parse and validate the data
// Direct assignment without additional validation - trust the data structure
const messageEntries: MessageEntriesCacheValue = {
responseMessages: (result[0].responseMessages as ChatMessageResponseMessage[]) || [],
reasoning: (result[0].reasoning as ChatMessageReasoningMessage[]) || [],
rawLlmMessages: (result[0].rawLlmMessages as ModelMessage[]) || [],
};
// Cache the result
// Cache the result for future reads
messageEntriesCache.set(messageId, messageEntries);
return messageEntries;

View File

@ -19,35 +19,30 @@ export function mergeResponseMessages(
// Create a map of new messages by ID
const updateMap = new Map<string, ChatMessageResponseMessage>();
for (const msg of updates) {
updateMap.set(msg.id, msg);
}
// Keep track of which IDs we've already processed
const processedIds = new Set<string>();
// Single pass: update existing and track what's been processed
const result: ChatMessageResponseMessage[] = [];
// First pass: update existing messages in place
const merged = existing.map((existingMsg) => {
if (updateMap.has(existingMsg.id)) {
processedIds.add(existingMsg.id);
const updated = updateMap.get(existingMsg.id);
if (!updated) {
throw new Error(`Expected to find message with id ${existingMsg.id} in updateMap`);
}
return updated;
}
return existingMsg;
});
// Second pass: append new messages that weren't in existing
for (const updateMsg of updates) {
if (!processedIds.has(updateMsg.id)) {
merged.push(updateMsg);
// Process existing messages, updating if needed
for (const existingMsg of existing) {
const updated = updateMap.get(existingMsg.id);
if (updated) {
result.push(updated);
updateMap.delete(existingMsg.id); // Remove from map once processed
} else {
result.push(existingMsg);
}
}
return merged;
// Append any remaining new messages
for (const updateMsg of updateMap.values()) {
result.push(updateMsg);
}
return result;
}
/**
@ -65,59 +60,30 @@ export function mergeReasoningMessages(
// Create a map of new messages by ID
const updateMap = new Map<string, ChatMessageReasoningMessage>();
for (const msg of updates) {
updateMap.set(msg.id, msg);
}
// Keep track of which IDs we've already processed
const processedIds = new Set<string>();
// Single pass: update existing and track what's been processed
const result: ChatMessageReasoningMessage[] = [];
// First pass: update existing messages in place
const merged = existing.map((existingMsg) => {
if (updateMap.has(existingMsg.id)) {
processedIds.add(existingMsg.id);
const updated = updateMap.get(existingMsg.id);
if (!updated) {
throw new Error(`Expected to find message with id ${existingMsg.id} in updateMap`);
}
return updated;
}
return existingMsg;
});
// Second pass: append new messages that weren't in existing
for (const updateMsg of updates) {
if (!processedIds.has(updateMsg.id)) {
merged.push(updateMsg);
// Process existing messages, updating if needed
for (const existingMsg of existing) {
const updated = updateMap.get(existingMsg.id);
if (updated) {
result.push(updated);
updateMap.delete(existingMsg.id); // Remove from map once processed
} else {
result.push(existingMsg);
}
}
return merged;
}
/**
* Helper to extract tool call IDs from content
*/
function getToolCallIds(content: unknown): string {
if (!content || typeof content !== 'object') {
return '';
// Append any remaining new messages
for (const updateMsg of updateMap.values()) {
result.push(updateMsg);
}
if (Array.isArray(content)) {
const toolCallIds: string[] = [];
for (const item of content) {
if (typeof item === 'object' && item !== null && 'toolCallId' in item) {
const toolCallId = (item as { toolCallId?: unknown }).toolCallId;
if (typeof toolCallId === 'string') {
toolCallIds.push(toolCallId);
}
}
}
return toolCallIds.sort().join(',');
}
return '';
return result;
}
/**
@ -125,164 +91,70 @@ function getToolCallIds(content: unknown): string {
*/
function getRawLlmMessageKey(message: ModelMessage): string {
const role = message.role || '';
const toolCallIds = getToolCallIds(message.content);
return `${role}:${toolCallIds}`;
}
/**
* Ensures tool calls always precede their corresponding tool results
* This fixes any ordering issues that may occur during concurrent updates
*/
function sortToolCallsBeforeResults(messages: ModelMessage[]): ModelMessage[] {
// Map to store tool call/result pairs
const toolPairs = new Map<
string,
{ call?: ModelMessage; result?: ModelMessage; callIndex?: number; resultIndex?: number }
>();
const standaloneMessages: { message: ModelMessage; index: number }[] = [];
// Fast path for non-tool messages
if (role !== 'assistant' && role !== 'tool') {
return role;
}
// First pass: identify tool calls and results
messages.forEach((msg, index) => {
const toolCallIds = getToolCallIds(msg.content);
if (toolCallIds) {
// This message has tool call IDs
const toolCallIdList = toolCallIds.split(',');
for (const toolCallId of toolCallIdList) {
if (!toolCallId) continue;
const pair = toolPairs.get(toolCallId) || {};
if (msg.role === 'assistant') {
// This is a tool call
pair.call = msg;
pair.callIndex = index;
} else if (msg.role === 'tool') {
// This is a tool result
pair.result = msg;
pair.resultIndex = index;
}
toolPairs.set(toolCallId, pair);
}
} else {
// Standalone message without tool call IDs
standaloneMessages.push({ message: msg, index });
}
});
// Build the sorted array
const sorted: ModelMessage[] = [];
const processedIndices = new Set<number>();
// Process messages in original order, but ensure tool pairs are correctly ordered
messages.forEach((msg, index) => {
if (processedIndices.has(index)) {
return; // Already processed as part of a tool pair
}
const toolCallIds = getToolCallIds(msg.content);
if (toolCallIds) {
const toolCallIdList = toolCallIds.split(',');
for (const toolCallId of toolCallIdList) {
if (!toolCallId) continue;
const pair = toolPairs.get(toolCallId);
if (!pair) continue;
// If this is a tool result that appears before its call, skip it for now
if (
msg.role === 'tool' &&
pair.call &&
pair.callIndex !== undefined &&
pair.callIndex > index &&
!processedIndices.has(pair.callIndex)
) {
continue; // Will be added when we process the call
}
// If this is a tool call, add both call and result in correct order
if (msg.role === 'assistant' && pair.call && !processedIndices.has(index)) {
sorted.push(pair.call);
processedIndices.add(index);
// Add the corresponding result immediately after
if (
pair.result &&
pair.resultIndex !== undefined &&
!processedIndices.has(pair.resultIndex)
) {
sorted.push(pair.result);
processedIndices.add(pair.resultIndex);
}
}
// If this is an orphaned tool result (no corresponding call), add it
if (msg.role === 'tool' && !pair.call && !processedIndices.has(index)) {
sorted.push(msg);
processedIndices.add(index);
// Extract tool call IDs if present
if (Array.isArray(message.content)) {
const toolCallIds: string[] = [];
for (const item of message.content) {
if (typeof item === 'object' && item !== null && 'toolCallId' in item) {
const toolCallId = (item as { toolCallId?: unknown }).toolCallId;
if (typeof toolCallId === 'string') {
toolCallIds.push(toolCallId);
}
}
} else {
// Standalone message
sorted.push(msg);
processedIndices.add(index);
}
});
if (toolCallIds.length > 0) {
return `${role}:${toolCallIds.sort().join(',')}`;
}
}
return sorted;
return role;
}
/**
* Merges raw LLM messages by combination of 'role' and 'toolCallId', preserving order
* Messages with the same role and tool call IDs replace existing ones at their original position
* New messages are appended
* Tool calls are guaranteed to precede their corresponding tool results
*/
export function mergeRawLlmMessages(
existing: ModelMessage[],
updates: ModelMessage[]
): ModelMessage[] {
if (!existing || existing.length === 0) {
return sortToolCallsBeforeResults(updates);
return updates;
}
// Create a map of new messages by their unique key
const updateMap = new Map<string, ModelMessage>();
for (const msg of updates) {
const key = getRawLlmMessageKey(msg);
updateMap.set(key, msg);
}
// Keep track of which keys we've already processed
const processedKeys = new Set<string>();
// Single pass: update existing and track what's been processed
const result: ModelMessage[] = [];
// First pass: update existing messages in place
const merged = existing.map((existingMsg) => {
// Process existing messages, updating if needed
for (const existingMsg of existing) {
const key = getRawLlmMessageKey(existingMsg);
if (updateMap.has(key)) {
processedKeys.add(key);
const updated = updateMap.get(key);
if (!updated) {
throw new Error(`Expected to find message with key ${key} in updateMap`);
}
return updated;
}
return existingMsg;
});
// Second pass: append new messages that weren't in existing
for (const updateMsg of updates) {
const key = getRawLlmMessageKey(updateMsg);
if (!processedKeys.has(key)) {
merged.push(updateMsg);
const updated = updateMap.get(key);
if (updated) {
result.push(updated);
updateMap.delete(key); // Remove from map once processed
} else {
result.push(existingMsg);
}
}
// Ensure tool calls always precede their results
return sortToolCallsBeforeResults(merged);
// Append any remaining new messages
for (const updateMsg of updateMap.values()) {
result.push(updateMsg);
}
return result;
}

View File

@ -22,8 +22,8 @@ class MessageEntriesCache {
private constructor(options: CacheOptions = {}) {
this.cache = new LRUCache<string, MessageEntriesCacheValue>({
max: options.max ?? 100,
ttl: options.ttl ?? 1000 * 60 * 2, // 2 minutes TTL
max: options.max ?? 500, // Increased from 100 for better hit rate
ttl: options.ttl ?? 1000 * 60 * 5, // Increased to 5 minutes TTL
updateAgeOnGet: true, // Refresh TTL on read
updateAgeOnHas: true, // Refresh TTL on has check
});

View File

@ -44,39 +44,6 @@ export async function updateMessageEntries({
throw new Error(`Message not found: ${messageId}`);
}
// Fix any stringified JSON inputs in rawLlmMessages before merging
const fixedRawLlmMessages = rawLlmMessages?.map((msg) => {
if (msg.role === 'assistant' && Array.isArray(msg.content)) {
const fixedContent = msg.content.map((item) => {
if (
typeof item === 'object' &&
'type' in item &&
item.type === 'tool-call' &&
'input' in item &&
typeof item.input === 'string'
) {
try {
// Try to parse the stringified JSON
const parsedInput = JSON.parse(item.input);
return {
...item,
input: parsedInput,
};
} catch {
// If parsing fails, keep the original
return item;
}
}
return item;
});
return {
...msg,
content: fixedContent,
};
}
return msg;
});
// Merge with new entries
const mergedEntries = {
responseMessages: responseMessages
@ -85,16 +52,15 @@ export async function updateMessageEntries({
reasoning: reasoningMessages
? mergeReasoningMessages(existingEntries.reasoning, reasoningMessages)
: existingEntries.reasoning,
rawLlmMessages: fixedRawLlmMessages
? mergeRawLlmMessages(existingEntries.rawLlmMessages, fixedRawLlmMessages)
rawLlmMessages: rawLlmMessages
? mergeRawLlmMessages(existingEntries.rawLlmMessages, rawLlmMessages)
: existingEntries.rawLlmMessages,
};
// Update cache immediately (cache is source of truth during streaming)
messageEntriesCache.set(messageId, mergedEntries);
// Update database asynchronously for persistence (fire-and-forget)
// If this fails, cache still has the latest state for next update
// Build update data
const updateData: Record<string, unknown> = {
updatedAt: new Date().toISOString(),
};
@ -111,11 +77,18 @@ export async function updateMessageEntries({
updateData.rawLlmMessages = mergedEntries.rawLlmMessages;
}
// Update database for persistence
await db
.update(messages)
// Fire-and-forget database update for persistence
// Don't await - return immediately after cache update
db.update(messages)
.set(updateData)
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)));
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)))
.catch((dbError) => {
// Log error but don't fail the operation - cache has the data
console.error('Background DB update failed for message entries:', {
messageId,
error: dbError,
});
});
return { success: true };
} catch (error) {