buster/packages/database/src/queries/messages/update-message-entries.ts

91 lines
3.1 KiB
TypeScript
Raw Normal View History

2025-08-08 06:36:01 +08:00
import type { ModelMessage } from 'ai';
import { and, eq, isNull } from 'drizzle-orm';
import { z } from 'zod';
2025-08-08 06:36:01 +08:00
import { db } from '../../connection';
import { messages } from '../../schema';
import { ReasoningMessageSchema, ResponseMessageSchema } from '../../schemas/message-schemas';
import { fetchMessageEntries } from './helpers/fetch-message-entries';
import {
mergeRawLlmMessages,
mergeReasoningMessages,
mergeResponseMessages,
} from './helpers/merge-entries';
import { messageEntriesCache } from './message-entries-cache';
2025-08-08 06:36:01 +08:00
const UpdateMessageEntriesSchema = z.object({
messageId: z.string().uuid(),
rawLlmMessages: z.array(z.custom<ModelMessage>()).optional(),
responseMessages: z.array(ResponseMessageSchema).optional(),
reasoningMessages: z.array(ReasoningMessageSchema).optional(),
});
2025-08-08 06:36:01 +08:00
export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSchema>;
2025-08-08 06:36:01 +08:00
/**
* Updates message entries using TypeScript-based merge logic with write-through caching.
* Fetches existing entries from cache/database, merges with updates, and saves back.
*
* Merge logic:
* - responseMessages: upsert by 'id' field, maintaining order
* - reasoningMessages: upsert by 'id' field, maintaining order
* - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining order
2025-08-08 06:36:01 +08:00
*/
export async function updateMessageEntries({
messageId,
rawLlmMessages,
responseMessages,
reasoningMessages,
2025-08-08 06:36:01 +08:00
}: UpdateMessageEntriesParams): Promise<{ success: boolean }> {
try {
// Fetch existing entries from cache or database
const existingEntries = await fetchMessageEntries(messageId);
2025-08-08 06:36:01 +08:00
if (!existingEntries) {
throw new Error(`Message not found: ${messageId}`);
2025-08-08 06:36:01 +08:00
}
// Merge with new entries
const mergedEntries = {
responseMessages: responseMessages
? mergeResponseMessages(existingEntries.responseMessages, responseMessages)
: existingEntries.responseMessages,
reasoning: reasoningMessages
? mergeReasoningMessages(existingEntries.reasoning, reasoningMessages)
: existingEntries.reasoning,
rawLlmMessages: rawLlmMessages
? mergeRawLlmMessages(existingEntries.rawLlmMessages, rawLlmMessages)
: existingEntries.rawLlmMessages,
};
// Update database with merged entries
const updateData: Record<string, unknown> = {
updatedAt: new Date().toISOString(),
};
if (responseMessages) {
updateData.responseMessages = mergedEntries.responseMessages;
2025-08-08 06:36:01 +08:00
}
if (reasoningMessages) {
updateData.reasoning = mergedEntries.reasoning;
}
if (rawLlmMessages) {
updateData.rawLlmMessages = mergedEntries.rawLlmMessages;
2025-08-08 06:36:01 +08:00
}
await db
.update(messages)
.set(updateData)
2025-08-08 06:36:01 +08:00
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)));
// Write-through: update cache with merged entries
messageEntriesCache.set(messageId, mergedEntries);
2025-08-08 06:36:01 +08:00
return { success: true };
} catch (error) {
console.error('Failed to update message entries:', error);
throw new Error(`Failed to update message entries for message ${messageId}`);
}
}