migration, optimization on entry querires, etc.

This commit is contained in:
dal 2025-08-13 11:50:56 -06:00
parent abc070449b
commit d02d809e59
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
7 changed files with 7034 additions and 58 deletions

View File

@ -1,27 +1,78 @@
import { updateMessageEntries } from '@buster/database';
import { wrapTraced } from 'braintrust';
import type {
SequentialThinkingContext,
SequentialThinkingInput,
SequentialThinkingOutput,
import { normalizeEscapedText } from '../../../utils/streaming/escape-normalizer';
import {
createSequentialThinkingRawLlmMessageEntry,
createSequentialThinkingReasoningMessage,
} from './helpers/sequential-thinking-tool-transform-helper';
import {
SEQUENTIAL_THINKING_TOOL_NAME,
type SequentialThinkingContext,
type SequentialThinkingInput,
type SequentialThinkingOutput,
type SequentialThinkingState,
} from './sequential-thinking-tool';
// Process sequential thinking execution
async function processSequentialThinking(
input: SequentialThinkingInput,
messageId?: string
state: SequentialThinkingState,
context: SequentialThinkingContext
): Promise<SequentialThinkingOutput> {
try {
// Log the thinking step for debugging
if (messageId) {
if (context.messageId) {
console.info('[sequential-thinking] Processing thought:', {
messageId,
messageId: context.messageId,
thoughtNumber: input.thoughtNumber,
nextThoughtNeeded: input.nextThoughtNeeded,
});
}
// The actual thinking logic is handled by the streaming callbacks
// This execute function just confirms successful processing
// Since we have the full input object, create the reasoning entries directly
const toolCallId = state.entry_id;
if (toolCallId && context.messageId) {
// Create reasoning entry with completed status using the input values directly
const reasoningEntry = createSequentialThinkingReasoningMessage(
{
entry_id: toolCallId,
thought: normalizeEscapedText(input.thought),
nextThoughtNeeded: input.nextThoughtNeeded,
thoughtNumber: input.thoughtNumber,
},
toolCallId,
'completed' // Mark as completed in execute
);
// Create raw LLM message entry using the input values directly
const rawLlmMessage = createSequentialThinkingRawLlmMessageEntry(
{
entry_id: toolCallId,
thought: normalizeEscapedText(input.thought),
nextThoughtNeeded: input.nextThoughtNeeded,
thoughtNumber: input.thoughtNumber,
},
toolCallId
);
if (reasoningEntry && rawLlmMessage) {
await updateMessageEntries({
messageId: context.messageId,
reasoningEntry,
rawLlmMessage,
toolCallId,
});
console.info('[sequential-thinking] Completed sequential thinking in execute:', {
messageId: context.messageId,
toolCallId,
thoughtNumber: input.thoughtNumber,
nextThoughtNeeded: input.nextThoughtNeeded,
});
}
}
return {
success: true,
};
@ -35,13 +86,14 @@ async function processSequentialThinking(
}
// Factory function that creates the execute function with proper context typing
export function createSequentialThinkingExecute(context: SequentialThinkingContext) {
export function createSequentialThinkingExecute(
state: SequentialThinkingState,
context: SequentialThinkingContext
) {
return wrapTraced(
async (input: SequentialThinkingInput): Promise<SequentialThinkingOutput> => {
// Use the messageId from the passed context
const messageId = context.messageId;
return await processSequentialThinking(input, messageId);
return await processSequentialThinking(input, state, context);
},
{ name: 'Sequential Thinking Tool' }
{ name: SEQUENTIAL_THINKING_TOOL_NAME }
);
}

View File

@ -69,7 +69,7 @@ export function createSequentialThinkingTool(context: SequentialThinkingContext)
thoughtNumber: undefined,
};
const execute = createSequentialThinkingExecute(context);
const execute = createSequentialThinkingExecute(state, context);
const onInputStart = createSequentialThinkingStart(state, context);
const onInputDelta = createSequentialThinkingDelta(state, context);
const onInputAvailable = createSequentialThinkingFinish(state, context);

View File

@ -0,0 +1,8 @@
ALTER TABLE "messages" ALTER COLUMN "response_messages" SET DEFAULT '[]'::jsonb;--> statement-breakpoint
ALTER TABLE "messages" ALTER COLUMN "reasoning" SET DEFAULT '[]'::jsonb;--> statement-breakpoint
ALTER TABLE "messages" ALTER COLUMN "raw_llm_messages" SET DEFAULT '[]'::jsonb;--> statement-breakpoint
CREATE INDEX "messages_deleted_at_idx" ON "messages" USING btree ("deleted_at" timestamptz_ops);--> statement-breakpoint
CREATE INDEX "messages_raw_llm_messages_gin_idx" ON "messages" USING gin ("raw_llm_messages" jsonb_ops);--> statement-breakpoint
CREATE INDEX "messages_response_messages_gin_idx" ON "messages" USING gin ("response_messages" jsonb_ops);--> statement-breakpoint
CREATE INDEX "messages_reasoning_gin_idx" ON "messages" USING gin ("reasoning" jsonb_ops);--> statement-breakpoint
CREATE INDEX "messages_id_deleted_at_idx" ON "messages" USING btree ("id" uuid_ops,"deleted_at" timestamptz_ops);

File diff suppressed because it is too large Load Diff

View File

@ -624,6 +624,13 @@
"when": 1754415507954,
"tag": "0088_puzzling_vanisher",
"breakpoints": true
},
{
"idx": 89,
"version": "7",
"when": 1755104970838,
"tag": "0089_black_starhawk",
"breakpoints": true
}
]
}

View File

@ -11,10 +11,6 @@ export interface UpdateMessageEntriesParams {
reasoningEntry?: unknown;
}
/**
* Mapping of field names to their corresponding message table columns
* This provides type-safe access to the JSONB columns we need to update
*/
const MESSAGE_FIELD_MAPPING = {
rawLlmMessages: messages.rawLlmMessages,
responseMessages: messages.responseMessages,
@ -24,16 +20,7 @@ const MESSAGE_FIELD_MAPPING = {
type MessageFieldName = keyof typeof MESSAGE_FIELD_MAPPING;
/**
* Helper function to generate SQL for upserting entries in a JSONB array field.
* Ensures only one entry exists per toolCallId - either updates the existing entry
* or appends a new one if it doesn't exist.
*
* Uses jsonb_set for efficient in-place updates instead of rebuilding the entire array.
* This is optimized for frequent streaming updates.
*
* @param fieldName - The field name to update
* @param jsonString - Pre-stringified JSON to insert/update
* @param toolCallId - The toolCallId for identifying the entry (must be unique)
* Generates SQL for upserting entries in a JSONB array field using jsonb_set.
*/
function generateJsonbArrayUpsertSql(
fieldName: MessageFieldName,
@ -42,48 +29,32 @@ function generateJsonbArrayUpsertSql(
): SQL {
const field = MESSAGE_FIELD_MAPPING[fieldName];
// Efficient approach: Find index once and use jsonb_set for updates
// This avoids rebuilding the entire array for streaming updates
return sql`
CASE
WHEN EXISTS (
SELECT 1
FROM jsonb_array_elements(${field}) AS elem
WHERE elem->>'id' = ${toolCallId} OR elem->>'toolCallId' = ${toolCallId}
FROM jsonb_array_elements(COALESCE(${field}, '[]'::jsonb)) WITH ORDINALITY AS elem(value, pos)
WHERE elem.value->>'id' = ${toolCallId} OR elem.value->>'toolCallId' = ${toolCallId}
) THEN
-- Update existing entry using jsonb_set at the found index
(SELECT jsonb_set(
${field},
ARRAY[(idx - 1)::text],
jsonb_set(
COALESCE(${field}, '[]'::jsonb),
ARRAY[(
SELECT (elem.pos - 1)::text
FROM jsonb_array_elements(COALESCE(${field}, '[]'::jsonb)) WITH ORDINALITY AS elem(value, pos)
WHERE elem.value->>'id' = ${toolCallId} OR elem.value->>'toolCallId' = ${toolCallId}
LIMIT 1
)],
${jsonString}::jsonb,
false
)
FROM (
SELECT row_number() OVER () AS idx, elem.value
FROM jsonb_array_elements(${field}) AS elem
) AS indexed
WHERE indexed.value->>'id' = ${toolCallId}
OR indexed.value->>'toolCallId' = ${toolCallId}
LIMIT 1)
ELSE
-- No existing entry, append the new one
COALESCE(${field}, '[]'::jsonb) || ${jsonString}::jsonb
END
`;
}
/**
* Atomically upsert multiple message entry arrays on a single row.
* Each entry is identified by its unique toolCallId - if an entry with that toolCallId exists,
* it will be replaced with the new data; otherwise, a new entry will be appended.
*
* IMPORTANT: This function guarantees that only one entry per toolCallId will exist in each array.
* Multiple calls with the same toolCallId will update the existing entry, not create duplicates.
*
* This prevents race conditions between concurrent tool updates by ensuring each tool
* only modifies its own entries, identified by toolCallId.
*
* Any of the entry parameters may be omitted. If none are provided, only updatedAt is modified.
* Updates message entries atomically, ensuring only one entry per toolCallId exists.
*/
export async function updateMessageEntries({
messageId,
@ -97,8 +68,6 @@ export async function updateMessageEntries({
updatedAt: new Date().toISOString(),
};
// Add each field conditionally using the upsert helper function
// Stringify the entries before passing them to SQL to ensure proper JSONB casting
if (rawLlmMessage) {
setValues.rawLlmMessages = generateJsonbArrayUpsertSql(
'rawLlmMessages',

View File

@ -881,6 +881,29 @@ export const messages = pgTable(
'btree',
table.createdBy.asc().nullsLast().op('uuid_ops')
),
index('messages_deleted_at_idx').using(
'btree',
table.deletedAt.asc().nullsLast().op('timestamptz_ops')
),
// GIN indexes for JSONB columns
index('messages_raw_llm_messages_gin_idx').using(
'gin',
table.rawLlmMessages.asc().nullsLast().op('jsonb_ops')
),
index('messages_response_messages_gin_idx').using(
'gin',
table.responseMessages.asc().nullsLast().op('jsonb_ops')
),
index('messages_reasoning_gin_idx').using(
'gin',
table.reasoning.asc().nullsLast().op('jsonb_ops')
),
// Composite index for WHERE clause
index('messages_id_deleted_at_idx').using(
'btree',
table.id.asc().nullsLast().op('uuid_ops'),
table.deletedAt.asc().nullsLast().op('timestamptz_ops')
),
foreignKey({
columns: [table.chatId],
foreignColumns: [chats.id],