From e2fe53261e00f4a78e1de6934bcb15b5b7adad96 Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 9 Jul 2025 12:08:23 -0600 Subject: [PATCH] Enhance message post-processing to include Slack message existence check. Updated workflow to differentiate between follow-up messages based on Slack context. Added tests to validate new functionality and ensure proper handling of Slack-related follow-ups. --- .../helpers/data-transformers.test.ts | 13 +++++++--- .../helpers/data-transformers.ts | 7 +++++- .../message-post-processing.test.ts | 17 +++++++++++++ .../message-post-processing.ts | 25 +++++++++++++------ .../combine-parallel-results-step.int.test.ts | 2 ++ .../combine-parallel-results-step.ts | 2 ++ .../flag-chat-step.int.test.ts | 1 + .../steps/post-processing/flag-chat-step.ts | 2 ++ .../format-follow-up-message-step.int.test.ts | 1 + .../format-initial-message-step.int.test.ts | 2 ++ .../identify-assumptions-step.int.test.ts | 1 + .../identify-assumptions-step.ts | 2 ++ .../ai/src/steps/post-processing/schemas.ts | 2 ++ .../post-processing-workflow.int.test.ts | 1 + .../src/workflows/post-processing-workflow.ts | 12 ++++++--- 15 files changed, 73 insertions(+), 17 deletions(-) diff --git a/apps/trigger/src/tasks/message-post-processing/helpers/data-transformers.test.ts b/apps/trigger/src/tasks/message-post-processing/helpers/data-transformers.test.ts index f15e89a38..a78d06dc8 100644 --- a/apps/trigger/src/tasks/message-post-processing/helpers/data-transformers.test.ts +++ b/apps/trigger/src/tasks/message-post-processing/helpers/data-transformers.test.ts @@ -264,7 +264,8 @@ describe('data-transformers', () => { baseMessageContext, baseConversationMessages, basePreviousResults, - baseDatasets + baseDatasets, + false ); expect(result).toEqual({ @@ -274,6 +275,7 @@ describe('data-transformers', () => { userId: 'user-123', chatId: 'chat-123', isFollowUp: false, + isSlackFollowUp: false, previousMessages: [], datasets: 'yaml content', }); @@ -291,10 +293,12 @@ describe('data-transformers', () => { baseMessageContext, baseConversationMessages, previousResults, - baseDatasets + baseDatasets, + true ); expect(result.isFollowUp).toBe(true); + expect(result.isSlackFollowUp).toBe(true); expect(result.previousMessages).toHaveLength(1); expect(result.previousMessages[0]).toContain('Previous assumption'); }); @@ -309,13 +313,14 @@ describe('data-transformers', () => { messageContextWithNullUser, baseConversationMessages, basePreviousResults, - baseDatasets + baseDatasets, + false ); expect(result.userName).toBe('Unknown User'); }); it('should handle empty conversation history', () => { - const result = buildWorkflowInput(baseMessageContext, [], basePreviousResults, baseDatasets); + const result = buildWorkflowInput(baseMessageContext, [], basePreviousResults, baseDatasets, false); expect(result.conversationHistory).toBeUndefined(); }); }); diff --git a/apps/trigger/src/tasks/message-post-processing/helpers/data-transformers.ts b/apps/trigger/src/tasks/message-post-processing/helpers/data-transformers.ts index 230cc34b2..bd4f43fc1 100644 --- a/apps/trigger/src/tasks/message-post-processing/helpers/data-transformers.ts +++ b/apps/trigger/src/tasks/message-post-processing/helpers/data-transformers.ts @@ -82,7 +82,8 @@ export function buildWorkflowInput( messageContext: MessageContext, conversationMessages: ConversationMessage[], previousPostProcessingResults: PostProcessingResult[], - datasets: PermissionedDataset[] + datasets: PermissionedDataset[], + slackMessageExists: boolean ): PostProcessingWorkflowInput { // Build conversation history from all messages const conversationHistory = buildConversationHistory(conversationMessages); @@ -90,6 +91,9 @@ export function buildWorkflowInput( // Determine if this is a follow-up const isFollowUp = previousPostProcessingResults.length > 0; + // Determine if this is a Slack follow-up (both follow-up AND Slack message exists) + const isSlackFollowUp = isFollowUp && slackMessageExists; + // Format previous messages const previousMessages = formatPreviousMessages(previousPostProcessingResults); @@ -103,6 +107,7 @@ export function buildWorkflowInput( userId: messageContext.createdBy, chatId: messageContext.chatId, isFollowUp, + isSlackFollowUp, previousMessages, datasets: datasetsYaml, }; diff --git a/apps/trigger/src/tasks/message-post-processing/message-post-processing.test.ts b/apps/trigger/src/tasks/message-post-processing/message-post-processing.test.ts index ea5a336bb..497b16f89 100644 --- a/apps/trigger/src/tasks/message-post-processing/message-post-processing.test.ts +++ b/apps/trigger/src/tasks/message-post-processing/message-post-processing.test.ts @@ -17,12 +17,21 @@ vi.mock('./helpers', () => ({ buildWorkflowInput: vi.fn(), validateMessageId: vi.fn((id) => id), validateWorkflowOutput: vi.fn((output) => output), + getExistingSlackMessageForChat: vi.fn(), })); vi.mock('@buster/database', () => ({ getDb: vi.fn(), eq: vi.fn((a, b) => ({ type: 'eq', a, b })), messages: { id: 'messages.id' }, + getBraintrustMetadata: vi.fn(() => Promise.resolve({ + userName: 'John Doe', + userId: 'user-123', + organizationName: 'Test Org', + organizationId: 'org-123', + messageId: 'msg-12345', + chatId: 'chat-123', + })), })); vi.mock('@buster/ai/workflows/post-processing-workflow', () => ({ @@ -94,6 +103,7 @@ describe('messagePostProcessingTask', () => { vi.mocked(helpers.fetchConversationHistory).mockResolvedValue(conversationMessages); vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]); vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]); + vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: false }); vi.mocked(helpers.buildWorkflowInput).mockReturnValue({ conversationHistory: [{ role: 'user', content: 'Hello' }], userName: 'John Doe', @@ -101,6 +111,7 @@ describe('messagePostProcessingTask', () => { userId: 'user-123', chatId: 'chat-123', isFollowUp: false, + isSlackFollowUp: false, previousMessages: [], datasets: '', }); @@ -167,6 +178,7 @@ describe('messagePostProcessingTask', () => { vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]); vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue(previousResults); vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]); + vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: true }); vi.mocked(helpers.buildWorkflowInput).mockReturnValue({ conversationHistory: undefined, userName: 'John Doe', @@ -174,6 +186,7 @@ describe('messagePostProcessingTask', () => { userId: 'user-123', chatId: 'chat-123', isFollowUp: true, + isSlackFollowUp: true, previousMessages: ['{"assumptions":["Previous assumption"]}'], datasets: '', }); @@ -216,6 +229,7 @@ describe('messagePostProcessingTask', () => { vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]); vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]); vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]); + vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: false }); vi.mocked(helpers.buildWorkflowInput).mockReturnValue({ conversationHistory: undefined, userName: 'John Doe', @@ -223,6 +237,7 @@ describe('messagePostProcessingTask', () => { userId: 'user-123', chatId: 'chat-123', isFollowUp: false, + isSlackFollowUp: false, previousMessages: [], datasets: '', }); @@ -284,6 +299,7 @@ describe('messagePostProcessingTask', () => { vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]); vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]); vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]); + vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: false }); vi.mocked(helpers.buildWorkflowInput).mockReturnValue({ conversationHistory: undefined, userName: 'John Doe', @@ -291,6 +307,7 @@ describe('messagePostProcessingTask', () => { userId: 'user-123', chatId: 'chat-123', isFollowUp: false, + isSlackFollowUp: false, previousMessages: [], datasets: '', }); diff --git a/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts b/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts index 23964fc1e..9403b47ca 100644 --- a/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts +++ b/apps/trigger/src/tasks/message-post-processing/message-post-processing.ts @@ -120,13 +120,19 @@ export const messagePostProcessingTask: ReturnType< }); // Step 2: Fetch all required data concurrently - const [conversationMessages, previousPostProcessingResults, datasets, braintrustMetadata] = - await Promise.all([ - fetchConversationHistory(messageContext.chatId), - fetchPreviousPostProcessingMessages(messageContext.chatId, messageContext.createdAt), - fetchUserDatasets(messageContext.createdBy), - getBraintrustMetadata({ messageId: payload.messageId }), - ]); + const [ + conversationMessages, + previousPostProcessingResults, + datasets, + braintrustMetadata, + existingSlackMessage, + ] = await Promise.all([ + fetchConversationHistory(messageContext.chatId), + fetchPreviousPostProcessingMessages(messageContext.chatId, messageContext.createdAt), + fetchUserDatasets(messageContext.createdBy), + getBraintrustMetadata({ messageId: payload.messageId }), + getExistingSlackMessageForChat(messageContext.chatId), + ]); logger.log('Fetched required data', { messageId: payload.messageId, @@ -134,6 +140,7 @@ export const messagePostProcessingTask: ReturnType< previousPostProcessingCount: previousPostProcessingResults.length, datasetsCount: datasets.length, braintrustMetadata, // Log the metadata to verify it's working + slackMessageExists: existingSlackMessage?.exists || false, }); // Step 3: Build workflow input @@ -141,12 +148,14 @@ export const messagePostProcessingTask: ReturnType< messageContext, conversationMessages, previousPostProcessingResults, - datasets + datasets, + existingSlackMessage?.exists || false ); logger.log('Built workflow input', { messageId: payload.messageId, isFollowUp: workflowInput.isFollowUp, + isSlackFollowUp: workflowInput.isSlackFollowUp, previousMessagesCount: workflowInput.previousMessages.length, hasConversationHistory: !!workflowInput.conversationHistory, datasetsLength: workflowInput.datasets.length, diff --git a/packages/ai/src/steps/post-processing/combine-parallel-results-step.int.test.ts b/packages/ai/src/steps/post-processing/combine-parallel-results-step.int.test.ts index a65d2ead2..e212e5746 100644 --- a/packages/ai/src/steps/post-processing/combine-parallel-results-step.int.test.ts +++ b/packages/ai/src/steps/post-processing/combine-parallel-results-step.int.test.ts @@ -315,6 +315,7 @@ describe('combine-parallel-results-step integration', () => { userId: 'user_67890', chatId: 'chat_abcde', isFollowUp: false, + isSlackFollowUp: false, previousMessages: [], datasets: 'name: product\ndescription: Product catalog information', toolCalled: 'noIssuesFound', @@ -329,6 +330,7 @@ describe('combine-parallel-results-step integration', () => { userId: 'user_67890', chatId: 'chat_abcde', isFollowUp: false, + isSlackFollowUp: false, previousMessages: [], datasets: 'name: product\ndescription: Product catalog information', toolCalled: 'listAssumptionsResponse', diff --git a/packages/ai/src/steps/post-processing/combine-parallel-results-step.ts b/packages/ai/src/steps/post-processing/combine-parallel-results-step.ts index 21d9128dd..d66f95687 100644 --- a/packages/ai/src/steps/post-processing/combine-parallel-results-step.ts +++ b/packages/ai/src/steps/post-processing/combine-parallel-results-step.ts @@ -19,6 +19,7 @@ export const combineParallelResultsOutputSchema = z.object({ userId: z.string().describe('User ID for the current operation'), chatId: z.string().describe('Chat ID for the current operation'), isFollowUp: z.boolean().describe('Whether this is a follow-up message'), + isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'), previousMessages: z.array(z.string()).describe('Array of previous messages for context'), datasets: z.string().describe('Assembled YAML content of all available datasets for context'), @@ -87,6 +88,7 @@ export const combineParallelResultsStepExecution = async ({ userId: flagChatResult.userId, chatId: flagChatResult.chatId, isFollowUp: flagChatResult.isFollowUp, + isSlackFollowUp: flagChatResult.isSlackFollowUp, previousMessages: flagChatResult.previousMessages, datasets: flagChatResult.datasets, diff --git a/packages/ai/src/steps/post-processing/flag-chat-step.int.test.ts b/packages/ai/src/steps/post-processing/flag-chat-step.int.test.ts index f6137cdbe..0519b349e 100644 --- a/packages/ai/src/steps/post-processing/flag-chat-step.int.test.ts +++ b/packages/ai/src/steps/post-processing/flag-chat-step.int.test.ts @@ -313,6 +313,7 @@ describe('flag-chat-step integration', () => { userId: 'user_67890', chatId: 'chat_abcde', isFollowUp: false, + isSlackFollowUp: false, previousMessages: [], datasets: 'name: product\ndescription: Product catalog information\ntables:\n - name: product\n description: Product information including bikes and accessories\n - name: sales_order_header\n description: Sales order header information\n - name: credit_card\n description: Credit card information', diff --git a/packages/ai/src/steps/post-processing/flag-chat-step.ts b/packages/ai/src/steps/post-processing/flag-chat-step.ts index 03550a067..4af76b893 100644 --- a/packages/ai/src/steps/post-processing/flag-chat-step.ts +++ b/packages/ai/src/steps/post-processing/flag-chat-step.ts @@ -15,6 +15,7 @@ const inputSchema = z.object({ userId: z.string().describe('User ID for the current operation'), chatId: z.string().describe('Chat ID for the current operation'), isFollowUp: z.boolean().describe('Whether this is a follow-up message'), + isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'), previousMessages: z.array(z.string()).describe('Array of previous messages for context'), datasets: z.string().describe('Assembled YAML content of all available datasets for context'), }); @@ -27,6 +28,7 @@ export const flagChatOutputSchema = z.object({ userId: z.string().describe('User ID for the current operation'), chatId: z.string().describe('Chat ID for the current operation'), isFollowUp: z.boolean().describe('Whether this is a follow-up message'), + isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'), previousMessages: z.array(z.string()).describe('Array of previous messages for context'), datasets: z.string().describe('Assembled YAML content of all available datasets for context'), diff --git a/packages/ai/src/steps/post-processing/format-follow-up-message-step.int.test.ts b/packages/ai/src/steps/post-processing/format-follow-up-message-step.int.test.ts index 5915cb6f3..8e2dea4a8 100644 --- a/packages/ai/src/steps/post-processing/format-follow-up-message-step.int.test.ts +++ b/packages/ai/src/steps/post-processing/format-follow-up-message-step.int.test.ts @@ -56,6 +56,7 @@ describe('format-follow-up-message-step integration', () => { userId: 'user_67890', chatId: 'chat_abcde', isFollowUp: true, + isSlackFollowUp: true, previousMessages: [ 'Mountain-500 Series Analysis: Found assumptions about stock bike interpretation and geographic boundaries that require data team review.', ], diff --git a/packages/ai/src/steps/post-processing/format-initial-message-step.int.test.ts b/packages/ai/src/steps/post-processing/format-initial-message-step.int.test.ts index 2a22c1d4c..07d18effb 100644 --- a/packages/ai/src/steps/post-processing/format-initial-message-step.int.test.ts +++ b/packages/ai/src/steps/post-processing/format-initial-message-step.int.test.ts @@ -56,6 +56,7 @@ describe('format-initial-message-step integration', () => { userId: 'user_67890', chatId: 'chat_abcde', isFollowUp: false, + isSlackFollowUp: false, previousMessages: [], datasets: 'name: product\ndescription: Product catalog information\ntables:\n - name: product\n description: Product information including bikes and accessories\n - name: sales_order_header\n description: Sales order header information\n - name: credit_card\n description: Credit card information', @@ -149,6 +150,7 @@ describe('format-initial-message-step integration', () => { userId: 'user_67890', chatId: 'chat_abcde', isFollowUp: false, + isSlackFollowUp: false, previousMessages: [], datasets: 'name: sales\ndescription: Sales data', diff --git a/packages/ai/src/steps/post-processing/identify-assumptions-step.int.test.ts b/packages/ai/src/steps/post-processing/identify-assumptions-step.int.test.ts index d24c97d42..0a1d59558 100644 --- a/packages/ai/src/steps/post-processing/identify-assumptions-step.int.test.ts +++ b/packages/ai/src/steps/post-processing/identify-assumptions-step.int.test.ts @@ -286,6 +286,7 @@ describe('identify-assumptions-step integration', () => { userId: 'user_67890', chatId: 'chat_abcde', isFollowUp: false, + isSlackFollowUp: false, previousMessages: [], datasets: 'name: product\ndescription: Product catalog information\ntables:\n - name: product\n description: Product information including bikes and accessories\n columns:\n - name: name\n description: Product name\n - name: finishedgoodsflag\n description: Indicates if finished and ready for sale\n - name: sales_order_header\n description: Sales order header information\n columns:\n - name: onlineorderflag\n description: Boolean indicating if order was placed online\n - name: credit_card\n description: Credit card information\n columns:\n - name: cardtype\n description: Type of credit card', diff --git a/packages/ai/src/steps/post-processing/identify-assumptions-step.ts b/packages/ai/src/steps/post-processing/identify-assumptions-step.ts index 2eeaf79e2..a9b7ff15f 100644 --- a/packages/ai/src/steps/post-processing/identify-assumptions-step.ts +++ b/packages/ai/src/steps/post-processing/identify-assumptions-step.ts @@ -18,6 +18,7 @@ const inputSchema = z.object({ userId: z.string().describe('User ID for the current operation'), chatId: z.string().describe('Chat ID for the current operation'), isFollowUp: z.boolean().describe('Whether this is a follow-up message'), + isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'), previousMessages: z.array(z.string()).describe('Array of previous messages for context'), datasets: z.string().describe('Assembled YAML content of all available datasets for context'), }); @@ -30,6 +31,7 @@ export const identifyAssumptionsOutputSchema = z.object({ userId: z.string().describe('User ID for the current operation'), chatId: z.string().describe('Chat ID for the current operation'), isFollowUp: z.boolean().describe('Whether this is a follow-up message'), + isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'), previousMessages: z.array(z.string()).describe('Array of previous messages for context'), datasets: z.string().describe('Assembled YAML content of all available datasets for context'), diff --git a/packages/ai/src/steps/post-processing/schemas.ts b/packages/ai/src/steps/post-processing/schemas.ts index 90ed5fa4e..143bf907f 100644 --- a/packages/ai/src/steps/post-processing/schemas.ts +++ b/packages/ai/src/steps/post-processing/schemas.ts @@ -9,6 +9,7 @@ export const postProcessingWorkflowInputSchema = z.object({ userId: z.string().describe('User ID for the current operation'), chatId: z.string().describe('Chat ID for the current operation'), isFollowUp: z.boolean().describe('Whether this is a follow-up message'), + isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'), previousMessages: z.array(z.string()).describe('Array of the previous post-processing messages'), datasets: z.string().describe('Assembled YAML content of all available datasets for context'), }); @@ -22,6 +23,7 @@ export const postProcessingWorkflowOutputSchema = z.object({ userId: z.string().describe('User ID for the current operation'), chatId: z.string().describe('Chat ID for the current operation'), isFollowUp: z.boolean().describe('Whether this is a follow-up message'), + isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'), previousMessages: z.array(z.string()).describe('Array of the previous post-processing messages'), datasets: z.string().describe('Assembled YAML content of all available datasets for context'), diff --git a/packages/ai/src/workflows/post-processing-workflow.int.test.ts b/packages/ai/src/workflows/post-processing-workflow.int.test.ts index 028204da6..5a85d2afd 100644 --- a/packages/ai/src/workflows/post-processing-workflow.int.test.ts +++ b/packages/ai/src/workflows/post-processing-workflow.int.test.ts @@ -328,6 +328,7 @@ describe('Post-Processing Workflow Integration Tests', () => { userId: 'user_67890', chatId: 'chat_abcde', isFollowUp: false, + isSlackFollowUp: false, previousMessages: [], datasets: 'name: product\ndescription: Product catalog information\ntables:\n - name: product\n description: Product information including bikes and accessories\n columns:\n - name: name\n description: Product name\n - name: finishedgoodsflag\n description: Indicates if finished and ready for sale\n - name: sales_order_header\n description: Sales order header information\n columns:\n - name: onlineorderflag\n description: Boolean indicating if order was placed online\n - name: credit_card\n description: Credit card information\n columns:\n - name: cardtype\n description: Type of credit card', diff --git a/packages/ai/src/workflows/post-processing-workflow.ts b/packages/ai/src/workflows/post-processing-workflow.ts index 4143d9b31..92765cfbb 100644 --- a/packages/ai/src/workflows/post-processing-workflow.ts +++ b/packages/ai/src/workflows/post-processing-workflow.ts @@ -21,10 +21,14 @@ const postProcessingWorkflow = createWorkflow({ .parallel([flagChatStep, identifyAssumptionsStep]) .then(combineParallelResultsStep) .branch([ - // Branch for follow-up messages - [async ({ inputData }) => inputData?.isFollowUp === true, formatFollowUpMessageStep], - // Branch for initial messages - [async ({ inputData }) => inputData?.isFollowUp === false, formatInitialMessageStep], + // Branch for follow-up messages (only if it's both a follow-up AND Slack message exists) + [ + async ({ inputData }) => + inputData?.isFollowUp === true && inputData?.isSlackFollowUp === true, + formatFollowUpMessageStep, + ], + // Otherwise use initial message format + [async () => true, formatInitialMessageStep], ]) .commit();