diff --git a/packages/database/src/queries/messages/update-message-entries-optimized.ts b/packages/database/src/queries/messages/update-message-entries-optimized.ts new file mode 100644 index 000000000..3ae37543f --- /dev/null +++ b/packages/database/src/queries/messages/update-message-entries-optimized.ts @@ -0,0 +1,181 @@ +import type { ModelMessage } from 'ai'; +import { type SQL, and, eq, isNull, sql } from 'drizzle-orm'; +import { z } from 'zod'; +import { db } from '../../connection'; +import { messages } from '../../schema'; +import { ReasoningMessageSchema, ResponseMessageSchema } from '../../schemas/message-schemas'; + +const UpdateMessageEntriesSchema = z.object({ + messageId: z.string().uuid(), + rawLlmMessages: z.array(z.custom()).optional(), + responseMessages: z.array(ResponseMessageSchema).optional(), + reasoningMessages: z.array(ReasoningMessageSchema).optional(), +}); + +export type UpdateMessageEntriesParams = z.infer; + +/** + * Optimized version of updateMessageEntries using more efficient JSONB operations. + * Key optimizations: + * 1. Uses jsonb_build_object to construct lookup maps for O(1) lookups + * 2. Reduces the number of jsonb_array_elements calls + * 3. Simplifies the toolCallId comparison logic + * 4. Uses more efficient CASE statements instead of complex subqueries + */ +export async function updateMessageEntriesOptimized({ + messageId, + rawLlmMessages, + responseMessages, + reasoningMessages, +}: UpdateMessageEntriesParams): Promise<{ success: boolean }> { + try { + const updates: Record = { updatedAt: new Date().toISOString() }; + + // Optimized merge for response messages - using jsonb_object for O(1) lookups + if (responseMessages?.length) { + const newData = JSON.stringify(responseMessages); + updates.responseMessages = sql` + CASE + WHEN ${messages.responseMessages} IS NULL THEN ${newData}::jsonb + ELSE ( + WITH new_map AS ( + SELECT jsonb_object_agg(value->>'id', value) AS map + FROM jsonb_array_elements(${newData}::jsonb) AS value + WHERE value->>'id' IS NOT NULL + ), + merged AS ( + SELECT jsonb_agg( + CASE + WHEN new_map.map ? (existing.value->>'id') + THEN new_map.map->(existing.value->>'id') + ELSE existing.value + END + ORDER BY existing.ordinality + ) AS result + FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS existing(value, ordinality) + CROSS JOIN new_map + UNION ALL + SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality) + FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality) + CROSS JOIN new_map + WHERE NOT EXISTS ( + SELECT 1 FROM jsonb_array_elements(${messages.responseMessages}) AS existing + WHERE existing.value->>'id' = new_item.value->>'id' + ) + ) + SELECT COALESCE(jsonb_agg(value), '[]'::jsonb) + FROM ( + SELECT jsonb_array_elements(result) AS value + FROM merged + WHERE result IS NOT NULL + ) t + ) + END`; + } + + // Optimized merge for reasoning messages + if (reasoningMessages?.length) { + const newData = JSON.stringify(reasoningMessages); + updates.reasoning = sql` + CASE + WHEN ${messages.reasoning} IS NULL THEN ${newData}::jsonb + ELSE ( + WITH new_map AS ( + SELECT jsonb_object_agg(value->>'id', value) AS map + FROM jsonb_array_elements(${newData}::jsonb) AS value + WHERE value->>'id' IS NOT NULL + ), + merged AS ( + SELECT jsonb_agg( + CASE + WHEN new_map.map ? (existing.value->>'id') + THEN new_map.map->(existing.value->>'id') + ELSE existing.value + END + ORDER BY existing.ordinality + ) AS result + FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS existing(value, ordinality) + CROSS JOIN new_map + UNION ALL + SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality) + FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality) + CROSS JOIN new_map + WHERE NOT EXISTS ( + SELECT 1 FROM jsonb_array_elements(${messages.reasoning}) AS existing + WHERE existing.value->>'id' = new_item.value->>'id' + ) + ) + SELECT COALESCE(jsonb_agg(value), '[]'::jsonb) + FROM ( + SELECT jsonb_array_elements(result) AS value + FROM merged + WHERE result IS NOT NULL + ) t + ) + END`; + } + + // Optimized merge for raw LLM messages - simplified toolCallId comparison + if (rawLlmMessages?.length) { + const newData = JSON.stringify(rawLlmMessages); + updates.rawLlmMessages = sql` + CASE + WHEN ${messages.rawLlmMessages} IS NULL THEN ${newData}::jsonb + ELSE ( + WITH new_messages AS ( + SELECT + value, + value->>'role' AS role, + COALESCE( + (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') + FROM jsonb_array_elements(value->'content') c + WHERE c->>'toolCallId' IS NOT NULL), + '' + ) AS tool_calls + FROM jsonb_array_elements(${newData}::jsonb) AS value + ), + existing_messages AS ( + SELECT + value, + ordinality, + value->>'role' AS role, + COALESCE( + (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') + FROM jsonb_array_elements(value->'content') c + WHERE c->>'toolCallId' IS NOT NULL), + '' + ) AS tool_calls + FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality) + ) + SELECT COALESCE( + jsonb_agg(value ORDER BY ord), + '[]'::jsonb + ) + FROM ( + -- Keep existing messages that aren't being updated + SELECT e.value, e.ordinality AS ord + FROM existing_messages e + WHERE NOT EXISTS ( + SELECT 1 FROM new_messages n + WHERE n.role = e.role AND n.tool_calls = e.tool_calls + ) + UNION ALL + -- Add all new messages + SELECT n.value, 1000000 + row_number() OVER () AS ord + FROM new_messages n + ) combined + ) + END`; + } + + await db + .update(messages) + .set(updates) + .where(and(eq(messages.id, messageId), isNull(messages.deletedAt))); + + return { success: true }; + } catch (error) { + console.error('Failed to update message entries:', error); + throw new Error(`Failed to update message entries for message ${messageId}`); + } +} \ No newline at end of file diff --git a/packages/database/src/queries/messages/update-message-entries.ts b/packages/database/src/queries/messages/update-message-entries.ts index bfa8b2db1..b390ec476 100644 --- a/packages/database/src/queries/messages/update-message-entries.ts +++ b/packages/database/src/queries/messages/update-message-entries.ts @@ -22,6 +22,12 @@ export type UpdateMessageEntriesParams = z.infer = { updatedAt: new Date().toISOString() }; - // Optimized merge for response messages - upsert by 'id' + // Optimized merge for response messages - using jsonb_object_agg for O(1) lookups if (responseMessages?.length) { const newData = JSON.stringify(responseMessages); updates.responseMessages = sql` - COALESCE( - (SELECT jsonb_agg(value ORDER BY ordinality) - FROM ( - -- Keep existing messages that aren't being updated - SELECT value, ordinality - FROM jsonb_array_elements(COALESCE(${messages.responseMessages}, '[]'::jsonb)) WITH ORDINALITY AS t(value, ordinality) - WHERE NOT EXISTS ( - SELECT 1 FROM jsonb_array_elements(${newData}::jsonb) AS new_msg - WHERE new_msg->>'id' = t.value->>'id' - ) - UNION ALL - -- Add new/updated messages at the end - SELECT value, 1000000 + ordinality AS ordinality - FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality) - ) combined), - '[]'::jsonb - )`; + CASE + WHEN ${messages.responseMessages} IS NULL OR jsonb_array_length(${messages.responseMessages}) = 0 + THEN ${newData}::jsonb + ELSE ( + WITH indexed_new AS ( + SELECT jsonb_object_agg(value->>'id', value) AS lookup + FROM jsonb_array_elements(${newData}::jsonb) AS value + ) + SELECT jsonb_agg( + COALESCE( + indexed_new.lookup->(existing.value->>'id'), + existing.value + ) ORDER BY existing.ordinality + ) || + COALESCE( + (SELECT jsonb_agg(new_item.value) + FROM jsonb_array_elements(${newData}::jsonb) AS new_item + WHERE NOT EXISTS ( + SELECT 1 FROM jsonb_array_elements(${messages.responseMessages}) AS e + WHERE e.value->>'id' = new_item.value->>'id' + )), + '[]'::jsonb + ) + FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS existing(value, ordinality) + CROSS JOIN indexed_new + ) + END`; } - // Optimized merge for reasoning messages - upsert by 'id' + // Optimized merge for reasoning messages if (reasoningMessages?.length) { const newData = JSON.stringify(reasoningMessages); updates.reasoning = sql` - COALESCE( - (SELECT jsonb_agg(value ORDER BY ordinality) - FROM ( - -- Keep existing messages that aren't being updated - SELECT value, ordinality - FROM jsonb_array_elements(COALESCE(${messages.reasoning}, '[]'::jsonb)) WITH ORDINALITY AS t(value, ordinality) - WHERE NOT EXISTS ( - SELECT 1 FROM jsonb_array_elements(${newData}::jsonb) AS new_msg - WHERE new_msg->>'id' = t.value->>'id' - ) - UNION ALL - -- Add new/updated messages at the end - SELECT value, 1000000 + ordinality AS ordinality - FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality) - ) combined), - '[]'::jsonb - )`; + CASE + WHEN ${messages.reasoning} IS NULL OR jsonb_array_length(${messages.reasoning}) = 0 + THEN ${newData}::jsonb + ELSE ( + WITH indexed_new AS ( + SELECT jsonb_object_agg(value->>'id', value) AS lookup + FROM jsonb_array_elements(${newData}::jsonb) AS value + ) + SELECT jsonb_agg( + COALESCE( + indexed_new.lookup->(existing.value->>'id'), + existing.value + ) ORDER BY existing.ordinality + ) || + COALESCE( + (SELECT jsonb_agg(new_item.value) + FROM jsonb_array_elements(${newData}::jsonb) AS new_item + WHERE NOT EXISTS ( + SELECT 1 FROM jsonb_array_elements(${messages.reasoning}) AS e + WHERE e.value->>'id' = new_item.value->>'id' + )), + '[]'::jsonb + ) + FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS existing(value, ordinality) + CROSS JOIN indexed_new + ) + END`; } - // Optimized merge for raw LLM messages - upsert by role + toolCallId combination + // Optimized merge for raw LLM messages - using efficient key generation if (rawLlmMessages?.length) { const newData = JSON.stringify(rawLlmMessages); updates.rawLlmMessages = sql` - COALESCE( - (SELECT jsonb_agg(value ORDER BY ordinality) - FROM ( - -- Keep existing messages that aren't being updated - SELECT value, ordinality - FROM jsonb_array_elements(COALESCE(${messages.rawLlmMessages}, '[]'::jsonb)) WITH ORDINALITY AS t(value, ordinality) - WHERE NOT EXISTS ( - SELECT 1 FROM jsonb_array_elements(${newData}::jsonb) AS new_msg - WHERE new_msg->>'role' = t.value->>'role' - AND ( - -- Compare toolCallIds if they exist - (SELECT string_agg(content->>'toolCallId', ',' ORDER BY content->>'toolCallId') - FROM jsonb_array_elements(new_msg->'content') content - WHERE content->>'toolCallId' IS NOT NULL) = - (SELECT string_agg(content->>'toolCallId', ',' ORDER BY content->>'toolCallId') - FROM jsonb_array_elements(t.value->'content') content - WHERE content->>'toolCallId' IS NOT NULL) - OR - -- Both have no toolCallIds - ((SELECT COUNT(*) FROM jsonb_array_elements(new_msg->'content') content WHERE content->>'toolCallId' IS NOT NULL) = 0 - AND (SELECT COUNT(*) FROM jsonb_array_elements(t.value->'content') content WHERE content->>'toolCallId' IS NOT NULL) = 0) - ) - ) - UNION ALL - -- Add new/updated messages at the end - SELECT value, 1000000 + ordinality AS ordinality - FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality) - ) combined), - '[]'::jsonb - )`; + CASE + WHEN ${messages.rawLlmMessages} IS NULL OR jsonb_array_length(${messages.rawLlmMessages}) = 0 + THEN ${newData}::jsonb + ELSE ( + WITH new_with_keys AS ( + SELECT + value, + value->>'role' || ':' || COALESCE( + (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') + FROM jsonb_array_elements(value->'content') c + WHERE c->>'toolCallId' IS NOT NULL), + 'no-tools' + ) AS match_key + FROM jsonb_array_elements(${newData}::jsonb) AS value + ), + existing_with_keys AS ( + SELECT + value, + ordinality, + value->>'role' || ':' || COALESCE( + (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') + FROM jsonb_array_elements(value->'content') c + WHERE c->>'toolCallId' IS NOT NULL), + 'no-tools' + ) AS match_key + FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality) + ), + new_lookup AS ( + SELECT jsonb_object_agg(match_key, value) AS lookup + FROM new_with_keys + ) + SELECT jsonb_agg( + COALESCE( + new_lookup.lookup->existing_with_keys.match_key, + existing_with_keys.value + ) ORDER BY existing_with_keys.ordinality + ) || + COALESCE( + (SELECT jsonb_agg(n.value) + FROM new_with_keys n + WHERE NOT EXISTS ( + SELECT 1 FROM existing_with_keys e + WHERE e.match_key = n.match_key + )), + '[]'::jsonb + ) + FROM existing_with_keys + CROSS JOIN new_lookup + ) + END`; } await db