Enhance message post-processing to include Slack message existence check. Updated workflow to differentiate between follow-up messages based on Slack context. Added tests to validate new functionality and ensure proper handling of Slack-related follow-ups.

This commit is contained in:
dal 2025-07-09 12:08:23 -06:00
parent 12fa022cfe
commit e2fe53261e
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
15 changed files with 73 additions and 17 deletions

View File

@ -264,7 +264,8 @@ describe('data-transformers', () => {
baseMessageContext,
baseConversationMessages,
basePreviousResults,
baseDatasets
baseDatasets,
false
);
expect(result).toEqual({
@ -274,6 +275,7 @@ describe('data-transformers', () => {
userId: 'user-123',
chatId: 'chat-123',
isFollowUp: false,
isSlackFollowUp: false,
previousMessages: [],
datasets: 'yaml content',
});
@ -291,10 +293,12 @@ describe('data-transformers', () => {
baseMessageContext,
baseConversationMessages,
previousResults,
baseDatasets
baseDatasets,
true
);
expect(result.isFollowUp).toBe(true);
expect(result.isSlackFollowUp).toBe(true);
expect(result.previousMessages).toHaveLength(1);
expect(result.previousMessages[0]).toContain('Previous assumption');
});
@ -309,13 +313,14 @@ describe('data-transformers', () => {
messageContextWithNullUser,
baseConversationMessages,
basePreviousResults,
baseDatasets
baseDatasets,
false
);
expect(result.userName).toBe('Unknown User');
});
it('should handle empty conversation history', () => {
const result = buildWorkflowInput(baseMessageContext, [], basePreviousResults, baseDatasets);
const result = buildWorkflowInput(baseMessageContext, [], basePreviousResults, baseDatasets, false);
expect(result.conversationHistory).toBeUndefined();
});
});

View File

@ -82,7 +82,8 @@ export function buildWorkflowInput(
messageContext: MessageContext,
conversationMessages: ConversationMessage[],
previousPostProcessingResults: PostProcessingResult[],
datasets: PermissionedDataset[]
datasets: PermissionedDataset[],
slackMessageExists: boolean
): PostProcessingWorkflowInput {
// Build conversation history from all messages
const conversationHistory = buildConversationHistory(conversationMessages);
@ -90,6 +91,9 @@ export function buildWorkflowInput(
// Determine if this is a follow-up
const isFollowUp = previousPostProcessingResults.length > 0;
// Determine if this is a Slack follow-up (both follow-up AND Slack message exists)
const isSlackFollowUp = isFollowUp && slackMessageExists;
// Format previous messages
const previousMessages = formatPreviousMessages(previousPostProcessingResults);
@ -103,6 +107,7 @@ export function buildWorkflowInput(
userId: messageContext.createdBy,
chatId: messageContext.chatId,
isFollowUp,
isSlackFollowUp,
previousMessages,
datasets: datasetsYaml,
};

View File

@ -17,12 +17,21 @@ vi.mock('./helpers', () => ({
buildWorkflowInput: vi.fn(),
validateMessageId: vi.fn((id) => id),
validateWorkflowOutput: vi.fn((output) => output),
getExistingSlackMessageForChat: vi.fn(),
}));
vi.mock('@buster/database', () => ({
getDb: vi.fn(),
eq: vi.fn((a, b) => ({ type: 'eq', a, b })),
messages: { id: 'messages.id' },
getBraintrustMetadata: vi.fn(() => Promise.resolve({
userName: 'John Doe',
userId: 'user-123',
organizationName: 'Test Org',
organizationId: 'org-123',
messageId: 'msg-12345',
chatId: 'chat-123',
})),
}));
vi.mock('@buster/ai/workflows/post-processing-workflow', () => ({
@ -94,6 +103,7 @@ describe('messagePostProcessingTask', () => {
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue(conversationMessages);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: false });
vi.mocked(helpers.buildWorkflowInput).mockReturnValue({
conversationHistory: [{ role: 'user', content: 'Hello' }],
userName: 'John Doe',
@ -101,6 +111,7 @@ describe('messagePostProcessingTask', () => {
userId: 'user-123',
chatId: 'chat-123',
isFollowUp: false,
isSlackFollowUp: false,
previousMessages: [],
datasets: '',
});
@ -167,6 +178,7 @@ describe('messagePostProcessingTask', () => {
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue(previousResults);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: true });
vi.mocked(helpers.buildWorkflowInput).mockReturnValue({
conversationHistory: undefined,
userName: 'John Doe',
@ -174,6 +186,7 @@ describe('messagePostProcessingTask', () => {
userId: 'user-123',
chatId: 'chat-123',
isFollowUp: true,
isSlackFollowUp: true,
previousMessages: ['{"assumptions":["Previous assumption"]}'],
datasets: '',
});
@ -216,6 +229,7 @@ describe('messagePostProcessingTask', () => {
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: false });
vi.mocked(helpers.buildWorkflowInput).mockReturnValue({
conversationHistory: undefined,
userName: 'John Doe',
@ -223,6 +237,7 @@ describe('messagePostProcessingTask', () => {
userId: 'user-123',
chatId: 'chat-123',
isFollowUp: false,
isSlackFollowUp: false,
previousMessages: [],
datasets: '',
});
@ -284,6 +299,7 @@ describe('messagePostProcessingTask', () => {
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: false });
vi.mocked(helpers.buildWorkflowInput).mockReturnValue({
conversationHistory: undefined,
userName: 'John Doe',
@ -291,6 +307,7 @@ describe('messagePostProcessingTask', () => {
userId: 'user-123',
chatId: 'chat-123',
isFollowUp: false,
isSlackFollowUp: false,
previousMessages: [],
datasets: '',
});

View File

@ -120,13 +120,19 @@ export const messagePostProcessingTask: ReturnType<
});
// Step 2: Fetch all required data concurrently
const [conversationMessages, previousPostProcessingResults, datasets, braintrustMetadata] =
await Promise.all([
fetchConversationHistory(messageContext.chatId),
fetchPreviousPostProcessingMessages(messageContext.chatId, messageContext.createdAt),
fetchUserDatasets(messageContext.createdBy),
getBraintrustMetadata({ messageId: payload.messageId }),
]);
const [
conversationMessages,
previousPostProcessingResults,
datasets,
braintrustMetadata,
existingSlackMessage,
] = await Promise.all([
fetchConversationHistory(messageContext.chatId),
fetchPreviousPostProcessingMessages(messageContext.chatId, messageContext.createdAt),
fetchUserDatasets(messageContext.createdBy),
getBraintrustMetadata({ messageId: payload.messageId }),
getExistingSlackMessageForChat(messageContext.chatId),
]);
logger.log('Fetched required data', {
messageId: payload.messageId,
@ -134,6 +140,7 @@ export const messagePostProcessingTask: ReturnType<
previousPostProcessingCount: previousPostProcessingResults.length,
datasetsCount: datasets.length,
braintrustMetadata, // Log the metadata to verify it's working
slackMessageExists: existingSlackMessage?.exists || false,
});
// Step 3: Build workflow input
@ -141,12 +148,14 @@ export const messagePostProcessingTask: ReturnType<
messageContext,
conversationMessages,
previousPostProcessingResults,
datasets
datasets,
existingSlackMessage?.exists || false
);
logger.log('Built workflow input', {
messageId: payload.messageId,
isFollowUp: workflowInput.isFollowUp,
isSlackFollowUp: workflowInput.isSlackFollowUp,
previousMessagesCount: workflowInput.previousMessages.length,
hasConversationHistory: !!workflowInput.conversationHistory,
datasetsLength: workflowInput.datasets.length,

View File

@ -315,6 +315,7 @@ describe('combine-parallel-results-step integration', () => {
userId: 'user_67890',
chatId: 'chat_abcde',
isFollowUp: false,
isSlackFollowUp: false,
previousMessages: [],
datasets: 'name: product\ndescription: Product catalog information',
toolCalled: 'noIssuesFound',
@ -329,6 +330,7 @@ describe('combine-parallel-results-step integration', () => {
userId: 'user_67890',
chatId: 'chat_abcde',
isFollowUp: false,
isSlackFollowUp: false,
previousMessages: [],
datasets: 'name: product\ndescription: Product catalog information',
toolCalled: 'listAssumptionsResponse',

View File

@ -19,6 +19,7 @@ export const combineParallelResultsOutputSchema = z.object({
userId: z.string().describe('User ID for the current operation'),
chatId: z.string().describe('Chat ID for the current operation'),
isFollowUp: z.boolean().describe('Whether this is a follow-up message'),
isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'),
previousMessages: z.array(z.string()).describe('Array of previous messages for context'),
datasets: z.string().describe('Assembled YAML content of all available datasets for context'),
@ -87,6 +88,7 @@ export const combineParallelResultsStepExecution = async ({
userId: flagChatResult.userId,
chatId: flagChatResult.chatId,
isFollowUp: flagChatResult.isFollowUp,
isSlackFollowUp: flagChatResult.isSlackFollowUp,
previousMessages: flagChatResult.previousMessages,
datasets: flagChatResult.datasets,

View File

@ -313,6 +313,7 @@ describe('flag-chat-step integration', () => {
userId: 'user_67890',
chatId: 'chat_abcde',
isFollowUp: false,
isSlackFollowUp: false,
previousMessages: [],
datasets:
'name: product\ndescription: Product catalog information\ntables:\n - name: product\n description: Product information including bikes and accessories\n - name: sales_order_header\n description: Sales order header information\n - name: credit_card\n description: Credit card information',

View File

@ -15,6 +15,7 @@ const inputSchema = z.object({
userId: z.string().describe('User ID for the current operation'),
chatId: z.string().describe('Chat ID for the current operation'),
isFollowUp: z.boolean().describe('Whether this is a follow-up message'),
isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'),
previousMessages: z.array(z.string()).describe('Array of previous messages for context'),
datasets: z.string().describe('Assembled YAML content of all available datasets for context'),
});
@ -27,6 +28,7 @@ export const flagChatOutputSchema = z.object({
userId: z.string().describe('User ID for the current operation'),
chatId: z.string().describe('Chat ID for the current operation'),
isFollowUp: z.boolean().describe('Whether this is a follow-up message'),
isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'),
previousMessages: z.array(z.string()).describe('Array of previous messages for context'),
datasets: z.string().describe('Assembled YAML content of all available datasets for context'),

View File

@ -56,6 +56,7 @@ describe('format-follow-up-message-step integration', () => {
userId: 'user_67890',
chatId: 'chat_abcde',
isFollowUp: true,
isSlackFollowUp: true,
previousMessages: [
'Mountain-500 Series Analysis: Found assumptions about stock bike interpretation and geographic boundaries that require data team review.',
],

View File

@ -56,6 +56,7 @@ describe('format-initial-message-step integration', () => {
userId: 'user_67890',
chatId: 'chat_abcde',
isFollowUp: false,
isSlackFollowUp: false,
previousMessages: [],
datasets:
'name: product\ndescription: Product catalog information\ntables:\n - name: product\n description: Product information including bikes and accessories\n - name: sales_order_header\n description: Sales order header information\n - name: credit_card\n description: Credit card information',
@ -149,6 +150,7 @@ describe('format-initial-message-step integration', () => {
userId: 'user_67890',
chatId: 'chat_abcde',
isFollowUp: false,
isSlackFollowUp: false,
previousMessages: [],
datasets: 'name: sales\ndescription: Sales data',

View File

@ -286,6 +286,7 @@ describe('identify-assumptions-step integration', () => {
userId: 'user_67890',
chatId: 'chat_abcde',
isFollowUp: false,
isSlackFollowUp: false,
previousMessages: [],
datasets:
'name: product\ndescription: Product catalog information\ntables:\n - name: product\n description: Product information including bikes and accessories\n columns:\n - name: name\n description: Product name\n - name: finishedgoodsflag\n description: Indicates if finished and ready for sale\n - name: sales_order_header\n description: Sales order header information\n columns:\n - name: onlineorderflag\n description: Boolean indicating if order was placed online\n - name: credit_card\n description: Credit card information\n columns:\n - name: cardtype\n description: Type of credit card',

View File

@ -18,6 +18,7 @@ const inputSchema = z.object({
userId: z.string().describe('User ID for the current operation'),
chatId: z.string().describe('Chat ID for the current operation'),
isFollowUp: z.boolean().describe('Whether this is a follow-up message'),
isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'),
previousMessages: z.array(z.string()).describe('Array of previous messages for context'),
datasets: z.string().describe('Assembled YAML content of all available datasets for context'),
});
@ -30,6 +31,7 @@ export const identifyAssumptionsOutputSchema = z.object({
userId: z.string().describe('User ID for the current operation'),
chatId: z.string().describe('Chat ID for the current operation'),
isFollowUp: z.boolean().describe('Whether this is a follow-up message'),
isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'),
previousMessages: z.array(z.string()).describe('Array of previous messages for context'),
datasets: z.string().describe('Assembled YAML content of all available datasets for context'),

View File

@ -9,6 +9,7 @@ export const postProcessingWorkflowInputSchema = z.object({
userId: z.string().describe('User ID for the current operation'),
chatId: z.string().describe('Chat ID for the current operation'),
isFollowUp: z.boolean().describe('Whether this is a follow-up message'),
isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'),
previousMessages: z.array(z.string()).describe('Array of the previous post-processing messages'),
datasets: z.string().describe('Assembled YAML content of all available datasets for context'),
});
@ -22,6 +23,7 @@ export const postProcessingWorkflowOutputSchema = z.object({
userId: z.string().describe('User ID for the current operation'),
chatId: z.string().describe('Chat ID for the current operation'),
isFollowUp: z.boolean().describe('Whether this is a follow-up message'),
isSlackFollowUp: z.boolean().describe('Whether this is a follow-up message for an existing Slack thread'),
previousMessages: z.array(z.string()).describe('Array of the previous post-processing messages'),
datasets: z.string().describe('Assembled YAML content of all available datasets for context'),

View File

@ -328,6 +328,7 @@ describe('Post-Processing Workflow Integration Tests', () => {
userId: 'user_67890',
chatId: 'chat_abcde',
isFollowUp: false,
isSlackFollowUp: false,
previousMessages: [],
datasets:
'name: product\ndescription: Product catalog information\ntables:\n - name: product\n description: Product information including bikes and accessories\n columns:\n - name: name\n description: Product name\n - name: finishedgoodsflag\n description: Indicates if finished and ready for sale\n - name: sales_order_header\n description: Sales order header information\n columns:\n - name: onlineorderflag\n description: Boolean indicating if order was placed online\n - name: credit_card\n description: Credit card information\n columns:\n - name: cardtype\n description: Type of credit card',

View File

@ -21,10 +21,14 @@ const postProcessingWorkflow = createWorkflow({
.parallel([flagChatStep, identifyAssumptionsStep])
.then(combineParallelResultsStep)
.branch([
// Branch for follow-up messages
[async ({ inputData }) => inputData?.isFollowUp === true, formatFollowUpMessageStep],
// Branch for initial messages
[async ({ inputData }) => inputData?.isFollowUp === false, formatInitialMessageStep],
// Branch for follow-up messages (only if it's both a follow-up AND Slack message exists)
[
async ({ inputData }) =>
inputData?.isFollowUp === true && inputData?.isSlackFollowUp === true,
formatFollowUpMessageStep,
],
// Otherwise use initial message format
[async () => true, formatInitialMessageStep],
])
.commit();