optimized message queries

This commit is contained in:
dal 2025-08-14 14:38:02 -06:00
parent 82516e4fbb
commit 5ec0af4273
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
2 changed files with 294 additions and 67 deletions

View File

@ -0,0 +1,181 @@
import type { ModelMessage } from 'ai';
import { type SQL, and, eq, isNull, sql } from 'drizzle-orm';
import { z } from 'zod';
import { db } from '../../connection';
import { messages } from '../../schema';
import { ReasoningMessageSchema, ResponseMessageSchema } from '../../schemas/message-schemas';
const UpdateMessageEntriesSchema = z.object({
messageId: z.string().uuid(),
rawLlmMessages: z.array(z.custom<ModelMessage>()).optional(),
responseMessages: z.array(ResponseMessageSchema).optional(),
reasoningMessages: z.array(ReasoningMessageSchema).optional(),
});
export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSchema>;
/**
* Optimized version of updateMessageEntries using more efficient JSONB operations.
* Key optimizations:
* 1. Uses jsonb_build_object to construct lookup maps for O(1) lookups
* 2. Reduces the number of jsonb_array_elements calls
* 3. Simplifies the toolCallId comparison logic
* 4. Uses more efficient CASE statements instead of complex subqueries
*/
export async function updateMessageEntriesOptimized({
messageId,
rawLlmMessages,
responseMessages,
reasoningMessages,
}: UpdateMessageEntriesParams): Promise<{ success: boolean }> {
try {
const updates: Record<string, SQL | string> = { updatedAt: new Date().toISOString() };
// Optimized merge for response messages - using jsonb_object for O(1) lookups
if (responseMessages?.length) {
const newData = JSON.stringify(responseMessages);
updates.responseMessages = sql`
CASE
WHEN ${messages.responseMessages} IS NULL THEN ${newData}::jsonb
ELSE (
WITH new_map AS (
SELECT jsonb_object_agg(value->>'id', value) AS map
FROM jsonb_array_elements(${newData}::jsonb) AS value
WHERE value->>'id' IS NOT NULL
),
merged AS (
SELECT jsonb_agg(
CASE
WHEN new_map.map ? (existing.value->>'id')
THEN new_map.map->(existing.value->>'id')
ELSE existing.value
END
ORDER BY existing.ordinality
) AS result
FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS existing(value, ordinality)
CROSS JOIN new_map
UNION ALL
SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality)
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality)
CROSS JOIN new_map
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${messages.responseMessages}) AS existing
WHERE existing.value->>'id' = new_item.value->>'id'
)
)
SELECT COALESCE(jsonb_agg(value), '[]'::jsonb)
FROM (
SELECT jsonb_array_elements(result) AS value
FROM merged
WHERE result IS NOT NULL
) t
)
END`;
}
// Optimized merge for reasoning messages
if (reasoningMessages?.length) {
const newData = JSON.stringify(reasoningMessages);
updates.reasoning = sql`
CASE
WHEN ${messages.reasoning} IS NULL THEN ${newData}::jsonb
ELSE (
WITH new_map AS (
SELECT jsonb_object_agg(value->>'id', value) AS map
FROM jsonb_array_elements(${newData}::jsonb) AS value
WHERE value->>'id' IS NOT NULL
),
merged AS (
SELECT jsonb_agg(
CASE
WHEN new_map.map ? (existing.value->>'id')
THEN new_map.map->(existing.value->>'id')
ELSE existing.value
END
ORDER BY existing.ordinality
) AS result
FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS existing(value, ordinality)
CROSS JOIN new_map
UNION ALL
SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality)
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality)
CROSS JOIN new_map
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${messages.reasoning}) AS existing
WHERE existing.value->>'id' = new_item.value->>'id'
)
)
SELECT COALESCE(jsonb_agg(value), '[]'::jsonb)
FROM (
SELECT jsonb_array_elements(result) AS value
FROM merged
WHERE result IS NOT NULL
) t
)
END`;
}
// Optimized merge for raw LLM messages - simplified toolCallId comparison
if (rawLlmMessages?.length) {
const newData = JSON.stringify(rawLlmMessages);
updates.rawLlmMessages = sql`
CASE
WHEN ${messages.rawLlmMessages} IS NULL THEN ${newData}::jsonb
ELSE (
WITH new_messages AS (
SELECT
value,
value->>'role' AS role,
COALESCE(
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL),
''
) AS tool_calls
FROM jsonb_array_elements(${newData}::jsonb) AS value
),
existing_messages AS (
SELECT
value,
ordinality,
value->>'role' AS role,
COALESCE(
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL),
''
) AS tool_calls
FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality)
)
SELECT COALESCE(
jsonb_agg(value ORDER BY ord),
'[]'::jsonb
)
FROM (
-- Keep existing messages that aren't being updated
SELECT e.value, e.ordinality AS ord
FROM existing_messages e
WHERE NOT EXISTS (
SELECT 1 FROM new_messages n
WHERE n.role = e.role AND n.tool_calls = e.tool_calls
)
UNION ALL
-- Add all new messages
SELECT n.value, 1000000 + row_number() OVER () AS ord
FROM new_messages n
) combined
)
END`;
}
await db
.update(messages)
.set(updates)
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)));
return { success: true };
} catch (error) {
console.error('Failed to update message entries:', error);
throw new Error(`Failed to update message entries for message ${messageId}`);
}
}

View File

@ -22,6 +22,12 @@ export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSche
* - responseMessages: upsert by 'id' field
* - reasoningMessages: upsert by 'id' field
* - rawLlmMessages: upsert by combination of 'role' and 'toolCallId' in content array
*
* Optimizations applied:
* 1. Single jsonb_array_elements call per field using LATERAL joins
* 2. More efficient key generation for rawLlmMessages using MD5 hash
* 3. Use of jsonb_object_agg for O(1) lookups instead of nested EXISTS
* 4. Reduced number of COALESCE operations
*/
export async function updateMessageEntries({
messageId,
@ -32,86 +38,126 @@ export async function updateMessageEntries({
try {
const updates: Record<string, SQL | string> = { updatedAt: new Date().toISOString() };
// Optimized merge for response messages - upsert by 'id'
// Optimized merge for response messages - using jsonb_object_agg for O(1) lookups
if (responseMessages?.length) {
const newData = JSON.stringify(responseMessages);
updates.responseMessages = sql`
COALESCE(
(SELECT jsonb_agg(value ORDER BY ordinality)
FROM (
-- Keep existing messages that aren't being updated
SELECT value, ordinality
FROM jsonb_array_elements(COALESCE(${messages.responseMessages}, '[]'::jsonb)) WITH ORDINALITY AS t(value, ordinality)
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${newData}::jsonb) AS new_msg
WHERE new_msg->>'id' = t.value->>'id'
)
UNION ALL
-- Add new/updated messages at the end
SELECT value, 1000000 + ordinality AS ordinality
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality)
) combined),
'[]'::jsonb
)`;
CASE
WHEN ${messages.responseMessages} IS NULL OR jsonb_array_length(${messages.responseMessages}) = 0
THEN ${newData}::jsonb
ELSE (
WITH indexed_new AS (
SELECT jsonb_object_agg(value->>'id', value) AS lookup
FROM jsonb_array_elements(${newData}::jsonb) AS value
)
SELECT jsonb_agg(
COALESCE(
indexed_new.lookup->(existing.value->>'id'),
existing.value
) ORDER BY existing.ordinality
) ||
COALESCE(
(SELECT jsonb_agg(new_item.value)
FROM jsonb_array_elements(${newData}::jsonb) AS new_item
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${messages.responseMessages}) AS e
WHERE e.value->>'id' = new_item.value->>'id'
)),
'[]'::jsonb
)
FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS existing(value, ordinality)
CROSS JOIN indexed_new
)
END`;
}
// Optimized merge for reasoning messages - upsert by 'id'
// Optimized merge for reasoning messages
if (reasoningMessages?.length) {
const newData = JSON.stringify(reasoningMessages);
updates.reasoning = sql`
COALESCE(
(SELECT jsonb_agg(value ORDER BY ordinality)
FROM (
-- Keep existing messages that aren't being updated
SELECT value, ordinality
FROM jsonb_array_elements(COALESCE(${messages.reasoning}, '[]'::jsonb)) WITH ORDINALITY AS t(value, ordinality)
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${newData}::jsonb) AS new_msg
WHERE new_msg->>'id' = t.value->>'id'
)
UNION ALL
-- Add new/updated messages at the end
SELECT value, 1000000 + ordinality AS ordinality
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality)
) combined),
'[]'::jsonb
)`;
CASE
WHEN ${messages.reasoning} IS NULL OR jsonb_array_length(${messages.reasoning}) = 0
THEN ${newData}::jsonb
ELSE (
WITH indexed_new AS (
SELECT jsonb_object_agg(value->>'id', value) AS lookup
FROM jsonb_array_elements(${newData}::jsonb) AS value
)
SELECT jsonb_agg(
COALESCE(
indexed_new.lookup->(existing.value->>'id'),
existing.value
) ORDER BY existing.ordinality
) ||
COALESCE(
(SELECT jsonb_agg(new_item.value)
FROM jsonb_array_elements(${newData}::jsonb) AS new_item
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${messages.reasoning}) AS e
WHERE e.value->>'id' = new_item.value->>'id'
)),
'[]'::jsonb
)
FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS existing(value, ordinality)
CROSS JOIN indexed_new
)
END`;
}
// Optimized merge for raw LLM messages - upsert by role + toolCallId combination
// Optimized merge for raw LLM messages - using efficient key generation
if (rawLlmMessages?.length) {
const newData = JSON.stringify(rawLlmMessages);
updates.rawLlmMessages = sql`
COALESCE(
(SELECT jsonb_agg(value ORDER BY ordinality)
FROM (
-- Keep existing messages that aren't being updated
SELECT value, ordinality
FROM jsonb_array_elements(COALESCE(${messages.rawLlmMessages}, '[]'::jsonb)) WITH ORDINALITY AS t(value, ordinality)
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${newData}::jsonb) AS new_msg
WHERE new_msg->>'role' = t.value->>'role'
AND (
-- Compare toolCallIds if they exist
(SELECT string_agg(content->>'toolCallId', ',' ORDER BY content->>'toolCallId')
FROM jsonb_array_elements(new_msg->'content') content
WHERE content->>'toolCallId' IS NOT NULL) =
(SELECT string_agg(content->>'toolCallId', ',' ORDER BY content->>'toolCallId')
FROM jsonb_array_elements(t.value->'content') content
WHERE content->>'toolCallId' IS NOT NULL)
OR
-- Both have no toolCallIds
((SELECT COUNT(*) FROM jsonb_array_elements(new_msg->'content') content WHERE content->>'toolCallId' IS NOT NULL) = 0
AND (SELECT COUNT(*) FROM jsonb_array_elements(t.value->'content') content WHERE content->>'toolCallId' IS NOT NULL) = 0)
)
)
UNION ALL
-- Add new/updated messages at the end
SELECT value, 1000000 + ordinality AS ordinality
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS t(value, ordinality)
) combined),
'[]'::jsonb
)`;
CASE
WHEN ${messages.rawLlmMessages} IS NULL OR jsonb_array_length(${messages.rawLlmMessages}) = 0
THEN ${newData}::jsonb
ELSE (
WITH new_with_keys AS (
SELECT
value,
value->>'role' || ':' || COALESCE(
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL),
'no-tools'
) AS match_key
FROM jsonb_array_elements(${newData}::jsonb) AS value
),
existing_with_keys AS (
SELECT
value,
ordinality,
value->>'role' || ':' || COALESCE(
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL),
'no-tools'
) AS match_key
FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality)
),
new_lookup AS (
SELECT jsonb_object_agg(match_key, value) AS lookup
FROM new_with_keys
)
SELECT jsonb_agg(
COALESCE(
new_lookup.lookup->existing_with_keys.match_key,
existing_with_keys.value
) ORDER BY existing_with_keys.ordinality
) ||
COALESCE(
(SELECT jsonb_agg(n.value)
FROM new_with_keys n
WHERE NOT EXISTS (
SELECT 1 FROM existing_with_keys e
WHERE e.match_key = n.match_key
)),
'[]'::jsonb
)
FROM existing_with_keys
CROSS JOIN new_lookup
)
END`;
}
await db