From affe7b710b1f45af69c641a152a9486e5abfb967 Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 22 Aug 2025 17:03:41 -0600 Subject: [PATCH] update message entries query fix --- .../messages/update-message-entries.ts | 176 ++++++------------ 1 file changed, 56 insertions(+), 120 deletions(-) diff --git a/packages/database/src/queries/messages/update-message-entries.ts b/packages/database/src/queries/messages/update-message-entries.ts index 5bfd1b635..cdc51370c 100644 --- a/packages/database/src/queries/messages/update-message-entries.ts +++ b/packages/database/src/queries/messages/update-message-entries.ts @@ -15,13 +15,14 @@ const UpdateMessageEntriesSchema = z.object({ export type UpdateMessageEntriesParams = z.infer; /** - * Updates message entries using optimized JSONB merge operations. + * Updates message entries using order-preserving JSONB merge operations. * Performs batch upserts for multiple entries in a single database operation. + * PRESERVES the exact order of input arrays during upsert/append operations. * * Upsert logic: - * - responseMessages: upsert by 'id' field - * - reasoningMessages: upsert by 'id' field - * - rawLlmMessages: upsert by combination of 'role' and 'toolCallId' in content array + * - responseMessages: upsert by 'id' field, maintaining input array order + * - reasoningMessages: upsert by 'id' field, maintaining input array order + * - rawLlmMessages: upsert by combination of 'role' and 'toolCallId', maintaining input array order * (handles both string content and array content with tool calls) */ export async function updateMessageEntries({ @@ -33,101 +34,75 @@ export async function updateMessageEntries({ try { const updates: Record = { updatedAt: new Date().toISOString() }; - // Optimized merge for response messages + // Order-preserving merge for response messages if (responseMessages?.length) { const newData = JSON.stringify(responseMessages); updates.responseMessages = sql` CASE WHEN ${messages.responseMessages} IS NULL THEN ${newData}::jsonb ELSE ( - WITH new_map AS ( - SELECT jsonb_object_agg(value->>'id', value) AS map - FROM jsonb_array_elements(${newData}::jsonb) AS value - WHERE value->>'id' IS NOT NULL + WITH new_data AS ( + SELECT value, ordinality as new_order + FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality) ), - merged AS ( - SELECT jsonb_agg( - CASE - WHEN new_map.map ? (existing.value->>'id') - THEN new_map.map->(existing.value->>'id') - ELSE existing.value - END - ORDER BY existing.ordinality - ) AS result - FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS existing(value, ordinality) - CROSS JOIN new_map - UNION ALL - SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality) - FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality) - CROSS JOIN new_map - WHERE NOT EXISTS ( - SELECT 1 FROM jsonb_array_elements(${messages.responseMessages}) AS existing - WHERE existing.value->>'id' = new_item.value->>'id' - ) + existing_data AS ( + SELECT value, ordinality as existing_order + FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS t(value, ordinality) ) - SELECT COALESCE(jsonb_agg(value), '[]'::jsonb) - FROM ( - SELECT jsonb_array_elements(result) AS value - FROM merged - WHERE result IS NOT NULL - ) t + SELECT jsonb_agg( + CASE + WHEN nd.value IS NOT NULL THEN nd.value + ELSE ed.value + END + ORDER BY COALESCE(nd.new_order, ed.existing_order) + ) + FROM existing_data ed + FULL OUTER JOIN new_data nd ON ed.value->>'id' = nd.value->>'id' + WHERE COALESCE(nd.value->>'id', ed.value->>'id') IS NOT NULL ) END`; } - // Optimized merge for reasoning messages + // Order-preserving merge for reasoning messages if (reasoningMessages?.length) { const newData = JSON.stringify(reasoningMessages); updates.reasoning = sql` CASE WHEN ${messages.reasoning} IS NULL THEN ${newData}::jsonb ELSE ( - WITH new_map AS ( - SELECT jsonb_object_agg(value->>'id', value) AS map - FROM jsonb_array_elements(${newData}::jsonb) AS value - WHERE value->>'id' IS NOT NULL + WITH new_data AS ( + SELECT value, ordinality as new_order + FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality) ), - merged AS ( - SELECT jsonb_agg( - CASE - WHEN new_map.map ? (existing.value->>'id') - THEN new_map.map->(existing.value->>'id') - ELSE existing.value - END - ORDER BY existing.ordinality - ) AS result - FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS existing(value, ordinality) - CROSS JOIN new_map - UNION ALL - SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality) - FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality) - CROSS JOIN new_map - WHERE NOT EXISTS ( - SELECT 1 FROM jsonb_array_elements(${messages.reasoning}) AS existing - WHERE existing.value->>'id' = new_item.value->>'id' - ) + existing_data AS ( + SELECT value, ordinality as existing_order + FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS t(value, ordinality) ) - SELECT COALESCE(jsonb_agg(value), '[]'::jsonb) - FROM ( - SELECT jsonb_array_elements(result) AS value - FROM merged - WHERE result IS NOT NULL - ) t + SELECT jsonb_agg( + CASE + WHEN nd.value IS NOT NULL THEN nd.value + ELSE ed.value + END + ORDER BY COALESCE(nd.new_order, ed.existing_order) + ) + FROM existing_data ed + FULL OUTER JOIN new_data nd ON ed.value->>'id' = nd.value->>'id' + WHERE COALESCE(nd.value->>'id', ed.value->>'id') IS NOT NULL ) END`; } - // Optimized merge for raw LLM messages - handles both string and array content + // Order-preserving merge for raw LLM messages - handles both string and array content if (rawLlmMessages?.length) { const newData = JSON.stringify(rawLlmMessages); updates.rawLlmMessages = sql` CASE WHEN ${messages.rawLlmMessages} IS NULL THEN ${newData}::jsonb ELSE ( - WITH new_messages AS ( + WITH new_data AS ( SELECT - value, - ordinality as input_order, + value, + ordinality as new_order, value->>'role' AS role, COALESCE( CASE @@ -138,19 +113,13 @@ export async function updateMessageEntries({ ELSE NULL END, '' - ) AS tool_calls, - -- Extract toolCallId for tool result messages to find their corresponding call - CASE - WHEN value->>'role' = 'tool' AND jsonb_typeof(value->'content') = 'array' THEN - value->'content'->0->>'toolCallId' - ELSE NULL - END AS result_tool_call_id + ) AS tool_calls FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality) ), - existing_messages AS ( + existing_data AS ( SELECT - value, - ordinality, + value, + ordinality as existing_order, value->>'role' AS role, COALESCE( CASE @@ -163,50 +132,17 @@ export async function updateMessageEntries({ '' ) AS tool_calls FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality) - ), - -- Find positions of tool calls that are being updated - tool_call_positions AS ( - SELECT - n.tool_calls, - MAX(e.ordinality) as call_position - FROM new_messages n - JOIN existing_messages e ON n.role = e.role AND n.tool_calls = e.tool_calls - WHERE n.role = 'assistant' AND n.tool_calls != '' - GROUP BY n.tool_calls ) - SELECT COALESCE( - jsonb_agg(value ORDER BY ord), - '[]'::jsonb + SELECT jsonb_agg( + CASE + WHEN nd.value IS NOT NULL THEN nd.value + ELSE ed.value + END + ORDER BY COALESCE(nd.new_order, ed.existing_order) ) - FROM ( - -- Keep existing messages that aren't being updated - SELECT e.value, e.ordinality AS ord - FROM existing_messages e - WHERE NOT EXISTS ( - SELECT 1 FROM new_messages n - WHERE n.role = e.role AND n.tool_calls = e.tool_calls - ) - UNION ALL - -- Add new messages with smart ordering - SELECT - n.value, - CASE - -- Tool result: place immediately after its corresponding tool call - WHEN n.role = 'tool' AND n.result_tool_call_id IS NOT NULL THEN - COALESCE( - -- If the tool call was just updated, place result right after it - (SELECT tcp.call_position + 0.5 - FROM tool_call_positions tcp - WHERE tcp.tool_calls LIKE '%' || n.result_tool_call_id || '%' - LIMIT 1), - -- Otherwise append at the end - 1000000 + n.input_order - ) - -- Regular messages and tool calls: append at end - ELSE 1000000 + n.input_order - END AS ord - FROM new_messages n - ) combined + FROM existing_data ed + FULL OUTER JOIN new_data nd ON (ed.role = nd.role AND ed.tool_calls = nd.tool_calls) + WHERE COALESCE(nd.role, ed.role) IS NOT NULL ) END`; }