retrieve metadata on the fly.

This commit is contained in:
dal 2025-10-08 14:43:56 -06:00
parent b0503c6270
commit de64341958
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
6 changed files with 422 additions and 22 deletions

View File

@ -1,17 +1,178 @@
import { getDatasetMetadata } from '@buster/database/queries'; import { type Credentials, DataSourceType, createAdapter } from '@buster/data-source';
import type { GetMetadataRequest, GetMetadataResponse } from '@buster/server-shared'; import { getDataSourceCredentials } from '@buster/database/queries';
import type {
GetMetadataRequest,
GetMetadataResponse,
GetTableStatisticsOutput,
} from '@buster/server-shared';
import type { ApiKeyContext } from '@buster/server-shared'; import type { ApiKeyContext } from '@buster/server-shared';
import { runs, tasks } from '@trigger.dev/sdk';
import { HTTPException } from 'hono/http-exception'; import { HTTPException } from 'hono/http-exception';
/**
* Validates that identifier doesn't contain SQL injection attempts
* Allows alphanumeric, underscores, hyphens, and dots (for qualified names)
*/
function validateIdentifier(identifier: string, fieldName: string): void {
// Allow alphanumeric, underscores, hyphens, dots, and spaces (for some database names)
const validPattern = /^[a-zA-Z0-9_\-\.\s]+$/;
if (!validPattern.test(identifier)) {
throw new HTTPException(400, {
message: `Invalid ${fieldName}: contains disallowed characters. Only alphanumeric, underscores, hyphens, dots, and spaces are allowed.`,
});
}
// Block common SQL injection keywords
const sqlKeywords =
/(\b(DROP|DELETE|INSERT|UPDATE|ALTER|CREATE|EXEC|EXECUTE|UNION|SELECT|WHERE|FROM|JOIN)\b)/i;
if (sqlKeywords.test(identifier)) {
throw new HTTPException(400, {
message: `Invalid ${fieldName}: contains disallowed SQL keywords.`,
});
}
}
function isCredentials(value: unknown): value is Credentials {
if (!value || typeof value !== 'object') return false;
const type = (value as { type?: unknown }).type;
if (typeof type !== 'string') return false;
return (Object.values(DataSourceType) as string[]).includes(type);
}
/**
* Quick metadata lookup from warehouse information schema
* Returns table size, row count, and type without full sampling
*/
async function getQuickTableMetadata(
dataSourceId: string,
database: string,
schema: string,
tableName: string
): Promise<{ rowCount: number; sizeBytes?: number; type: string }> {
let adapter = null;
try {
// Fetch credentials
const credentials = await getDataSourceCredentials({ dataSourceId });
if (!isCredentials(credentials)) {
throw new Error('Invalid credentials returned from vault');
}
// Create adapter
adapter = await createAdapter(credentials);
// Query information schema based on database type
// This is fast and doesn't require sampling
let query: string;
let params: unknown[] = [];
switch (credentials.type) {
case DataSourceType.PostgreSQL:
case DataSourceType.Redshift:
query = `
SELECT
COALESCE(s.n_live_tup, 0) as row_count,
pg_relation_size(quote_ident($1)||'.'||quote_ident($2)) as size_bytes,
t.table_type
FROM information_schema.tables t
LEFT JOIN pg_stat_user_tables s
ON s.schemaname = t.table_schema
AND s.relname = t.table_name
WHERE t.table_schema = $1
AND t.table_name = $2
LIMIT 1
`;
params = [schema, tableName];
break;
case DataSourceType.Snowflake:
query = `
SELECT
row_count,
bytes as size_bytes,
table_type
FROM ${database}.information_schema.tables
WHERE table_schema = ?
AND table_name = ?
LIMIT 1
`;
params = [schema, tableName];
break;
case DataSourceType.BigQuery:
query = `
SELECT
row_count,
size_bytes,
table_type
FROM \`${database}.${schema}.INFORMATION_SCHEMA.TABLES\`
WHERE table_name = ?
LIMIT 1
`;
params = [tableName];
break;
case DataSourceType.MySQL:
query = `
SELECT
table_rows as row_count,
data_length as size_bytes,
table_type
FROM information_schema.tables
WHERE table_schema = ?
AND table_name = ?
LIMIT 1
`;
params = [schema, tableName];
break;
default:
// Fallback for unknown types - use reasonable defaults
return { rowCount: 100000, type: 'TABLE' };
}
const result = await adapter.query(query, params as (string | number)[]);
if (!result.rows || result.rows.length === 0) {
throw new HTTPException(404, {
message: `Table not found: ${database}.${schema}.${tableName}`,
});
}
const row = result.rows[0] as Record<string, unknown>;
const rowCount = typeof row.row_count === 'number' ? row.row_count : 100000;
const sizeBytes = typeof row.size_bytes === 'number' ? row.size_bytes : undefined;
const tableType = typeof row.table_type === 'string' ? row.table_type : 'TABLE';
return {
rowCount,
...(sizeBytes !== undefined && { sizeBytes }),
type: tableType,
};
} finally {
if (adapter) {
await adapter.close().catch(() => {
// Ignore cleanup errors
});
}
}
}
/** /**
* Handler for retrieving dataset metadata via API key authentication * Handler for retrieving dataset metadata via API key authentication
* *
* This handler: * This handler:
* 1. Validates API key has access to the organization * 1. Validates API key has access to the organization
* 2. Queries for dataset matching database, schema, name, and organization * 2. Validates identifiers for SQL injection protection
* 3. Returns the dataset's metadata column * 3. Triggers get-table-statistics task to compute fresh metadata
* 4. Waits for task completion (max 2 minutes)
* 5. Transforms and returns the metadata
* *
* @param request - The metadata request containing database, schema, and name * Note: This works on ANY table in the data source, not just registered datasets
*
* @param request - The metadata request containing dataSourceId, database, schema, and name
* @param apiKeyContext - The authenticated API key context * @param apiKeyContext - The authenticated API key context
* @returns The dataset metadata * @returns The dataset metadata
*/ */
@ -20,22 +181,127 @@ export async function getMetadataHandler(
apiKeyContext: ApiKeyContext apiKeyContext: ApiKeyContext
): Promise<GetMetadataResponse> { ): Promise<GetMetadataResponse> {
const { organizationId } = apiKeyContext; const { organizationId } = apiKeyContext;
const { dataSourceId, database, schema, name } = request;
// Get dataset metadata // Validate identifiers for SQL injection protection
const result = await getDatasetMetadata({ validateIdentifier(database, 'database');
database: request.database, validateIdentifier(schema, 'schema');
schema: request.schema, validateIdentifier(name, 'table name');
name: request.name,
organizationId,
});
if (!result || !result.metadata) { try {
throw new HTTPException(404, { // Quick lookup from warehouse information schema to get accurate row count and size
message: `Dataset not found: ${request.database}.${request.schema}.${request.name}`, // This is fast (< 1 second) and doesn't require sampling
const quickMetadata = await getQuickTableMetadata(dataSourceId, database, schema, name);
// Determine sample size based on actual row count
// Use up to 100k samples for large tables, scale down for smaller ones
const sampleSize = Math.min(100000, Math.max(10000, Math.floor(quickMetadata.rowCount * 0.1)));
// Map table type to enum
const tableType: 'TABLE' | 'VIEW' | 'MATERIALIZED_VIEW' | 'EXTERNAL_TABLE' | 'TEMPORARY_TABLE' =
quickMetadata.type === 'VIEW'
? 'VIEW'
: quickMetadata.type === 'MATERIALIZED VIEW'
? 'MATERIALIZED_VIEW'
: quickMetadata.type === 'FOREIGN TABLE'
? 'EXTERNAL_TABLE'
: quickMetadata.type === 'LOCAL TEMPORARY'
? 'TEMPORARY_TABLE'
: 'TABLE';
// Trigger the get-table-statistics task with idempotency
// If the same table is requested within 5 minutes, return existing task
const handle = await tasks.trigger(
'get-table-statistics',
{
dataSourceId,
table: {
name,
schema,
database,
rowCount: quickMetadata.rowCount,
sizeBytes: quickMetadata.sizeBytes,
type: tableType,
},
sampleSize,
},
{
idempotencyKey: `metadata-${organizationId}-${dataSourceId}-${database}-${schema}-${name}`,
idempotencyKeyTTL: '5m', // 5 minutes TTL
}
);
// Poll for task completion with timeout
const startTime = Date.now();
const timeout = 120000; // 2 minutes
const pollInterval = 2000; // Poll every 2 seconds
let run: Awaited<ReturnType<typeof runs.retrieve>>;
while (true) {
run = await runs.retrieve(handle.id);
// Check if task completed, failed, or was canceled
if (run.status === 'COMPLETED' || run.status === 'FAILED' || run.status === 'CANCELED') {
break;
}
// Check for timeout
if (Date.now() - startTime > timeout) {
throw new HTTPException(504, {
message: 'Metadata collection took too long to complete. Please try again.',
}); });
} }
// Wait before next poll
await new Promise((resolve) => setTimeout(resolve, pollInterval));
}
// Check task status
if (run.status === 'FAILED' || run.status === 'CANCELED') {
throw new HTTPException(500, {
message: `Metadata collection task ${run.status.toLowerCase()}`,
});
}
// Check if task completed successfully
if (!run.output) {
throw new HTTPException(500, {
message: 'Metadata collection task did not return any output',
});
}
const output = run.output as GetTableStatisticsOutput;
if (!output.success) {
throw new HTTPException(500, {
message: output.error || 'Metadata collection failed',
});
}
// Transform GetTableStatisticsOutput to DatasetMetadata format
const metadata = {
rowCount: output.totalRows,
sizeBytes: quickMetadata.sizeBytes,
sampleSize: output.actualSamples,
samplingMethod: output.samplingMethod,
columnProfiles: output.columnProfiles || [],
introspectedAt: new Date().toISOString(),
};
return { return {
metadata: result.metadata, metadata,
}; };
} catch (error) {
// Re-throw HTTPException as-is
if (error instanceof HTTPException) {
throw error;
}
// Log unexpected errors
console.error('Unexpected error during metadata retrieval:', error);
throw new HTTPException(500, {
message: 'An unexpected error occurred during metadata retrieval',
});
}
} }

View File

@ -35,6 +35,7 @@ describe('retrieve-metadata-execute error handling', () => {
} as Response); } as Response);
const result = await executeHandler({ const result = await executeHandler({
dataSourceId: 'test-ds-id',
database: 'test_db', database: 'test_db',
schema: 'public', schema: 'public',
name: 'users', name: 'users',
@ -42,7 +43,7 @@ describe('retrieve-metadata-execute error handling', () => {
expect(result).toEqual(mockResponse); expect(result).toEqual(mockResponse);
expect(fetch).toHaveBeenCalledWith( expect(fetch).toHaveBeenCalledWith(
'http://localhost:3000/api/v2/tools/metadata?database=test_db&schema=public&name=users', 'http://localhost:3000/api/v2/tools/metadata?dataSourceId=test-ds-id&database=test_db&schema=public&name=users',
expect.objectContaining({ expect.objectContaining({
method: 'GET', method: 'GET',
headers: { headers: {
@ -64,6 +65,7 @@ describe('retrieve-metadata-execute error handling', () => {
await expect( await expect(
executeHandler({ executeHandler({
dataSourceId: 'test-ds-id',
database: 'test_db', database: 'test_db',
schema: 'public', schema: 'public',
name: 'nonexistent_table', name: 'nonexistent_table',
@ -78,6 +80,7 @@ describe('retrieve-metadata-execute error handling', () => {
await expect( await expect(
executeHandler({ executeHandler({
dataSourceId: 'test-ds-id',
database: 'test_db', database: 'test_db',
schema: 'public', schema: 'public',
name: 'users', name: 'users',
@ -97,6 +100,7 @@ describe('retrieve-metadata-execute error handling', () => {
await expect( await expect(
executeHandler({ executeHandler({
dataSourceId: 'test-ds-id',
database: 'test_db', database: 'test_db',
schema: 'restricted', schema: 'restricted',
name: 'sensitive_table', name: 'sensitive_table',
@ -116,6 +120,7 @@ describe('retrieve-metadata-execute error handling', () => {
await expect( await expect(
executeHandler({ executeHandler({
dataSourceId: 'test-ds-id',
database: 'test_db', database: 'test_db',
schema: 'public', schema: 'public',
name: 'users', name: 'users',
@ -137,6 +142,7 @@ describe('retrieve-metadata-execute error handling', () => {
await expect( await expect(
executeHandler({ executeHandler({
dataSourceId: 'test-ds-id',
database: 'test_db', database: 'test_db',
schema: 'public', schema: 'public',
name: 'users', name: 'users',
@ -151,6 +157,7 @@ describe('retrieve-metadata-execute error handling', () => {
await expect( await expect(
executeHandler({ executeHandler({
dataSourceId: 'test-ds-id',
database: 'test_db', database: 'test_db',
schema: 'public', schema: 'public',
name: 'users', name: 'users',

View File

@ -10,6 +10,7 @@ import {
* Retrieve dataset metadata via API endpoint * Retrieve dataset metadata via API endpoint
*/ */
async function executeApiRequest( async function executeApiRequest(
dataSourceId: string,
database: string, database: string,
schema: string, schema: string,
name: string, name: string,
@ -22,6 +23,7 @@ async function executeApiRequest(
try { try {
// Build query string // Build query string
const params = new URLSearchParams({ const params = new URLSearchParams({
dataSourceId,
database, database,
schema, schema,
name, name,
@ -66,10 +68,10 @@ async function executeApiRequest(
export function createRetrieveMetadataExecute(context: RetrieveMetadataContext) { export function createRetrieveMetadataExecute(context: RetrieveMetadataContext) {
return wrapTraced( return wrapTraced(
async (input: RetrieveMetadataInput): Promise<RetrieveMetadataOutput> => { async (input: RetrieveMetadataInput): Promise<RetrieveMetadataOutput> => {
const { database, schema, name } = input; const { dataSourceId, database, schema, name } = input;
// Execute API request // Execute API request
const result = await executeApiRequest(database, schema, name, context); const result = await executeApiRequest(dataSourceId, database, schema, name, context);
if (result.success && result.data) { if (result.success && result.data) {
return result.data; return result.data;

View File

@ -5,6 +5,7 @@ import { createRetrieveMetadataExecute } from './retrieve-metadata-execute';
export const RETRIEVE_METADATA_TOOL_NAME = 'retrieveMetadata'; export const RETRIEVE_METADATA_TOOL_NAME = 'retrieveMetadata';
export const RetrieveMetadataInputSchema = z.object({ export const RetrieveMetadataInputSchema = z.object({
dataSourceId: z.string().min(1).describe('Data source identifier'),
database: z.string().min(1).describe('Database name where the dataset resides'), database: z.string().min(1).describe('Database name where the dataset resides'),
schema: z.string().min(1).describe('Schema name where the dataset resides'), schema: z.string().min(1).describe('Schema name where the dataset resides'),
name: z.string().min(1).describe('Dataset/table name'), name: z.string().min(1).describe('Dataset/table name'),

View File

@ -3,6 +3,7 @@ import { z } from 'zod';
// Request schema for getting dataset metadata // Request schema for getting dataset metadata
export const GetMetadataRequestSchema = z.object({ export const GetMetadataRequestSchema = z.object({
dataSourceId: z.string().min(1, 'Data source ID cannot be empty'),
database: z.string().min(1, 'Database name cannot be empty'), database: z.string().min(1, 'Database name cannot be empty'),
schema: z.string().min(1, 'Schema name cannot be empty'), schema: z.string().min(1, 'Schema name cannot be empty'),
name: z.string().min(1, 'Dataset name cannot be empty'), name: z.string().min(1, 'Dataset name cannot be empty'),
@ -14,3 +15,10 @@ export type GetMetadataRequest = z.infer<typeof GetMetadataRequestSchema>;
export interface GetMetadataResponse { export interface GetMetadataResponse {
metadata: DatasetMetadata; metadata: DatasetMetadata;
} }
// Re-export trigger task types for use in server
export type {
GetTableStatisticsInput,
GetTableStatisticsOutput,
ColumnProfile,
} from './trigger-task-types';

View File

@ -0,0 +1,116 @@
import { z } from 'zod';
/**
* Types for trigger.dev tasks
* These are duplicated from @buster-app/trigger to avoid circular dependencies
* IMPORTANT: Keep in sync with apps/trigger/src/tasks/introspect-data/types/index.ts
*/
// Column profile schema
export const ColumnProfileSchema = z.object({
columnName: z.string(),
dataType: z.string(),
// Basic Statistics
nullRate: z.number().min(0).max(1),
distinctCount: z.number().int().nonnegative(),
uniquenessRatio: z.number().min(0).max(1),
emptyStringRate: z.number().min(0).max(1),
// Distribution
topValues: z.array(
z.object({
value: z.unknown(),
count: z.number(),
percentage: z.number(),
})
),
entropy: z.number(),
giniCoefficient: z.number().min(0).max(1),
// Sample values
sampleValues: z.array(z.unknown()),
// Numeric-specific
numericStats: z
.object({
mean: z.number(),
median: z.number(),
stdDev: z.number(),
skewness: z.number(),
percentiles: z.object({
p25: z.number(),
p50: z.number(),
p75: z.number(),
p95: z.number(),
p99: z.number(),
}),
outlierRate: z.number().min(0).max(1),
})
.optional(),
// Classification
classification: z.object({
isLikelyEnum: z.boolean(),
isLikelyIdentifier: z.boolean(),
identifierType: z
.enum(['primary_key', 'foreign_key', 'natural_key', 'sequential', 'uuid_like'])
.optional(),
enumValues: z.array(z.string()).optional(),
}),
// Dynamic metadata based on detected column semantics
dynamicMetadata: z
.union([
z.object({ type: z.literal('datetime') }).passthrough(),
z.object({ type: z.literal('numeric') }).passthrough(),
z.object({ type: z.literal('identifier') }).passthrough(),
z.object({ type: z.literal('url') }).passthrough(),
z.object({ type: z.literal('email') }).passthrough(),
z.object({ type: z.literal('json') }).passthrough(),
])
.optional(),
});
export type ColumnProfile = z.infer<typeof ColumnProfileSchema>;
// Get table statistics task input
export const GetTableStatisticsInputSchema = z.object({
dataSourceId: z.string().min(1, 'Data source ID is required'),
table: z.object({
name: z.string(),
schema: z.string(),
database: z.string(),
rowCount: z.number(),
sizeBytes: z.number().optional(),
type: z.enum(['TABLE', 'VIEW', 'MATERIALIZED_VIEW', 'EXTERNAL_TABLE', 'TEMPORARY_TABLE']),
}),
sampleSize: z.number().int().positive(),
});
export type GetTableStatisticsInput = z.infer<typeof GetTableStatisticsInputSchema>;
// Get table statistics task output
export const GetTableStatisticsOutputSchema = z.object({
success: z.boolean(),
tableId: z.string(),
totalRows: z.number(),
sampleSize: z.number(),
actualSamples: z.number(),
samplingMethod: z.string(),
// Statistical analysis results
columnProfiles: z.array(ColumnProfileSchema).optional(),
tableMetadata: z
.object({
sampleSize: z.number(),
totalRows: z.number(),
samplingRate: z.number(),
analysisTimeMs: z.number(),
})
.optional(),
error: z.string().optional(),
});
export type GetTableStatisticsOutput = z.infer<typeof GetTableStatisticsOutputSchema>;