Merge pull request #436 from buster-so/dal/post-processing-json-output

Dal/post-processing-json-output
This commit is contained in:
dal 2025-07-08 09:57:17 -07:00 committed by GitHub
commit 9b18e791ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 86 additions and 35 deletions

View File

@ -93,6 +93,12 @@ pnpm run test:watch
- **No implicit returns** - All code paths must return - **No implicit returns** - All code paths must return
- **Consistent file casing** - Enforced by TypeScript - **Consistent file casing** - Enforced by TypeScript
### Type Safety and Zod Best Practices
- We care deeply about type safety and we use Zod schemas and then export them as types
- We prefer using type abstractions over `.parse()` method calls
- Always export Zod schemas as TypeScript types to leverage static type checking
- Avoid runtime type checking when compile-time type checks are sufficient
### Biome Rules (Key Enforcements) ### Biome Rules (Key Enforcements)
- **`useImportType: "warn"`** - Use type-only imports when possible - **`useImportType: "warn"`** - Use type-only imports when possible
- **`noExplicitAny: "error"`** - Never use `any` type - **`noExplicitAny: "error"`** - Never use `any` type
@ -108,4 +114,11 @@ pnpm run test:watch
- `console.warn` for warning messages - `console.warn` for warning messages
- `console.error` for error messages - `console.error` for error messages
[... rest of the existing content remains the same ...] ## Error Handling and Logging Philosophy
- We care deeply about error handling and logging
- Key principles for error management:
- Catch errors effectively and thoughtfully
- Consider the state errors put the system into
- Implement comprehensive unit tests for error scenarios
- Log errors strategically for effective debugging
- Avoid over-logging while ensuring sufficient context for troubleshooting

View File

@ -4,7 +4,7 @@ import postProcessingWorkflow, {
import { eq, getDb, messages } from '@buster/database'; import { eq, getDb, messages } from '@buster/database';
import { logger, schemaTask } from '@trigger.dev/sdk/v3'; import { logger, schemaTask } from '@trigger.dev/sdk/v3';
import { initLogger, wrapTraced } from 'braintrust'; import { initLogger, wrapTraced } from 'braintrust';
import { z } from 'zod'; import { z } from 'zod/v4';
import { import {
buildWorkflowInput, buildWorkflowInput,
fetchConversationHistory, fetchConversationHistory,
@ -13,23 +13,18 @@ import {
fetchUserDatasets, fetchUserDatasets,
sendSlackNotification, sendSlackNotification,
} from './helpers'; } from './helpers';
import { import { DataFetchError, MessageNotFoundError, TaskInputSchema } from './types';
DataFetchError,
MessageNotFoundError,
TaskInputSchema,
type TaskOutputSchema,
} from './types';
import type { TaskInput, TaskOutput } from './types'; import type { TaskInput, TaskOutput } from './types';
// Schema for the subset of fields we want to save to the database // Schema for the subset of fields we want to save to the database
const PostProcessingDbDataSchema = z.object({ const PostProcessingDbDataSchema = z.object({
summaryMessage: z.string().optional(), confidence_score: z.enum(['low', 'high']),
summaryTitle: z.string().optional(), summary_message: z.string(),
formattedMessage: z.string().nullable().optional(), summary_title: z.string(),
assumptions: z assumptions: z
.array( .array(
z.object({ z.object({
descriptiveTitle: z.string(), descriptive_title: z.string(),
classification: z.enum([ classification: z.enum([
'fieldMapping', 'fieldMapping',
'tableRelationship', 'tableRelationship',
@ -59,9 +54,8 @@ const PostProcessingDbDataSchema = z.object({
}) })
) )
.optional(), .optional(),
message: z.string().optional(), tool_called: z.string(),
toolCalled: z.string(), user_name: z.string().nullable().optional(),
userName: z.string().nullable().optional(),
}); });
type PostProcessingDbData = z.infer<typeof PostProcessingDbDataSchema>; type PostProcessingDbData = z.infer<typeof PostProcessingDbDataSchema>;
@ -73,18 +67,53 @@ function extractDbFields(
workflowOutput: PostProcessingWorkflowOutput, workflowOutput: PostProcessingWorkflowOutput,
userName: string | null userName: string | null
): PostProcessingDbData { ): PostProcessingDbData {
const extracted = { logger.log('Extracting database fields from workflow output', {
summaryMessage: workflowOutput.summaryMessage, workflowOutput,
summaryTitle: workflowOutput.summaryTitle, });
formattedMessage: workflowOutput.formattedMessage,
assumptions: workflowOutput.assumptions, // Check if there are any major assumptions
message: workflowOutput.message, const hasMajorAssumptions =
toolCalled: workflowOutput.toolCalled || 'unknown', // Provide default if missing workflowOutput.assumptions?.some((assumption) => assumption.label === 'major') ?? false;
userName,
// Determine confidence score based on rules:
// - Low if toolCalled is 'flagChat'
// - Low if there are any major assumptions
// - High otherwise
let confidence_score: 'low' | 'high' = 'high';
if (workflowOutput.toolCalled === 'flagChat' || hasMajorAssumptions) {
confidence_score = 'low';
}
// Determine summary message and title
let summaryMessage: string;
let summaryTitle: string;
if (!hasMajorAssumptions && workflowOutput.flagChatMessage) {
// If no major assumptions, use flagChatMessage as summaryMessage
summaryMessage = workflowOutput.flagChatMessage;
summaryTitle = 'No Major Assumptions Identified';
} else {
// Otherwise use the provided summary fields or defaults
summaryMessage = workflowOutput.summaryMessage || 'No summary available';
summaryTitle = workflowOutput.summaryTitle || 'Summary';
}
const extracted: PostProcessingDbData = {
summary_message: summaryMessage,
summary_title: summaryTitle,
confidence_score,
assumptions: workflowOutput.assumptions?.map((assumption) => ({
descriptive_title: assumption.descriptiveTitle,
classification: assumption.classification,
explanation: assumption.explanation,
label: assumption.label,
})),
tool_called: workflowOutput.toolCalled || 'unknown', // Provide default if missing
user_name: userName,
}; };
// Validate the extracted data matches our schema // Validate the extracted data matches our schema
return PostProcessingDbDataSchema.parse(extracted); return extracted;
} }
/** /**
@ -263,21 +292,18 @@ export const messagePostProcessingTask: ReturnType<
logger.log('Checking Slack notification conditions', { logger.log('Checking Slack notification conditions', {
messageId: payload.messageId, messageId: payload.messageId,
organizationId: messageContext.organizationId, organizationId: messageContext.organizationId,
summaryTitle: dbData.summaryTitle, summaryTitle: dbData.summary_title,
summaryMessage: dbData.summaryMessage, summaryMessage: dbData.summary_message,
formattedMessage: dbData.formattedMessage, toolCalled: dbData.tool_called,
toolCalled: dbData.toolCalled,
}); });
const slackResult = await sendSlackNotification({ const slackResult = await sendSlackNotification({
organizationId: messageContext.organizationId, organizationId: messageContext.organizationId,
userName: messageContext.userName, userName: messageContext.userName,
chatId: messageContext.chatId, chatId: messageContext.chatId,
summaryTitle: dbData.summaryTitle, summaryTitle: dbData.summary_title,
summaryMessage: dbData.summaryMessage, summaryMessage: dbData.summary_message,
formattedMessage: dbData.formattedMessage, toolCalled: dbData.tool_called,
toolCalled: dbData.toolCalled,
message: dbData.message,
}); });
if (slackResult.sent) { if (slackResult.sent) {

View File

@ -224,13 +224,25 @@ No conversation history available for analysis.`);
throw new Error('No tool was called by the flag chat agent'); throw new Error('No tool was called by the flag chat agent');
} }
// Handle different tool responses
let flagChatMessage: string | undefined;
let flagChatTitle: string | undefined;
if (toolCall.toolName === 'noIssuesFound') {
flagChatMessage = toolCall.args.message;
flagChatTitle = 'No Issues Found';
} else if (toolCall.toolName === 'flagChat') {
flagChatMessage = toolCall.args.summary_message;
flagChatTitle = toolCall.args.summary_title;
}
return { return {
// Pass through all input fields // Pass through all input fields
...inputData, ...inputData,
// Add new fields from this step // Add new fields from this step
toolCalled: toolCall.toolName, toolCalled: toolCall.toolName,
flagChatMessage: toolCall.args.summary_message, flagChatMessage,
flagChatTitle: toolCall.args.summary_title, flagChatTitle,
}; };
} catch (error) { } catch (error) {
console.error('Failed to analyze chat for flagging:', error); console.error('Failed to analyze chat for flagging:', error);