diff --git a/packages/ai/src/agents/analyst-agent/analyst-agent-instructions.ts b/packages/ai/src/agents/analyst-agent/analyst-agent-instructions.ts index d2c656fb3..f857a56d8 100644 --- a/packages/ai/src/agents/analyst-agent/analyst-agent-instructions.ts +++ b/packages/ai/src/agents/analyst-agent/analyst-agent-instructions.ts @@ -314,3 +314,13 @@ export const getAnalystInstructions = async ({ sqlDialectGuidance, }); }; + +// Export the template function without dataset context for use in step files +export const createAnalystInstructionsWithoutDatasets = (sqlDialectGuidance: string): string => { + return createAnalystInstructions({ + databaseContext: '', + sqlDialectGuidance, + }) + .replace(/[\s\S]*?<\/database_context>/, '') + .trim(); +}; diff --git a/packages/ai/src/agents/analyst-agent/analyst-agent.ts b/packages/ai/src/agents/analyst-agent/analyst-agent.ts index 80d2f21ae..aa924ae7d 100644 --- a/packages/ai/src/agents/analyst-agent/analyst-agent.ts +++ b/packages/ai/src/agents/analyst-agent/analyst-agent.ts @@ -23,7 +23,7 @@ const DEFAULT_OPTIONS = { export const analystAgent = new Agent({ name: 'Analyst Agent', - instructions: getAnalystInstructions, + instructions: '', // We control the system messages in the step at stream instantiation model: anthropicCachedModel('claude-sonnet-4-20250514'), tools: { createMetrics, diff --git a/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts b/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts index 2553b69f6..6c2272b37 100644 --- a/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts +++ b/packages/ai/src/agents/think-and-prep-agent/think-and-prep-agent.ts @@ -22,7 +22,7 @@ const DEFAULT_OPTIONS = { export const thinkAndPrepAgent = new Agent({ name: 'Think and Prep Agent', - instructions: getThinkAndPrepInstructions, + instructions: '', // We control the system messages in the step at stream instantiation model: anthropicCachedModel('claude-sonnet-4-20250514'), tools: { sequentialThinking, diff --git a/packages/ai/src/agents/think-and-prep-agent/think-and-prep-instructions.ts b/packages/ai/src/agents/think-and-prep-agent/think-and-prep-instructions.ts index f81cbbad6..9cf414452 100644 --- a/packages/ai/src/agents/think-and-prep-agent/think-and-prep-instructions.ts +++ b/packages/ai/src/agents/think-and-prep-agent/think-and-prep-instructions.ts @@ -567,3 +567,15 @@ export const getThinkAndPrepInstructions = async ({ sqlDialectGuidance, }); }; + +// Export the template function without dataset context for use in step files +export const createThinkAndPrepInstructionsWithoutDatasets = ( + sqlDialectGuidance: string +): string => { + return createThinkAndPrepInstructions({ + databaseContext: '', + sqlDialectGuidance, + }) + .replace(/[\s\S]*?<\/database_context>/, '') + .trim(); +}; diff --git a/packages/ai/src/steps/analyst-step.ts b/packages/ai/src/steps/analyst-step.ts index c0aea2a1a..925f00ff3 100644 --- a/packages/ai/src/steps/analyst-step.ts +++ b/packages/ai/src/steps/analyst-step.ts @@ -4,11 +4,14 @@ import type { CoreMessage } from 'ai'; import { wrapTraced } from 'braintrust'; import { z } from 'zod'; +import { getPermissionedDatasets } from '@buster/access-controls'; import type { ChatMessageReasoningMessage, ChatMessageResponseMessage, } from '@buster/server-shared/chats'; import { analystAgent } from '../agents/analyst-agent/analyst-agent'; +import { createAnalystInstructionsWithoutDatasets } from '../agents/analyst-agent/analyst-agent-instructions'; +import { getSqlDialectGuidance } from '../agents/shared/sql-dialect-guidance'; import { ChunkProcessor } from '../utils/database/chunk-processor'; import { MessageHistorySchema, @@ -56,6 +59,10 @@ const outputSchema = z.object({ finalReasoningMessage: z.string().optional(), }); +const DEFAULT_CACHE_OPTIONS = { + anthropic: { cacheControl: { type: 'ephemeral' } }, +}; + /** * Transform reasoning/response history to match ChunkProcessor expected types */ @@ -253,6 +260,28 @@ const analystExecution = async ({ let retryCount = 0; const maxRetries = 5; + // Get database context and SQL dialect guidance + const userId = runtimeContext.get('userId'); + const dataSourceSyntax = runtimeContext.get('dataSourceSyntax'); + + const datasets = await getPermissionedDatasets(userId, 0, 1000); + + // Extract yml_content from each dataset and join with separators + const assembledYmlContent = datasets + .map((dataset: { ymlFile: string | null | undefined }) => dataset.ymlFile) + .filter((content: string | null | undefined) => content !== null && content !== undefined) + .join('\n---\n'); + + // Get dialect-specific guidance + const sqlDialectGuidance = getSqlDialectGuidance(dataSourceSyntax); + + // Create dataset system message + const createDatasetSystemMessage = (databaseContext: string): string => { + return ` +${databaseContext} +`; + }; + // Initialize chunk processor with histories from previous step // IMPORTANT: Pass histories from think-and-prep to accumulate across steps const { reasoningHistory: transformedReasoning, responseHistory: transformedResponse } = @@ -359,8 +388,25 @@ const analystExecution = async ({ const wrappedStream = wrapTraced( async () => { + // Create system messages with dataset context and instructions + const systemMessages: CoreMessage[] = [ + { + role: 'system', + content: createDatasetSystemMessage(assembledYmlContent), + providerOptions: DEFAULT_CACHE_OPTIONS, + }, + { + role: 'system', + content: createAnalystInstructionsWithoutDatasets(sqlDialectGuidance), + providerOptions: DEFAULT_CACHE_OPTIONS, + }, + ]; + + // Combine system messages with conversation messages + const messagesWithSystem = [...systemMessages, ...messages]; + // Create stream directly without retryableAgentStreamWithHealing - const stream = await analystAgent.stream(messages, { + const stream = await analystAgent.stream(messagesWithSystem, { toolCallStreaming: true, runtimeContext, maxRetries: 5, @@ -442,7 +488,7 @@ const analystExecution = async ({ continue; } - // Update messages for the retry + // Update messages for the retry (without system messages) messages = healedMessages; // Update chunk processor with the healed messages diff --git a/packages/ai/src/steps/think-and-prep-step.ts b/packages/ai/src/steps/think-and-prep-step.ts index 17fc7d487..e514556d7 100644 --- a/packages/ai/src/steps/think-and-prep-step.ts +++ b/packages/ai/src/steps/think-and-prep-step.ts @@ -1,10 +1,13 @@ +import { getPermissionedDatasets } from '@buster/access-controls'; import type { ChatMessageReasoningMessage } from '@buster/server-shared/chats'; import { createStep } from '@mastra/core'; import type { RuntimeContext } from '@mastra/core/runtime-context'; import type { CoreMessage } from 'ai'; import { wrapTraced } from 'braintrust'; import { z } from 'zod'; +import { getSqlDialectGuidance } from '../agents/shared/sql-dialect-guidance'; import { thinkAndPrepAgent } from '../agents/think-and-prep-agent/think-and-prep-agent'; +import { createThinkAndPrepInstructionsWithoutDatasets } from '../agents/think-and-prep-agent/think-and-prep-instructions'; import type { thinkAndPrepWorkflowInputSchema } from '../schemas/workflow-schemas'; import { ChunkProcessor } from '../utils/database/chunk-processor'; import { @@ -60,6 +63,10 @@ type BusterChatMessageResponse = z.infer const outputSchema = ThinkAndPrepOutputSchema; +const DEFAULT_CACHE_OPTIONS = { + anthropic: { cacheControl: { type: 'ephemeral' } }, +}; + // Helper function to create the result object const createStepResult = ( finished: boolean, @@ -150,6 +157,27 @@ const thinkAndPrepExecution = async ({ ); try { + // Get database context and SQL dialect guidance + const userId = runtimeContext.get('userId'); + const dataSourceSyntax = runtimeContext.get('dataSourceSyntax'); + + const datasets = await getPermissionedDatasets(userId, 0, 1000); + + // Extract yml_content from each dataset and join with separators + const assembledYmlContent = datasets + .map((dataset: { ymlFile: string | null | undefined }) => dataset.ymlFile) + .filter((content: string | null | undefined) => content !== null && content !== undefined) + .join('\n---\n'); + + // Get dialect-specific guidance + const sqlDialectGuidance = getSqlDialectGuidance(dataSourceSyntax); + + // Create dataset system message + const createDatasetSystemMessage = (databaseContext: string): string => { + return ` +${databaseContext} +`; + }; const todos = inputData['create-todos'].todos; // Standardize messages from workflow inputs @@ -223,8 +251,25 @@ const thinkAndPrepExecution = async ({ const wrappedStream = wrapTraced( async () => { + // Create system messages with dataset context and instructions + const systemMessages: CoreMessage[] = [ + { + role: 'system', + content: createDatasetSystemMessage(assembledYmlContent), + providerOptions: DEFAULT_CACHE_OPTIONS, + }, + { + role: 'system', + content: createThinkAndPrepInstructionsWithoutDatasets(sqlDialectGuidance), + providerOptions: DEFAULT_CACHE_OPTIONS, + }, + ]; + + // Combine system messages with conversation messages + const messagesWithSystem = [...systemMessages, ...messages]; + // Create stream directly without retryableAgentStreamWithHealing - const stream = await thinkAndPrepAgent.stream(messages, { + const stream = await thinkAndPrepAgent.stream(messagesWithSystem, { toolCallStreaming: true, runtimeContext, maxRetries: 5, @@ -316,7 +361,7 @@ const thinkAndPrepExecution = async ({ continue; } - // Update messages for the retry + // Update messages for the retry (without system messages) messages = healedMessages; // Update chunk processor with the healed messages