moving over slack processing work

This commit is contained in:
dal 2025-07-07 09:52:26 -06:00
parent 21e08b9ac9
commit bc4ce29fc6
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
16 changed files with 3907 additions and 1775 deletions

View File

@ -0,0 +1,322 @@
import type { CoreMessage } from 'ai';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import type { MessageContext } from '../types';
import {
buildConversationHistory,
buildWorkflowInput,
concatenateDatasets,
formatPreviousMessages,
} from './data-transformers';
// Mock console.error to avoid noise in tests
beforeEach(() => {
vi.spyOn(console, 'error').mockImplementation(() => {});
});
describe('data-transformers', () => {
describe('buildConversationHistory', () => {
it('should combine multiple message arrays correctly', () => {
const messages = [
{
id: '1',
rawLlmMessages: [
{ role: 'user', content: 'Hello' } as CoreMessage,
{ role: 'assistant', content: 'Hi there' } as CoreMessage,
],
createdAt: new Date('2024-01-01T00:00:00Z'),
},
{
id: '2',
rawLlmMessages: [
{ role: 'user', content: 'How are you?' } as CoreMessage,
{ role: 'assistant', content: 'I am fine' } as CoreMessage,
],
createdAt: new Date('2024-01-01T00:01:00Z'),
},
];
const result = buildConversationHistory(messages);
expect(result).toHaveLength(4);
expect(result?.[0]).toEqual({ role: 'user', content: 'Hello' });
expect(result?.[3]).toEqual({ role: 'assistant', content: 'I am fine' });
});
it('should handle empty messages array', () => {
const result = buildConversationHistory([]);
expect(result).toBeUndefined();
});
it('should skip messages with null rawLlmMessages', () => {
const messages = [
{
id: '1',
rawLlmMessages: null as any,
createdAt: new Date(),
},
{
id: '2',
rawLlmMessages: [{ role: 'user', content: 'Test' } as CoreMessage],
createdAt: new Date(),
},
];
const result = buildConversationHistory(messages);
expect(result).toHaveLength(1);
expect(result?.[0]).toEqual({ role: 'user', content: 'Test' });
});
it('should handle non-array rawLlmMessages gracefully', () => {
const messages = [
{
id: '1',
rawLlmMessages: 'invalid data' as any,
createdAt: new Date(),
},
{
id: '2',
rawLlmMessages: [{ role: 'user', content: 'Valid message' } as CoreMessage],
createdAt: new Date(),
},
];
const result = buildConversationHistory(messages);
expect(result).toHaveLength(1);
expect(result?.[0]).toEqual({ role: 'user', content: 'Valid message' });
});
});
describe('formatPreviousMessages', () => {
it('should extract string representation correctly', () => {
const results = [
{
postProcessingMessage: { assumptions: ['Test assumption'] },
createdAt: new Date(),
},
{
postProcessingMessage: { message: 'Direct string message' },
createdAt: new Date(),
},
];
const formatted = formatPreviousMessages(results);
expect(formatted).toHaveLength(2);
expect(formatted[0]).toContain('assumptions');
expect(formatted[1]).toContain('Direct string message');
});
it('should handle complex nested objects', () => {
const results = [
{
postProcessingMessage: {
initial: {
assumptions: ['Complex assumption'],
flagForReview: true,
nested: {
deep: 'value',
},
},
},
createdAt: new Date(),
},
];
const formatted = formatPreviousMessages(results);
expect(formatted[0]).toContain('Complex assumption');
expect(formatted[0]).toContain('flagForReview');
expect(formatted[0]).toContain('deep');
});
it('should return empty array for no messages', () => {
const formatted = formatPreviousMessages([]);
expect(formatted).toEqual([]);
});
it('should filter out empty strings from errors', () => {
const results = [
{
postProcessingMessage: {}, // This will cause an error/empty result
createdAt: new Date(),
},
{
postProcessingMessage: { message: 'Valid message' },
createdAt: new Date(),
},
];
const formatted = formatPreviousMessages(results);
expect(formatted).toHaveLength(2);
expect(formatted[1]).toContain('Valid message');
});
});
describe('concatenateDatasets', () => {
it('should join with correct separator', () => {
const datasets = [
{
id: '1',
name: 'Dataset 1',
ymlFile: 'content1',
createdAt: new Date(),
updatedAt: new Date(),
deletedAt: null,
dataSourceId: 'ds1',
},
{
id: '2',
name: 'Dataset 2',
ymlFile: 'content2',
createdAt: new Date(),
updatedAt: new Date(),
deletedAt: null,
dataSourceId: 'ds2',
},
];
const result = concatenateDatasets(datasets);
expect(result).toBe('content1\n---\ncontent2');
});
it('should filter null ymlFile entries', () => {
const datasets = [
{
id: '1',
name: 'Dataset 1',
ymlFile: 'content1',
createdAt: new Date(),
updatedAt: new Date(),
deletedAt: null,
dataSourceId: 'ds1',
},
{
id: '2',
name: 'Dataset 2',
ymlFile: null,
createdAt: new Date(),
updatedAt: new Date(),
deletedAt: null,
dataSourceId: 'ds2',
},
];
const result = concatenateDatasets(datasets);
expect(result).toBe('content1');
});
it('should return empty string for no datasets', () => {
const result = concatenateDatasets([]);
expect(result).toBe('');
});
it('should return empty string if all datasets have null ymlFile', () => {
const datasets = [
{
id: '1',
name: 'Dataset 1',
ymlFile: null,
createdAt: new Date(),
updatedAt: new Date(),
deletedAt: null,
dataSourceId: 'ds1',
},
];
const result = concatenateDatasets(datasets);
expect(result).toBe('');
});
});
describe('buildWorkflowInput', () => {
const baseMessageContext: MessageContext = {
id: 'msg-123',
chatId: 'chat-123',
createdBy: 'user-123',
createdAt: new Date(),
userName: 'John Doe',
organizationId: 'org-123',
};
const baseConversationMessages = [
{
id: '1',
rawLlmMessages: [{ role: 'user', content: 'Hello' }] as any,
createdAt: new Date(),
},
];
const basePreviousResults: any[] = [];
const baseDatasets = [
{
id: '1',
name: 'Dataset 1',
ymlFile: 'yaml content',
createdAt: new Date(),
updatedAt: new Date(),
deletedAt: null,
dataSourceId: 'ds1',
},
];
it('should build complete workflow input for initial message', () => {
const result = buildWorkflowInput(
baseMessageContext,
baseConversationMessages,
basePreviousResults,
baseDatasets
);
expect(result).toEqual({
conversationHistory: [{ role: 'user', content: 'Hello' }],
userName: 'John Doe',
messageId: 'msg-123',
userId: 'user-123',
chatId: 'chat-123',
isFollowUp: false,
previousMessages: [],
datasets: 'yaml content',
});
});
it('should build workflow input for follow-up message', () => {
const previousResults = [
{
postProcessingMessage: { assumptions: ['Previous assumption'] },
createdAt: new Date(),
},
];
const result = buildWorkflowInput(
baseMessageContext,
baseConversationMessages,
previousResults,
baseDatasets
);
expect(result.isFollowUp).toBe(true);
expect(result.previousMessages).toHaveLength(1);
expect(result.previousMessages[0]).toContain('Previous assumption');
});
it('should handle null userName', () => {
const messageContextWithNullUser = {
...baseMessageContext,
userName: null,
};
const result = buildWorkflowInput(
messageContextWithNullUser,
baseConversationMessages,
basePreviousResults,
baseDatasets
);
expect(result.userName).toBe('Unknown User');
});
it('should handle empty conversation history', () => {
const result = buildWorkflowInput(baseMessageContext, [], basePreviousResults, baseDatasets);
expect(result.conversationHistory).toBeUndefined();
});
});
});

View File

@ -0,0 +1,109 @@
import type { PermissionedDataset } from '@buster/access-controls';
import type { MessageHistory } from '@buster/ai/utils/memory/types';
import type { PostProcessingWorkflowInput } from '@buster/ai/workflows/post-processing-workflow';
import type { CoreMessage } from 'ai';
import type { ConversationMessage, MessageContext, PostProcessingResult } from '../types';
/**
* Combine raw LLM messages from multiple messages into a single conversation history
*/
export function buildConversationHistory(
messages: ConversationMessage[]
): MessageHistory | undefined {
if (messages.length === 0) {
return undefined;
}
const allMessages: CoreMessage[] = [];
for (const message of messages) {
if (!message.rawLlmMessages) {
continue;
}
try {
// rawLlmMessages from the database is already CoreMessage[]
if (Array.isArray(message.rawLlmMessages)) {
allMessages.push(...message.rawLlmMessages);
}
} catch (_error) {
// Skip messages that can't be parsed
// Continue with other messages
}
}
if (allMessages.length === 0) {
return undefined;
}
// Return as MessageHistory which is CoreMessage[]
return allMessages as MessageHistory;
}
/**
* Extract post-processing messages as string array
*/
export function formatPreviousMessages(results: PostProcessingResult[]): string[] {
return results
.map((result) => {
try {
if (typeof result.postProcessingMessage === 'string') {
return result.postProcessingMessage;
}
// Convert object to formatted string
return JSON.stringify(result.postProcessingMessage, null, 2);
} catch (_error) {
// Skip messages that can't be formatted
return '';
}
})
.filter((msg) => msg.length > 0);
}
/**
* Concatenate dataset YAML files
*/
export function concatenateDatasets(datasets: PermissionedDataset[]): string {
const validDatasets = datasets.filter(
(dataset) => dataset.ymlFile !== null && dataset.ymlFile !== undefined
);
if (validDatasets.length === 0) {
return '';
}
return validDatasets.map((dataset) => dataset.ymlFile).join('\n---\n');
}
/**
* Build complete workflow input from collected data
*/
export function buildWorkflowInput(
messageContext: MessageContext,
conversationMessages: ConversationMessage[],
previousPostProcessingResults: PostProcessingResult[],
datasets: PermissionedDataset[]
): PostProcessingWorkflowInput {
// Build conversation history from all messages
const conversationHistory = buildConversationHistory(conversationMessages);
// Determine if this is a follow-up
const isFollowUp = previousPostProcessingResults.length > 0;
// Format previous messages
const previousMessages = formatPreviousMessages(previousPostProcessingResults);
// Concatenate datasets
const datasetsYaml = concatenateDatasets(datasets);
return {
conversationHistory,
userName: messageContext.userName || 'Unknown User',
messageId: messageContext.id,
userId: messageContext.createdBy,
chatId: messageContext.chatId,
isFollowUp,
previousMessages,
datasets: datasetsYaml,
};
}

View File

@ -0,0 +1,4 @@
// Export all helper functions
export * from './message-fetchers';
export * from './data-transformers';
export * from './slack-notifier';

View File

@ -0,0 +1,252 @@
import * as accessControls from '@buster/access-controls';
import * as database from '@buster/database';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { DataFetchError, MessageNotFoundError } from '../types';
import {
fetchConversationHistory,
fetchMessageWithContext,
fetchPreviousPostProcessingMessages,
fetchUserDatasets,
} from './message-fetchers';
// Mock the database module
vi.mock('@buster/database', () => ({
getDb: vi.fn(),
and: vi.fn((...args) => ({ type: 'and', args })),
eq: vi.fn((a, b) => ({ type: 'eq', a, b })),
lt: vi.fn((a, b) => ({ type: 'lt', a, b })),
isNull: vi.fn((a) => ({ type: 'isNull', a })),
isNotNull: vi.fn((a) => ({ type: 'isNotNull', a })),
messages: { id: 'messages.id', chatId: 'messages.chatId', createdBy: 'messages.createdBy' },
chats: { id: 'chats.id', organizationId: 'chats.organizationId' },
users: { id: 'users.id', name: 'users.name' },
}));
// Mock access controls
vi.mock('@buster/access-controls', () => ({
getPermissionedDatasets: vi.fn(),
}));
describe('message-fetchers', () => {
let mockDb: any;
beforeEach(() => {
vi.clearAllMocks();
mockDb = {
select: vi.fn().mockReturnThis(),
from: vi.fn().mockReturnThis(),
innerJoin: vi.fn().mockReturnThis(),
leftJoin: vi.fn().mockReturnThis(),
where: vi.fn().mockReturnThis(),
orderBy: vi.fn().mockReturnThis(),
limit: vi.fn().mockReturnThis(),
};
vi.mocked(database.getDb).mockReturnValue(mockDb);
});
describe('fetchMessageWithContext', () => {
it('should return message with user and chat data', async () => {
const messageData = {
id: '123e4567-e89b-12d3-a456-426614174000',
chatId: '223e4567-e89b-12d3-a456-426614174000',
createdBy: '323e4567-e89b-12d3-a456-426614174000',
createdAt: '2024-01-01T00:00:00Z',
userName: 'John Doe',
organizationId: '423e4567-e89b-12d3-a456-426614174000',
};
mockDb.limit.mockResolvedValue([messageData]);
const result = await fetchMessageWithContext(messageData.id);
expect(result).toEqual({
id: messageData.id,
chatId: messageData.chatId,
createdBy: messageData.createdBy,
createdAt: new Date(messageData.createdAt),
userName: messageData.userName,
organizationId: messageData.organizationId,
});
expect(mockDb.select).toHaveBeenCalled();
expect(mockDb.where).toHaveBeenCalled();
expect(mockDb.limit).toHaveBeenCalledWith(1);
});
it('should throw MessageNotFoundError when message not found', async () => {
mockDb.limit.mockResolvedValue([]);
await expect(fetchMessageWithContext('non-existent-id')).rejects.toThrow(
MessageNotFoundError
);
});
it('should handle null user name', async () => {
const messageData = {
id: '123e4567-e89b-12d3-a456-426614174000',
chatId: '223e4567-e89b-12d3-a456-426614174000',
createdBy: '323e4567-e89b-12d3-a456-426614174000',
createdAt: '2024-01-01T00:00:00Z',
userName: null,
organizationId: '423e4567-e89b-12d3-a456-426614174000',
};
mockDb.limit.mockResolvedValue([messageData]);
const result = await fetchMessageWithContext(messageData.id);
expect(result.userName).toBeNull();
});
it('should wrap database errors in DataFetchError', async () => {
const dbError = new Error('Database connection failed');
mockDb.limit.mockRejectedValue(dbError);
await expect(fetchMessageWithContext('123')).rejects.toThrow(DataFetchError);
});
});
describe('fetchConversationHistory', () => {
it('should return messages in chronological order', async () => {
const messages = [
{
id: '1',
rawLlmMessages: [{ role: 'user', content: 'Hello' }],
createdAt: '2024-01-01T00:00:00Z',
},
{
id: '2',
rawLlmMessages: [{ role: 'assistant', content: 'Hi there' }],
createdAt: '2024-01-01T00:01:00Z',
},
];
mockDb.orderBy.mockResolvedValue(messages);
const result = await fetchConversationHistory('chat-id');
expect(result).toHaveLength(2);
expect(result[0]?.id).toBe('1');
expect(result[1]?.id).toBe('2');
expect(mockDb.orderBy).toHaveBeenCalled();
});
it('should return empty array for non-existent chat', async () => {
mockDb.orderBy.mockResolvedValue([]);
const result = await fetchConversationHistory('non-existent-chat');
expect(result).toEqual([]);
});
it('should handle messages with null rawLlmMessages', async () => {
const messages = [
{
id: '1',
rawLlmMessages: null,
createdAt: '2024-01-01T00:00:00Z',
},
];
mockDb.orderBy.mockResolvedValue(messages);
const result = await fetchConversationHistory('chat-id');
expect(result[0]?.rawLlmMessages).toBeNull();
});
});
describe('fetchPreviousPostProcessingMessages', () => {
const beforeTimestamp = new Date('2024-01-01T12:00:00Z');
it('should return only messages with postProcessingMessage', async () => {
const messages = [
{
postProcessingMessage: { assumptions: ['test'] },
createdAt: '2024-01-01T10:00:00Z',
},
{
postProcessingMessage: { followUp: { suggestions: ['ask more'] } },
createdAt: '2024-01-01T11:00:00Z',
},
];
mockDb.orderBy.mockResolvedValue(messages);
const result = await fetchPreviousPostProcessingMessages('chat-id', beforeTimestamp);
expect(result).toHaveLength(2);
expect(result[0]?.postProcessingMessage).toHaveProperty('assumptions');
expect(result[1]?.postProcessingMessage).toHaveProperty('followUp');
});
it('should order by createdAt ascending', async () => {
const messages = [
{
postProcessingMessage: { id: 1 },
createdAt: '2024-01-01T10:00:00Z',
},
{
postProcessingMessage: { id: 2 },
createdAt: '2024-01-01T11:00:00Z',
},
];
mockDb.orderBy.mockResolvedValue(messages);
const result = await fetchPreviousPostProcessingMessages('chat-id', beforeTimestamp);
expect(result[0]!.createdAt < result[1]!.createdAt).toBe(true);
});
it('should return empty array when no results', async () => {
mockDb.orderBy.mockResolvedValue([]);
const result = await fetchPreviousPostProcessingMessages('chat-id', beforeTimestamp);
expect(result).toEqual([]);
});
});
describe('fetchUserDatasets', () => {
it('should return datasets with non-null ymlFile', async () => {
const datasets = [
{
id: '1',
name: 'Dataset 1',
ymlFile: 'content1',
createdAt: new Date(),
updatedAt: new Date(),
deletedAt: null,
dataSourceId: 'ds1',
},
{
id: '2',
name: 'Dataset 2',
ymlFile: 'content2',
createdAt: new Date(),
updatedAt: new Date(),
deletedAt: null,
dataSourceId: 'ds2',
},
];
vi.mocked(accessControls.getPermissionedDatasets).mockResolvedValue(datasets);
const result = await fetchUserDatasets('user-id');
expect(result).toEqual(datasets);
expect(accessControls.getPermissionedDatasets).toHaveBeenCalledWith('user-id', 0, 1000);
});
it('should handle empty dataset list', async () => {
vi.mocked(accessControls.getPermissionedDatasets).mockResolvedValue([]);
const result = await fetchUserDatasets('user-id');
expect(result).toEqual([]);
});
it('should wrap errors in DataFetchError', async () => {
const error = new Error('Access denied');
vi.mocked(accessControls.getPermissionedDatasets).mockRejectedValue(error);
await expect(fetchUserDatasets('user-id')).rejects.toThrow(DataFetchError);
});
});
});

View File

@ -0,0 +1,151 @@
import { getPermissionedDatasets } from '@buster/access-controls';
import {
and,
chats,
desc,
eq,
getDb,
isNotNull,
isNull,
lt,
messages,
users,
} from '@buster/database';
import type { CoreMessage } from 'ai';
import {
type ConversationMessage,
DataFetchError,
type MessageContext,
MessageNotFoundError,
type PostProcessingResult,
} from '../types';
/**
* Fetch current message with user and chat info
*/
export async function fetchMessageWithContext(messageId: string): Promise<MessageContext> {
const db = getDb();
try {
const result = await db
.select({
id: messages.id,
chatId: messages.chatId,
createdBy: messages.createdBy,
createdAt: messages.createdAt,
userName: users.name,
organizationId: chats.organizationId,
})
.from(messages)
.innerJoin(chats, eq(messages.chatId, chats.id))
.leftJoin(users, eq(messages.createdBy, users.id))
.where(and(eq(messages.id, messageId), isNull(messages.deletedAt)))
.limit(1);
const messageData = result[0];
if (!messageData) {
throw new MessageNotFoundError(messageId);
}
return {
id: messageData.id,
chatId: messageData.chatId,
createdBy: messageData.createdBy,
createdAt: new Date(messageData.createdAt),
userName: messageData.userName,
organizationId: messageData.organizationId,
};
} catch (error) {
if (error instanceof MessageNotFoundError) {
throw error;
}
throw new DataFetchError(
`Failed to fetch message context for ${messageId}`,
error instanceof Error ? { cause: error } : undefined
);
}
}
/**
* Fetch all messages for conversation history
*/
export async function fetchConversationHistory(chatId: string): Promise<ConversationMessage[]> {
const db = getDb();
try {
const result = await db
.select({
id: messages.id,
rawLlmMessages: messages.rawLlmMessages,
createdAt: messages.createdAt,
})
.from(messages)
.where(and(eq(messages.chatId, chatId), isNull(messages.deletedAt)))
.orderBy(messages.createdAt);
return result.map((msg) => ({
id: msg.id,
rawLlmMessages: msg.rawLlmMessages as CoreMessage[],
createdAt: new Date(msg.createdAt),
}));
} catch (error) {
throw new DataFetchError(
`Failed to fetch conversation history for chat ${chatId}`,
error instanceof Error ? { cause: error } : undefined
);
}
}
/**
* Fetch previous post-processing results
*/
export async function fetchPreviousPostProcessingMessages(
chatId: string,
beforeTimestamp: Date
): Promise<PostProcessingResult[]> {
const db = getDb();
try {
const result = await db
.select({
postProcessingMessage: messages.postProcessingMessage,
createdAt: messages.createdAt,
})
.from(messages)
.where(
and(
eq(messages.chatId, chatId),
isNotNull(messages.postProcessingMessage),
isNull(messages.deletedAt),
lt(messages.createdAt, beforeTimestamp.toISOString())
)
)
.orderBy(messages.createdAt);
return result.map((msg) => ({
postProcessingMessage: msg.postProcessingMessage as Record<string, unknown>,
createdAt: new Date(msg.createdAt),
}));
} catch (error) {
throw new DataFetchError(
`Failed to fetch previous post-processing messages for chat ${chatId}`,
error instanceof Error ? { cause: error } : undefined
);
}
}
/**
* Fetch user's permissioned datasets
*/
export async function fetchUserDatasets(userId: string) {
try {
// Using the existing access control function
const datasets = await getPermissionedDatasets(userId, 0, 1000);
return datasets;
} catch (error) {
throw new DataFetchError(
`Failed to fetch datasets for user ${userId}`,
error instanceof Error ? { cause: error } : undefined
);
}
}

View File

@ -0,0 +1,274 @@
import { and, eq, getDb, getSecretByName, isNull, slackIntegrations } from '@buster/database';
import { logger } from '@trigger.dev/sdk/v3';
export interface SlackNotificationParams {
organizationId: string;
userName: string | null;
summaryTitle?: string | undefined;
summaryMessage?: string | undefined;
formattedMessage?: string | null | undefined;
toolCalled: string;
message?: string | undefined;
}
export interface SlackNotificationResult {
sent: boolean;
error?: string;
}
interface SlackBlock {
type: string;
text?: {
type: string;
text: string;
verbatim?: boolean;
};
}
interface SlackMessage {
blocks?: SlackBlock[];
text?: string;
}
/**
* Send a Slack notification based on post-processing results
*/
export async function sendSlackNotification(
params: SlackNotificationParams
): Promise<SlackNotificationResult> {
try {
// Step 1: Check if organization has active Slack integration
const db = getDb();
const [integration] = await db
.select()
.from(slackIntegrations)
.where(
and(
eq(slackIntegrations.organizationId, params.organizationId),
eq(slackIntegrations.status, 'active'),
isNull(slackIntegrations.deletedAt)
)
)
.limit(1);
if (!integration) {
logger.log('No active Slack integration found', { organizationId: params.organizationId });
return { sent: false, error: 'No active Slack integration' };
}
if (!integration.defaultChannel) {
logger.log('No default channel configured for Slack integration', {
organizationId: params.organizationId,
integrationId: integration.id,
});
return { sent: false, error: 'No default channel configured' };
}
// Step 2: Check if we should send a notification
const shouldSendNotification = shouldSendSlackNotification(params);
if (!shouldSendNotification) {
logger.log('Notification conditions not met', { params });
return { sent: false, error: 'Notification conditions not met' };
}
// Step 3: Retrieve access token from vault
if (!integration.tokenVaultKey) {
logger.error('No token vault key found for integration', {
integrationId: integration.id,
organizationId: params.organizationId,
});
return { sent: false, error: 'No token vault key found' };
}
const tokenSecret = await getSecretByName(integration.tokenVaultKey);
if (!tokenSecret) {
logger.error('Failed to retrieve token from vault', {
tokenVaultKey: integration.tokenVaultKey,
organizationId: params.organizationId,
});
return { sent: false, error: 'Failed to retrieve access token' };
}
// Step 4: Format the Slack message
const slackMessage = formatSlackMessage(params);
// Step 5: Send the message via Slack API
const result = await sendSlackMessage(
tokenSecret.secret,
integration.defaultChannel.id,
slackMessage
);
if (result.success) {
logger.log('Successfully sent Slack notification', {
organizationId: params.organizationId,
channelId: integration.defaultChannel.id,
messageTs: result.messageTs,
});
return { sent: true };
}
logger.error('Failed to send Slack notification', {
organizationId: params.organizationId,
channelId: integration.defaultChannel.id,
error: result.error,
});
return { sent: false, error: result.error || 'Failed to send message' };
} catch (error) {
logger.error('Error in sendSlackNotification', {
error: error instanceof Error ? error.message : 'Unknown error',
organizationId: params.organizationId,
});
return {
sent: false,
error: error instanceof Error ? error.message : 'Unknown error occurred',
};
}
}
/**
* Determine if we should send a Slack notification based on the parameters
*/
function shouldSendSlackNotification(params: SlackNotificationParams): boolean {
// Condition 1: formattedMessage is present (from format-message steps)
if (params.formattedMessage) {
return true;
}
// Condition 2: summaryTitle and summaryMessage are present (legacy)
if (params.summaryTitle && params.summaryMessage) {
return true;
}
// Condition 3: toolCalled is 'flagChat' and message is present (legacy)
if (params.toolCalled === 'flagChat' && params.message) {
return true;
}
return false;
}
/**
* Format the Slack message based on the notification type
*/
function formatSlackMessage(params: SlackNotificationParams): SlackMessage {
const userName = params.userName || 'Unknown User';
// Case 1: Formatted message from workflow (highest priority)
if (params.formattedMessage) {
return {
blocks: [
{
type: 'section',
text: {
type: 'mrkdwn',
text: `Buster flagged a chat for review:\n*<fakeLink.toEmployeeProfile.com|${userName}>*`,
},
},
{
type: 'section',
text: {
type: 'mrkdwn',
text: params.formattedMessage,
verbatim: false,
},
},
],
};
}
// Case 2: Summary notification (summaryTitle and summaryMessage present)
if (params.summaryTitle && params.summaryMessage) {
return {
blocks: [
{
type: 'section',
text: {
type: 'mrkdwn',
text: `Buster flagged a chat for review:\n*<fakeLink.toEmployeeProfile.com|${userName} - ${params.summaryTitle}>*`,
},
},
{
type: 'section',
text: {
type: 'mrkdwn',
text: params.summaryMessage,
verbatim: false,
},
},
],
};
}
// Case 3: Flagged chat notification (toolCalled is 'flagChat' and message present)
if (params.toolCalled === 'flagChat' && params.message) {
return {
blocks: [
{
type: 'section',
text: {
type: 'mrkdwn',
text: `Buster flagged a chat for review:\n*<fakeLink.toEmployeeProfile.com|${userName} - Flagged Chat>*`,
},
},
{
type: 'section',
text: {
type: 'mrkdwn',
text: params.message,
verbatim: false,
},
},
],
};
}
// This shouldn't happen if shouldSendSlackNotification is working correctly
throw new Error('Invalid notification parameters');
}
/**
* Send a message to Slack using the Web API
*/
async function sendSlackMessage(
accessToken: string,
channelId: string,
message: SlackMessage
): Promise<{ success: boolean; messageTs?: string; error?: string }> {
try {
const response = await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
channel: channelId,
blocks: message.blocks,
text: message.text || ' ', // Fallback text required by Slack
}),
});
const data = (await response.json()) as { ok: boolean; ts?: string; error?: string };
if (data.ok) {
return {
success: true,
...(data.ts && { messageTs: data.ts }),
};
}
return {
success: false,
error: data.error || 'Failed to send message',
};
} catch (error) {
logger.error('Failed to send Slack message', {
error: error instanceof Error ? error.message : 'Unknown error',
});
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to send message',
};
}
}

View File

@ -0,0 +1,6 @@
export { messagePostProcessingTask } from './message-post-processing';
export type { MessagePostProcessingTask } from './message-post-processing';
export type {
TaskInput as MessagePostProcessingTaskInput,
TaskOutput as MessagePostProcessingTaskOutput,
} from './types';

View File

@ -0,0 +1,217 @@
import { eq, getDb, messages } from '@buster/database';
import {
cleanupTestChats,
cleanupTestMessages,
createTestChat,
createTestMessage,
createTestUser,
} from '@buster/test-utils';
import { tasks } from '@trigger.dev/sdk/v3';
import { afterAll, beforeAll, describe, expect, it } from 'vitest';
import type { messagePostProcessingTask } from './message-post-processing';
// Skip integration tests if TEST_DATABASE_URL is not set
const skipIntegrationTests = !process.env.DATABASE_URL;
describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', () => {
let testUserId: string;
let testChatId: string;
let testMessageId: string;
let testOrgId: string;
beforeAll(async () => {
// Use specific test user with datasets and permissions
testUserId = 'c2dd64cd-f7f3-4884-bc91-d46ae431901e';
const testChatResult = await createTestChat();
testChatId = testChatResult.chatId;
testOrgId = testChatResult.organizationId;
});
afterAll(async () => {
// Cleanup test data
if (testChatId) {
// Note: cleanupTestMessages expects message IDs, not chat IDs
// For now, we'll just clean up the chat which should cascade delete messages
await cleanupTestChats([testChatId]);
}
});
it('should successfully process new message (not follow-up)', async () => {
// Use prepopulated message ID
const messageId = 'a3206f20-35d1-4a6c-84a7-48f8f222c39f';
// Execute task
const result = await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
'message-post-processing',
{ messageId },
{ pollIntervalMs: 2000 }
);
// Verify result structure
expect(result).toBeDefined();
expect(result.status).toBe('COMPLETED');
expect(result.output).toBeDefined();
expect(result.output?.success).toBe(true);
expect(result.output?.messageId).toBe(messageId);
expect(result.output?.result?.success).toBe(true);
expect(result.output?.result?.workflowCompleted).toBe(true);
// Verify database was updated
const db = getDb();
const updatedMessage = await db
.select({ postProcessingMessage: messages.postProcessingMessage })
.from(messages)
.where(eq(messages.id, messageId))
.limit(1);
expect(updatedMessage[0]?.postProcessingMessage).toBeDefined();
// Cleanup - reset postProcessingMessage to null
await db
.update(messages)
.set({ postProcessingMessage: null })
.where(eq(messages.id, messageId));
});
it('should successfully process follow-up message', async () => {
// Create first message with post-processing result
const firstMessageId = await createTestMessage(testChatId, testUserId, {
requestMessage: 'Tell me about databases',
rawLlmMessages: [
{ role: 'user' as const, content: 'Tell me about databases' },
{ role: 'assistant' as const, content: 'Databases are organized collections of data.' },
],
});
// Manually add post-processing result to first message
const db = getDb();
await db
.update(messages)
.set({
postProcessingMessage: {
initial: {
assumptions: ['User wants general database information'],
flagForReview: false,
},
},
})
.where(eq(messages.id, firstMessageId));
// Create follow-up message
const followUpMessageId = await createTestMessage(testChatId, testUserId, {
requestMessage: 'What about NoSQL databases?',
rawLlmMessages: [
{ role: 'user' as const, content: 'What about NoSQL databases?' },
{ role: 'assistant' as const, content: 'NoSQL databases are non-relational databases.' },
],
});
// Execute task for follow-up
const result = await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
'message-post-processing',
{ messageId: followUpMessageId },
{ pollIntervalMs: 2000 }
);
// Verify it's a follow-up result
expect(result).toBeDefined();
expect(result.status).toBe('COMPLETED');
expect(result.output?.success).toBe(true);
expect(result.output?.messageId).toBe(followUpMessageId);
expect(result.output?.result?.success).toBe(true);
expect(result.output?.result?.workflowCompleted).toBe(true);
});
it('should handle message with no conversation history', async () => {
// Use prepopulated message ID
const messageId = 'a3206f20-35d1-4a6c-84a7-48f8f222c39f';
// Execute task
const result = await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
'message-post-processing',
{ messageId },
{ pollIntervalMs: 2000 }
);
// Should still process successfully
expect(result).toBeDefined();
expect(result.status).toBe('COMPLETED');
expect(result.output?.success).toBe(true);
expect(result.output?.messageId).toBe(messageId);
// Cleanup - reset postProcessingMessage to null
const db = getDb();
await db
.update(messages)
.set({ postProcessingMessage: null })
.where(eq(messages.id, messageId));
});
it('should fail gracefully when message does not exist', async () => {
const nonExistentId = '00000000-0000-0000-0000-000000000000';
const result = await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
'message-post-processing',
{ messageId: nonExistentId },
{ pollIntervalMs: 2000 }
);
expect(result.status).toBe('COMPLETED');
expect(result.output?.success).toBe(false);
expect(result.output?.error?.code).toBe('MESSAGE_NOT_FOUND');
});
it('should complete within timeout', async () => {
// Use prepopulated message ID
const messageId = 'a3206f20-35d1-4a6c-84a7-48f8f222c39f';
const startTime = Date.now();
await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
'message-post-processing',
{ messageId },
{ pollIntervalMs: 2000 }
);
const duration = Date.now() - startTime;
// Should complete within 60 seconds (task timeout)
expect(duration).toBeLessThan(60000);
// Cleanup - reset postProcessingMessage to null
const db = getDb();
await db
.update(messages)
.set({ postProcessingMessage: null })
.where(eq(messages.id, messageId));
});
it('should handle large conversation histories', async () => {
// Create many messages in the chat
const largeHistory = [];
for (let i = 0; i < 50; i++) {
largeHistory.push(
{ role: 'user' as const, content: `Question ${i}` },
{ role: 'assistant' as const, content: `Answer ${i}` }
);
}
const largeMessageId = await createTestMessage(testChatId, testUserId, {
requestMessage: 'Large history test',
rawLlmMessages: largeHistory,
});
// Should still process successfully
const result = await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
'message-post-processing',
{ messageId: largeMessageId },
{ pollIntervalMs: 2000 }
);
expect(result).toBeDefined();
expect(result.status).toBe('COMPLETED');
expect(result.output?.success).toBe(true);
expect(result.output?.messageId).toBe(largeMessageId);
});
});

View File

@ -0,0 +1,318 @@
import postProcessingWorkflow from '@buster/ai/workflows/post-processing-workflow';
import * as database from '@buster/database';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import * as helpers from './helpers';
import { messagePostProcessingTask } from './message-post-processing';
import { DataFetchError, MessageNotFoundError } from './types';
// Extract the run function from the task
const runTask = (messagePostProcessingTask as any).run;
// Mock dependencies
vi.mock('./helpers', () => ({
fetchMessageWithContext: vi.fn(),
fetchConversationHistory: vi.fn(),
fetchPreviousPostProcessingMessages: vi.fn(),
fetchUserDatasets: vi.fn(),
buildWorkflowInput: vi.fn(),
validateMessageId: vi.fn((id) => id),
validateWorkflowOutput: vi.fn((output) => output),
}));
vi.mock('@buster/database', () => ({
getDb: vi.fn(),
eq: vi.fn((a, b) => ({ type: 'eq', a, b })),
messages: { id: 'messages.id' },
}));
vi.mock('@buster/ai/workflows/post-processing-workflow', () => ({
default: {
createRun: vi.fn(),
},
}));
// Mock Trigger.dev logger
vi.mock('@trigger.dev/sdk/v3', () => ({
logger: {
log: vi.fn(),
error: vi.fn(),
},
schemaTask: vi.fn((config) => ({
...config,
run: config.run,
})),
}));
describe('messagePostProcessingTask', () => {
let mockDb: any;
let mockWorkflowRun: any;
beforeEach(() => {
vi.clearAllMocks();
mockDb = {
update: vi.fn().mockReturnThis(),
set: vi.fn().mockReturnThis(),
where: vi.fn().mockResolvedValue(undefined),
};
vi.mocked(database.getDb).mockReturnValue(mockDb);
// Setup workflow mock
mockWorkflowRun = {
start: vi.fn(),
};
vi.mocked(postProcessingWorkflow.createRun).mockReturnValue(mockWorkflowRun);
});
it('should process message successfully for initial message', async () => {
const messageId = '123e4567-e89b-12d3-a456-426614174000';
const messageContext = {
id: messageId,
chatId: 'chat-123',
createdBy: 'user-123',
createdAt: new Date(),
userName: 'John Doe',
organizationId: 'org-123',
};
const conversationMessages = [
{
id: '1',
rawLlmMessages: [{ role: 'user' as const, content: 'Hello' }],
createdAt: new Date(),
},
];
const workflowOutput = {
initial: {
assumptions: ['Test assumption'],
flagForReview: false,
},
};
// Setup mocks
vi.mocked(helpers.fetchMessageWithContext).mockResolvedValue(messageContext);
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue(conversationMessages);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.buildWorkflowInput).mockReturnValue({
conversationHistory: [{ role: 'user', content: 'Hello' }],
userName: 'John Doe',
messageId,
userId: 'user-123',
chatId: 'chat-123',
isFollowUp: false,
previousMessages: [],
datasets: '',
});
mockWorkflowRun.start.mockResolvedValue({
status: 'success',
result: workflowOutput,
});
// Execute task
const result = await runTask({ messageId });
// Verify results
expect(result).toEqual({
success: true,
messageId,
result: {
success: true,
messageId,
executionTimeMs: expect.any(Number),
workflowCompleted: true,
},
});
expect(helpers.fetchMessageWithContext).toHaveBeenCalledWith(messageId);
expect(helpers.fetchConversationHistory).toHaveBeenCalledWith('chat-123');
expect(helpers.fetchPreviousPostProcessingMessages).toHaveBeenCalledWith(
'chat-123',
messageContext.createdAt
);
expect(helpers.fetchUserDatasets).toHaveBeenCalledWith('user-123');
expect(postProcessingWorkflow.createRun).toHaveBeenCalled();
expect(mockWorkflowRun.start).toHaveBeenCalled();
expect(mockDb.update).toHaveBeenCalledWith(database.messages);
expect(mockDb.set).toHaveBeenCalledWith({
postProcessingMessage: workflowOutput,
updatedAt: expect.any(String),
});
});
it('should process follow-up message correctly', async () => {
const messageId = '123e4567-e89b-12d3-a456-426614174000';
const previousResults = [
{
postProcessingMessage: { assumptions: ['Previous assumption'] },
createdAt: new Date(),
},
];
const workflowOutput = {
followUp: {
suggestions: ['Ask about X'],
analysis: 'Based on previous conversation...',
},
};
// Setup mocks for follow-up scenario
vi.mocked(helpers.fetchMessageWithContext).mockResolvedValue({
id: messageId,
chatId: 'chat-123',
createdBy: 'user-123',
createdAt: new Date(),
userName: 'John Doe',
organizationId: 'org-123',
});
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue(previousResults);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.buildWorkflowInput).mockReturnValue({
conversationHistory: undefined,
userName: 'John Doe',
messageId,
userId: 'user-123',
chatId: 'chat-123',
isFollowUp: true,
previousMessages: ['{"assumptions":["Previous assumption"]}'],
datasets: '',
});
mockWorkflowRun.start.mockResolvedValue({
status: 'success',
result: workflowOutput,
});
const result = await runTask({ messageId });
expect(result).toEqual({
success: true,
messageId,
result: {
success: true,
messageId,
executionTimeMs: expect.any(Number),
workflowCompleted: true,
},
});
expect(helpers.buildWorkflowInput).toHaveBeenCalledWith(
expect.objectContaining({ id: messageId }),
[],
previousResults,
[]
);
});
it('should return error result when workflow returns no output', async () => {
const messageId = '123e4567-e89b-12d3-a456-426614174000';
vi.mocked(helpers.fetchMessageWithContext).mockResolvedValue({
id: messageId,
chatId: 'chat-123',
createdBy: 'user-123',
createdAt: new Date(),
userName: 'John Doe',
organizationId: 'org-123',
});
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.buildWorkflowInput).mockReturnValue({
conversationHistory: undefined,
userName: 'John Doe',
messageId,
userId: 'user-123',
chatId: 'chat-123',
isFollowUp: false,
previousMessages: [],
datasets: '',
});
mockWorkflowRun.start.mockResolvedValue({
status: 'failed',
result: null,
});
const result = await runTask({ messageId });
expect(result).toEqual({
success: false,
messageId,
error: {
code: 'WORKFLOW_EXECUTION_ERROR',
message: 'Post-processing workflow returned no output',
details: {
operation: 'message_post_processing_task_execution',
messageId,
},
},
});
});
it('should return error result for message not found', async () => {
const messageId = 'non-existent-id';
const error = new MessageNotFoundError(messageId);
vi.mocked(helpers.fetchMessageWithContext).mockRejectedValue(error);
const result = await runTask({ messageId });
expect(result).toEqual({
success: false,
messageId,
error: {
code: 'MESSAGE_NOT_FOUND',
message: `Message not found: ${messageId}`,
details: {
operation: 'message_post_processing_task_execution',
messageId,
},
},
});
});
it('should return error result for database update failure', async () => {
const messageId = '123e4567-e89b-12d3-a456-426614174000';
const dbError = new Error('Database update failed');
vi.mocked(helpers.fetchMessageWithContext).mockResolvedValue({
id: messageId,
chatId: 'chat-123',
createdBy: 'user-123',
createdAt: new Date(),
userName: 'John Doe',
organizationId: 'org-123',
});
vi.mocked(helpers.fetchConversationHistory).mockResolvedValue([]);
vi.mocked(helpers.fetchPreviousPostProcessingMessages).mockResolvedValue([]);
vi.mocked(helpers.fetchUserDatasets).mockResolvedValue([]);
vi.mocked(helpers.buildWorkflowInput).mockReturnValue({
conversationHistory: undefined,
userName: 'John Doe',
messageId,
userId: 'user-123',
chatId: 'chat-123',
isFollowUp: false,
previousMessages: [],
datasets: '',
});
mockWorkflowRun.start.mockResolvedValue({
status: 'success',
result: { initial: { assumptions: [], flagForReview: false } },
});
mockDb.where.mockRejectedValue(dbError);
const result = await runTask({ messageId });
expect(result).toEqual({
success: false,
messageId,
error: {
code: 'DATABASE_ERROR',
message: 'Database update failed',
details: {
operation: 'message_post_processing_task_execution',
messageId,
},
},
});
});
});

View File

@ -0,0 +1,341 @@
import postProcessingWorkflow, {
type PostProcessingWorkflowOutput,
} from '@buster/ai/workflows/post-processing-workflow';
import { eq, getDb, messages } from '@buster/database';
import { logger, schemaTask } from '@trigger.dev/sdk/v3';
import { initLogger, wrapTraced } from 'braintrust';
import { z } from 'zod';
import {
buildWorkflowInput,
fetchConversationHistory,
fetchMessageWithContext,
fetchPreviousPostProcessingMessages,
fetchUserDatasets,
sendSlackNotification,
} from './helpers';
import {
DataFetchError,
MessageNotFoundError,
TaskInputSchema,
type TaskOutputSchema,
} from './types';
import type { TaskInput, TaskOutput } from './types';
// Schema for the subset of fields we want to save to the database
const PostProcessingDbDataSchema = z.object({
summaryMessage: z.string().optional(),
summaryTitle: z.string().optional(),
formattedMessage: z.string().nullable().optional(),
assumptions: z
.array(
z.object({
descriptiveTitle: z.string(),
classification: z.enum([
'fieldMapping',
'tableRelationship',
'dataQuality',
'dataFormat',
'dataAvailability',
'timePeriodInterpretation',
'timePeriodGranularity',
'metricInterpretation',
'segmentInterpretation',
'quantityInterpretation',
'requestScope',
'metricDefinition',
'segmentDefinition',
'businessLogic',
'policyInterpretation',
'optimization',
'aggregation',
'filtering',
'sorting',
'grouping',
'calculationMethod',
'dataRelevance',
]),
explanation: z.string(),
label: z.enum(['timeRelated', 'vagueRequest', 'major', 'minor']),
})
)
.optional(),
message: z.string().optional(),
toolCalled: z.string(),
userName: z.string().nullable().optional(),
});
type PostProcessingDbData = z.infer<typeof PostProcessingDbDataSchema>;
/**
* Extract only the specific fields we want to save to the database
*/
function extractDbFields(
workflowOutput: PostProcessingWorkflowOutput,
userName: string | null
): PostProcessingDbData {
const extracted = {
summaryMessage: workflowOutput.summaryMessage,
summaryTitle: workflowOutput.summaryTitle,
formattedMessage: workflowOutput.formattedMessage,
assumptions: workflowOutput.assumptions,
message: workflowOutput.message,
toolCalled: workflowOutput.toolCalled || 'unknown', // Provide default if missing
userName,
};
// Validate the extracted data matches our schema
return PostProcessingDbDataSchema.parse(extracted);
}
/**
* Message Post-Processing Task
*
* Processes messages after creation to extract insights, assumptions,
* and generate follow-up suggestions using AI workflows.
*/
export const messagePostProcessingTask: ReturnType<
typeof schemaTask<'message-post-processing', typeof TaskInputSchema, TaskOutput>
> = schemaTask<'message-post-processing', typeof TaskInputSchema, TaskOutput>({
id: 'message-post-processing',
schema: TaskInputSchema,
maxDuration: 300, // 300 seconds timeout
run: async (payload: TaskInput): Promise<TaskOutput> => {
const startTime = Date.now();
if (!process.env.BRAINTRUST_KEY) {
throw new Error('BRAINTRUST_KEY is not set');
}
// Initialize Braintrust logging for observability
initLogger({
apiKey: process.env.BRAINTRUST_KEY,
projectName: process.env.ENVIRONMENT || 'development',
});
try {
logger.log('Starting message post-processing task', {
messageId: payload.messageId,
});
// Step 1: Fetch message context (this will throw if message not found)
const messageContext = await fetchMessageWithContext(payload.messageId);
logger.log('Fetched message context', {
chatId: messageContext.chatId,
userId: messageContext.createdBy,
organizationId: messageContext.organizationId,
});
// Step 2: Fetch all required data concurrently
const [conversationMessages, previousPostProcessingResults, datasets] = await Promise.all([
fetchConversationHistory(messageContext.chatId),
fetchPreviousPostProcessingMessages(messageContext.chatId, messageContext.createdAt),
fetchUserDatasets(messageContext.createdBy),
]);
logger.log('Fetched required data', {
messageId: payload.messageId,
conversationMessagesCount: conversationMessages.length,
previousPostProcessingCount: previousPostProcessingResults.length,
datasetsCount: datasets.length,
});
// Step 3: Build workflow input
const workflowInput = buildWorkflowInput(
messageContext,
conversationMessages,
previousPostProcessingResults,
datasets
);
logger.log('Built workflow input', {
messageId: payload.messageId,
isFollowUp: workflowInput.isFollowUp,
previousMessagesCount: workflowInput.previousMessages.length,
hasConversationHistory: !!workflowInput.conversationHistory,
datasetsLength: workflowInput.datasets.length,
});
// Step 4: Execute post-processing workflow
logger.log('Starting post-processing workflow execution', {
messageId: payload.messageId,
});
const tracedWorkflow = wrapTraced(
async () => {
const run = postProcessingWorkflow.createRun();
return await run.start({
inputData: workflowInput,
});
},
{
name: 'Message Post-Processing Workflow',
}
);
const workflowResult = await tracedWorkflow();
if (!workflowResult || workflowResult.status !== 'success' || !workflowResult.result) {
throw new Error('Post-processing workflow returned no output');
}
// Handle branch results - the result will have one of the branch step IDs as a key
let validatedOutput: PostProcessingWorkflowOutput;
const branchResult = workflowResult.result as any; // Type assertion needed for branch results
if ('format-follow-up-message' in branchResult && branchResult['format-follow-up-message']) {
validatedOutput = branchResult['format-follow-up-message'] as PostProcessingWorkflowOutput;
} else if (
'format-initial-message' in branchResult &&
branchResult['format-initial-message']
) {
validatedOutput = branchResult['format-initial-message'] as PostProcessingWorkflowOutput;
} else {
logger.error('Unexpected workflow result structure', {
messageId: payload.messageId,
resultKeys: Object.keys(branchResult),
result: branchResult,
});
throw new Error('Post-processing workflow returned unexpected result structure');
}
logger.log('Validated output', {
messageId: payload.messageId,
summaryTitle: validatedOutput.summaryTitle,
summaryMessage: validatedOutput.summaryMessage,
flagChatMessage: validatedOutput.flagChatMessage,
flagChatTitle: validatedOutput.flagChatTitle,
toolCalled: validatedOutput.toolCalled,
assumptions: validatedOutput.assumptions,
message: validatedOutput.message,
});
// Step 5: Store result in database
logger.log('Storing post-processing result in database', {
messageId: payload.messageId,
});
const db = getDb();
const dbData = extractDbFields(validatedOutput, messageContext.userName);
await db
.update(messages)
.set({
postProcessingMessage: dbData,
updatedAt: new Date().toISOString(),
})
.where(eq(messages.id, payload.messageId));
// Step 6: Send Slack notification if conditions are met
logger.log('Checking Slack notification conditions', {
messageId: payload.messageId,
organizationId: messageContext.organizationId,
summaryTitle: dbData.summaryTitle,
summaryMessage: dbData.summaryMessage,
formattedMessage: dbData.formattedMessage,
toolCalled: dbData.toolCalled,
});
const slackResult = await sendSlackNotification({
organizationId: messageContext.organizationId,
userName: messageContext.userName,
summaryTitle: dbData.summaryTitle,
summaryMessage: dbData.summaryMessage,
formattedMessage: dbData.formattedMessage,
toolCalled: dbData.toolCalled,
message: dbData.message,
});
if (slackResult.sent) {
logger.log('Slack notification sent successfully', {
messageId: payload.messageId,
organizationId: messageContext.organizationId,
});
} else {
logger.log('Slack notification not sent', {
messageId: payload.messageId,
organizationId: messageContext.organizationId,
reason: slackResult.error,
});
}
logger.log('Message post-processing completed successfully', {
messageId: payload.messageId,
executionTimeMs: Date.now() - startTime,
});
// Wait 500ms to allow Braintrust to clean up its trace before completing
await new Promise((resolve) => setTimeout(resolve, 500));
return {
success: true,
messageId: payload.messageId,
result: {
success: true,
messageId: payload.messageId,
executionTimeMs: Date.now() - startTime,
workflowCompleted: true,
},
};
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
logger.error('Post-processing task execution failed', {
messageId: payload.messageId,
error: errorMessage,
executionTimeMs: Date.now() - startTime,
});
return {
success: false,
messageId: payload.messageId,
error: {
code: getErrorCode(error),
message: errorMessage,
details: {
operation: 'message_post_processing_task_execution',
messageId: payload.messageId,
},
},
};
}
},
});
/**
* Get error code from error object for consistent error handling
*/
function getErrorCode(error: unknown): string {
if (error instanceof MessageNotFoundError) {
return 'MESSAGE_NOT_FOUND';
}
if (error instanceof DataFetchError) {
return 'DATA_FETCH_ERROR';
}
if (error instanceof Error) {
// Validation errors
if (error.name === 'ZodError' || error.name === 'ValidationError') {
return 'VALIDATION_ERROR';
}
// Workflow errors
if (error.message.includes('workflow')) {
return 'WORKFLOW_EXECUTION_ERROR';
}
// Database errors
if (error.message.includes('database') || error.message.includes('Database')) {
return 'DATABASE_ERROR';
}
// Permission errors
if (error.message.includes('permission') || error.message.includes('access')) {
return 'ACCESS_DENIED';
}
}
return 'UNKNOWN_ERROR';
}
export type MessagePostProcessingTask = typeof messagePostProcessingTask;

View File

@ -0,0 +1,81 @@
import type { CoreMessage } from 'ai';
import { z } from 'zod';
// Input schema - simple UUID validation
export const UUIDSchema = z.string().uuid('Must be a valid UUID');
export const TaskInputSchema = z.object({
messageId: UUIDSchema,
});
// Task execution result for internal monitoring
export const TaskExecutionResultSchema = z.object({
success: z.boolean(),
messageId: z.string(),
executionTimeMs: z.number(),
workflowCompleted: z.boolean(),
error: z
.object({
code: z.string(),
message: z.string(),
details: z.record(z.any()).optional(),
})
.optional(),
});
// Main output schema - what Trigger.dev expects
export const TaskOutputSchema = z.object({
success: z.boolean(),
messageId: z.string(),
result: TaskExecutionResultSchema.optional(),
error: z
.object({
code: z.string(),
message: z.string(),
details: z.record(z.any()).optional(),
})
.optional(),
});
// Database output schemas
export const MessageContextSchema = z.object({
id: z.string(),
chatId: z.string(),
createdBy: z.string(),
createdAt: z.date(),
userName: z.string().nullable(),
organizationId: z.string(),
});
export const ConversationMessageSchema = z.object({
id: z.string(),
rawLlmMessages: z.custom<CoreMessage[]>(),
createdAt: z.date(),
});
export const PostProcessingResultSchema = z.object({
postProcessingMessage: z.record(z.unknown()),
createdAt: z.date(),
});
// Infer TypeScript types from schemas
export type TaskInput = z.infer<typeof TaskInputSchema>;
export type TaskOutput = z.infer<typeof TaskOutputSchema>;
export type MessageContext = z.infer<typeof MessageContextSchema>;
export type ConversationMessage = z.infer<typeof ConversationMessageSchema>;
export type PostProcessingResult = z.infer<typeof PostProcessingResultSchema>;
// Error types
export class MessageNotFoundError extends Error {
constructor(messageId: string) {
super(`Message not found: ${messageId}`);
this.name = 'MessageNotFoundError';
}
}
export class DataFetchError extends Error {
constructor(message: string, options?: { cause?: Error }) {
super(message, options);
this.name = 'DataFetchError';
}
}

View File

@ -268,4 +268,4 @@ export const formatFollowUpMessageStep = createStep({
inputSchema,
outputSchema: formatFollowUpMessageOutputSchema,
execute: formatFollowUpMessageStepExecution,
});
});

View File

@ -53,4 +53,4 @@ const postProcessingWorkflow = createWorkflow({
export default postProcessingWorkflow;
// Re-export schemas for external use
export { postProcessingWorkflowInputSchema, postProcessingWorkflowOutputSchema };
export { postProcessingWorkflowInputSchema, postProcessingWorkflowOutputSchema };

View File

@ -1 +1 @@
ALTER TABLE "slack_integrations" ADD COLUMN "default_channel" jsonb DEFAULT '{}'::jsonb;
ALTER TABLE "slack_integrations" ADD COLUMN "default_channel" jsonb DEFAULT 'null'::jsonb;

File diff suppressed because it is too large Load Diff

View File

@ -170,8 +170,8 @@ importers:
specifier: 'catalog:'
version: 4.3.16(react@18.3.1)(zod@3.25.67)
braintrust:
specifier: ^0.0.206
version: 0.0.206(@aws-sdk/credential-provider-web-identity@3.840.0)(react@18.3.1)(sswr@2.2.0(svelte@5.34.9))(svelte@5.34.9)(vue@3.5.17(typescript@5.8.3))(zod@3.25.67)
specifier: ^0.0.209
version: 0.0.209(@aws-sdk/credential-provider-web-identity@3.840.0)(react@18.3.1)(sswr@2.2.0(svelte@5.34.9))(svelte@5.34.9)(vue@3.5.17(typescript@5.8.3))(zod@3.25.67)
vitest:
specifier: 'catalog:'
version: 3.2.4(@edge-runtime/vm@3.2.0)(@types/debug@4.1.12)(@types/node@24.0.10)(@vitest/ui@3.2.4)(jiti@2.4.2)(jsdom@26.1.0)(lightningcss@1.30.1)(msw@2.10.3(@types/node@24.0.10)(typescript@5.8.3))(sass@1.89.2)(terser@5.43.1)(tsx@4.20.3)(yaml@2.8.0)
@ -1880,6 +1880,9 @@ packages:
'@braintrust/core@0.0.88':
resolution: {integrity: sha512-asVr//nyiXvnagf2Av+k3Ggv2UFiygwvlzreI8rS87+9DYRlw0ofy13gSxr7a0ycd0yfRomdVSEpDRlEzpQm5w==}
'@braintrust/core@0.0.89':
resolution: {integrity: sha512-BBLVfFxM6/d4B+i4LUTDW/FvZa4C0HN1/Cqo1W1vOflxnCJ8QVXFSqEUl+MhIU9+cJV9vwjTUVukmyscMT24hA==}
'@bugsnag/cuid@3.2.1':
resolution: {integrity: sha512-zpvN8xQ5rdRWakMd/BcVkdn2F8HKlDSbM3l7duueK590WmI1T0ObTLc1V/1e55r14WNjPd5AJTYX4yPEAFVi+Q==}
@ -5952,6 +5955,12 @@ packages:
peerDependencies:
zod: ^3.0.0
braintrust@0.0.209:
resolution: {integrity: sha512-acsjb06ttD/gllfb59idiq1lDAdvsoHcHSJPkmddSIRPORy3vIYt3kfKXiW+WlhwZs2pl3lN8X8pTVuLyj5NNw==}
hasBin: true
peerDependencies:
zod: ^3.0.0
brorand@1.1.0:
resolution: {integrity: sha512-cKV8tMCEpQs4hK/ik71d6LrPOnpkpGBR0wzxqr68g2m/LB2GxVYQroAjMJZRVM1Y4BCjCKc3vAamxSzOY2RP+w==}
@ -6004,7 +6013,6 @@ packages:
bun@1.2.18:
resolution: {integrity: sha512-OR+EpNckoJN4tHMVZPaTPxDj2RgpJgJwLruTIFYbO3bQMguLd0YrmkWKYqsiihcLgm2ehIjF/H1RLfZiRa7+qQ==}
cpu: [arm64, x64, aarch64]
os: [darwin, linux, win32]
hasBin: true
@ -11821,6 +11829,11 @@ snapshots:
openapi3-ts: 4.5.0
zod: 3.25.67
'@asteasolutions/zod-to-openapi@6.4.0(zod@3.25.75)':
dependencies:
openapi3-ts: 4.5.0
zod: 3.25.75
'@aws-crypto/crc32@3.0.0':
dependencies:
'@aws-crypto/util': 3.0.0
@ -13407,6 +13420,12 @@ snapshots:
uuid: 9.0.1
zod: 3.25.67
'@braintrust/core@0.0.89':
dependencies:
'@asteasolutions/zod-to-openapi': 6.4.0(zod@3.25.75)
uuid: 9.0.1
zod: 3.25.75
'@bugsnag/cuid@3.2.1': {}
'@bundled-es-modules/cookie@2.0.1':
@ -18204,6 +18223,42 @@ snapshots:
- svelte
- vue
braintrust@0.0.209(@aws-sdk/credential-provider-web-identity@3.840.0)(react@18.3.1)(sswr@2.2.0(svelte@5.34.9))(svelte@5.34.9)(vue@3.5.17(typescript@5.8.3))(zod@3.25.67):
dependencies:
'@ai-sdk/provider': 1.1.3
'@braintrust/core': 0.0.89
'@next/env': 14.2.30
'@vercel/functions': 1.6.0(@aws-sdk/credential-provider-web-identity@3.840.0)
ai: 3.4.33(react@18.3.1)(sswr@2.2.0(svelte@5.34.9))(svelte@5.34.9)(vue@3.5.17(typescript@5.8.3))(zod@3.25.67)
argparse: 2.0.1
chalk: 4.1.2
cli-progress: 3.12.0
cors: 2.8.5
dotenv: 16.6.1
esbuild: 0.25.5
eventsource-parser: 1.1.2
express: 4.21.2
graceful-fs: 4.2.11
http-errors: 2.0.0
minimatch: 9.0.5
mustache: 4.2.0
pluralize: 8.0.0
simple-git: 3.28.0
slugify: 1.6.6
source-map: 0.7.4
uuid: 9.0.1
zod: 3.25.67
zod-to-json-schema: 3.24.6(zod@3.25.67)
transitivePeerDependencies:
- '@aws-sdk/credential-provider-web-identity'
- openai
- react
- solid-js
- sswr
- supports-color
- svelte
- vue
brorand@1.1.0: {}
browser-assert@1.2.1: {}