buster/packages/database/src/queries/messages/update-message-entries.ts

96 lines
3.4 KiB
TypeScript
Raw Normal View History

2025-08-08 06:36:01 +08:00
import type { ModelMessage } from 'ai';
import { and, eq, isNull } from 'drizzle-orm';
import { z } from 'zod';
2025-08-08 06:36:01 +08:00
import { db } from '../../connection';
import { messages } from '../../schema';
import { ReasoningMessageSchema, ResponseMessageSchema } from '../../schemas/message-schemas';
import { fetchMessageEntries } from './helpers/fetch-message-entries';
import {
mergeRawLlmMessages,
mergeReasoningMessages,
mergeResponseMessages,
} from './helpers/merge-entries';
import { messageEntriesCache } from './message-entries-cache';
2025-08-08 06:36:01 +08:00
const UpdateMessageEntriesSchema = z.object({
messageId: z.string().uuid(),
rawLlmMessages: z.array(z.custom<ModelMessage>()).optional(),
responseMessages: z.array(ResponseMessageSchema).optional(),
reasoningMessages: z.array(ReasoningMessageSchema).optional(),
});
2025-08-08 06:36:01 +08:00
export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSchema>;
2025-08-08 06:36:01 +08:00
/**
* Updates message entries with cache-first approach for streaming.
* Cache is the source of truth during streaming, DB is updated for persistence.
*
* Merge logic:
* - responseMessages: upsert by 'id' field, maintaining order
* - reasoningMessages: upsert by 'id' field, maintaining order
* - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order
2025-08-08 06:36:01 +08:00
*/
export async function updateMessageEntries({
messageId,
rawLlmMessages,
responseMessages,
reasoningMessages,
2025-08-08 06:36:01 +08:00
}: UpdateMessageEntriesParams): Promise<{ success: boolean }> {
try {
// Fetch existing entries from cache or database
const existingEntries = await fetchMessageEntries(messageId);
2025-08-08 06:36:01 +08:00
if (!existingEntries) {
throw new Error(`Message not found: ${messageId}`);
2025-08-08 06:36:01 +08:00
}
// Merge with new entries
const mergedEntries = {
responseMessages: responseMessages
? mergeResponseMessages(existingEntries.responseMessages, responseMessages)
: existingEntries.responseMessages,
reasoning: reasoningMessages
? mergeReasoningMessages(existingEntries.reasoning, reasoningMessages)
: existingEntries.reasoning,
rawLlmMessages: rawLlmMessages
? mergeRawLlmMessages(existingEntries.rawLlmMessages, rawLlmMessages)
: existingEntries.rawLlmMessages,
};
// Update cache immediately (cache is source of truth during streaming)
messageEntriesCache.set(messageId, mergedEntries);
// Update database asynchronously for persistence (fire-and-forget)
// If this fails, cache still has the latest state for next update
const updateData: Record<string, unknown> = {
updatedAt: new Date().toISOString(),
};
if (responseMessages) {
updateData.responseMessages = mergedEntries.responseMessages;
2025-08-08 06:36:01 +08:00
}
if (reasoningMessages) {
updateData.reasoning = mergedEntries.reasoning;
}
if (rawLlmMessages) {
updateData.rawLlmMessages = mergedEntries.rawLlmMessages;
2025-08-08 06:36:01 +08:00
}
// Non-blocking DB update - don't await
db.update(messages)
.set(updateData)
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)))
.catch((error) => {
// Log but don't fail - cache has the truth
console.error('Background DB update failed (cache still valid):', error);
});
2025-08-08 06:36:01 +08:00
return { success: true };
} catch (error) {
console.error('Failed to update message entries:', error);
throw new Error(`Failed to update message entries for message ${messageId}`);
}
}