metadata and sql tools

This commit is contained in:
dal 2025-10-06 13:00:15 -06:00
parent d776ef6fba
commit 168777f9df
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
17 changed files with 647 additions and 1 deletions

View File

@ -24,6 +24,7 @@ import slackRoutes from './slack';
import sqlRoutes from './sql';
import supportRoutes from './support';
import titleRoutes from './title';
import { tools } from './tools';
import userRoutes from './users';
const app = new Hono()
@ -44,6 +45,7 @@ const app = new Hono()
.route('/support', supportRoutes)
.route('/security', securityRoutes)
.route('/shortcuts', shortcutsRoutes)
.route('/tools', tools)
.route('/organizations', organizationRoutes)
.route('/dictionaries', dictionariesRoutes)
.route('/title', titleRoutes)

View File

@ -0,0 +1,5 @@
import { Hono } from 'hono';
import metadata from './metadata';
import sql from './sql';
export const tools = new Hono().route('/sql', sql).route('/metadata', metadata);

View File

@ -0,0 +1,41 @@
import { getDatasetMetadata } from '@buster/database/queries';
import type { GetMetadataRequest, GetMetadataResponse } from '@buster/server-shared';
import type { ApiKeyContext } from '@buster/server-shared';
import { HTTPException } from 'hono/http-exception';
/**
* Handler for retrieving dataset metadata via API key authentication
*
* This handler:
* 1. Validates API key has access to the organization
* 2. Queries for dataset matching database, schema, name, and organization
* 3. Returns the dataset's metadata column
*
* @param request - The metadata request containing database, schema, and name
* @param apiKeyContext - The authenticated API key context
* @returns The dataset metadata
*/
export async function getMetadataHandler(
request: GetMetadataRequest,
apiKeyContext: ApiKeyContext
): Promise<GetMetadataResponse> {
const { organizationId } = apiKeyContext;
// Get dataset metadata
const result = await getDatasetMetadata({
database: request.database,
schema: request.schema,
name: request.name,
organizationId,
});
if (!result || !result.metadata) {
throw new HTTPException(404, {
message: `Dataset not found: ${request.database}.${request.schema}.${request.name}`,
});
}
return {
metadata: result.metadata,
};
}

View File

@ -0,0 +1,41 @@
import { GetMetadataRequestSchema } from '@buster/server-shared';
import { zValidator } from '@hono/zod-validator';
import { Hono } from 'hono';
import { HTTPException } from 'hono/http-exception';
import { createApiKeyAuthMiddleware } from '../../../../middleware/api-key-auth';
import { getMetadataHandler } from './GET';
const app = new Hono()
// Apply API key authentication middleware to all routes
.use('*', createApiKeyAuthMiddleware())
// GET /tools/metadata - Retrieve dataset metadata with API key auth
.get('/', zValidator('query', GetMetadataRequestSchema), async (c) => {
const request = c.req.valid('query');
const apiKeyContext = c.get('apiKey');
if (!apiKeyContext) {
throw new HTTPException(401, {
message: 'API key authentication required',
});
}
const response = await getMetadataHandler(request, apiKeyContext);
return c.json(response);
})
// Error handler for metadata tool routes
.onError((err, c) => {
console.error('Metadata tool API error:', err);
// Let HTTPException responses pass through
if (err instanceof HTTPException) {
return err.getResponse();
}
// Default error response
return c.json({ error: 'Internal server error' }, 500);
});
export default app;

View File

@ -0,0 +1,124 @@
import { createPermissionErrorMessage, validateSqlPermissions } from '@buster/access-controls';
import { executeMetricQuery } from '@buster/data-source';
import type { Credentials } from '@buster/data-source';
import { getDataSourceById, getDataSourceCredentials } from '@buster/database/queries';
import type { RunSqlRequest, RunSqlResponse } from '@buster/server-shared';
import type { ApiKeyContext } from '@buster/server-shared';
import { HTTPException } from 'hono/http-exception';
/**
* Handler for running SQL queries against data sources via API key authentication
*
* This handler:
* 1. Validates API key has access to the organization
* 2. Verifies data source belongs to API key's organization
* 3. Validates SQL permissions against user's permissioned datasets
* 4. Executes the query with retry logic and timeout handling
* 5. Returns the data with metadata and pagination info
*
* @param request - The SQL query request containing data_source_id and sql
* @param apiKeyContext - The authenticated API key context
* @returns The query results with metadata
*/
export async function runSqlHandler(
request: RunSqlRequest,
apiKeyContext: ApiKeyContext
): Promise<RunSqlResponse> {
const { organizationId, ownerId } = apiKeyContext;
// Get data source details
const dataSource = await getDataSourceById(request.data_source_id);
if (!dataSource) {
throw new HTTPException(404, {
message: 'Data source not found',
});
}
// Verify data source belongs to API key's organization
if (dataSource.organizationId !== organizationId) {
throw new HTTPException(403, {
message: 'You do not have permission to access this data source',
});
}
// Validate SQL against user's permissioned datasets
const permissionResult = await validateSqlPermissions(request.sql, ownerId, dataSource.type);
if (!permissionResult.isAuthorized) {
const errorMessage =
permissionResult.error ||
createPermissionErrorMessage(
permissionResult.unauthorizedTables,
permissionResult.unauthorizedColumns
);
throw new HTTPException(403, {
message: errorMessage,
});
}
// Get data source credentials from vault
let credentials: Credentials;
try {
const rawCredentials = await getDataSourceCredentials({
dataSourceId: request.data_source_id,
});
// Validate credential type matches data source type
if (rawCredentials.type && rawCredentials.type !== dataSource.type) {
console.warn(
`Credential type mismatch: credentials have type '${rawCredentials.type}' but data source has type '${dataSource.type}'. Using data source type.`
);
}
// Use data source type as the source of truth
credentials = {
...rawCredentials,
type: dataSource.type,
} as Credentials;
} catch (error) {
console.error('Failed to retrieve data source credentials:', error);
throw new HTTPException(500, {
message: 'Failed to access data source',
});
}
// Execute query using the shared utility with 5000 row limit
try {
// Request one extra row to detect if there are more records
const result = await executeMetricQuery(request.data_source_id, request.sql, credentials, {
maxRows: 5001,
timeout: 60000, // 60 seconds
retryDelays: [1000, 3000, 6000], // 1s, 3s, 6s
});
// Trim to 5000 rows and check if there are more records
const hasMore = result.data.length > 5000;
const trimmedData = result.data.slice(0, 5000);
// Use the data source's hasMoreRecords if available, otherwise use our local check
// This ensures we have a single source of truth for pagination state
const hasMoreRecords = result.hasMoreRecords !== undefined ? result.hasMoreRecords : hasMore;
const response: RunSqlResponse = {
data: trimmedData,
data_metadata: result.dataMetadata,
has_more_records: hasMoreRecords,
};
return response;
} catch (error) {
console.error('Query execution failed:', error);
if (error instanceof Error) {
throw new HTTPException(500, {
message: `Query execution failed: ${error.message}`,
});
}
throw new HTTPException(500, {
message: 'Query execution failed',
});
}
}

View File

@ -0,0 +1,41 @@
import { RunSqlRequestSchema } from '@buster/server-shared';
import { zValidator } from '@hono/zod-validator';
import { Hono } from 'hono';
import { HTTPException } from 'hono/http-exception';
import { createApiKeyAuthMiddleware } from '../../../../middleware/api-key-auth';
import { runSqlHandler } from './POST';
const app = new Hono()
// Apply API key authentication middleware to all routes
.use('*', createApiKeyAuthMiddleware())
// POST /tools/sql - Execute SQL query against data source with API key auth
.post('/', zValidator('json', RunSqlRequestSchema), async (c) => {
const request = c.req.valid('json');
const apiKeyContext = c.get('apiKey');
if (!apiKeyContext) {
throw new HTTPException(401, {
message: 'API key authentication required',
});
}
const response = await runSqlHandler(request, apiKeyContext);
return c.json(response);
})
// Error handler for SQL tool routes
.onError((err, c) => {
console.error('SQL tool API error:', err);
// Let HTTPException responses pass through
if (err instanceof HTTPException) {
return err.getResponse();
}
// Default error response
return c.json({ error: 'Internal server error' }, 500);
});
export default app;

View File

@ -6,6 +6,8 @@ import {
LS_TOOL_NAME,
MULTI_EDIT_FILE_TOOL_NAME,
READ_FILE_TOOL_NAME,
RETRIEVE_METADATA_TOOL_NAME,
RUN_SQL_TOOL_NAME,
TODO_WRITE_TOOL_NAME,
WRITE_FILE_TOOL_NAME,
createBashTool,
@ -15,6 +17,8 @@ import {
createLsTool,
createMultiEditFileTool,
createReadFileTool,
createRetrieveMetadataTool,
createRunSqlTool,
createTaskTool,
createTodoWriteTool,
createWriteFileTool,
@ -60,6 +64,15 @@ export async function createAnalyticsEngineerToolset(
workingDirectory: analyticsEngineerAgentOptions.folder_structure,
todosList: analyticsEngineerAgentOptions.todosList,
});
const runSqlTool = createRunSqlTool({
messageId: analyticsEngineerAgentOptions.messageId,
apiKey: process.env.BUSTER_API_KEY || '',
apiUrl: process.env.BUSTER_API_URL || 'http://localhost:3000',
});
const retrieveMetadataTool = createRetrieveMetadataTool({
apiKey: process.env.BUSTER_API_KEY || '',
apiUrl: process.env.BUSTER_API_URL || 'http://localhost:3000',
});
// Conditionally create task tool (only for main agent, not for subagents)
const taskTool = !analyticsEngineerAgentOptions.isSubagent
? createTaskTool({
@ -87,6 +100,8 @@ export async function createAnalyticsEngineerToolset(
[MULTI_EDIT_FILE_TOOL_NAME]: multiEditFileTool,
[LS_TOOL_NAME]: lsTool,
[TODO_WRITE_TOOL_NAME]: todosTool,
[RUN_SQL_TOOL_NAME]: runSqlTool,
[RETRIEVE_METADATA_TOOL_NAME]: retrieveMetadataTool,
...(taskTool ? { taskTool } : {}),
};
}

View File

@ -0,0 +1,83 @@
import { wrapTraced } from 'braintrust';
import {
RETRIEVE_METADATA_TOOL_NAME,
type RetrieveMetadataContext,
type RetrieveMetadataInput,
type RetrieveMetadataOutput,
} from './retrieve-metadata';
/**
* Retrieve dataset metadata via API endpoint
*/
async function executeApiRequest(
database: string,
schema: string,
name: string,
context: RetrieveMetadataContext
): Promise<{
success: boolean;
data?: RetrieveMetadataOutput;
error?: string;
}> {
try {
// Build query string
const params = new URLSearchParams({
database,
schema,
name,
});
const apiEndpoint = `${context.apiUrl}/api/v2/tools/metadata?${params.toString()}`;
const response = await fetch(apiEndpoint, {
method: 'GET',
headers: {
Authorization: `Bearer ${context.apiKey}`,
},
});
if (!response.ok) {
const errorData = (await response.json().catch(() => ({}))) as { error?: string };
const errorMessage = errorData.error || `HTTP ${response.status}: ${response.statusText}`;
return {
success: false,
error: errorMessage,
};
}
const result = (await response.json()) as RetrieveMetadataOutput;
return {
success: true,
data: result,
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Metadata retrieval failed';
return {
success: false,
error: errorMessage,
};
}
}
// Factory function that creates the execute function with proper context typing
export function createRetrieveMetadataExecute(context: RetrieveMetadataContext) {
return wrapTraced(
async (input: RetrieveMetadataInput): Promise<RetrieveMetadataOutput> => {
const { database, schema, name } = input;
// Execute API request
const result = await executeApiRequest(database, schema, name, context);
if (result.success && result.data) {
return result.data;
}
// Throw error if retrieval failed
throw new Error(result.error || 'Metadata retrieval failed');
},
{ name: RETRIEVE_METADATA_TOOL_NAME }
);
}

View File

@ -0,0 +1,39 @@
import { tool } from 'ai';
import { z } from 'zod';
import { createRetrieveMetadataExecute } from './retrieve-metadata-execute';
export const RETRIEVE_METADATA_TOOL_NAME = 'retrieveMetadata';
export const RetrieveMetadataInputSchema = z.object({
database: z.string().min(1).describe('Database name where the dataset resides'),
schema: z.string().min(1).describe('Schema name where the dataset resides'),
name: z.string().min(1).describe('Dataset/table name'),
});
const RetrieveMetadataContextSchema = z.object({
apiKey: z.string().describe('API key for authentication'),
apiUrl: z.string().describe('Base URL of the API server'),
});
export type RetrieveMetadataInput = z.infer<typeof RetrieveMetadataInputSchema>;
export type RetrieveMetadataContext = z.infer<typeof RetrieveMetadataContextSchema>;
const RetrieveMetadataOutputSchema = z.object({
metadata: z
.record(z.unknown())
.describe('Dataset metadata containing column profiles and statistics'),
});
export type RetrieveMetadataOutput = z.infer<typeof RetrieveMetadataOutputSchema>;
// Factory function to create the retrieve-metadata tool
export function createRetrieveMetadataTool(context: RetrieveMetadataContext) {
return tool({
description: `Use this to retrieve metadata about a dataset from the data warehouse.
This tool fetches detailed metadata including column profiles, statistics, distributions, and other analytical information about tables/datasets.
The metadata can help understand data structure, quality, and characteristics before querying.`,
inputSchema: RetrieveMetadataInputSchema,
outputSchema: RetrieveMetadataOutputSchema,
execute: createRetrieveMetadataExecute(context),
});
}

View File

@ -0,0 +1,138 @@
import { wrapTraced } from 'braintrust';
import {
RUN_SQL_TOOL_NAME,
type RunSqlContext,
type RunSqlInput,
type RunSqlOutput,
} from './run-sql';
/**
* Execute SQL query via API endpoint
*/
async function executeApiRequest(
dataSourceId: string,
sql: string,
context: RunSqlContext
): Promise<{
success: boolean;
data?: RunSqlOutput;
error?: string;
}> {
const MAX_RETRIES = 3;
const RETRY_DELAYS = [1000, 3000, 6000]; // 1s, 3s, 6s
if (!sql.trim()) {
return { success: false, error: 'SQL query cannot be empty' };
}
// Attempt execution with retries
for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) {
try {
const apiEndpoint = `${context.apiUrl}/api/v2/tools/sql`;
const response = await fetch(apiEndpoint, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${context.apiKey}`,
},
body: JSON.stringify({
data_source_id: dataSourceId,
sql,
}),
});
if (!response.ok) {
const errorData = (await response.json().catch(() => ({}))) as { error?: string };
const errorMessage = errorData.error || `HTTP ${response.status}: ${response.statusText}`;
// Check if error is timeout-related
const isTimeout =
errorMessage.toLowerCase().includes('timeout') ||
errorMessage.toLowerCase().includes('timed out');
if (isTimeout && attempt < MAX_RETRIES) {
// Wait before retry
const delay = RETRY_DELAYS[attempt] || 6000;
console.warn(
`[run-sql] Query timeout on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`,
{
sql: `${sql.substring(0, 100)}...`,
attempt: attempt + 1,
nextDelay: delay,
}
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue; // Retry
}
// Not a timeout or no more retries
return {
success: false,
error: errorMessage,
};
}
const result = (await response.json()) as RunSqlOutput;
return {
success: true,
data: result,
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'SQL execution failed';
const isTimeout =
errorMessage.toLowerCase().includes('timeout') ||
errorMessage.toLowerCase().includes('timed out') ||
errorMessage.toLowerCase().includes('fetch failed');
if (isTimeout && attempt < MAX_RETRIES) {
// Wait before retry
const delay = RETRY_DELAYS[attempt] || 6000;
console.warn(
`[run-sql] Query timeout (exception) on attempt ${attempt + 1}/${MAX_RETRIES + 1}. Retrying in ${delay}ms...`,
{
sql: `${sql.substring(0, 100)}...`,
attempt: attempt + 1,
nextDelay: delay,
error: errorMessage,
}
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue; // Retry
}
// Not a timeout or no more retries
return {
success: false,
error: errorMessage,
};
}
}
// Should not reach here, but just in case
return {
success: false,
error: 'Max retries exceeded for SQL execution',
};
}
// Factory function that creates the execute function with proper context typing
export function createRunSqlExecute(context: RunSqlContext) {
return wrapTraced(
async (input: RunSqlInput): Promise<RunSqlOutput> => {
const { data_source_id, sql } = input;
// Execute SQL via API
const result = await executeApiRequest(data_source_id, sql, context);
if (result.success && result.data) {
return result.data;
}
// Throw error if execution failed
throw new Error(result.error || 'Query execution failed');
},
{ name: RUN_SQL_TOOL_NAME }
);
}

View File

@ -0,0 +1,49 @@
import { tool } from 'ai';
import { z } from 'zod';
import { createRunSqlExecute } from './run-sql-execute';
export const RUN_SQL_TOOL_NAME = 'runSql';
export const RunSqlInputSchema = z.object({
data_source_id: z.string().uuid().describe('UUID of the data source to execute SQL against'),
sql: z
.string()
.min(1)
.describe(
`SQL query to execute.
YOU MUST USE THE <SCHEMA_NAME>.<TABLE_NAME> syntax/qualifier for all table names.
NEVER use SELECT * on physical tables - for security purposes you must explicitly select the columns you intend to use.
NEVER query system tables or use 'SHOW' statements as these will fail to execute.
Queries without these requirements will fail to execute.`
),
});
const RunSqlContextSchema = z.object({
apiKey: z.string().describe('API key for authentication'),
apiUrl: z.string().describe('Base URL of the API server'),
messageId: z.string().describe('Message ID for database updates'),
});
export type RunSqlInput = z.infer<typeof RunSqlInputSchema>;
export type RunSqlContext = z.infer<typeof RunSqlContextSchema>;
const RunSqlOutputSchema = z.object({
data: z.array(z.record(z.unknown())).describe('Query results'),
data_metadata: z.record(z.unknown()).describe('Metadata about the data columns'),
has_more_records: z.boolean().describe('Whether there are more records available'),
});
export type RunSqlOutput = z.infer<typeof RunSqlOutputSchema>;
// Factory function to create the run-sql tool
export function createRunSqlTool(context: RunSqlContext) {
return tool({
description: `Use this to run SQL queries against a data source via API.
This tool executes queries remotely and returns results with metadata.
You must use the <SCHEMA_NAME>.<TABLE_NAME> syntax/qualifier for all table names.
Results are limited to 5000 rows for performance.`,
inputSchema: RunSqlInputSchema,
outputSchema: RunSqlOutputSchema,
execute: createRunSqlExecute(context),
});
}

View File

@ -46,6 +46,11 @@ export {
createExecuteSqlTool,
EXECUTE_SQL_TOOL_NAME,
} from './database-tools/execute-sql/execute-sql';
export { createRunSqlTool, RUN_SQL_TOOL_NAME } from './database-tools/run-sql/run-sql';
export {
createRetrieveMetadataTool,
RETRIEVE_METADATA_TOOL_NAME,
} from './database-tools/retrieve-metadata/retrieve-metadata';
export { executeSqlDocsAgent } from './database-tools/super-execute-sql/super-execute-sql';
// File tools

View File

@ -20,7 +20,10 @@ const TodoWriteToolInputSchema = z.object({
const TodoWriteToolContextSchema = z.object({
chatId: z.string().describe('The chat/conversation ID to associate todos with'),
workingDirectory: z.string().describe('The working directory for the chat'),
todosList: z.array(TodoItemSchema).default([]).describe('In-memory array of todo items to manipulate'),
todosList: z
.array(TodoItemSchema)
.default([])
.describe('In-memory array of todo items to manipulate'),
});
export type TodoWriteToolInput = z.infer<typeof TodoWriteToolInputSchema>;

View File

@ -0,0 +1,42 @@
import { and, eq, isNull } from 'drizzle-orm';
import { z } from 'zod';
import { db } from '../../connection';
import { datasets } from '../../schema';
/**
* Parameters for getting dataset metadata
*/
const GetDatasetMetadataParamsSchema = z.object({
database: z.string().describe('Database name'),
schema: z.string().describe('Schema name'),
name: z.string().describe('Dataset/table name'),
organizationId: z.string().uuid().describe('Organization UUID'),
});
type GetDatasetMetadataParams = z.infer<typeof GetDatasetMetadataParamsSchema>;
/**
* Get dataset metadata by database, schema, name, and organization
* Returns only the metadata column from the matching dataset
*/
export async function getDatasetMetadata(params: GetDatasetMetadataParams) {
const validated = GetDatasetMetadataParamsSchema.parse(params);
const conditions = [
eq(datasets.databaseName, validated.database),
eq(datasets.schema, validated.schema),
eq(datasets.name, validated.name),
eq(datasets.organizationId, validated.organizationId),
isNull(datasets.deletedAt),
];
const result = await db
.select({
metadata: datasets.metadata,
})
.from(datasets)
.where(and(...conditions))
.limit(1);
return result[0] || null;
}

View File

@ -7,3 +7,4 @@ export { getDatasetsWithYml, getDatasetsWithYmlByOrganization } from './get-data
export { getDatasetById, type Dataset } from './get-dataset-by-id';
export { getDataSourceWithDetails } from './get-data-source-with-details';
export { updateDatasetMetadata } from './update-dataset-metadata';
export { getDatasetMetadata } from './get-dataset-metadata';

View File

@ -18,6 +18,7 @@ export * from './dictionary';
export * from './docs';
export * from './github';
export * from './message';
export * from './metadata';
export * from './metrics';
export * from './organization';
export * from './public-chat';

View File

@ -0,0 +1,16 @@
import type { DatasetMetadata } from '@buster/database/schema-types';
import { z } from 'zod';
// Request schema for getting dataset metadata
export const GetMetadataRequestSchema = z.object({
database: z.string().min(1, 'Database name cannot be empty'),
schema: z.string().min(1, 'Schema name cannot be empty'),
name: z.string().min(1, 'Dataset name cannot be empty'),
});
export type GetMetadataRequest = z.infer<typeof GetMetadataRequestSchema>;
// Response type for metadata retrieval
export interface GetMetadataResponse {
metadata: DatasetMetadata;
}