build error map the reasoning message better in web

This commit is contained in:
dal 2025-08-18 10:16:59 -06:00
parent b4e34d22e0
commit 6b171aae2e
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
18 changed files with 457 additions and 365 deletions

View File

@ -6,7 +6,7 @@ import {
ChatErrorCode, ChatErrorCode,
type ChatWithMessages, type ChatWithMessages,
} from '@buster/server-shared/chats'; } from '@buster/server-shared/chats';
import { tasks } from '@trigger.dev/sdk/v3'; import { tasks } from '@trigger.dev/sdk';
import { handleAssetChat, handleAssetChatWithPrompt } from './services/chat-helpers'; import { handleAssetChat, handleAssetChatWithPrompt } from './services/chat-helpers';
import { initializeChat } from './services/chat-service'; import { initializeChat } from './services/chat-service';

View File

@ -1,4 +1,5 @@
import { canUserAccessChatCached } from '@buster/access-controls'; import { canUserAccessChatCached } from '@buster/access-controls';
import type { ModelMessage } from '@buster/ai';
import { import {
type User, type User,
chats, chats,
@ -328,6 +329,13 @@ export async function handleNewChat({
requestMessage: prompt, requestMessage: prompt,
title: prompt, title: prompt,
isCompleted: false, isCompleted: false,
// Add the user message as the first raw LLM entry
rawLlmMessages: [
{
role: 'user',
content: prompt,
} as ModelMessage,
],
}) })
.returning(); .returning();

View File

@ -86,7 +86,7 @@ export const ReasoningMessageSelector: React.FC<ReasoningMessageSelectorProps> =
if (!type || !status) return null; if (!type || !status) return null;
const ReasoningMessage = ReasoningMessageRecord[type]; const ReasoningMessage = ReasoningMessageRecord[type as keyof typeof ReasoningMessageRecord];
const animationKey = reasoningMessageId + type; const animationKey = reasoningMessageId + type;
return ( return (

View File

@ -109,6 +109,14 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) {
maxOutputTokens: 10000, maxOutputTokens: 10000,
temperature: 0, temperature: 0,
experimental_repairToolCall: healToolWithLlm, experimental_repairToolCall: healToolWithLlm,
onStepFinish: async (event) => {
// Wait for all tool operations to complete before moving to next step
// This ensures done tool's async operations complete before stream terminates
console.info('Analyst Agent step finished', {
toolCalls: event.toolCalls?.length || 0,
hasToolResults: !!event.toolResults,
});
},
onFinish: () => { onFinish: () => {
console.info('Analyst Agent finished'); console.info('Analyst Agent finished');
}, },

View File

@ -136,8 +136,11 @@ export function createThinkAndPrepAgent(thinkAndPrepAgentSchema: ThinkAndPrepAge
maxOutputTokens: 10000, maxOutputTokens: 10000,
temperature: 0, temperature: 0,
experimental_repairToolCall: healToolWithLlm, experimental_repairToolCall: healToolWithLlm,
onFinish: () => { onStepFinish: async (event) => {
console.info('Think and Prep Agent finished'); console.info('Think and Prep Agent step finished', {
toolCalls: event.toolCalls?.length || 0,
hasToolResults: !!event.toolResults,
});
}, },
}), }),
{ {

View File

@ -366,7 +366,7 @@ test('model reset interval resets to first model', async () => {
vi.useFakeTimers(); vi.useFakeTimers();
let model1CallCount = 0; let model1CallCount = 0;
const model1 = new MockLanguageModelV2({ const model1 = new MockLanguageModelV2({
modelId: 'primary-model', modelId: 'primary-model',
doGenerate: async () => { doGenerate: async () => {
model1CallCount++; model1CallCount++;
@ -394,7 +394,7 @@ test('model reset interval resets to first model', async () => {
try { try {
await fallback.doGenerate({ prompt: [] }); await fallback.doGenerate({ prompt: [] });
} catch {} } catch {}
expect(fallback.modelId).toBe('fallback-model'); expect(fallback.modelId).toBe('fallback-model');
// Advance time past reset interval // Advance time past reset interval

View File

@ -255,6 +255,20 @@ export class FallbackModel implements LanguageModelV2 {
} }
controller.close(); controller.close();
} catch (error) { } catch (error) {
// Check if this is a normal stream termination
const errorMessage = error instanceof Error ? error.message : String(error);
const isNormalTermination =
errorMessage === 'terminated' ||
errorMessage.includes('terminated') ||
errorMessage === 'aborted' ||
errorMessage.includes('aborted');
// If it's a normal termination and we've already streamed content, just close normally
if (isNormalTermination && hasStreamedAny) {
controller.close();
return;
}
if (self.settings.onError) { if (self.settings.onError) {
try { try {
await self.settings.onError(error as RetryableError, self.modelId); await self.settings.onError(error as RetryableError, self.modelId);

View File

@ -13,7 +13,9 @@ export const createTodosParamsSchema = z.object({
export const createTodosResultSchema = z.object({ export const createTodosResultSchema = z.object({
todos: z.string().describe('The TODO list in markdown format with checkboxes'), todos: z.string().describe('The TODO list in markdown format with checkboxes'),
todosMessage: z.custom<ModelMessage>().describe('The TODO list message'), messages: z
.array(z.custom<ModelMessage>())
.describe('Tool call and result messages for the TODO creation'),
}); });
// Context schema for passing to streaming handlers // Context schema for passing to streaming handlers
@ -49,7 +51,6 @@ export type CreateTodosInput = z.infer<typeof llmOutputSchema>;
import { createTodosStepDelta } from './create-todos-step-delta'; import { createTodosStepDelta } from './create-todos-step-delta';
import { createTodosStepFinish } from './create-todos-step-finish'; import { createTodosStepFinish } from './create-todos-step-finish';
import { createTodosStepStart } from './create-todos-step-start'; import { createTodosStepStart } from './create-todos-step-start';
import { createTodosUserMessage } from './helpers/create-todos-transform-helper';
/** /**
* Generates a TODO list using the LLM with structured output and streaming * Generates a TODO list using the LLM with structured output and streaming
@ -135,12 +136,52 @@ export async function runCreateTodosStep(params: CreateTodosParams): Promise<Cre
const todos = await generateTodosWithLLM(params.messages, context); const todos = await generateTodosWithLLM(params.messages, context);
// Create user message for conversation history (backward compatibility) // Generate a unique ID for this tool call
const todosMessage = createTodosUserMessage(todos); const toolCallId = `create_todos_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
// Create tool call and result messages
const resultMessages: ModelMessage[] = [];
// Add assistant message with tool call
resultMessages.push({
role: 'assistant',
content: [
{
type: 'tool-call',
toolCallId,
toolName: 'createTodos',
input: {},
},
],
});
// Add tool result message
resultMessages.push({
role: 'tool',
content: [
{
type: 'tool-result',
toolCallId,
toolName: 'createTodos',
output: {
type: 'text',
value: todos,
},
},
],
});
// Add the todos as a user message (for backward compatibility)
if (todos) {
resultMessages.push({
role: 'user',
content: `<todo_list>\n- Below are the items on your TODO list:\n${todos}\n</todo_list>`,
});
}
return { return {
todos, todos,
todosMessage, messages: resultMessages,
}; };
} catch (error) { } catch (error) {
console.error('[create-todos-step] Unexpected error:', error); console.error('[create-todos-step] Unexpected error:', error);

View File

@ -19,14 +19,11 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
expect(result.todos).toContain('[ ]'); expect(result.todos).toContain('[ ]');
expect(result.todosMessage).toBeDefined(); expect(result.messages[result.messages.length - 1]!.content).toContain(result.todos);
expect(result.todosMessage.role).toBe('user'); // Should contain checkbox format
expect(result.todosMessage).toBeDefined();
expect(result.todosMessage.role).toBe('user');
expect(result.todosMessage.content).toBe(result.todos);
}); });
it('should create todos for complex multi-part request', async () => { it('should create todos for complex multi-part request', async () => {
@ -42,11 +39,13 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
expect(result.todos).toContain('[ ]'); expect(result.todos).toContain('[ ]');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
}); });
it('should create todos for specific entity queries', async () => { it('should create todos for specific entity queries', async () => {
@ -62,8 +61,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
// Should contain todos about identifying Baltic Born, return rate, and time period // Should contain todos about identifying Baltic Born, return rate, and time period
}); });
@ -80,11 +80,13 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
expect(result.todos).toContain('[ ]'); expect(result.todos).toContain('[ ]');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
}); });
it('should create todos for merchant ranking queries', async () => { it('should create todos for merchant ranking queries', async () => {
@ -101,8 +103,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
// Should include todos about identifying merchants, metrics, filtering, and sorting // Should include todos about identifying merchants, metrics, filtering, and sorting
}); });
@ -119,8 +122,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
}); });
it('should handle vague requests appropriately', async () => { it('should handle vague requests appropriately', async () => {
@ -136,8 +140,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
// Should create todos about determining what "important stuff" means // Should create todos about determining what "important stuff" means
}); });
@ -155,8 +160,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
// Should include todos about charts and groupings // Should include todos about charts and groupings
}); });
@ -174,8 +180,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
// Should include todo about inability to do forecasts // Should include todo about inability to do forecasts
}); });
@ -192,8 +199,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
// Should create todos about identifying both elements // Should create todos about identifying both elements
}); });
@ -219,8 +227,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
}); });
it('should handle follow-up questions with context', async () => { it('should handle follow-up questions with context', async () => {
@ -244,8 +253,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
// Should leverage context from previous messages // Should leverage context from previous messages
}); });
@ -263,8 +273,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
// Should create todos about data availability // Should create todos about data availability
}); });
@ -281,8 +292,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
// Should break down "sports car" and "best selling" // Should break down "sports car" and "best selling"
}); });
@ -299,8 +311,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
// Should include todos about smart TVs and online channel // Should include todos about smart TVs and online channel
}); });
@ -317,9 +330,10 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(result.todos).toBe(''); // Empty TODO list for empty prompt expect(result.todos).toBe(''); // Empty TODO list for empty prompt
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.todosMessage.content).toBe(''); expect(result.messages[result.messages.length - 1]!.role).toBe('user');
expect(result.messages[result.messages.length - 1]!.content).toContain(result.todos);
}); });
it('should handle very long complex prompts', async () => { it('should handle very long complex prompts', async () => {
@ -349,8 +363,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
expect(result.todos.length).toBeGreaterThan(0); expect(result.todos.length).toBeGreaterThan(0);
}); });
@ -367,8 +382,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
// Should include todos about CLV calculation and segmentation // Should include todos about CLV calculation and segmentation
}); });
@ -386,8 +402,9 @@ describe('create-todos-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.todos).toBeDefined(); expect(result.todos).toBeDefined();
expect(typeof result.todos).toBe('string'); expect(typeof result.todos).toBe('string');
expect(result.todosMessage).toBeDefined(); expect(result.messages).toBeDefined();
expect(result.todosMessage.role).toBe('user'); expect(result.messages.length).toBeGreaterThan(0);
expect(result.messages[result.messages.length - 1]!.role).toBe('user');
}); });
it('should process concurrent todo creation requests', async () => { it('should process concurrent todo creation requests', async () => {

View File

@ -16,7 +16,7 @@ describe('extract-values-search-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.values).toBeDefined(); expect(result.values).toBeDefined();
expect(Array.isArray(result.values)).toBe(true); expect(Array.isArray(result.values)).toBe(true);
expect(result.valuesMessage).toBeUndefined(); expect(result.messages?.[result.messages.length - 1]).toBeUndefined();
}); });
it('should extract multiple values from complex query', async () => { it('should extract multiple values from complex query', async () => {
@ -53,7 +53,7 @@ describe('extract-values-search-step integration', () => {
expect(result.values).toBeDefined(); expect(result.values).toBeDefined();
expect(Array.isArray(result.values)).toBe(true); expect(Array.isArray(result.values)).toBe(true);
expect(result.values.length).toBe(0); // No specific values to extract expect(result.values.length).toBe(0); // No specific values to extract
expect(result.valuesMessage).toBeUndefined(); expect(result.messages?.[result.messages.length - 1]).toBeUndefined();
}); });
it('should use conversation history for context', async () => { it('should use conversation history for context', async () => {
@ -204,7 +204,7 @@ describe('extract-values-search-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.values).toEqual([]); expect(result.values).toEqual([]);
expect(result.valuesMessage).toBeUndefined(); expect(result.messages?.[result.messages.length - 1]).toBeUndefined();
}); });
it('should handle very long prompts with multiple values', async () => { it('should handle very long prompts with multiple values', async () => {
@ -277,7 +277,7 @@ describe('extract-values-search-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.values).toBeDefined(); expect(result.values).toBeDefined();
expect(Array.isArray(result.values)).toBe(true); expect(Array.isArray(result.values)).toBe(true);
expect(result.valuesMessage).toBeUndefined(); expect(result.messages?.[result.messages.length - 1]).toBeUndefined();
}); });
it('should handle special characters in values', async () => { it('should handle special characters in values', async () => {
@ -345,10 +345,8 @@ describe('extract-values-search-step integration', () => {
}); });
it('should not create valuesMessage with empty content when search returns no results', async () => { it('should not create valuesMessage with empty content when search returns no results', async () => {
const messages: ModelMessage[] = [ const messages: ModelMessage[] = [{ role: 'user', content: 'Show me sales for Red Bull' }];
{ role: 'user', content: 'Show me sales for Red Bull' },
];
// Test with a dataSourceId that would trigger search but return empty results // Test with a dataSourceId that would trigger search but return empty results
const params = { const params = {
messages, messages,
@ -359,12 +357,13 @@ describe('extract-values-search-step integration', () => {
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.values).toBeDefined(); expect(result.values).toBeDefined();
// If valuesMessage exists, it should have non-empty content // If valuesMessage exists, it should have non-empty content
if (result.valuesMessage) { const lastMessage = result.messages?.[result.messages.length - 1];
expect(result.valuesMessage.content).toBeTruthy(); if (lastMessage) {
expect(typeof result.valuesMessage.content).toBe('string'); expect(lastMessage.content).toBeTruthy();
expect((result.valuesMessage.content as string).length).toBeGreaterThan(0); expect(typeof lastMessage.content).toBe('string');
expect((lastMessage.content as string).length).toBeGreaterThan(0);
} }
}); });
@ -372,22 +371,23 @@ describe('extract-values-search-step integration', () => {
const messages: ModelMessage[] = [ const messages: ModelMessage[] = [
{ role: 'user', content: 'Show me Nike and Adidas products' }, { role: 'user', content: 'Show me Nike and Adidas products' },
]; ];
const params = { const params = {
messages, messages,
dataSourceId: 'test-datasource-id', dataSourceId: 'test-datasource-id',
}; };
const result = await runExtractValuesAndSearchStep(params); const result = await runExtractValuesAndSearchStep(params);
expect(result).toBeDefined(); expect(result).toBeDefined();
expect(result.values).toBeDefined(); expect(result.values).toBeDefined();
// Verify that if we have values but no search results, valuesMessage is undefined // Verify that if we have values but no search results, valuesMessage is undefined
// This prevents empty string messages from being created // This prevents empty string messages from being created
if (result.values.length > 0 && result.valuesMessage) { const lastMsg = result.messages?.[result.messages.length - 1];
expect(result.valuesMessage.content).toBeTruthy(); if (result.values.length > 0 && lastMsg) {
expect((result.valuesMessage.content as string).trim()).not.toBe(''); expect(lastMsg.content).toBeTruthy();
expect((lastMsg.content as string).trim()).not.toBe('');
} }
}); });
}); });

View File

@ -12,7 +12,9 @@ export const extractValuesSearchParamsSchema = z.object({
export const extractValuesSearchResultSchema = z.object({ export const extractValuesSearchResultSchema = z.object({
values: z.array(z.string()).describe('The values that were extracted from the prompt'), values: z.array(z.string()).describe('The values that were extracted from the prompt'),
valuesMessage: z.custom<ModelMessage>().optional().describe('The values message'), messages: z
.array(z.custom<ModelMessage>())
.describe('Tool call and result messages for the extraction'),
}); });
// Export types from schemas // Export types from schemas
@ -182,15 +184,55 @@ export async function runExtractValuesAndSearchStep(
// Perform stored values search if we have extracted values and a dataSourceId // Perform stored values search if we have extracted values and a dataSourceId
const storedValuesResult = await searchStoredValues(extractedValues, dataSourceId || ''); const storedValuesResult = await searchStoredValues(extractedValues, dataSourceId || '');
// Generate a unique ID for this tool call
const toolCallId = `extract_values_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
// Create tool call and result messages
const resultMessages: ModelMessage[] = [];
// Add assistant message with tool call
resultMessages.push({
role: 'assistant',
content: [
{
type: 'tool-call',
toolCallId,
toolName: 'extractValues',
input: { prompt },
},
],
});
// Add tool result message
resultMessages.push({
role: 'tool',
content: [
{
type: 'tool-result',
toolCallId,
toolName: 'extractValues',
output: {
type: 'text',
value: JSON.stringify({
values: extractedValues,
searchResults: storedValuesResult.searchResults || '',
}),
},
},
],
});
// If we have search results, add them as a user message (for backward compatibility)
if (extractedValues.length > 0 && storedValuesResult.searchResults) {
resultMessages.push({
role: 'user',
content: storedValuesResult.searchResults,
});
}
return { return {
values: extractedValues, values: extractedValues,
valuesMessage: messages: resultMessages,
extractedValues.length > 0 && storedValuesResult.searchResults
? {
role: 'user',
content: storedValuesResult.searchResults,
}
: undefined,
}; };
} catch (error) { } catch (error) {
// Handle AbortError gracefully // Handle AbortError gracefully
@ -199,6 +241,7 @@ export async function runExtractValuesAndSearchStep(
// Return empty object when aborted // Return empty object when aborted
return { return {
values: [], values: [],
messages: [],
}; };
} }
@ -206,6 +249,7 @@ export async function runExtractValuesAndSearchStep(
// Return empty object instead of crashing // Return empty object instead of crashing
return { return {
values: [], values: [],
messages: [],
}; };
} }
} }

View File

@ -79,11 +79,13 @@ export async function extractValuesWithLLM(
messages.push(...conversationHistory); messages.push(...conversationHistory);
} }
// Add the current user prompt const userMessage: ModelMessage = {
messages.push({
role: 'user', role: 'user',
content: prompt, content: prompt,
}); };
// Add the current user prompt
messages.push(userMessage);
const tracedValuesExtraction = wrapTraced( const tracedValuesExtraction = wrapTraced(
async () => { async () => {

View File

@ -40,12 +40,12 @@ describe('Sequential Thinking Tool', () => {
expect(tool.execute).toBeDefined(); expect(tool.execute).toBeDefined();
expect(tool.onInputStart).toBeDefined(); expect(tool.onInputStart).toBeDefined();
// First call onInputStart to set up the state (simulating streaming) // First call onInputStart to set up the state (simulating streaming)
if (tool.onInputStart) { if (tool.onInputStart) {
await tool.onInputStart({ toolCallId: 'test-tool-call-123', messages: [] }); await tool.onInputStart({ toolCallId: 'test-tool-call-123', messages: [] });
} }
const execute = tool.execute; const execute = tool.execute;
if (!execute) throw new Error('execute is undefined'); if (!execute) throw new Error('execute is undefined');

View File

@ -33,11 +33,11 @@ export async function runAnalystWorkflow(input: AnalystWorkflowInput) {
const { todos, values } = await runAnalystPrepSteps(input); const { todos, values } = await runAnalystPrepSteps(input);
if (values.valuesMessage) { // Add all messages from extract-values step (tool call, result, and optional user message)
messages.push(values.valuesMessage); messages.push(...values.messages);
}
messages.push(todos.todosMessage); // Add all messages from create-todos step (tool call, result, and user message)
messages.push(...todos.messages);
const thinkAndPrepAgentStepResults = await runThinkAndPrepAgentStep({ const thinkAndPrepAgentStepResults = await runThinkAndPrepAgentStep({
options: { options: {

View File

@ -136,6 +136,13 @@ export async function createMessage(input: CreateMessageInput): Promise<Message>
requestMessage: validated.content, requestMessage: validated.content,
title: validated.content.substring(0, 255), // Ensure title fits in database title: validated.content.substring(0, 255), // Ensure title fits in database
isCompleted: false, isCompleted: false,
// Add the user message as the first raw LLM entry
rawLlmMessages: [
{
role: 'user',
content: validated.content,
},
],
}) })
.returning(); .returning();

View File

@ -1,9 +1,73 @@
import type { CoreMessage, ModelMessage } from 'ai'; import type { ModelMessage } from 'ai';
import { and, eq, isNull } from 'drizzle-orm'; import { and, eq, isNull } from 'drizzle-orm';
import { z } from 'zod'; import { z } from 'zod';
import { db } from '../../connection'; import { db } from '../../connection';
import { messages } from '../../schema'; import { messages } from '../../schema';
/**
* Convert messages from old CoreMessage format (v4) to ModelMessage format (v5)
* Main changes: tool calls 'args' 'input', tool results 'result' 'output'
*
* Since we're dealing with unknown data from the database, we cast to ModelMessage[]
* and let the runtime handle any actual format differences.
*/
export function convertCoreToModel(messages: unknown): ModelMessage[] {
if (!Array.isArray(messages)) {
return [];
}
return messages.map((message: unknown) => {
// Basic validation
if (!message || typeof message !== 'object' || !('role' in message)) {
return message as ModelMessage;
}
const msg = message as Record<string, unknown>;
// For assistant messages, update tool call args → input
if (msg.role === 'assistant' && Array.isArray(msg.content)) {
return {
...msg,
content: msg.content.map((part: unknown) => {
if (
part &&
typeof part === 'object' &&
'type' in part &&
part.type === 'tool-call' &&
'args' in part
) {
const { args, ...rest } = part as Record<string, unknown>;
return { ...rest, input: args };
}
return part;
}),
} as ModelMessage;
}
// For tool messages, update result → output
if (msg.role === 'tool' && Array.isArray(msg.content)) {
return {
...msg,
content: msg.content.map((part: unknown) => {
if (
part &&
typeof part === 'object' &&
'type' in part &&
part.type === 'tool-result' &&
'result' in part
) {
const { result, ...rest } = part as Record<string, unknown>;
return { ...rest, output: result };
}
return part;
}),
} as ModelMessage;
}
return message as ModelMessage;
});
}
// Helper function to get chatId from messageId // Helper function to get chatId from messageId
async function getChatIdFromMessage(messageId: string): Promise<string> { async function getChatIdFromMessage(messageId: string): Promise<string> {
let messageResult: Array<{ chatId: string }>; let messageResult: Array<{ chatId: string }>;
@ -76,8 +140,8 @@ export type ChatConversationHistoryOutput = z.infer<typeof ChatConversationHisto
/** /**
* Get complete conversation history for a chat from any message in that chat * Get complete conversation history for a chat from any message in that chat
* Finds the chat from the given messageId, then returns the most recent message's rawLlmMessages * Finds the chat from the given messageId, then merges and deduplicates all rawLlmMessages
* which contains the complete conversation history up to that point * from all messages in the chat to create a complete conversation history
*/ */
export async function getChatConversationHistory( export async function getChatConversationHistory(
input: ChatConversationHistoryInput input: ChatConversationHistoryInput
@ -92,9 +156,56 @@ export async function getChatConversationHistory(
// Get all messages for this chat // Get all messages for this chat
const chatMessages = await getAllMessagesForChat(chatId); const chatMessages = await getAllMessagesForChat(chatId);
// Collect all rawLlmMessages from all messages
const allRawMessages: unknown[] = [];
for (const message of chatMessages) {
if (message.rawLlmMessages && Array.isArray(message.rawLlmMessages)) {
allRawMessages.push(...message.rawLlmMessages);
}
}
if (allRawMessages.length === 0) {
// If no messages with LLM data, return empty array
return [];
}
// Convert from old CoreMessage format to new ModelMessage format if needed
const convertedMessages = convertCoreToModel(allRawMessages);
// Deduplicate messages based on content and role
// We'll use a Map to track unique messages, using a combination of role and stringified content as the key
const uniqueMessagesMap = new Map<string, ModelMessage>();
for (const message of convertedMessages) {
// Create a unique key based on role and content
// This ensures we don't have duplicate messages with the same role and content
const messageKey = JSON.stringify({
role: message.role,
content: message.content,
// Include experimental_providerMetadata if it has messageId for better deduplication
messageId:
'experimental_providerMetadata' in message &&
message.experimental_providerMetadata &&
typeof message.experimental_providerMetadata === 'object' &&
'messageId' in message.experimental_providerMetadata
? message.experimental_providerMetadata.messageId
: undefined,
});
// Only add if we haven't seen this message before
if (!uniqueMessagesMap.has(messageKey)) {
uniqueMessagesMap.set(messageKey, message);
}
}
// Convert back to array and maintain chronological order
// Since we're merging from multiple messages, we should preserve the order they appear
const deduplicatedMessages = Array.from(uniqueMessagesMap.values());
// Validate output // Validate output
try { try {
return ChatConversationHistoryOutputSchema.parse(chatMessages); return ChatConversationHistoryOutputSchema.parse(deduplicatedMessages);
} catch (validationError) { } catch (validationError) {
throw new Error( throw new Error(
`Output validation failed: ${validationError instanceof Error ? validationError.message : 'Invalid output format'}` `Output validation failed: ${validationError instanceof Error ? validationError.message : 'Invalid output format'}`

View File

@ -1,181 +0,0 @@
import type { ModelMessage } from 'ai';
import { type SQL, and, eq, isNull, sql } from 'drizzle-orm';
import { z } from 'zod';
import { db } from '../../connection';
import { messages } from '../../schema';
import { ReasoningMessageSchema, ResponseMessageSchema } from '../../schemas/message-schemas';
const UpdateMessageEntriesSchema = z.object({
messageId: z.string().uuid(),
rawLlmMessages: z.array(z.custom<ModelMessage>()).optional(),
responseMessages: z.array(ResponseMessageSchema).optional(),
reasoningMessages: z.array(ReasoningMessageSchema).optional(),
});
export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSchema>;
/**
* Optimized version of updateMessageEntries using more efficient JSONB operations.
* Key optimizations:
* 1. Uses jsonb_build_object to construct lookup maps for O(1) lookups
* 2. Reduces the number of jsonb_array_elements calls
* 3. Simplifies the toolCallId comparison logic
* 4. Uses more efficient CASE statements instead of complex subqueries
*/
export async function updateMessageEntriesOptimized({
messageId,
rawLlmMessages,
responseMessages,
reasoningMessages,
}: UpdateMessageEntriesParams): Promise<{ success: boolean }> {
try {
const updates: Record<string, SQL | string> = { updatedAt: new Date().toISOString() };
// Optimized merge for response messages - using jsonb_object for O(1) lookups
if (responseMessages?.length) {
const newData = JSON.stringify(responseMessages);
updates.responseMessages = sql`
CASE
WHEN ${messages.responseMessages} IS NULL THEN ${newData}::jsonb
ELSE (
WITH new_map AS (
SELECT jsonb_object_agg(value->>'id', value) AS map
FROM jsonb_array_elements(${newData}::jsonb) AS value
WHERE value->>'id' IS NOT NULL
),
merged AS (
SELECT jsonb_agg(
CASE
WHEN new_map.map ? (existing.value->>'id')
THEN new_map.map->(existing.value->>'id')
ELSE existing.value
END
ORDER BY existing.ordinality
) AS result
FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS existing(value, ordinality)
CROSS JOIN new_map
UNION ALL
SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality)
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality)
CROSS JOIN new_map
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${messages.responseMessages}) AS existing
WHERE existing.value->>'id' = new_item.value->>'id'
)
)
SELECT COALESCE(jsonb_agg(value), '[]'::jsonb)
FROM (
SELECT jsonb_array_elements(result) AS value
FROM merged
WHERE result IS NOT NULL
) t
)
END`;
}
// Optimized merge for reasoning messages
if (reasoningMessages?.length) {
const newData = JSON.stringify(reasoningMessages);
updates.reasoning = sql`
CASE
WHEN ${messages.reasoning} IS NULL THEN ${newData}::jsonb
ELSE (
WITH new_map AS (
SELECT jsonb_object_agg(value->>'id', value) AS map
FROM jsonb_array_elements(${newData}::jsonb) AS value
WHERE value->>'id' IS NOT NULL
),
merged AS (
SELECT jsonb_agg(
CASE
WHEN new_map.map ? (existing.value->>'id')
THEN new_map.map->(existing.value->>'id')
ELSE existing.value
END
ORDER BY existing.ordinality
) AS result
FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS existing(value, ordinality)
CROSS JOIN new_map
UNION ALL
SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality)
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality)
CROSS JOIN new_map
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${messages.reasoning}) AS existing
WHERE existing.value->>'id' = new_item.value->>'id'
)
)
SELECT COALESCE(jsonb_agg(value), '[]'::jsonb)
FROM (
SELECT jsonb_array_elements(result) AS value
FROM merged
WHERE result IS NOT NULL
) t
)
END`;
}
// Optimized merge for raw LLM messages - simplified toolCallId comparison
if (rawLlmMessages?.length) {
const newData = JSON.stringify(rawLlmMessages);
updates.rawLlmMessages = sql`
CASE
WHEN ${messages.rawLlmMessages} IS NULL THEN ${newData}::jsonb
ELSE (
WITH new_messages AS (
SELECT
value,
value->>'role' AS role,
COALESCE(
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL),
''
) AS tool_calls
FROM jsonb_array_elements(${newData}::jsonb) AS value
),
existing_messages AS (
SELECT
value,
ordinality,
value->>'role' AS role,
COALESCE(
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL),
''
) AS tool_calls
FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality)
)
SELECT COALESCE(
jsonb_agg(value ORDER BY ord),
'[]'::jsonb
)
FROM (
-- Keep existing messages that aren't being updated
SELECT e.value, e.ordinality AS ord
FROM existing_messages e
WHERE NOT EXISTS (
SELECT 1 FROM new_messages n
WHERE n.role = e.role AND n.tool_calls = e.tool_calls
)
UNION ALL
-- Add all new messages
SELECT n.value, 1000000 + row_number() OVER () AS ord
FROM new_messages n
) combined
)
END`;
}
await db
.update(messages)
.set(updates)
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)));
return { success: true };
} catch (error) {
console.error('Failed to update message entries:', error);
throw new Error(`Failed to update message entries for message ${messageId}`);
}
}

View File

@ -22,12 +22,7 @@ export type UpdateMessageEntriesParams = z.infer<typeof UpdateMessageEntriesSche
* - responseMessages: upsert by 'id' field * - responseMessages: upsert by 'id' field
* - reasoningMessages: upsert by 'id' field * - reasoningMessages: upsert by 'id' field
* - rawLlmMessages: upsert by combination of 'role' and 'toolCallId' in content array * - rawLlmMessages: upsert by combination of 'role' and 'toolCallId' in content array
* * (handles both string content and array content with tool calls)
* Optimizations applied:
* 1. Single jsonb_array_elements call per field using LATERAL joins
* 2. More efficient key generation for rawLlmMessages using MD5 hash
* 3. Use of jsonb_object_agg for O(1) lookups instead of nested EXISTS
* 4. Reduced number of COALESCE operations
*/ */
export async function updateMessageEntries({ export async function updateMessageEntries({
messageId, messageId,
@ -38,35 +33,44 @@ export async function updateMessageEntries({
try { try {
const updates: Record<string, SQL | string> = { updatedAt: new Date().toISOString() }; const updates: Record<string, SQL | string> = { updatedAt: new Date().toISOString() };
// Optimized merge for response messages - using jsonb_object_agg for O(1) lookups // Optimized merge for response messages
if (responseMessages?.length) { if (responseMessages?.length) {
const newData = JSON.stringify(responseMessages); const newData = JSON.stringify(responseMessages);
updates.responseMessages = sql` updates.responseMessages = sql`
CASE CASE
WHEN ${messages.responseMessages} IS NULL OR jsonb_array_length(${messages.responseMessages}) = 0 WHEN ${messages.responseMessages} IS NULL THEN ${newData}::jsonb
THEN ${newData}::jsonb
ELSE ( ELSE (
WITH indexed_new AS ( WITH new_map AS (
SELECT jsonb_object_agg(value->>'id', value) AS lookup SELECT jsonb_object_agg(value->>'id', value) AS map
FROM jsonb_array_elements(${newData}::jsonb) AS value FROM jsonb_array_elements(${newData}::jsonb) AS value
WHERE value->>'id' IS NOT NULL
),
merged AS (
SELECT jsonb_agg(
CASE
WHEN new_map.map ? (existing.value->>'id')
THEN new_map.map->(existing.value->>'id')
ELSE existing.value
END
ORDER BY existing.ordinality
) AS result
FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS existing(value, ordinality)
CROSS JOIN new_map
UNION ALL
SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality)
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality)
CROSS JOIN new_map
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${messages.responseMessages}) AS existing
WHERE existing.value->>'id' = new_item.value->>'id'
)
) )
SELECT jsonb_agg( SELECT COALESCE(jsonb_agg(value), '[]'::jsonb)
COALESCE( FROM (
indexed_new.lookup->(existing.value->>'id'), SELECT jsonb_array_elements(result) AS value
existing.value FROM merged
) ORDER BY existing.ordinality WHERE result IS NOT NULL
) || ) t
COALESCE(
(SELECT jsonb_agg(new_item.value)
FROM jsonb_array_elements(${newData}::jsonb) AS new_item
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${messages.responseMessages}) AS e
WHERE e.value->>'id' = new_item.value->>'id'
)),
'[]'::jsonb
)
FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS existing(value, ordinality)
CROSS JOIN indexed_new
) )
END`; END`;
} }
@ -76,86 +80,100 @@ export async function updateMessageEntries({
const newData = JSON.stringify(reasoningMessages); const newData = JSON.stringify(reasoningMessages);
updates.reasoning = sql` updates.reasoning = sql`
CASE CASE
WHEN ${messages.reasoning} IS NULL OR jsonb_array_length(${messages.reasoning}) = 0 WHEN ${messages.reasoning} IS NULL THEN ${newData}::jsonb
THEN ${newData}::jsonb
ELSE ( ELSE (
WITH indexed_new AS ( WITH new_map AS (
SELECT jsonb_object_agg(value->>'id', value) AS lookup SELECT jsonb_object_agg(value->>'id', value) AS map
FROM jsonb_array_elements(${newData}::jsonb) AS value FROM jsonb_array_elements(${newData}::jsonb) AS value
WHERE value->>'id' IS NOT NULL
),
merged AS (
SELECT jsonb_agg(
CASE
WHEN new_map.map ? (existing.value->>'id')
THEN new_map.map->(existing.value->>'id')
ELSE existing.value
END
ORDER BY existing.ordinality
) AS result
FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS existing(value, ordinality)
CROSS JOIN new_map
UNION ALL
SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality)
FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality)
CROSS JOIN new_map
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${messages.reasoning}) AS existing
WHERE existing.value->>'id' = new_item.value->>'id'
)
) )
SELECT jsonb_agg( SELECT COALESCE(jsonb_agg(value), '[]'::jsonb)
COALESCE( FROM (
indexed_new.lookup->(existing.value->>'id'), SELECT jsonb_array_elements(result) AS value
existing.value FROM merged
) ORDER BY existing.ordinality WHERE result IS NOT NULL
) || ) t
COALESCE(
(SELECT jsonb_agg(new_item.value)
FROM jsonb_array_elements(${newData}::jsonb) AS new_item
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_array_elements(${messages.reasoning}) AS e
WHERE e.value->>'id' = new_item.value->>'id'
)),
'[]'::jsonb
)
FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS existing(value, ordinality)
CROSS JOIN indexed_new
) )
END`; END`;
} }
// Optimized merge for raw LLM messages - using efficient key generation // Optimized merge for raw LLM messages - handles both string and array content
if (rawLlmMessages?.length) { if (rawLlmMessages?.length) {
const newData = JSON.stringify(rawLlmMessages); const newData = JSON.stringify(rawLlmMessages);
updates.rawLlmMessages = sql` updates.rawLlmMessages = sql`
CASE CASE
WHEN ${messages.rawLlmMessages} IS NULL OR jsonb_array_length(${messages.rawLlmMessages}) = 0 WHEN ${messages.rawLlmMessages} IS NULL THEN ${newData}::jsonb
THEN ${newData}::jsonb
ELSE ( ELSE (
WITH new_with_keys AS ( WITH new_messages AS (
SELECT SELECT
value, value,
value->>'role' || ':' || COALESCE( value->>'role' AS role,
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') COALESCE(
FROM jsonb_array_elements(value->'content') c CASE
WHERE c->>'toolCallId' IS NOT NULL), WHEN jsonb_typeof(value->'content') = 'array' THEN
'no-tools' (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
) AS match_key FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL)
ELSE NULL
END,
''
) AS tool_calls
FROM jsonb_array_elements(${newData}::jsonb) AS value FROM jsonb_array_elements(${newData}::jsonb) AS value
), ),
existing_with_keys AS ( existing_messages AS (
SELECT SELECT
value, value,
ordinality, ordinality,
value->>'role' || ':' || COALESCE( value->>'role' AS role,
(SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') COALESCE(
FROM jsonb_array_elements(value->'content') c CASE
WHERE c->>'toolCallId' IS NOT NULL), WHEN jsonb_typeof(value->'content') = 'array' THEN
'no-tools' (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId')
) AS match_key FROM jsonb_array_elements(value->'content') c
WHERE c->>'toolCallId' IS NOT NULL)
ELSE NULL
END,
''
) AS tool_calls
FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality) FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality)
),
new_lookup AS (
SELECT jsonb_object_agg(match_key, value) AS lookup
FROM new_with_keys
) )
SELECT jsonb_agg( SELECT COALESCE(
COALESCE( jsonb_agg(value ORDER BY ord),
new_lookup.lookup->existing_with_keys.match_key,
existing_with_keys.value
) ORDER BY existing_with_keys.ordinality
) ||
COALESCE(
(SELECT jsonb_agg(n.value)
FROM new_with_keys n
WHERE NOT EXISTS (
SELECT 1 FROM existing_with_keys e
WHERE e.match_key = n.match_key
)),
'[]'::jsonb '[]'::jsonb
) )
FROM existing_with_keys FROM (
CROSS JOIN new_lookup -- Keep existing messages that aren't being updated
SELECT e.value, e.ordinality AS ord
FROM existing_messages e
WHERE NOT EXISTS (
SELECT 1 FROM new_messages n
WHERE n.role = e.role AND n.tool_calls = e.tool_calls
)
UNION ALL
-- Add all new messages
SELECT n.value, 1000000 + row_number() OVER () AS ord
FROM new_messages n
) combined
) )
END`; END`;
} }