message post processing bugfix no longer loading duplicate conversation history

This commit is contained in:
dal 2025-07-21 12:34:34 -06:00
parent 8bb4e4ad8e
commit cee4483751
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
8 changed files with 148 additions and 274 deletions

View File

@ -14,7 +14,7 @@
"test:coverage": "vitest run --coverage",
"test:integration": "vitest run **/*.int.test.ts **/*.integration.test.ts",
"test:integration:watch": "vitest watch tests/integration",
"test:unit": "vitest run tests/unit",
"test:unit": "vitest run --exclude '**/*.int.test.ts' --exclude '**/*.integration.test.ts'",
"typecheck": "tsc --noEmit"
},
"dependencies": {

View File

@ -1,8 +1,6 @@
import type { CoreMessage } from 'ai';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import type { MessageContext } from '../types';
import {
buildConversationHistory,
buildWorkflowInput,
concatenateDatasets,
formatPreviousMessages,
@ -14,78 +12,6 @@ beforeEach(() => {
});
describe('data-transformers', () => {
describe('buildConversationHistory', () => {
it('should combine multiple message arrays correctly', () => {
const messages = [
{
id: '1',
rawLlmMessages: [
{ role: 'user', content: 'Hello' } as CoreMessage,
{ role: 'assistant', content: 'Hi there' } as CoreMessage,
],
createdAt: new Date('2024-01-01T00:00:00Z'),
},
{
id: '2',
rawLlmMessages: [
{ role: 'user', content: 'How are you?' } as CoreMessage,
{ role: 'assistant', content: 'I am fine' } as CoreMessage,
],
createdAt: new Date('2024-01-01T00:01:00Z'),
},
];
const result = buildConversationHistory(messages);
expect(result).toHaveLength(4);
expect(result?.[0]).toEqual({ role: 'user', content: 'Hello' });
expect(result?.[3]).toEqual({ role: 'assistant', content: 'I am fine' });
});
it('should handle empty messages array', () => {
const result = buildConversationHistory([]);
expect(result).toBeUndefined();
});
it('should skip messages with null rawLlmMessages', () => {
const messages = [
{
id: '1',
rawLlmMessages: null as any,
createdAt: new Date(),
},
{
id: '2',
rawLlmMessages: [{ role: 'user', content: 'Test' } as CoreMessage],
createdAt: new Date(),
},
];
const result = buildConversationHistory(messages);
expect(result).toHaveLength(1);
expect(result?.[0]).toEqual({ role: 'user', content: 'Test' });
});
it('should handle non-array rawLlmMessages gracefully', () => {
const messages = [
{
id: '1',
rawLlmMessages: 'invalid data' as any,
createdAt: new Date(),
},
{
id: '2',
rawLlmMessages: [{ role: 'user', content: 'Valid message' } as CoreMessage],
createdAt: new Date(),
},
];
const result = buildConversationHistory(messages);
expect(result).toHaveLength(1);
expect(result?.[0]).toEqual({ role: 'user', content: 'Valid message' });
});
});
describe('formatPreviousMessages', () => {
it('should extract string representation correctly', () => {
const results = [
@ -233,18 +159,11 @@ describe('data-transformers', () => {
chatId: 'chat-123',
createdBy: 'user-123',
createdAt: new Date(),
rawLlmMessages: [{ role: 'user', content: 'Hello' }] as any,
userName: 'John Doe',
organizationId: 'org-123',
};
const baseConversationMessages = [
{
id: '1',
rawLlmMessages: [{ role: 'user', content: 'Hello' }] as any,
createdAt: new Date(),
},
];
const basePreviousResults: any[] = [];
const baseDatasets = [
@ -262,7 +181,6 @@ describe('data-transformers', () => {
it('should build complete workflow input for initial message', () => {
const result = buildWorkflowInput(
baseMessageContext,
baseConversationMessages,
basePreviousResults,
baseDatasets,
false
@ -289,13 +207,7 @@ describe('data-transformers', () => {
},
];
const result = buildWorkflowInput(
baseMessageContext,
baseConversationMessages,
previousResults,
baseDatasets,
true
);
const result = buildWorkflowInput(baseMessageContext, previousResults, baseDatasets, true);
expect(result.isFollowUp).toBe(true);
expect(result.isSlackFollowUp).toBe(true);
@ -311,7 +223,6 @@ describe('data-transformers', () => {
const result = buildWorkflowInput(
messageContextWithNullUser,
baseConversationMessages,
basePreviousResults,
baseDatasets,
false
@ -320,14 +231,17 @@ describe('data-transformers', () => {
});
it('should handle empty conversation history', () => {
const messageContextNoHistory = {
...baseMessageContext,
rawLlmMessages: [] as any,
};
const result = buildWorkflowInput(
baseMessageContext,
[],
messageContextNoHistory,
basePreviousResults,
baseDatasets,
false
);
expect(result.conversationHistory).toBeUndefined();
expect(result.conversationHistory).toEqual([]);
});
});
});

View File

@ -1,44 +1,7 @@
import type { PermissionedDataset } from '@buster/access-controls';
import type { MessageHistory } from '@buster/ai/utils/memory/types';
import type { PostProcessingWorkflowInput } from '@buster/ai/workflows/post-processing-workflow';
import type { CoreMessage } from 'ai';
import type { ConversationMessage, MessageContext, PostProcessingResult } from '../types';
/**
* Combine raw LLM messages from multiple messages into a single conversation history
*/
export function buildConversationHistory(
messages: ConversationMessage[]
): MessageHistory | undefined {
if (messages.length === 0) {
return undefined;
}
const allMessages: CoreMessage[] = [];
for (const message of messages) {
if (!message.rawLlmMessages) {
continue;
}
try {
// rawLlmMessages from the database is already CoreMessage[]
if (Array.isArray(message.rawLlmMessages)) {
allMessages.push(...message.rawLlmMessages);
}
} catch (_error) {
// Skip messages that can't be parsed
// Continue with other messages
}
}
if (allMessages.length === 0) {
return undefined;
}
// Return as MessageHistory which is CoreMessage[]
return allMessages as MessageHistory;
}
import type { MessageContext, PostProcessingResult } from '../types';
/**
* Extract post-processing messages as string array
@ -80,13 +43,12 @@ export function concatenateDatasets(datasets: PermissionedDataset[]): string {
*/
export function buildWorkflowInput(
messageContext: MessageContext,
conversationMessages: ConversationMessage[],
previousPostProcessingResults: PostProcessingResult[],
datasets: PermissionedDataset[],
slackMessageExists: boolean
): PostProcessingWorkflowInput {
// Build conversation history from all messages
const conversationHistory = buildConversationHistory(conversationMessages);
// Use conversation history directly from the message context
const conversationHistory = messageContext.rawLlmMessages as MessageHistory;
// Determine if this is a follow-up
const isFollowUp = previousPostProcessingResults.length > 0;

View File

@ -3,7 +3,6 @@ import * as database from '@buster/database';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { DataFetchError, MessageNotFoundError } from '../types';
import {
fetchConversationHistory,
fetchMessageWithContext,
fetchPreviousPostProcessingMessages,
fetchUserDatasets,
@ -15,9 +14,18 @@ vi.mock('@buster/database', () => ({
and: vi.fn((...args) => ({ type: 'and', args })),
eq: vi.fn((a, b) => ({ type: 'eq', a, b })),
lt: vi.fn((a, b) => ({ type: 'lt', a, b })),
lte: vi.fn((a, b) => ({ type: 'lte', a, b })),
isNull: vi.fn((a) => ({ type: 'isNull', a })),
isNotNull: vi.fn((a) => ({ type: 'isNotNull', a })),
messages: { id: 'messages.id', chatId: 'messages.chatId', createdBy: 'messages.createdBy' },
messages: {
id: 'messages.id',
chatId: 'messages.chatId',
createdBy: 'messages.createdBy',
createdAt: 'messages.createdAt',
postProcessingMessage: 'messages.postProcessingMessage',
deletedAt: 'messages.deletedAt',
rawLlmMessages: 'messages.rawLlmMessages',
},
chats: { id: 'chats.id', organizationId: 'chats.organizationId' },
users: { id: 'users.id', name: 'users.name' },
}));
@ -38,9 +46,17 @@ describe('message-fetchers', () => {
innerJoin: vi.fn().mockReturnThis(),
leftJoin: vi.fn().mockReturnThis(),
where: vi.fn().mockReturnThis(),
orderBy: vi.fn().mockReturnThis(),
limit: vi.fn().mockReturnThis(),
orderBy: vi.fn(),
limit: vi.fn(),
};
// Set up the mock chain to return itself for most methods
mockDb.select.mockReturnValue(mockDb);
mockDb.from.mockReturnValue(mockDb);
mockDb.innerJoin.mockReturnValue(mockDb);
mockDb.leftJoin.mockReturnValue(mockDb);
mockDb.where.mockReturnValue(mockDb);
vi.mocked(database.getDb).mockReturnValue(mockDb);
});
@ -51,6 +67,7 @@ describe('message-fetchers', () => {
chatId: '223e4567-e89b-12d3-a456-426614174000',
createdBy: '323e4567-e89b-12d3-a456-426614174000',
createdAt: '2024-01-01T00:00:00Z',
rawLlmMessages: [{ role: 'user', content: 'Test message' }],
userName: 'John Doe',
organizationId: '423e4567-e89b-12d3-a456-426614174000',
};
@ -64,6 +81,7 @@ describe('message-fetchers', () => {
chatId: messageData.chatId,
createdBy: messageData.createdBy,
createdAt: new Date(messageData.createdAt),
rawLlmMessages: messageData.rawLlmMessages,
userName: messageData.userName,
organizationId: messageData.organizationId,
});
@ -87,6 +105,7 @@ describe('message-fetchers', () => {
chatId: '223e4567-e89b-12d3-a456-426614174000',
createdBy: '323e4567-e89b-12d3-a456-426614174000',
createdAt: '2024-01-01T00:00:00Z',
rawLlmMessages: [{ role: 'user', content: 'Test' }],
userName: null,
organizationId: '423e4567-e89b-12d3-a456-426614174000',
};
@ -94,7 +113,7 @@ describe('message-fetchers', () => {
mockDb.limit.mockResolvedValue([messageData]);
const result = await fetchMessageWithContext(messageData.id);
expect(result.userName).toBeNull();
expect(result.userName).toBe('Unknown');
});
it('should wrap database errors in DataFetchError', async () => {
@ -105,54 +124,6 @@ describe('message-fetchers', () => {
});
});
describe('fetchConversationHistory', () => {
it('should return messages in chronological order', async () => {
const messages = [
{
id: '1',
rawLlmMessages: [{ role: 'user', content: 'Hello' }],
createdAt: '2024-01-01T00:00:00Z',
},
{
id: '2',
rawLlmMessages: [{ role: 'assistant', content: 'Hi there' }],
createdAt: '2024-01-01T00:01:00Z',
},
];
mockDb.orderBy.mockResolvedValue(messages);
const result = await fetchConversationHistory('chat-id');
expect(result).toHaveLength(2);
expect(result[0]?.id).toBe('1');
expect(result[1]?.id).toBe('2');
expect(mockDb.orderBy).toHaveBeenCalled();
});
it('should return empty array for non-existent chat', async () => {
mockDb.orderBy.mockResolvedValue([]);
const result = await fetchConversationHistory('non-existent-chat');
expect(result).toEqual([]);
});
it('should handle messages with null rawLlmMessages', async () => {
const messages = [
{
id: '1',
rawLlmMessages: null,
createdAt: '2024-01-01T00:00:00Z',
},
];
mockDb.orderBy.mockResolvedValue(messages);
const result = await fetchConversationHistory('chat-id');
expect(result[0]?.rawLlmMessages).toBeNull();
});
});
describe('fetchPreviousPostProcessingMessages', () => {
const beforeTimestamp = new Date('2024-01-01T12:00:00Z');
@ -168,7 +139,8 @@ describe('message-fetchers', () => {
},
];
mockDb.orderBy.mockResolvedValue(messages);
mockDb.orderBy.mockReturnValue(mockDb);
mockDb.orderBy.mockResolvedValueOnce(messages);
const result = await fetchPreviousPostProcessingMessages('chat-id', beforeTimestamp);
@ -189,7 +161,8 @@ describe('message-fetchers', () => {
},
];
mockDb.orderBy.mockResolvedValue(messages);
mockDb.orderBy.mockReturnValue(mockDb);
mockDb.orderBy.mockResolvedValueOnce(messages);
const result = await fetchPreviousPostProcessingMessages('chat-id', beforeTimestamp);
@ -197,7 +170,8 @@ describe('message-fetchers', () => {
});
it('should return empty array when no results', async () => {
mockDb.orderBy.mockResolvedValue([]);
mockDb.orderBy.mockReturnValue(mockDb);
mockDb.orderBy.mockResolvedValueOnce([]);
const result = await fetchPreviousPostProcessingMessages('chat-id', beforeTimestamp);
expect(result).toEqual([]);

View File

@ -1,20 +1,7 @@
import { getPermissionedDatasets } from '@buster/access-controls';
import {
and,
chats,
desc,
eq,
getDb,
isNotNull,
isNull,
lt,
lte,
messages,
users,
} from '@buster/database';
import { and, chats, eq, getDb, isNotNull, isNull, lte, messages, users } from '@buster/database';
import type { CoreMessage } from 'ai';
import {
type ConversationMessage,
DataFetchError,
type MessageContext,
MessageNotFoundError,
@ -34,6 +21,7 @@ export async function fetchMessageWithContext(messageId: string): Promise<Messag
chatId: messages.chatId,
createdBy: messages.createdBy,
createdAt: messages.createdAt,
rawLlmMessages: messages.rawLlmMessages,
userName: users.name,
organizationId: chats.organizationId,
})
@ -53,6 +41,7 @@ export async function fetchMessageWithContext(messageId: string): Promise<Messag
chatId: messageData.chatId,
createdBy: messageData.createdBy,
createdAt: new Date(messageData.createdAt),
rawLlmMessages: messageData.rawLlmMessages as CoreMessage[],
userName: messageData.userName ?? 'Unknown',
organizationId: messageData.organizationId,
};
@ -67,36 +56,6 @@ export async function fetchMessageWithContext(messageId: string): Promise<Messag
}
}
/**
* Fetch all messages for conversation history
*/
export async function fetchConversationHistory(chatId: string): Promise<ConversationMessage[]> {
const db = getDb();
try {
const result = await db
.select({
id: messages.id,
rawLlmMessages: messages.rawLlmMessages,
createdAt: messages.createdAt,
})
.from(messages)
.where(and(eq(messages.chatId, chatId), isNull(messages.deletedAt)))
.orderBy(messages.createdAt);
return result.map((msg) => ({
id: msg.id,
rawLlmMessages: msg.rawLlmMessages as CoreMessage[],
createdAt: new Date(msg.createdAt),
}));
} catch (error) {
throw new DataFetchError(
`Failed to fetch conversation history for chat ${chatId}`,
error instanceof Error ? { cause: error } : undefined
);
}
}
/**
* Fetch previous post-processing results
*/

View File

@ -11,19 +11,25 @@ const runTask = (messagePostProcessingTask as any).run;
// Mock dependencies
vi.mock('./helpers', () => ({
fetchMessageWithContext: vi.fn(),
fetchConversationHistory: vi.fn(),
fetchPreviousPostProcessingMessages: vi.fn(),
fetchUserDatasets: vi.fn(),
buildWorkflowInput: vi.fn(),
validateMessageId: vi.fn((id) => id),
validateWorkflowOutput: vi.fn((output) => output),
getExistingSlackMessageForChat: vi.fn(),
sendSlackNotification: vi.fn(),
sendSlackReplyNotification: vi.fn(),
trackSlackNotification: vi.fn(),
}));
vi.mock('@buster/database', () => ({
getDb: vi.fn(),
eq: vi.fn((a, b) => ({ type: 'eq', a, b })),
messages: { id: 'messages.id' },
messages: { id: 'messages.id', postProcessingMessage: 'messages.postProcessingMessage' },
slackIntegrations: {
id: 'slackIntegrations.id',
tokenVaultKey: 'slackIntegrations.tokenVaultKey',
},
getBraintrustMetadata: vi.fn(() =>
Promise.resolve({
userName: 'John Doe',
@ -63,8 +69,18 @@ describe('messagePostProcessingTask', () => {
mockDb = {
update: vi.fn().mockReturnThis(),
set: vi.fn().mockReturnThis(),
where: vi.fn().mockResolvedValue(undefined),
where: vi.fn().mockReturnThis(),
select: vi.fn().mockReturnThis(),
from: vi.fn().mockReturnThis(),
limit: vi.fn().mockReturnThis(),
orderBy: vi.fn().mockReturnThis(),
};
// Default mock chain behavior
mockDb.where.mockReturnValue(mockDb);
mockDb.limit.mockResolvedValue([{ tokenVaultKey: 'vault-key-123' }]);
mockDb.orderBy.mockResolvedValue([]);
vi.mocked(database.getDb).mockReturnValue(mockDb);
// Setup workflow mock
@ -81,6 +97,7 @@ describe('messagePostProcessingTask', () => {
chatId: 'chat-123',
createdBy: 'user-123',
createdAt: new Date(),
rawLlmMessages: [{ role: 'user', content: 'Hello' }] as any,
userName: 'John Doe',
organizationId: 'org-123',
};
@ -94,15 +111,24 @@ describe('messagePostProcessingTask', () => {
];
const workflowOutput = {
initial: {
assumptions: ['Test assumption'],
flagForReview: false,
'format-initial-message': {
assumptions: [
{
descriptiveTitle: 'Test assumption',
classification: 'business_rules',
explanation: 'Test explanation',
label: 'important',
},
],
flagChatMessage: false,
toolCalled: false,
summaryTitle: 'Test Summary',
summaryMessage: 'Test summary message',
},
};
// Setup mocks
vi.mocked(helpers.fetchMessageWithContext).mockResolvedValue(messageContext);
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue(conversationMessages);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: false });
@ -137,7 +163,6 @@ describe('messagePostProcessingTask', () => {
},
});
expect(helpers.fetchMessageWithContext).toHaveBeenCalledWith(messageId);
expect(helpers.fetchConversationHistory).toHaveBeenCalledWith('chat-123');
expect(helpers.fetchPreviousPostProcessingMessages).toHaveBeenCalledWith(
'chat-123',
messageContext.createdAt
@ -147,7 +172,21 @@ describe('messagePostProcessingTask', () => {
expect(mockWorkflowRun.start).toHaveBeenCalled();
expect(mockDb.update).toHaveBeenCalledWith(database.messages);
expect(mockDb.set).toHaveBeenCalledWith({
postProcessingMessage: workflowOutput,
postProcessingMessage: {
summary_message: 'Test summary message',
summary_title: 'Test Summary',
confidence_score: 'high',
assumptions: [
{
descriptive_title: 'Test assumption',
classification: 'business_rules',
explanation: 'Test explanation',
label: 'important',
},
],
tool_called: 'unknown',
user_name: 'John Doe',
},
updatedAt: expect.any(String),
});
});
@ -162,9 +201,13 @@ describe('messagePostProcessingTask', () => {
];
const workflowOutput = {
followUp: {
suggestions: ['Ask about X'],
analysis: 'Based on previous conversation...',
'format-follow-up-message': {
assumptions: [],
flagChatMessage: false,
toolCalled: false,
summaryTitle: 'Follow-up Analysis',
summaryMessage: 'Based on previous conversation...',
followUpSuggestions: ['Ask about X'],
},
};
@ -174,13 +217,24 @@ describe('messagePostProcessingTask', () => {
chatId: 'chat-123',
createdBy: 'user-123',
createdAt: new Date(),
rawLlmMessages: [] as any,
userName: 'John Doe',
organizationId: 'org-123',
});
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue(previousResults);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: true });
vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({
exists: true,
slackMessageTs: 'ts-123',
slackThreadTs: 'thread-ts-123',
channelId: 'C123456',
integrationId: 'int-123',
});
vi.mocked(helpers.sendSlackReplyNotification).mockResolvedValue({
sent: true,
messageTs: 'msg-ts-456',
threadTs: 'thread-ts-456',
});
vi.mocked(helpers.buildWorkflowInput).mockReturnValue({
conversationHistory: undefined,
userName: 'John Doe',
@ -211,9 +265,9 @@ describe('messagePostProcessingTask', () => {
});
expect(helpers.buildWorkflowInput).toHaveBeenCalledWith(
expect.objectContaining({ id: messageId }),
[],
previousResults,
[]
[],
true
);
});
@ -225,13 +279,18 @@ describe('messagePostProcessingTask', () => {
chatId: 'chat-123',
createdBy: 'user-123',
createdAt: new Date(),
rawLlmMessages: [] as any,
userName: 'John Doe',
organizationId: 'org-123',
});
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: false });
vi.mocked(helpers.sendSlackNotification).mockResolvedValue({
sent: true,
messageTs: 'msg-ts-123',
threadTs: 'thread-ts-123',
});
vi.mocked(helpers.buildWorkflowInput).mockReturnValue({
conversationHistory: undefined,
userName: 'John Doe',
@ -295,13 +354,18 @@ describe('messagePostProcessingTask', () => {
chatId: 'chat-123',
createdBy: 'user-123',
createdAt: new Date(),
rawLlmMessages: [] as any,
userName: 'John Doe',
organizationId: 'org-123',
});
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.getExistingSlackMessageForChat).mockResolvedValue({ exists: false });
vi.mocked(helpers.sendSlackNotification).mockResolvedValue({
sent: true,
messageTs: 'msg-ts-123',
threadTs: 'thread-ts-123',
});
vi.mocked(helpers.buildWorkflowInput).mockReturnValue({
conversationHistory: undefined,
userName: 'John Doe',
@ -315,7 +379,15 @@ describe('messagePostProcessingTask', () => {
});
mockWorkflowRun.start.mockResolvedValue({
status: 'success',
result: { initial: { assumptions: [], flagForReview: false } },
result: {
'format-initial-message': {
assumptions: [],
flagChatMessage: false,
toolCalled: false,
summaryTitle: 'Summary',
summaryMessage: 'Summary message',
},
},
});
mockDb.where.mockRejectedValue(dbError);
@ -326,7 +398,7 @@ describe('messagePostProcessingTask', () => {
messageId,
error: {
code: 'DATABASE_ERROR',
message: 'Database update failed',
message: 'Database update failed: Database update failed',
details: {
operation: 'message_post_processing_task_execution',
messageId,

View File

@ -13,7 +13,6 @@ import { currentSpan, initLogger, wrapTraced } from 'braintrust';
import { z } from 'zod/v4';
import {
buildWorkflowInput,
fetchConversationHistory,
fetchMessageWithContext,
fetchPreviousPostProcessingMessages,
fetchUserDatasets,
@ -120,33 +119,26 @@ export const messagePostProcessingTask: ReturnType<
});
// Step 2: Fetch all required data concurrently
const [
conversationMessages,
previousPostProcessingResults,
datasets,
braintrustMetadata,
existingSlackMessage,
] = await Promise.all([
fetchConversationHistory(messageContext.chatId),
fetchPreviousPostProcessingMessages(messageContext.chatId, messageContext.createdAt),
fetchUserDatasets(messageContext.createdBy),
getBraintrustMetadata({ messageId: payload.messageId }),
getExistingSlackMessageForChat(messageContext.chatId),
]);
const [previousPostProcessingResults, datasets, braintrustMetadata, existingSlackMessage] =
await Promise.all([
fetchPreviousPostProcessingMessages(messageContext.chatId, messageContext.createdAt),
fetchUserDatasets(messageContext.createdBy),
getBraintrustMetadata({ messageId: payload.messageId }),
getExistingSlackMessageForChat(messageContext.chatId),
]);
logger.log('Fetched required data', {
messageId: payload.messageId,
conversationMessagesCount: conversationMessages.length,
previousPostProcessingCount: previousPostProcessingResults.length,
datasetsCount: datasets.length,
braintrustMetadata, // Log the metadata to verify it's working
slackMessageExists: existingSlackMessage?.exists || false,
hasRawLlmMessages: !!messageContext.rawLlmMessages,
});
// Step 3: Build workflow input
const workflowInput = buildWorkflowInput(
messageContext,
conversationMessages,
previousPostProcessingResults,
datasets,
existingSlackMessage?.exists || false
@ -238,7 +230,7 @@ export const messagePostProcessingTask: ReturnType<
message: validatedOutput.message,
});
// Step 5: Store result in database
// Step 6: Store result in database
logger.log('Storing post-processing result in database', {
messageId: payload.messageId,
});
@ -273,7 +265,7 @@ export const messagePostProcessingTask: ReturnType<
throw new Error(`Database update failed: ${errorMessage}`);
}
// Step 6: Send Slack notification if conditions are met
// Step 7: Send Slack notification if conditions are met
let slackNotificationSent = false;
// Skip Slack notification if tool_called is "noIssuesFound" and there are no major assumptions

View File

@ -43,6 +43,7 @@ export const MessageContextSchema = z.object({
chatId: z.string(),
createdBy: z.string(),
createdAt: z.date(),
rawLlmMessages: z.custom<CoreMessage[]>(),
userName: z.string(),
organizationId: z.string(),
});