mirror of https://github.com/buster-so/buster.git
Refactor SQL execution tools to improve code readability and maintainability. Consolidate function parameters into single lines and enhance error handling in execution results. Update imports for consistency across files.
This commit is contained in:
parent
ee99a835c3
commit
ca2d3e940c
|
@ -19,10 +19,7 @@ import {
|
|||
// Using keyof with the inferred type ensures we're using the actual schema keys
|
||||
const FINAL_RESPONSE_KEY = 'final_response' as const satisfies keyof DoneToolInput;
|
||||
|
||||
export function createDoneToolDelta(
|
||||
doneToolState: DoneToolState,
|
||||
context: DoneToolContext
|
||||
) {
|
||||
export function createDoneToolDelta(doneToolState: DoneToolState, context: DoneToolContext) {
|
||||
return async function doneToolDelta(
|
||||
options: { inputTextDelta: string } & ToolCallOptions
|
||||
): Promise<void> {
|
||||
|
|
|
@ -6,8 +6,8 @@ import {
|
|||
} from '../../../utils/streaming/optimistic-json-parser';
|
||||
import type { ExecuteSqlContext, ExecuteSqlInput, ExecuteSqlState } from './execute-sql';
|
||||
import {
|
||||
createExecuteSqlReasoningEntry,
|
||||
createExecuteSqlRawLlmMessageEntry,
|
||||
createExecuteSqlReasoningEntry,
|
||||
} from './helpers/execute-sql-transform-helper';
|
||||
|
||||
// Type-safe key extraction from the schema
|
||||
|
@ -72,4 +72,4 @@ export function createExecuteSqlDelta(state: ExecuteSqlState, context: ExecuteSq
|
|||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,10 +6,15 @@ import {
|
|||
createPermissionErrorMessage,
|
||||
validateSqlPermissions,
|
||||
} from '../../../utils/sql-permissions';
|
||||
import type { ExecuteSqlContext, ExecuteSqlInput, ExecuteSqlOutput, ExecuteSqlState } from './execute-sql';
|
||||
import type {
|
||||
ExecuteSqlContext,
|
||||
ExecuteSqlInput,
|
||||
ExecuteSqlOutput,
|
||||
ExecuteSqlState,
|
||||
} from './execute-sql';
|
||||
import {
|
||||
createExecuteSqlReasoningEntry,
|
||||
createExecuteSqlRawLlmMessageEntry,
|
||||
createExecuteSqlReasoningEntry,
|
||||
} from './helpers/execute-sql-transform-helper';
|
||||
|
||||
/**
|
||||
|
@ -283,36 +288,38 @@ export function createExecuteSqlExecute(state: ExecuteSqlState, context: Execute
|
|||
const executionResults = await Promise.allSettled(executionPromises);
|
||||
|
||||
// Process results
|
||||
const results: ExecuteSqlOutput['results'] = executionResults.map((executionResult, index) => {
|
||||
const sql = statements[index] || '';
|
||||
const results: ExecuteSqlOutput['results'] = executionResults.map(
|
||||
(executionResult, index) => {
|
||||
const sql = statements[index] || '';
|
||||
|
||||
if (executionResult.status === 'fulfilled') {
|
||||
const { result } = executionResult.value;
|
||||
if (result.success) {
|
||||
if (executionResult.status === 'fulfilled') {
|
||||
const { result } = executionResult.value;
|
||||
if (result.success) {
|
||||
return {
|
||||
status: 'success' as const,
|
||||
sql,
|
||||
results: result.data || [],
|
||||
};
|
||||
}
|
||||
return {
|
||||
status: 'success' as const,
|
||||
status: 'error' as const,
|
||||
sql,
|
||||
results: result.data || [],
|
||||
error_message: result.error || 'Unknown error occurred',
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
status: 'error' as const,
|
||||
sql,
|
||||
error_message: result.error || 'Unknown error occurred',
|
||||
error_message: executionResult.reason?.message || 'Execution failed',
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
status: 'error' as const,
|
||||
sql,
|
||||
error_message: executionResult.reason?.message || 'Execution failed',
|
||||
};
|
||||
});
|
||||
);
|
||||
|
||||
// Update reasoning entry with results
|
||||
const endTime = Date.now();
|
||||
const executionTime = endTime - (state.startTime || endTime);
|
||||
|
||||
|
||||
// Update state with results
|
||||
state.executionResults = results;
|
||||
state.executionTime = executionTime;
|
||||
|
@ -355,4 +362,4 @@ export function createExecuteSqlExecute(state: ExecuteSqlState, context: Execute
|
|||
},
|
||||
{ name: 'execute-sql' }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,8 +2,8 @@ import { updateMessageEntries } from '@buster/database';
|
|||
import type { ToolCallOptions } from 'ai';
|
||||
import type { ExecuteSqlContext, ExecuteSqlInput, ExecuteSqlState } from './execute-sql';
|
||||
import {
|
||||
createExecuteSqlReasoningEntry,
|
||||
createExecuteSqlRawLlmMessageEntry,
|
||||
createExecuteSqlReasoningEntry,
|
||||
} from './helpers/execute-sql-transform-helper';
|
||||
|
||||
export function createExecuteSqlFinish(state: ExecuteSqlState, context: ExecuteSqlContext) {
|
||||
|
@ -36,4 +36,4 @@ export function createExecuteSqlFinish(state: ExecuteSqlState, context: ExecuteS
|
|||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,8 +2,8 @@ import { updateMessageEntries } from '@buster/database';
|
|||
import type { ToolCallOptions } from 'ai';
|
||||
import type { ExecuteSqlContext, ExecuteSqlState } from './execute-sql';
|
||||
import {
|
||||
createExecuteSqlReasoningEntry,
|
||||
createExecuteSqlRawLlmMessageEntry,
|
||||
createExecuteSqlReasoningEntry,
|
||||
} from './helpers/execute-sql-transform-helper';
|
||||
|
||||
export function createExecuteSqlStart(state: ExecuteSqlState, context: ExecuteSqlContext) {
|
||||
|
@ -36,4 +36,4 @@ export function createExecuteSqlStart(state: ExecuteSqlState, context: ExecuteSq
|
|||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,27 +33,29 @@ const ExecuteSqlStateSchema = z.object({
|
|||
isComplete: z.boolean().optional().describe('Whether input is complete'),
|
||||
startTime: z.number().optional().describe('Execution start time'),
|
||||
executionTime: z.number().optional().describe('Total execution time in ms'),
|
||||
executionResults: z.array(
|
||||
z.discriminatedUnion('status', [
|
||||
z.object({
|
||||
status: z.literal('success'),
|
||||
sql: z.string(),
|
||||
results: z.array(z.record(z.unknown())),
|
||||
}),
|
||||
z.object({
|
||||
status: z.literal('error'),
|
||||
sql: z.string(),
|
||||
error_message: z.string(),
|
||||
}),
|
||||
])
|
||||
).optional().describe('Execution results'),
|
||||
executionResults: z
|
||||
.array(
|
||||
z.discriminatedUnion('status', [
|
||||
z.object({
|
||||
status: z.literal('success'),
|
||||
sql: z.string(),
|
||||
results: z.array(z.record(z.unknown())),
|
||||
}),
|
||||
z.object({
|
||||
status: z.literal('error'),
|
||||
sql: z.string(),
|
||||
error_message: z.string(),
|
||||
}),
|
||||
])
|
||||
)
|
||||
.optional()
|
||||
.describe('Execution results'),
|
||||
});
|
||||
|
||||
export type ExecuteSqlInput = z.infer<typeof ExecuteSqlInputSchema>;
|
||||
export type ExecuteSqlContext = z.infer<typeof ExecuteSqlContextSchema>;
|
||||
export type ExecuteSqlState = z.infer<typeof ExecuteSqlStateSchema>;
|
||||
|
||||
|
||||
const ExecuteSqlOutputSchema = z.object({
|
||||
results: z.array(
|
||||
z.discriminatedUnion('status', [
|
||||
|
|
|
@ -1,6 +1,4 @@
|
|||
import type {
|
||||
ChatMessageReasoningMessage,
|
||||
} from '@buster/server-shared/chats';
|
||||
import type { ChatMessageReasoningMessage } from '@buster/server-shared/chats';
|
||||
import type { CoreMessage } from 'ai';
|
||||
import type { ExecuteSqlOutput, ExecuteSqlState } from '../execute-sql';
|
||||
|
||||
|
@ -22,11 +20,11 @@ export function createExecuteSqlReasoningEntry(
|
|||
let fullContent = statementsYaml;
|
||||
if (state.executionResults && state.executionResults.length > 0) {
|
||||
let resultsYaml = '\n\nresults:';
|
||||
|
||||
|
||||
for (const result of state.executionResults) {
|
||||
resultsYaml += `\n - status: ${result.status}`;
|
||||
resultsYaml += `\n sql: ${result.sql}`;
|
||||
|
||||
|
||||
if (result.status === 'error' && result.error_message) {
|
||||
resultsYaml += `\n error_message: |-\n ${result.error_message}`;
|
||||
} else if (result.status === 'success' && result.results) {
|
||||
|
@ -39,18 +37,18 @@ export function createExecuteSqlReasoningEntry(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fullContent += resultsYaml;
|
||||
}
|
||||
|
||||
// Calculate title based on results
|
||||
let title = 'Executing SQL';
|
||||
let status: 'loading' | 'completed' | 'failed' = 'loading';
|
||||
|
||||
|
||||
if (state.executionResults && state.isComplete) {
|
||||
const successCount = state.executionResults.filter((r) => r.status === 'success').length;
|
||||
const failedCount = state.executionResults.filter((r) => r.status === 'error').length;
|
||||
|
||||
|
||||
if (failedCount > 0) {
|
||||
title = `Ran ${successCount} validation ${successCount === 1 ? 'query' : 'queries'}, ${failedCount} failed`;
|
||||
status = 'failed';
|
||||
|
@ -76,7 +74,7 @@ export function createExecuteSqlReasoningEntry(
|
|||
}
|
||||
|
||||
const fileId = `sql-${toolCallId}`;
|
||||
|
||||
|
||||
return {
|
||||
id: toolCallId,
|
||||
type: 'files',
|
||||
|
@ -123,4 +121,4 @@ export function createExecuteSqlRawLlmMessageEntry(
|
|||
},
|
||||
],
|
||||
} as CoreMessage;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -259,4 +259,4 @@ describe('Sequential Thinking Tool Transform Helper', () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -346,4 +346,4 @@ describe('Sequential Thinking Tool Streaming Tests', () => {
|
|||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -113,7 +113,8 @@ describe('Sequential Thinking Tool Integration Tests', () => {
|
|||
|
||||
// Stream complete JSON
|
||||
await deltaHandler({
|
||||
inputTextDelta: '{"thought": "Step 1: Understanding the problem", "thoughtNumber": 1, "nextThoughtNeeded": true}',
|
||||
inputTextDelta:
|
||||
'{"thought": "Step 1: Understanding the problem", "thoughtNumber": 1, "nextThoughtNeeded": true}',
|
||||
toolCallId,
|
||||
messages: [],
|
||||
});
|
||||
|
@ -152,7 +153,9 @@ describe('Sequential Thinking Tool Integration Tests', () => {
|
|||
});
|
||||
|
||||
// Verify state was finalized
|
||||
expect(state.thought).toBe('After analyzing the problem, the solution is to use a recursive approach');
|
||||
expect(state.thought).toBe(
|
||||
'After analyzing the problem, the solution is to use a recursive approach'
|
||||
);
|
||||
expect(state.nextThoughtNeeded).toBe(false);
|
||||
expect(state.thoughtNumber).toBe(3);
|
||||
expect(state.entry_id).toBe(toolCallId);
|
||||
|
@ -174,13 +177,13 @@ describe('Sequential Thinking Tool Integration Tests', () => {
|
|||
|
||||
// First thought - start and delta
|
||||
await startHandler({ toolCallId, messages: [] });
|
||||
|
||||
|
||||
await deltaHandler({
|
||||
inputTextDelta: '{"thought": "First, let me understand',
|
||||
toolCallId,
|
||||
messages: [],
|
||||
});
|
||||
|
||||
|
||||
await deltaHandler({
|
||||
inputTextDelta: ' the requirements", "thoughtNumber": 1}',
|
||||
toolCallId,
|
||||
|
@ -227,9 +230,7 @@ describe('Sequential Thinking Tool Integration Tests', () => {
|
|||
const toolCallId = randomUUID();
|
||||
|
||||
// Should not throw, but handle error gracefully
|
||||
await expect(
|
||||
startHandler({ toolCallId, messages: [] })
|
||||
).resolves.not.toThrow();
|
||||
await expect(startHandler({ toolCallId, messages: [] })).resolves.not.toThrow();
|
||||
|
||||
// State should still be updated even if database fails
|
||||
expect(state.entry_id).toBe(toolCallId);
|
||||
|
@ -268,4 +269,4 @@ describe('Sequential Thinking Tool Integration Tests', () => {
|
|||
expect(state.thoughtNumber).toBe(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -19,7 +19,9 @@ describe('Sequential Thinking Tool', () => {
|
|||
|
||||
const tool = createSequentialThinkingTool(context);
|
||||
|
||||
expect(tool.description).toContain('detailed tool for dynamic and reflective problem-solving');
|
||||
expect(tool.description).toContain(
|
||||
'detailed tool for dynamic and reflective problem-solving'
|
||||
);
|
||||
expect(tool.description).toContain('When to use this tool');
|
||||
expect(tool.description).toContain('Key features');
|
||||
expect(tool.inputSchema).toBeDefined();
|
||||
|
@ -129,4 +131,4 @@ describe('Sequential Thinking Tool', () => {
|
|||
expect(floatParseResult.success).toBe(false);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -524,10 +524,7 @@ export function createCreateDashboardsExecute<
|
|||
|
||||
try {
|
||||
// Call the main function directly instead of delegating
|
||||
const result = await createDashboardFiles(
|
||||
input as CreateDashboardFilesParams,
|
||||
context
|
||||
);
|
||||
const result = await createDashboardFiles(input as CreateDashboardFilesParams, context);
|
||||
|
||||
// Update state files with final results (IDs, versions, status)
|
||||
if (result && typeof result === 'object') {
|
||||
|
@ -627,4 +624,4 @@ export function createCreateDashboardsExecute<
|
|||
},
|
||||
{ name: 'create-dashboards-execute' }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -270,7 +270,10 @@ async function processDashboardFileUpdate(
|
|||
|
||||
// Main modify dashboard files function
|
||||
const modifyDashboardFiles = wrapTraced(
|
||||
async (params: UpdateFilesParams, context: ModifyDashboardsAgentContext): Promise<ModifyFilesOutput> => {
|
||||
async (
|
||||
params: UpdateFilesParams,
|
||||
context: ModifyDashboardsAgentContext
|
||||
): Promise<ModifyFilesOutput> => {
|
||||
const startTime = Date.now();
|
||||
|
||||
// Get context values (for logging/tracking)
|
||||
|
@ -518,10 +521,7 @@ export function createModifyDashboardsExecute<
|
|||
}));
|
||||
|
||||
// Call the main function directly instead of delegating
|
||||
const result = await modifyDashboardFiles(
|
||||
input as UpdateFilesParams,
|
||||
context
|
||||
);
|
||||
const result = await modifyDashboardFiles(input as UpdateFilesParams, context);
|
||||
|
||||
// Update state files with final results
|
||||
if (result.files) {
|
||||
|
@ -578,4 +578,4 @@ export function createModifyDashboardsExecute<
|
|||
},
|
||||
{ name: 'modify-dashboards-execute' }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,10 @@ import { eq, inArray } from 'drizzle-orm';
|
|||
import * as yaml from 'yaml';
|
||||
import { z } from 'zod';
|
||||
import { getWorkflowDataSourceManager } from '../../../utils/data-source-manager';
|
||||
import { createPermissionErrorMessage, validateSqlPermissions } from '../../../utils/sql-permissions';
|
||||
import {
|
||||
createPermissionErrorMessage,
|
||||
validateSqlPermissions,
|
||||
} from '../../../utils/sql-permissions';
|
||||
import { validateAndAdjustBarLineAxes } from '../bar-line-axis-validator';
|
||||
import { trackFileAssociations } from '../file-tracking-helper';
|
||||
import { ensureTimeFrameQuoted } from '../time-frame-helper';
|
||||
|
@ -607,7 +610,10 @@ async function processMetricFileUpdate(
|
|||
|
||||
// Main modify metrics function
|
||||
const modifyMetricFiles = wrapTraced(
|
||||
async (params: UpdateFilesParams, context: ModifyMetricsAgentContext): Promise<ModifyFilesOutput> => {
|
||||
async (
|
||||
params: UpdateFilesParams,
|
||||
context: ModifyMetricsAgentContext
|
||||
): Promise<ModifyFilesOutput> => {
|
||||
const startTime = Date.now();
|
||||
|
||||
// Get context values
|
||||
|
@ -857,10 +863,7 @@ export function createModifyMetricsExecute<
|
|||
|
||||
try {
|
||||
// Call the main function directly instead of delegating
|
||||
const result = await modifyMetricFiles(
|
||||
input as UpdateFilesParams,
|
||||
context
|
||||
);
|
||||
const result = await modifyMetricFiles(input as UpdateFilesParams, context);
|
||||
|
||||
// Update state files with results
|
||||
if (result.files && Array.isArray(result.files)) {
|
||||
|
@ -947,4 +950,4 @@ export function createModifyMetricsExecute<
|
|||
},
|
||||
{ name: 'modify-metrics-execute' }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue