From 71efdff64e6ef5c0ab069619c0c90f95a1d61638 Mon Sep 17 00:00:00 2001 From: dal Date: Thu, 21 Aug 2025 16:57:04 -0600 Subject: [PATCH] Add data source integration for metric data retrieval - Added '@buster/data-source' dependency to the server and pnpm lock files. - Enhanced the getMetricDataHandler function to support versioning and data source integration. - Updated the metric data query schema to include an optional version number parameter. - Modified the API endpoint to accommodate the new versioning feature for metric data retrieval. --- apps/server/package.json | 1 + .../api/v2/metric_files/get-metric-data.ts | 130 ++++++++++++--- apps/server/src/api/v2/metric_files/index.ts | 4 +- apps/trigger/package.json | 2 +- .../metrics/get-metric-with-data-source.ts | 149 ++++++++++++++++++ .../database/src/queries/metrics/index.ts | 15 ++ .../src/metrics/responses.types.ts | 1 + pnpm-lock.yaml | 3 + 8 files changed, 279 insertions(+), 26 deletions(-) create mode 100644 packages/database/src/queries/metrics/get-metric-with-data-source.ts diff --git a/apps/server/package.json b/apps/server/package.json index 9d34f7fca..24183b48d 100644 --- a/apps/server/package.json +++ b/apps/server/package.json @@ -25,6 +25,7 @@ "dependencies": { "@buster/access-controls": "workspace:*", "@buster/ai": "workspace:*", + "@buster/data-source": "workspace:*", "@buster/database": "workspace:*", "@buster/env-utils": "workspace:*", "@buster/github": "workspace:*", diff --git a/apps/server/src/api/v2/metric_files/get-metric-data.ts b/apps/server/src/api/v2/metric_files/get-metric-data.ts index b2a24cb32..32f04afb0 100644 --- a/apps/server/src/api/v2/metric_files/get-metric-data.ts +++ b/apps/server/src/api/v2/metric_files/get-metric-data.ts @@ -1,6 +1,13 @@ import { type AssetPermissionCheck, checkPermission } from '@buster/access-controls'; +import { createAdapter } from '@buster/data-source'; +import type { Credentials } from '@buster/data-source'; import type { User } from '@buster/database'; -import { getUserOrganizationId } from '@buster/database'; +import { + extractSqlFromMetricContent, + getDataSourceCredentials, + getMetricWithDataSource, + getUserOrganizationId, +} from '@buster/database'; import type { MetricDataResponse } from '@buster/server-shared/metrics'; import { HTTPException } from 'hono/http-exception'; @@ -16,14 +23,16 @@ import { HTTPException } from 'hono/http-exception'; * 6. Returns the data with metadata and pagination info * * @param metricId - The ID of the metric to retrieve data for - * @param limit - Maximum number of rows to return (default 5000, max 5000) * @param user - The authenticated user + * @param limit - Maximum number of rows to return (default 5000, max 5000) + * @param versionNumber - Optional version number to retrieve specific metric version * @returns The metric data with metadata */ export async function getMetricDataHandler( metricId: string, - limit: number = 5000, - user: User + user: User, + limit = 5000, + versionNumber?: number ): Promise { // Get user's organization const userOrg = await getUserOrganizationId(user.id); @@ -56,23 +65,98 @@ export async function getMetricDataHandler( // Ensure limit is within bounds const queryLimit = Math.min(Math.max(limit, 1), 5000); - // TODO: Implement the following steps in subsequent tickets: - // 1. Retrieve metric definition from database - // 2. Parse metric content (YAML/JSON) to extract SQL query - // 3. Get data source connection details - // 4. Execute query using appropriate data source adapter - // 5. Process results and build metadata - // 6. Check if there are more records beyond the limit + // Retrieve metric definition from database with data source info + const metric = await getMetricWithDataSource({ metricId, versionNumber }); - // Placeholder response for now - return { - data: [], - data_metadata: { - column_count: 0, - column_metadata: [], - row_count: 0, - }, - metricId, - has_more_records: false, - }; -} \ No newline at end of file + if (!metric) { + throw new HTTPException(404, { + message: 'Metric not found', + }); + } + + // Verify metric belongs to user's organization + if (metric.organizationId !== organizationId) { + throw new HTTPException(403, { + message: 'You do not have permission to view this metric', + }); + } + + // Extract SQL query from metric content + const sql = extractSqlFromMetricContent(metric.content); + + // Get data source credentials from vault + let credentials: Credentials; + try { + const rawCredentials = await getDataSourceCredentials({ + dataSourceId: metric.secretId, + }); + + // Ensure credentials have the correct type + credentials = { + ...rawCredentials, + type: rawCredentials.type || metric.dataSourceType, + } as Credentials; + } catch (error) { + console.error('Failed to retrieve data source credentials:', error); + throw new HTTPException(500, { + message: 'Failed to access data source', + }); + } + + // Create adapter and execute query + const adapter = await createAdapter(credentials); + + try { + // Add 1 to limit to check if there are more records + const queryLimitWithCheck = queryLimit + 1; + + // Execute query with timeout (60 seconds) + const queryResult = await adapter.query( + sql, + [], // No parameters for metric queries + queryLimitWithCheck, + 60000 // 60 second timeout + ); + + // Check if we have more records than the requested limit + const hasMoreRecords = queryResult.rows.length > queryLimit; + + // Trim results to requested limit if we have more + const data = hasMoreRecords ? queryResult.rows.slice(0, queryLimit) : queryResult.rows; + + // Build metadata from query result + const dataMetadata = { + column_count: queryResult.fields.length, + column_metadata: queryResult.fields.map((field) => ({ + name: field.name, + type: field.type, + nullable: field.nullable, + })), + row_count: data.length, + }; + + return { + data, + data_metadata: dataMetadata, + metricId, + has_more_records: hasMoreRecords, + }; + } catch (error) { + console.error('Query execution failed:', error); + + if (error instanceof Error) { + throw new HTTPException(500, { + message: `Query execution failed: ${error.message}`, + }); + } + + throw new HTTPException(500, { + message: 'Query execution failed', + }); + } finally { + // Always close the adapter connection + await adapter.close().catch((err) => { + console.error('Failed to close adapter connection:', err); + }); + } +} diff --git a/apps/server/src/api/v2/metric_files/index.ts b/apps/server/src/api/v2/metric_files/index.ts index a3a628820..fe793c6e1 100644 --- a/apps/server/src/api/v2/metric_files/index.ts +++ b/apps/server/src/api/v2/metric_files/index.ts @@ -22,10 +22,10 @@ const app = new Hono() zValidator('query', MetricDataQuerySchema), async (c) => { const { id } = c.req.valid('param'); - const { limit } = c.req.valid('query'); + const { limit, version_number } = c.req.valid('query'); const user = c.get('busterUser'); - const response = await getMetricDataHandler(id, limit, user); + const response = await getMetricDataHandler(id, user, limit, version_number); return c.json(response); } diff --git a/apps/trigger/package.json b/apps/trigger/package.json index d56874b1f..dfe7856af 100644 --- a/apps/trigger/package.json +++ b/apps/trigger/package.json @@ -42,4 +42,4 @@ "devDependencies": { "@trigger.dev/build": "4.0.1" } -} \ No newline at end of file +} diff --git a/packages/database/src/queries/metrics/get-metric-with-data-source.ts b/packages/database/src/queries/metrics/get-metric-with-data-source.ts new file mode 100644 index 000000000..0417685f3 --- /dev/null +++ b/packages/database/src/queries/metrics/get-metric-with-data-source.ts @@ -0,0 +1,149 @@ +import { and, eq, isNull } from 'drizzle-orm'; +import { z } from 'zod'; +import { db } from '../../connection'; +import { dataSources, metricFiles } from '../../schema'; + +// Zod-first: Define schemas first, derive types from them +export const GetMetricWithDataSourceInputSchema = z.object({ + metricId: z.string().uuid(), + versionNumber: z.number().optional(), +}); + +export type GetMetricWithDataSourceInput = z.infer; + +// Zod schema for MetricContent (matches MetricYml from server-shared) +export const MetricContentSchema = z.object({ + name: z.string(), + description: z.string().optional(), + timeFrame: z.string().optional(), + sql: z.string(), + chartConfig: z.record(z.unknown()).optional(), +}); + +export type MetricContent = z.infer; + +// Zod schema for version history entry +export const VersionHistoryEntrySchema = z.object({ + content: MetricContentSchema, + updated_at: z.string(), + version_number: z.number(), +}); + +export type VersionHistoryEntry = z.infer; + +// Zod schema for the full metric with data source +export const MetricWithDataSourceSchema = z.object({ + id: z.string(), + name: z.string(), + content: MetricContentSchema, + dataSourceId: z.string(), + organizationId: z.string(), + dataMetadata: z.record(z.unknown()).nullable(), + versionHistory: z.record(VersionHistoryEntrySchema), + secretId: z.string(), + dataSourceType: z.string(), + versionNumber: z.number().optional(), +}); + +export type MetricWithDataSource = z.infer; + +/** + * Fetches metric details along with data source information + * Supports fetching specific versions from the versionHistory field + */ +export async function getMetricWithDataSource( + input: GetMetricWithDataSourceInput +): Promise { + const validated = GetMetricWithDataSourceInputSchema.parse(input); + + // Fetch the metric with its data source + const [result] = await db + .select({ + id: metricFiles.id, + name: metricFiles.name, + content: metricFiles.content, + dataSourceId: metricFiles.dataSourceId, + organizationId: metricFiles.organizationId, + dataMetadata: metricFiles.dataMetadata, + versionHistory: metricFiles.versionHistory, + secretId: dataSources.secretId, + dataSourceType: dataSources.type, + }) + .from(metricFiles) + .innerJoin(dataSources, eq(metricFiles.dataSourceId, dataSources.id)) + .where( + and( + eq(metricFiles.id, validated.metricId), + isNull(metricFiles.deletedAt), + isNull(dataSources.deletedAt) + ) + ) + .limit(1); + + if (!result) { + return null; + } + + // Parse and validate the content + const parsedContent = MetricContentSchema.safeParse(result.content); + if (!parsedContent.success) { + console.error('Invalid metric content structure:', parsedContent.error); + return null; + } + + // Parse version history if it exists + const versionHistory: Record = {}; + if (result.versionHistory && typeof result.versionHistory === 'object') { + // Validate each version entry + for (const [key, value] of Object.entries(result.versionHistory as Record)) { + const parsed = VersionHistoryEntrySchema.safeParse(value); + if (parsed.success) { + versionHistory[key] = parsed.data; + } + } + } + + // Determine which content to use (specific version or current) + let content = parsedContent.data; + let versionNumber: number | undefined; + + if (validated.versionNumber !== undefined && versionHistory) { + const versionKey = validated.versionNumber.toString(); + const versionData = versionHistory[versionKey]; + + if (versionData?.content) { + content = versionData.content; + versionNumber = validated.versionNumber; + } + // If version not found, fall back to current content + } + + // Parse and validate dataMetadata + const dataMetadata = + result.dataMetadata && typeof result.dataMetadata === 'object' + ? (result.dataMetadata as Record) + : null; + + const metricData: MetricWithDataSource = { + id: result.id, + name: result.name, + content, + dataSourceId: result.dataSourceId, + organizationId: result.organizationId, + dataMetadata, + versionHistory, + secretId: result.secretId, + dataSourceType: result.dataSourceType, + ...(versionNumber !== undefined && { versionNumber }), + }; + + return metricData; +} + +/** + * Extracts SQL query from metric content + * The content should follow the MetricYml schema where SQL is a direct field + */ +export function extractSqlFromMetricContent(content: MetricContent): string { + return content.sql; +} diff --git a/packages/database/src/queries/metrics/index.ts b/packages/database/src/queries/metrics/index.ts index f84123d17..f84a26101 100644 --- a/packages/database/src/queries/metrics/index.ts +++ b/packages/database/src/queries/metrics/index.ts @@ -10,3 +10,18 @@ export { type GetMetricForExportInput, type MetricForExport, } from './get-metric-for-export'; + +export { + getMetricWithDataSource, + extractSqlFromMetricContent, + // Schemas (Zod-first) + GetMetricWithDataSourceInputSchema, + MetricContentSchema, + VersionHistoryEntrySchema, + MetricWithDataSourceSchema, + // Types (derived from schemas) + type GetMetricWithDataSourceInput, + type MetricWithDataSource, + type MetricContent, + type VersionHistoryEntry, +} from './get-metric-with-data-source'; diff --git a/packages/server-shared/src/metrics/responses.types.ts b/packages/server-shared/src/metrics/responses.types.ts index f8bfa3d86..5b980ddd6 100644 --- a/packages/server-shared/src/metrics/responses.types.ts +++ b/packages/server-shared/src/metrics/responses.types.ts @@ -54,6 +54,7 @@ export type MetricDataParams = z.infer; */ export const MetricDataQuerySchema = z.object({ limit: z.coerce.number().min(1).max(5000).default(5000).optional(), + version_number: z.coerce.number().optional(), }); export type MetricDataQuery = z.infer; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a76b281cf..583edc59c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -163,6 +163,9 @@ importers: '@buster/ai': specifier: workspace:* version: link:../../packages/ai + '@buster/data-source': + specifier: workspace:* + version: link:../../packages/data-source '@buster/database': specifier: workspace:* version: link:../../packages/database