buster/packages/database/src/queries/messages/update-message-entries.ts

161 lines
6.2 KiB
TypeScript
Raw Normal View History

2025-08-08 06:36:01 +08:00
import type { ModelMessage } from 'ai';
import { type SQL, and, eq, isNull, sql } from 'drizzle-orm';
import { z } from 'zod';
2025-08-08 06:36:01 +08:00
import { db } from '../../connection';
import { messages } from '../../schema';
import { ReasoningMessageSchema, ResponseMessageSchema } from '../../schemas/message-schemas';
2025-08-08 06:36:01 +08:00
const UpdateMessageEntriesSchema = z.object({
messageId: z.string().uuid(),
rawLlmMessages: z.array(z.custom<ModelMessage>()).optional(),
responseMessages: z.array(ResponseMessageSchema).optional(),
reasoningMessages: z.array(ReasoningMessageSchema).optional(),
});
2025-08-08 06:36:01 +08:00
export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSchema>;
2025-08-08 06:36:01 +08:00
/**
2025-08-23 07:03:41 +08:00
* Updates message entries using order-preserving JSONB merge operations.
* Performs batch upserts for multiple entries in a single database operation.
2025-08-23 07:03:41 +08:00
* PRESERVES the exact order of input arrays during upsert/append operations.
*
* Upsert logic:
2025-08-23 07:03:41 +08:00
* - responseMessages: upsert by 'id' field, maintaining input array order
* - reasoningMessages: upsert by 'id' field, maintaining input array order
* - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining input array order
* (handles both string content and array content with tool calls)
2025-08-08 06:36:01 +08:00
*/
export async function updateMessageEntries({
messageId,
rawLlmMessages,
responseMessages,
reasoningMessages,
2025-08-08 06:36:01 +08:00
}: UpdateMessageEntriesParams): Promise<{ success: boolean }> {
try {
const updates: Record<string, SQL | string> = { updatedAt: new Date().toISOString() };
2025-08-08 06:36:01 +08:00
2025-08-23 07:03:41 +08:00
// Order-preserving merge for response messages
if (responseMessages?.length) {
const newData = JSON.stringify(responseMessages);
updates.responseMessages = sql`
2025-08-15 04:38:02 +08:00
CASE
WHEN ${messages.responseMessages} IS NULL THEN ${newData}::jsonb
2025-08-15 04:38:02 +08:00
ELSE (
2025-08-23 07:03:41 +08:00
WITH new_data AS (
SELECT value, ordinality as new_order
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality)
),
2025-08-23 07:03:41 +08:00
existing_data AS (
SELECT value, ordinality as existing_order
FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS t(value, ordinality)
)
SELECT jsonb_agg(
CASE
WHEN nd.value IS NOT NULL THEN nd.value
ELSE ed.value
END
ORDER BY COALESCE(nd.new_order, ed.existing_order)
2025-08-15 04:38:02 +08:00
)
2025-08-23 07:03:41 +08:00
FROM existing_data ed
FULL OUTER JOIN new_data nd ON ed.value->>'id' = nd.value->>'id'
WHERE COALESCE(nd.value->>'id', ed.value->>'id') IS NOT NULL
2025-08-15 04:38:02 +08:00
)
END`;
2025-08-08 06:36:01 +08:00
}
2025-08-23 07:03:41 +08:00
// Order-preserving merge for reasoning messages
if (reasoningMessages?.length) {
const newData = JSON.stringify(reasoningMessages);
updates.reasoning = sql`
2025-08-15 04:38:02 +08:00
CASE
WHEN ${messages.reasoning} IS NULL THEN ${newData}::jsonb
2025-08-15 04:38:02 +08:00
ELSE (
2025-08-23 07:03:41 +08:00
WITH new_data AS (
SELECT value, ordinality as new_order
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality)
),
2025-08-23 07:03:41 +08:00
existing_data AS (
SELECT value, ordinality as existing_order
FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS t(value, ordinality)
)
SELECT jsonb_agg(
CASE
WHEN nd.value IS NOT NULL THEN nd.value
ELSE ed.value
END
ORDER BY COALESCE(nd.new_order, ed.existing_order)
2025-08-15 04:38:02 +08:00
)
2025-08-23 07:03:41 +08:00
FROM existing_data ed
FULL OUTER JOIN new_data nd ON ed.value->>'id' = nd.value->>'id'
WHERE COALESCE(nd.value->>'id', ed.value->>'id') IS NOT NULL
2025-08-15 04:38:02 +08:00
)
END`;
2025-08-08 06:36:01 +08:00
}
2025-08-23 07:03:41 +08:00
// Order-preserving merge for raw LLM messages - handles both string and array content
if (rawLlmMessages?.length) {
const newData = JSON.stringify(rawLlmMessages);
updates.rawLlmMessages = sql`
2025-08-15 04:38:02 +08:00
CASE
WHEN ${messages.rawLlmMessages} IS NULL THEN ${newData}::jsonb
2025-08-15 04:38:02 +08:00
ELSE (
2025-08-23 07:03:41 +08:00
WITH new_data AS (
2025-08-15 04:38:02 +08:00
SELECT
2025-08-23 07:03:41 +08:00
value,
ordinality as new_order,
value->>'role' AS role,
COALESCE(
CASE
WHEN jsonb_typeof(value->'content') = 'array' THEN
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL)
ELSE NULL
END,
''
2025-08-23 07:03:41 +08:00
) AS tool_calls
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality)
2025-08-15 04:38:02 +08:00
),
2025-08-23 07:03:41 +08:00
existing_data AS (
2025-08-15 04:38:02 +08:00
SELECT
2025-08-23 07:03:41 +08:00
value,
ordinality as existing_order,
value->>'role' AS role,
COALESCE(
CASE
WHEN jsonb_typeof(value->'content') = 'array' THEN
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL)
ELSE NULL
END,
''
) AS tool_calls
2025-08-15 04:38:02 +08:00
FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality)
)
2025-08-23 07:03:41 +08:00
SELECT jsonb_agg(
CASE
WHEN nd.value IS NOT NULL THEN nd.value
ELSE ed.value
END
ORDER BY COALESCE(nd.new_order, ed.existing_order)
2025-08-15 04:38:02 +08:00
)
2025-08-23 07:03:41 +08:00
FROM existing_data ed
FULL OUTER JOIN new_data nd ON (ed.role = nd.role AND ed.tool_calls = nd.tool_calls)
WHERE COALESCE(nd.role, ed.role) IS NOT NULL
2025-08-15 04:38:02 +08:00
)
END`;
2025-08-08 06:36:01 +08:00
}
await db
.update(messages)
.set(updates)
2025-08-08 06:36:01 +08:00
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)));
return { success: true };
} catch (error) {
console.error('Failed to update message entries:', error);
throw new Error(`Failed to update message entries for message ${messageId}`);
}
}