From 6b171aae2e5c76325ab54bde5c9e877d50c2a6f2 Mon Sep 17 00:00:00 2001 From: dal Date: Mon, 18 Aug 2025 10:16:59 -0600 Subject: [PATCH] build error map the reasoning message better in web --- apps/server/src/api/v2/chats/handler.ts | 2 +- .../src/api/v2/chats/services/chat-helpers.ts | 8 + .../ReasoningMessageSelector.tsx | 2 +- .../src/agents/analyst-agent/analyst-agent.ts | 8 + .../think-and-prep-agent.ts | 7 +- packages/ai/src/llm/ai-fallback.test.ts | 4 +- packages/ai/src/llm/ai-fallback.ts | 14 ++ .../create-todos-step/create-todos-step.ts | 51 ++++- .../create-todos.int.test.ts | 113 ++++++----- .../extract-values-search-step.int.test.ts | 38 ++-- .../extract-values-search-step.ts | 60 +++++- .../extract-values-with-llm.ts | 8 +- .../sequential-thinking-tool.test.ts | 4 +- .../analyst-workflow.ts | 8 +- packages/database/src/queries/chats/chats.ts | 7 + .../messages/chatConversationHistory.ts | 119 ++++++++++- .../update-message-entries-optimized.ts | 181 ----------------- .../messages/update-message-entries.ts | 188 ++++++++++-------- 18 files changed, 457 insertions(+), 365 deletions(-) delete mode 100644 packages/database/src/queries/messages/update-message-entries-optimized.ts diff --git a/apps/server/src/api/v2/chats/handler.ts b/apps/server/src/api/v2/chats/handler.ts index b9d42af44..07c943e1a 100644 --- a/apps/server/src/api/v2/chats/handler.ts +++ b/apps/server/src/api/v2/chats/handler.ts @@ -6,7 +6,7 @@ import { ChatErrorCode, type ChatWithMessages, } from '@buster/server-shared/chats'; -import { tasks } from '@trigger.dev/sdk/v3'; +import { tasks } from '@trigger.dev/sdk'; import { handleAssetChat, handleAssetChatWithPrompt } from './services/chat-helpers'; import { initializeChat } from './services/chat-service'; diff --git a/apps/server/src/api/v2/chats/services/chat-helpers.ts b/apps/server/src/api/v2/chats/services/chat-helpers.ts index 287fdb3be..25961f352 100644 --- a/apps/server/src/api/v2/chats/services/chat-helpers.ts +++ b/apps/server/src/api/v2/chats/services/chat-helpers.ts @@ -1,4 +1,5 @@ import { canUserAccessChatCached } from '@buster/access-controls'; +import type { ModelMessage } from '@buster/ai'; import { type User, chats, @@ -328,6 +329,13 @@ export async function handleNewChat({ requestMessage: prompt, title: prompt, isCompleted: false, + // Add the user message as the first raw LLM entry + rawLlmMessages: [ + { + role: 'user', + content: prompt, + } as ModelMessage, + ], }) .returning(); diff --git a/apps/web/src/controllers/ReasoningController/ReasoningMessages/ReasoningMessageSelector.tsx b/apps/web/src/controllers/ReasoningController/ReasoningMessages/ReasoningMessageSelector.tsx index aed97c85f..b042d5c50 100644 --- a/apps/web/src/controllers/ReasoningController/ReasoningMessages/ReasoningMessageSelector.tsx +++ b/apps/web/src/controllers/ReasoningController/ReasoningMessages/ReasoningMessageSelector.tsx @@ -86,7 +86,7 @@ export const ReasoningMessageSelector: React.FC = if (!type || !status) return null; - const ReasoningMessage = ReasoningMessageRecord[type]; + const ReasoningMessage = ReasoningMessageRecord[type as keyof typeof ReasoningMessageRecord]; const animationKey = reasoningMessageId + type; return ( diff --git a/packages/ai/src/agents/analyst-agent/analyst-agent.ts b/packages/ai/src/agents/analyst-agent/analyst-agent.ts index 1d1b87fa6..9733a8c8c 100644 --- a/packages/ai/src/agents/analyst-agent/analyst-agent.ts +++ b/packages/ai/src/agents/analyst-agent/analyst-agent.ts @@ -109,6 +109,14 @@ export function createAnalystAgent(analystAgentOptions: AnalystAgentOptions) { maxOutputTokens: 10000, temperature: 0, experimental_repairToolCall: healToolWithLlm, + onStepFinish: async (event) => { + // Wait for all tool operations to complete before moving to next step + // This ensures done tool's async operations complete before stream terminates + console.info('Analyst Agent step finished', { + toolCalls: event.toolCalls?.length || 0, + hasToolResults: !!event.toolResults, + }); + }, onFinish: () => { console.info('Analyst Agent finished'); }, diff --git a/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts b/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts index 453c2f9fa..5ae819f6a 100644 --- a/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts +++ b/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts @@ -136,8 +136,11 @@ export function createThinkAndPrepAgent(thinkAndPrepAgentSchema: ThinkAndPrepAge maxOutputTokens: 10000, temperature: 0, experimental_repairToolCall: healToolWithLlm, - onFinish: () => { - console.info('Think and Prep Agent finished'); + onStepFinish: async (event) => { + console.info('Think and Prep Agent step finished', { + toolCalls: event.toolCalls?.length || 0, + hasToolResults: !!event.toolResults, + }); }, }), { diff --git a/packages/ai/src/llm/ai-fallback.test.ts b/packages/ai/src/llm/ai-fallback.test.ts index 4ce5ade28..d0b1160b8 100644 --- a/packages/ai/src/llm/ai-fallback.test.ts +++ b/packages/ai/src/llm/ai-fallback.test.ts @@ -366,7 +366,7 @@ test('model reset interval resets to first model', async () => { vi.useFakeTimers(); let model1CallCount = 0; - const model1 = new MockLanguageModelV2({ + const model1 = new MockLanguageModelV2({ modelId: 'primary-model', doGenerate: async () => { model1CallCount++; @@ -394,7 +394,7 @@ test('model reset interval resets to first model', async () => { try { await fallback.doGenerate({ prompt: [] }); } catch {} - + expect(fallback.modelId).toBe('fallback-model'); // Advance time past reset interval diff --git a/packages/ai/src/llm/ai-fallback.ts b/packages/ai/src/llm/ai-fallback.ts index 527260bb6..f73b980cf 100644 --- a/packages/ai/src/llm/ai-fallback.ts +++ b/packages/ai/src/llm/ai-fallback.ts @@ -255,6 +255,20 @@ export class FallbackModel implements LanguageModelV2 { } controller.close(); } catch (error) { + // Check if this is a normal stream termination + const errorMessage = error instanceof Error ? error.message : String(error); + const isNormalTermination = + errorMessage === 'terminated' || + errorMessage.includes('terminated') || + errorMessage === 'aborted' || + errorMessage.includes('aborted'); + + // If it's a normal termination and we've already streamed content, just close normally + if (isNormalTermination && hasStreamedAny) { + controller.close(); + return; + } + if (self.settings.onError) { try { await self.settings.onError(error as RetryableError, self.modelId); diff --git a/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos-step.ts b/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos-step.ts index d79ac28cd..58211704a 100644 --- a/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos-step.ts +++ b/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos-step.ts @@ -13,7 +13,9 @@ export const createTodosParamsSchema = z.object({ export const createTodosResultSchema = z.object({ todos: z.string().describe('The TODO list in markdown format with checkboxes'), - todosMessage: z.custom().describe('The TODO list message'), + messages: z + .array(z.custom()) + .describe('Tool call and result messages for the TODO creation'), }); // Context schema for passing to streaming handlers @@ -49,7 +51,6 @@ export type CreateTodosInput = z.infer; import { createTodosStepDelta } from './create-todos-step-delta'; import { createTodosStepFinish } from './create-todos-step-finish'; import { createTodosStepStart } from './create-todos-step-start'; -import { createTodosUserMessage } from './helpers/create-todos-transform-helper'; /** * Generates a TODO list using the LLM with structured output and streaming @@ -135,12 +136,52 @@ export async function runCreateTodosStep(params: CreateTodosParams): Promise\n- Below are the items on your TODO list:\n${todos}\n`, + }); + } return { todos, - todosMessage, + messages: resultMessages, }; } catch (error) { console.error('[create-todos-step] Unexpected error:', error); diff --git a/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos.int.test.ts b/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos.int.test.ts index 066c5e6e8..82380c74b 100644 --- a/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos.int.test.ts +++ b/packages/ai/src/steps/analyst-agent-steps/create-todos-step/create-todos.int.test.ts @@ -19,14 +19,11 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); expect(result.todos).toContain('[ ]'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); // Should contain checkbox format - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); - expect(result.todosMessage.content).toBe(result.todos); + expect(result.messages[result.messages.length - 1]!.content).toContain(result.todos); }); it('should create todos for complex multi-part request', async () => { @@ -42,11 +39,13 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); expect(result.todos).toContain('[ ]'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); }); it('should create todos for specific entity queries', async () => { @@ -62,8 +61,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); // Should contain todos about identifying Baltic Born, return rate, and time period }); @@ -80,11 +80,13 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); expect(result.todos).toContain('[ ]'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); }); it('should create todos for merchant ranking queries', async () => { @@ -101,8 +103,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); // Should include todos about identifying merchants, metrics, filtering, and sorting }); @@ -119,8 +122,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); }); it('should handle vague requests appropriately', async () => { @@ -136,8 +140,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); // Should create todos about determining what "important stuff" means }); @@ -155,8 +160,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); // Should include todos about charts and groupings }); @@ -174,8 +180,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); // Should include todo about inability to do forecasts }); @@ -192,8 +199,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); // Should create todos about identifying both elements }); @@ -219,8 +227,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); }); it('should handle follow-up questions with context', async () => { @@ -244,8 +253,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); // Should leverage context from previous messages }); @@ -263,8 +273,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); // Should create todos about data availability }); @@ -281,8 +292,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); // Should break down "sports car" and "best selling" }); @@ -299,8 +311,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); // Should include todos about smart TVs and online channel }); @@ -317,9 +330,10 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(result.todos).toBe(''); // Empty TODO list for empty prompt - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); - expect(result.todosMessage.content).toBe(''); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); + expect(result.messages[result.messages.length - 1]!.content).toContain(result.todos); }); it('should handle very long complex prompts', async () => { @@ -349,8 +363,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); expect(result.todos.length).toBeGreaterThan(0); }); @@ -367,8 +382,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); // Should include todos about CLV calculation and segmentation }); @@ -386,8 +402,9 @@ describe('create-todos-step integration', () => { expect(result).toBeDefined(); expect(result.todos).toBeDefined(); expect(typeof result.todos).toBe('string'); - expect(result.todosMessage).toBeDefined(); - expect(result.todosMessage.role).toBe('user'); + expect(result.messages).toBeDefined(); + expect(result.messages.length).toBeGreaterThan(0); + expect(result.messages[result.messages.length - 1]!.role).toBe('user'); }); it('should process concurrent todo creation requests', async () => { diff --git a/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-search-step.int.test.ts b/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-search-step.int.test.ts index 198983ba3..eb2d811cc 100644 --- a/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-search-step.int.test.ts +++ b/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-search-step.int.test.ts @@ -16,7 +16,7 @@ describe('extract-values-search-step integration', () => { expect(result).toBeDefined(); expect(result.values).toBeDefined(); expect(Array.isArray(result.values)).toBe(true); - expect(result.valuesMessage).toBeUndefined(); + expect(result.messages?.[result.messages.length - 1]).toBeUndefined(); }); it('should extract multiple values from complex query', async () => { @@ -53,7 +53,7 @@ describe('extract-values-search-step integration', () => { expect(result.values).toBeDefined(); expect(Array.isArray(result.values)).toBe(true); expect(result.values.length).toBe(0); // No specific values to extract - expect(result.valuesMessage).toBeUndefined(); + expect(result.messages?.[result.messages.length - 1]).toBeUndefined(); }); it('should use conversation history for context', async () => { @@ -204,7 +204,7 @@ describe('extract-values-search-step integration', () => { expect(result).toBeDefined(); expect(result.values).toEqual([]); - expect(result.valuesMessage).toBeUndefined(); + expect(result.messages?.[result.messages.length - 1]).toBeUndefined(); }); it('should handle very long prompts with multiple values', async () => { @@ -277,7 +277,7 @@ describe('extract-values-search-step integration', () => { expect(result).toBeDefined(); expect(result.values).toBeDefined(); expect(Array.isArray(result.values)).toBe(true); - expect(result.valuesMessage).toBeUndefined(); + expect(result.messages?.[result.messages.length - 1]).toBeUndefined(); }); it('should handle special characters in values', async () => { @@ -345,10 +345,8 @@ describe('extract-values-search-step integration', () => { }); it('should not create valuesMessage with empty content when search returns no results', async () => { - const messages: ModelMessage[] = [ - { role: 'user', content: 'Show me sales for Red Bull' }, - ]; - + const messages: ModelMessage[] = [{ role: 'user', content: 'Show me sales for Red Bull' }]; + // Test with a dataSourceId that would trigger search but return empty results const params = { messages, @@ -359,12 +357,13 @@ describe('extract-values-search-step integration', () => { expect(result).toBeDefined(); expect(result.values).toBeDefined(); - + // If valuesMessage exists, it should have non-empty content - if (result.valuesMessage) { - expect(result.valuesMessage.content).toBeTruthy(); - expect(typeof result.valuesMessage.content).toBe('string'); - expect((result.valuesMessage.content as string).length).toBeGreaterThan(0); + const lastMessage = result.messages?.[result.messages.length - 1]; + if (lastMessage) { + expect(lastMessage.content).toBeTruthy(); + expect(typeof lastMessage.content).toBe('string'); + expect((lastMessage.content as string).length).toBeGreaterThan(0); } }); @@ -372,22 +371,23 @@ describe('extract-values-search-step integration', () => { const messages: ModelMessage[] = [ { role: 'user', content: 'Show me Nike and Adidas products' }, ]; - + const params = { messages, - dataSourceId: 'test-datasource-id', + dataSourceId: 'test-datasource-id', }; const result = await runExtractValuesAndSearchStep(params); expect(result).toBeDefined(); expect(result.values).toBeDefined(); - + // Verify that if we have values but no search results, valuesMessage is undefined // This prevents empty string messages from being created - if (result.values.length > 0 && result.valuesMessage) { - expect(result.valuesMessage.content).toBeTruthy(); - expect((result.valuesMessage.content as string).trim()).not.toBe(''); + const lastMsg = result.messages?.[result.messages.length - 1]; + if (result.values.length > 0 && lastMsg) { + expect(lastMsg.content).toBeTruthy(); + expect((lastMsg.content as string).trim()).not.toBe(''); } }); }); diff --git a/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-search-step.ts b/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-search-step.ts index c83da7b2a..853d51d03 100644 --- a/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-search-step.ts +++ b/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-search-step.ts @@ -12,7 +12,9 @@ export const extractValuesSearchParamsSchema = z.object({ export const extractValuesSearchResultSchema = z.object({ values: z.array(z.string()).describe('The values that were extracted from the prompt'), - valuesMessage: z.custom().optional().describe('The values message'), + messages: z + .array(z.custom()) + .describe('Tool call and result messages for the extraction'), }); // Export types from schemas @@ -182,15 +184,55 @@ export async function runExtractValuesAndSearchStep( // Perform stored values search if we have extracted values and a dataSourceId const storedValuesResult = await searchStoredValues(extractedValues, dataSourceId || ''); + // Generate a unique ID for this tool call + const toolCallId = `extract_values_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + + // Create tool call and result messages + const resultMessages: ModelMessage[] = []; + + // Add assistant message with tool call + resultMessages.push({ + role: 'assistant', + content: [ + { + type: 'tool-call', + toolCallId, + toolName: 'extractValues', + input: { prompt }, + }, + ], + }); + + // Add tool result message + resultMessages.push({ + role: 'tool', + content: [ + { + type: 'tool-result', + toolCallId, + toolName: 'extractValues', + output: { + type: 'text', + value: JSON.stringify({ + values: extractedValues, + searchResults: storedValuesResult.searchResults || '', + }), + }, + }, + ], + }); + + // If we have search results, add them as a user message (for backward compatibility) + if (extractedValues.length > 0 && storedValuesResult.searchResults) { + resultMessages.push({ + role: 'user', + content: storedValuesResult.searchResults, + }); + } + return { values: extractedValues, - valuesMessage: - extractedValues.length > 0 && storedValuesResult.searchResults - ? { - role: 'user', - content: storedValuesResult.searchResults, - } - : undefined, + messages: resultMessages, }; } catch (error) { // Handle AbortError gracefully @@ -199,6 +241,7 @@ export async function runExtractValuesAndSearchStep( // Return empty object when aborted return { values: [], + messages: [], }; } @@ -206,6 +249,7 @@ export async function runExtractValuesAndSearchStep( // Return empty object instead of crashing return { values: [], + messages: [], }; } } diff --git a/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-with-llm.ts b/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-with-llm.ts index d3bd08dbb..76468c282 100644 --- a/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-with-llm.ts +++ b/packages/ai/src/steps/analyst-agent-steps/extract-values-step/extract-values-with-llm.ts @@ -79,11 +79,13 @@ export async function extractValuesWithLLM( messages.push(...conversationHistory); } - // Add the current user prompt - messages.push({ + const userMessage: ModelMessage = { role: 'user', content: prompt, - }); + }; + + // Add the current user prompt + messages.push(userMessage); const tracedValuesExtraction = wrapTraced( async () => { diff --git a/packages/ai/src/tools/planning-thinking-tools/sequential-thinking-tool/sequential-thinking-tool.test.ts b/packages/ai/src/tools/planning-thinking-tools/sequential-thinking-tool/sequential-thinking-tool.test.ts index af151a622..61d0e0d7e 100644 --- a/packages/ai/src/tools/planning-thinking-tools/sequential-thinking-tool/sequential-thinking-tool.test.ts +++ b/packages/ai/src/tools/planning-thinking-tools/sequential-thinking-tool/sequential-thinking-tool.test.ts @@ -40,12 +40,12 @@ describe('Sequential Thinking Tool', () => { expect(tool.execute).toBeDefined(); expect(tool.onInputStart).toBeDefined(); - + // First call onInputStart to set up the state (simulating streaming) if (tool.onInputStart) { await tool.onInputStart({ toolCallId: 'test-tool-call-123', messages: [] }); } - + const execute = tool.execute; if (!execute) throw new Error('execute is undefined'); diff --git a/packages/ai/src/workflows/analyst-agent-workflow/analyst-workflow.ts b/packages/ai/src/workflows/analyst-agent-workflow/analyst-workflow.ts index 83b867605..8b9e0dc35 100644 --- a/packages/ai/src/workflows/analyst-agent-workflow/analyst-workflow.ts +++ b/packages/ai/src/workflows/analyst-agent-workflow/analyst-workflow.ts @@ -33,11 +33,11 @@ export async function runAnalystWorkflow(input: AnalystWorkflowInput) { const { todos, values } = await runAnalystPrepSteps(input); - if (values.valuesMessage) { - messages.push(values.valuesMessage); - } + // Add all messages from extract-values step (tool call, result, and optional user message) + messages.push(...values.messages); - messages.push(todos.todosMessage); + // Add all messages from create-todos step (tool call, result, and user message) + messages.push(...todos.messages); const thinkAndPrepAgentStepResults = await runThinkAndPrepAgentStep({ options: { diff --git a/packages/database/src/queries/chats/chats.ts b/packages/database/src/queries/chats/chats.ts index 1d5a9cb52..415559061 100644 --- a/packages/database/src/queries/chats/chats.ts +++ b/packages/database/src/queries/chats/chats.ts @@ -136,6 +136,13 @@ export async function createMessage(input: CreateMessageInput): Promise requestMessage: validated.content, title: validated.content.substring(0, 255), // Ensure title fits in database isCompleted: false, + // Add the user message as the first raw LLM entry + rawLlmMessages: [ + { + role: 'user', + content: validated.content, + }, + ], }) .returning(); diff --git a/packages/database/src/queries/messages/chatConversationHistory.ts b/packages/database/src/queries/messages/chatConversationHistory.ts index 2a25d9cc0..bc445f4ed 100644 --- a/packages/database/src/queries/messages/chatConversationHistory.ts +++ b/packages/database/src/queries/messages/chatConversationHistory.ts @@ -1,9 +1,73 @@ -import type { CoreMessage, ModelMessage } from 'ai'; +import type { ModelMessage } from 'ai'; import { and, eq, isNull } from 'drizzle-orm'; import { z } from 'zod'; import { db } from '../../connection'; import { messages } from '../../schema'; +/** + * Convert messages from old CoreMessage format (v4) to ModelMessage format (v5) + * Main changes: tool calls 'args' → 'input', tool results 'result' → 'output' + * + * Since we're dealing with unknown data from the database, we cast to ModelMessage[] + * and let the runtime handle any actual format differences. + */ +export function convertCoreToModel(messages: unknown): ModelMessage[] { + if (!Array.isArray(messages)) { + return []; + } + + return messages.map((message: unknown) => { + // Basic validation + if (!message || typeof message !== 'object' || !('role' in message)) { + return message as ModelMessage; + } + + const msg = message as Record; + + // For assistant messages, update tool call args → input + if (msg.role === 'assistant' && Array.isArray(msg.content)) { + return { + ...msg, + content: msg.content.map((part: unknown) => { + if ( + part && + typeof part === 'object' && + 'type' in part && + part.type === 'tool-call' && + 'args' in part + ) { + const { args, ...rest } = part as Record; + return { ...rest, input: args }; + } + return part; + }), + } as ModelMessage; + } + + // For tool messages, update result → output + if (msg.role === 'tool' && Array.isArray(msg.content)) { + return { + ...msg, + content: msg.content.map((part: unknown) => { + if ( + part && + typeof part === 'object' && + 'type' in part && + part.type === 'tool-result' && + 'result' in part + ) { + const { result, ...rest } = part as Record; + return { ...rest, output: result }; + } + return part; + }), + } as ModelMessage; + } + + return message as ModelMessage; + }); +} + // Helper function to get chatId from messageId async function getChatIdFromMessage(messageId: string): Promise { let messageResult: Array<{ chatId: string }>; @@ -76,8 +140,8 @@ export type ChatConversationHistoryOutput = z.infer(); + + for (const message of convertedMessages) { + // Create a unique key based on role and content + // This ensures we don't have duplicate messages with the same role and content + const messageKey = JSON.stringify({ + role: message.role, + content: message.content, + // Include experimental_providerMetadata if it has messageId for better deduplication + messageId: + 'experimental_providerMetadata' in message && + message.experimental_providerMetadata && + typeof message.experimental_providerMetadata === 'object' && + 'messageId' in message.experimental_providerMetadata + ? message.experimental_providerMetadata.messageId + : undefined, + }); + + // Only add if we haven't seen this message before + if (!uniqueMessagesMap.has(messageKey)) { + uniqueMessagesMap.set(messageKey, message); + } + } + + // Convert back to array and maintain chronological order + // Since we're merging from multiple messages, we should preserve the order they appear + const deduplicatedMessages = Array.from(uniqueMessagesMap.values()); + // Validate output try { - return ChatConversationHistoryOutputSchema.parse(chatMessages); + return ChatConversationHistoryOutputSchema.parse(deduplicatedMessages); } catch (validationError) { throw new Error( `Output validation failed: ${validationError instanceof Error ? validationError.message : 'Invalid output format'}` diff --git a/packages/database/src/queries/messages/update-message-entries-optimized.ts b/packages/database/src/queries/messages/update-message-entries-optimized.ts deleted file mode 100644 index 5896c0a65..000000000 --- a/packages/database/src/queries/messages/update-message-entries-optimized.ts +++ /dev/null @@ -1,181 +0,0 @@ -import type { ModelMessage } from 'ai'; -import { type SQL, and, eq, isNull, sql } from 'drizzle-orm'; -import { z } from 'zod'; -import { db } from '../../connection'; -import { messages } from '../../schema'; -import { ReasoningMessageSchema, ResponseMessageSchema } from '../../schemas/message-schemas'; - -const UpdateMessageEntriesSchema = z.object({ - messageId: z.string().uuid(), - rawLlmMessages: z.array(z.custom()).optional(), - responseMessages: z.array(ResponseMessageSchema).optional(), - reasoningMessages: z.array(ReasoningMessageSchema).optional(), -}); - -export type UpdateMessageEntriesParams = z.infer; - -/** - * Optimized version of updateMessageEntries using more efficient JSONB operations. - * Key optimizations: - * 1. Uses jsonb_build_object to construct lookup maps for O(1) lookups - * 2. Reduces the number of jsonb_array_elements calls - * 3. Simplifies the toolCallId comparison logic - * 4. Uses more efficient CASE statements instead of complex subqueries - */ -export async function updateMessageEntriesOptimized({ - messageId, - rawLlmMessages, - responseMessages, - reasoningMessages, -}: UpdateMessageEntriesParams): Promise<{ success: boolean }> { - try { - const updates: Record = { updatedAt: new Date().toISOString() }; - - // Optimized merge for response messages - using jsonb_object for O(1) lookups - if (responseMessages?.length) { - const newData = JSON.stringify(responseMessages); - updates.responseMessages = sql` - CASE - WHEN ${messages.responseMessages} IS NULL THEN ${newData}::jsonb - ELSE ( - WITH new_map AS ( - SELECT jsonb_object_agg(value->>'id', value) AS map - FROM jsonb_array_elements(${newData}::jsonb) AS value - WHERE value->>'id' IS NOT NULL - ), - merged AS ( - SELECT jsonb_agg( - CASE - WHEN new_map.map ? (existing.value->>'id') - THEN new_map.map->(existing.value->>'id') - ELSE existing.value - END - ORDER BY existing.ordinality - ) AS result - FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS existing(value, ordinality) - CROSS JOIN new_map - UNION ALL - SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality) - FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality) - CROSS JOIN new_map - WHERE NOT EXISTS ( - SELECT 1 FROM jsonb_array_elements(${messages.responseMessages}) AS existing - WHERE existing.value->>'id' = new_item.value->>'id' - ) - ) - SELECT COALESCE(jsonb_agg(value), '[]'::jsonb) - FROM ( - SELECT jsonb_array_elements(result) AS value - FROM merged - WHERE result IS NOT NULL - ) t - ) - END`; - } - - // Optimized merge for reasoning messages - if (reasoningMessages?.length) { - const newData = JSON.stringify(reasoningMessages); - updates.reasoning = sql` - CASE - WHEN ${messages.reasoning} IS NULL THEN ${newData}::jsonb - ELSE ( - WITH new_map AS ( - SELECT jsonb_object_agg(value->>'id', value) AS map - FROM jsonb_array_elements(${newData}::jsonb) AS value - WHERE value->>'id' IS NOT NULL - ), - merged AS ( - SELECT jsonb_agg( - CASE - WHEN new_map.map ? (existing.value->>'id') - THEN new_map.map->(existing.value->>'id') - ELSE existing.value - END - ORDER BY existing.ordinality - ) AS result - FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS existing(value, ordinality) - CROSS JOIN new_map - UNION ALL - SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality) - FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality) - CROSS JOIN new_map - WHERE NOT EXISTS ( - SELECT 1 FROM jsonb_array_elements(${messages.reasoning}) AS existing - WHERE existing.value->>'id' = new_item.value->>'id' - ) - ) - SELECT COALESCE(jsonb_agg(value), '[]'::jsonb) - FROM ( - SELECT jsonb_array_elements(result) AS value - FROM merged - WHERE result IS NOT NULL - ) t - ) - END`; - } - - // Optimized merge for raw LLM messages - simplified toolCallId comparison - if (rawLlmMessages?.length) { - const newData = JSON.stringify(rawLlmMessages); - updates.rawLlmMessages = sql` - CASE - WHEN ${messages.rawLlmMessages} IS NULL THEN ${newData}::jsonb - ELSE ( - WITH new_messages AS ( - SELECT - value, - value->>'role' AS role, - COALESCE( - (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') - FROM jsonb_array_elements(value->'content') c - WHERE c->>'toolCallId' IS NOT NULL), - '' - ) AS tool_calls - FROM jsonb_array_elements(${newData}::jsonb) AS value - ), - existing_messages AS ( - SELECT - value, - ordinality, - value->>'role' AS role, - COALESCE( - (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') - FROM jsonb_array_elements(value->'content') c - WHERE c->>'toolCallId' IS NOT NULL), - '' - ) AS tool_calls - FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality) - ) - SELECT COALESCE( - jsonb_agg(value ORDER BY ord), - '[]'::jsonb - ) - FROM ( - -- Keep existing messages that aren't being updated - SELECT e.value, e.ordinality AS ord - FROM existing_messages e - WHERE NOT EXISTS ( - SELECT 1 FROM new_messages n - WHERE n.role = e.role AND n.tool_calls = e.tool_calls - ) - UNION ALL - -- Add all new messages - SELECT n.value, 1000000 + row_number() OVER () AS ord - FROM new_messages n - ) combined - ) - END`; - } - - await db - .update(messages) - .set(updates) - .where(and(eq(messages.id, messageId), isNull(messages.deletedAt))); - - return { success: true }; - } catch (error) { - console.error('Failed to update message entries:', error); - throw new Error(`Failed to update message entries for message ${messageId}`); - } -} diff --git a/packages/database/src/queries/messages/update-message-entries.ts b/packages/database/src/queries/messages/update-message-entries.ts index 34b380f42..33fc746b2 100644 --- a/packages/database/src/queries/messages/update-message-entries.ts +++ b/packages/database/src/queries/messages/update-message-entries.ts @@ -22,12 +22,7 @@ export type UpdateMessageEntriesParams = z.infer = { updatedAt: new Date().toISOString() }; - // Optimized merge for response messages - using jsonb_object_agg for O(1) lookups + // Optimized merge for response messages if (responseMessages?.length) { const newData = JSON.stringify(responseMessages); updates.responseMessages = sql` CASE - WHEN ${messages.responseMessages} IS NULL OR jsonb_array_length(${messages.responseMessages}) = 0 - THEN ${newData}::jsonb + WHEN ${messages.responseMessages} IS NULL THEN ${newData}::jsonb ELSE ( - WITH indexed_new AS ( - SELECT jsonb_object_agg(value->>'id', value) AS lookup + WITH new_map AS ( + SELECT jsonb_object_agg(value->>'id', value) AS map FROM jsonb_array_elements(${newData}::jsonb) AS value + WHERE value->>'id' IS NOT NULL + ), + merged AS ( + SELECT jsonb_agg( + CASE + WHEN new_map.map ? (existing.value->>'id') + THEN new_map.map->(existing.value->>'id') + ELSE existing.value + END + ORDER BY existing.ordinality + ) AS result + FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS existing(value, ordinality) + CROSS JOIN new_map + UNION ALL + SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality) + FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality) + CROSS JOIN new_map + WHERE NOT EXISTS ( + SELECT 1 FROM jsonb_array_elements(${messages.responseMessages}) AS existing + WHERE existing.value->>'id' = new_item.value->>'id' + ) ) - SELECT jsonb_agg( - COALESCE( - indexed_new.lookup->(existing.value->>'id'), - existing.value - ) ORDER BY existing.ordinality - ) || - COALESCE( - (SELECT jsonb_agg(new_item.value) - FROM jsonb_array_elements(${newData}::jsonb) AS new_item - WHERE NOT EXISTS ( - SELECT 1 FROM jsonb_array_elements(${messages.responseMessages}) AS e - WHERE e.value->>'id' = new_item.value->>'id' - )), - '[]'::jsonb - ) - FROM jsonb_array_elements(${messages.responseMessages}) WITH ORDINALITY AS existing(value, ordinality) - CROSS JOIN indexed_new + SELECT COALESCE(jsonb_agg(value), '[]'::jsonb) + FROM ( + SELECT jsonb_array_elements(result) AS value + FROM merged + WHERE result IS NOT NULL + ) t ) END`; } @@ -76,86 +80,100 @@ export async function updateMessageEntries({ const newData = JSON.stringify(reasoningMessages); updates.reasoning = sql` CASE - WHEN ${messages.reasoning} IS NULL OR jsonb_array_length(${messages.reasoning}) = 0 - THEN ${newData}::jsonb + WHEN ${messages.reasoning} IS NULL THEN ${newData}::jsonb ELSE ( - WITH indexed_new AS ( - SELECT jsonb_object_agg(value->>'id', value) AS lookup + WITH new_map AS ( + SELECT jsonb_object_agg(value->>'id', value) AS map FROM jsonb_array_elements(${newData}::jsonb) AS value + WHERE value->>'id' IS NOT NULL + ), + merged AS ( + SELECT jsonb_agg( + CASE + WHEN new_map.map ? (existing.value->>'id') + THEN new_map.map->(existing.value->>'id') + ELSE existing.value + END + ORDER BY existing.ordinality + ) AS result + FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS existing(value, ordinality) + CROSS JOIN new_map + UNION ALL + SELECT jsonb_agg(new_item.value ORDER BY new_item.ordinality) + FROM jsonb_array_elements(${newData}::jsonb) WITH ORDINALITY AS new_item(value, ordinality) + CROSS JOIN new_map + WHERE NOT EXISTS ( + SELECT 1 FROM jsonb_array_elements(${messages.reasoning}) AS existing + WHERE existing.value->>'id' = new_item.value->>'id' + ) ) - SELECT jsonb_agg( - COALESCE( - indexed_new.lookup->(existing.value->>'id'), - existing.value - ) ORDER BY existing.ordinality - ) || - COALESCE( - (SELECT jsonb_agg(new_item.value) - FROM jsonb_array_elements(${newData}::jsonb) AS new_item - WHERE NOT EXISTS ( - SELECT 1 FROM jsonb_array_elements(${messages.reasoning}) AS e - WHERE e.value->>'id' = new_item.value->>'id' - )), - '[]'::jsonb - ) - FROM jsonb_array_elements(${messages.reasoning}) WITH ORDINALITY AS existing(value, ordinality) - CROSS JOIN indexed_new + SELECT COALESCE(jsonb_agg(value), '[]'::jsonb) + FROM ( + SELECT jsonb_array_elements(result) AS value + FROM merged + WHERE result IS NOT NULL + ) t ) END`; } - // Optimized merge for raw LLM messages - using efficient key generation + // Optimized merge for raw LLM messages - handles both string and array content if (rawLlmMessages?.length) { const newData = JSON.stringify(rawLlmMessages); updates.rawLlmMessages = sql` CASE - WHEN ${messages.rawLlmMessages} IS NULL OR jsonb_array_length(${messages.rawLlmMessages}) = 0 - THEN ${newData}::jsonb + WHEN ${messages.rawLlmMessages} IS NULL THEN ${newData}::jsonb ELSE ( - WITH new_with_keys AS ( + WITH new_messages AS ( SELECT value, - value->>'role' || ':' || COALESCE( - (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') - FROM jsonb_array_elements(value->'content') c - WHERE c->>'toolCallId' IS NOT NULL), - 'no-tools' - ) AS match_key + value->>'role' AS role, + COALESCE( + CASE + WHEN jsonb_typeof(value->'content') = 'array' THEN + (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') + FROM jsonb_array_elements(value->'content') c + WHERE c->>'toolCallId' IS NOT NULL) + ELSE NULL + END, + '' + ) AS tool_calls FROM jsonb_array_elements(${newData}::jsonb) AS value ), - existing_with_keys AS ( + existing_messages AS ( SELECT value, ordinality, - value->>'role' || ':' || COALESCE( - (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') - FROM jsonb_array_elements(value->'content') c - WHERE c->>'toolCallId' IS NOT NULL), - 'no-tools' - ) AS match_key + value->>'role' AS role, + COALESCE( + CASE + WHEN jsonb_typeof(value->'content') = 'array' THEN + (SELECT string_agg(c->>'toolCallId', ',' ORDER BY c->>'toolCallId') + FROM jsonb_array_elements(value->'content') c + WHERE c->>'toolCallId' IS NOT NULL) + ELSE NULL + END, + '' + ) AS tool_calls FROM jsonb_array_elements(${messages.rawLlmMessages}) WITH ORDINALITY AS t(value, ordinality) - ), - new_lookup AS ( - SELECT jsonb_object_agg(match_key, value) AS lookup - FROM new_with_keys ) - SELECT jsonb_agg( - COALESCE( - new_lookup.lookup->existing_with_keys.match_key, - existing_with_keys.value - ) ORDER BY existing_with_keys.ordinality - ) || - COALESCE( - (SELECT jsonb_agg(n.value) - FROM new_with_keys n - WHERE NOT EXISTS ( - SELECT 1 FROM existing_with_keys e - WHERE e.match_key = n.match_key - )), + SELECT COALESCE( + jsonb_agg(value ORDER BY ord), '[]'::jsonb ) - FROM existing_with_keys - CROSS JOIN new_lookup + FROM ( + -- Keep existing messages that aren't being updated + SELECT e.value, e.ordinality AS ord + FROM existing_messages e + WHERE NOT EXISTS ( + SELECT 1 FROM new_messages n + WHERE n.role = e.role AND n.tool_calls = e.tool_calls + ) + UNION ALL + -- Add all new messages + SELECT n.value, 1000000 + row_number() OVER () AS ord + FROM new_messages n + ) combined ) END`; }