mirror of https://github.com/buster-so/buster.git
Add data source integration for metric data retrieval
- Added '@buster/data-source' dependency to the server and pnpm lock files. - Enhanced the getMetricDataHandler function to support versioning and data source integration. - Updated the metric data query schema to include an optional version number parameter. - Modified the API endpoint to accommodate the new versioning feature for metric data retrieval.
This commit is contained in:
parent
3c8f1f4615
commit
71efdff64e
|
@ -25,6 +25,7 @@
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@buster/access-controls": "workspace:*",
|
"@buster/access-controls": "workspace:*",
|
||||||
"@buster/ai": "workspace:*",
|
"@buster/ai": "workspace:*",
|
||||||
|
"@buster/data-source": "workspace:*",
|
||||||
"@buster/database": "workspace:*",
|
"@buster/database": "workspace:*",
|
||||||
"@buster/env-utils": "workspace:*",
|
"@buster/env-utils": "workspace:*",
|
||||||
"@buster/github": "workspace:*",
|
"@buster/github": "workspace:*",
|
||||||
|
|
|
@ -1,6 +1,13 @@
|
||||||
import { type AssetPermissionCheck, checkPermission } from '@buster/access-controls';
|
import { type AssetPermissionCheck, checkPermission } from '@buster/access-controls';
|
||||||
|
import { createAdapter } from '@buster/data-source';
|
||||||
|
import type { Credentials } from '@buster/data-source';
|
||||||
import type { User } from '@buster/database';
|
import type { User } from '@buster/database';
|
||||||
import { getUserOrganizationId } from '@buster/database';
|
import {
|
||||||
|
extractSqlFromMetricContent,
|
||||||
|
getDataSourceCredentials,
|
||||||
|
getMetricWithDataSource,
|
||||||
|
getUserOrganizationId,
|
||||||
|
} from '@buster/database';
|
||||||
import type { MetricDataResponse } from '@buster/server-shared/metrics';
|
import type { MetricDataResponse } from '@buster/server-shared/metrics';
|
||||||
import { HTTPException } from 'hono/http-exception';
|
import { HTTPException } from 'hono/http-exception';
|
||||||
|
|
||||||
|
@ -16,14 +23,16 @@ import { HTTPException } from 'hono/http-exception';
|
||||||
* 6. Returns the data with metadata and pagination info
|
* 6. Returns the data with metadata and pagination info
|
||||||
*
|
*
|
||||||
* @param metricId - The ID of the metric to retrieve data for
|
* @param metricId - The ID of the metric to retrieve data for
|
||||||
* @param limit - Maximum number of rows to return (default 5000, max 5000)
|
|
||||||
* @param user - The authenticated user
|
* @param user - The authenticated user
|
||||||
|
* @param limit - Maximum number of rows to return (default 5000, max 5000)
|
||||||
|
* @param versionNumber - Optional version number to retrieve specific metric version
|
||||||
* @returns The metric data with metadata
|
* @returns The metric data with metadata
|
||||||
*/
|
*/
|
||||||
export async function getMetricDataHandler(
|
export async function getMetricDataHandler(
|
||||||
metricId: string,
|
metricId: string,
|
||||||
limit: number = 5000,
|
user: User,
|
||||||
user: User
|
limit = 5000,
|
||||||
|
versionNumber?: number
|
||||||
): Promise<MetricDataResponse> {
|
): Promise<MetricDataResponse> {
|
||||||
// Get user's organization
|
// Get user's organization
|
||||||
const userOrg = await getUserOrganizationId(user.id);
|
const userOrg = await getUserOrganizationId(user.id);
|
||||||
|
@ -56,23 +65,98 @@ export async function getMetricDataHandler(
|
||||||
// Ensure limit is within bounds
|
// Ensure limit is within bounds
|
||||||
const queryLimit = Math.min(Math.max(limit, 1), 5000);
|
const queryLimit = Math.min(Math.max(limit, 1), 5000);
|
||||||
|
|
||||||
// TODO: Implement the following steps in subsequent tickets:
|
// Retrieve metric definition from database with data source info
|
||||||
// 1. Retrieve metric definition from database
|
const metric = await getMetricWithDataSource({ metricId, versionNumber });
|
||||||
// 2. Parse metric content (YAML/JSON) to extract SQL query
|
|
||||||
// 3. Get data source connection details
|
|
||||||
// 4. Execute query using appropriate data source adapter
|
|
||||||
// 5. Process results and build metadata
|
|
||||||
// 6. Check if there are more records beyond the limit
|
|
||||||
|
|
||||||
// Placeholder response for now
|
if (!metric) {
|
||||||
return {
|
throw new HTTPException(404, {
|
||||||
data: [],
|
message: 'Metric not found',
|
||||||
data_metadata: {
|
});
|
||||||
column_count: 0,
|
}
|
||||||
column_metadata: [],
|
|
||||||
row_count: 0,
|
// Verify metric belongs to user's organization
|
||||||
},
|
if (metric.organizationId !== organizationId) {
|
||||||
metricId,
|
throw new HTTPException(403, {
|
||||||
has_more_records: false,
|
message: 'You do not have permission to view this metric',
|
||||||
};
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extract SQL query from metric content
|
||||||
|
const sql = extractSqlFromMetricContent(metric.content);
|
||||||
|
|
||||||
|
// Get data source credentials from vault
|
||||||
|
let credentials: Credentials;
|
||||||
|
try {
|
||||||
|
const rawCredentials = await getDataSourceCredentials({
|
||||||
|
dataSourceId: metric.secretId,
|
||||||
|
});
|
||||||
|
|
||||||
|
// Ensure credentials have the correct type
|
||||||
|
credentials = {
|
||||||
|
...rawCredentials,
|
||||||
|
type: rawCredentials.type || metric.dataSourceType,
|
||||||
|
} as Credentials;
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Failed to retrieve data source credentials:', error);
|
||||||
|
throw new HTTPException(500, {
|
||||||
|
message: 'Failed to access data source',
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create adapter and execute query
|
||||||
|
const adapter = await createAdapter(credentials);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Add 1 to limit to check if there are more records
|
||||||
|
const queryLimitWithCheck = queryLimit + 1;
|
||||||
|
|
||||||
|
// Execute query with timeout (60 seconds)
|
||||||
|
const queryResult = await adapter.query(
|
||||||
|
sql,
|
||||||
|
[], // No parameters for metric queries
|
||||||
|
queryLimitWithCheck,
|
||||||
|
60000 // 60 second timeout
|
||||||
|
);
|
||||||
|
|
||||||
|
// Check if we have more records than the requested limit
|
||||||
|
const hasMoreRecords = queryResult.rows.length > queryLimit;
|
||||||
|
|
||||||
|
// Trim results to requested limit if we have more
|
||||||
|
const data = hasMoreRecords ? queryResult.rows.slice(0, queryLimit) : queryResult.rows;
|
||||||
|
|
||||||
|
// Build metadata from query result
|
||||||
|
const dataMetadata = {
|
||||||
|
column_count: queryResult.fields.length,
|
||||||
|
column_metadata: queryResult.fields.map((field) => ({
|
||||||
|
name: field.name,
|
||||||
|
type: field.type,
|
||||||
|
nullable: field.nullable,
|
||||||
|
})),
|
||||||
|
row_count: data.length,
|
||||||
|
};
|
||||||
|
|
||||||
|
return {
|
||||||
|
data,
|
||||||
|
data_metadata: dataMetadata,
|
||||||
|
metricId,
|
||||||
|
has_more_records: hasMoreRecords,
|
||||||
|
};
|
||||||
|
} catch (error) {
|
||||||
|
console.error('Query execution failed:', error);
|
||||||
|
|
||||||
|
if (error instanceof Error) {
|
||||||
|
throw new HTTPException(500, {
|
||||||
|
message: `Query execution failed: ${error.message}`,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
throw new HTTPException(500, {
|
||||||
|
message: 'Query execution failed',
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
// Always close the adapter connection
|
||||||
|
await adapter.close().catch((err) => {
|
||||||
|
console.error('Failed to close adapter connection:', err);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -22,10 +22,10 @@ const app = new Hono()
|
||||||
zValidator('query', MetricDataQuerySchema),
|
zValidator('query', MetricDataQuerySchema),
|
||||||
async (c) => {
|
async (c) => {
|
||||||
const { id } = c.req.valid('param');
|
const { id } = c.req.valid('param');
|
||||||
const { limit } = c.req.valid('query');
|
const { limit, version_number } = c.req.valid('query');
|
||||||
const user = c.get('busterUser');
|
const user = c.get('busterUser');
|
||||||
|
|
||||||
const response = await getMetricDataHandler(id, limit, user);
|
const response = await getMetricDataHandler(id, user, limit, version_number);
|
||||||
|
|
||||||
return c.json(response);
|
return c.json(response);
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,4 +42,4 @@
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"@trigger.dev/build": "4.0.1"
|
"@trigger.dev/build": "4.0.1"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
import { and, eq, isNull } from 'drizzle-orm';
|
||||||
|
import { z } from 'zod';
|
||||||
|
import { db } from '../../connection';
|
||||||
|
import { dataSources, metricFiles } from '../../schema';
|
||||||
|
|
||||||
|
// Zod-first: Define schemas first, derive types from them
|
||||||
|
export const GetMetricWithDataSourceInputSchema = z.object({
|
||||||
|
metricId: z.string().uuid(),
|
||||||
|
versionNumber: z.number().optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type GetMetricWithDataSourceInput = z.infer<typeof GetMetricWithDataSourceInputSchema>;
|
||||||
|
|
||||||
|
// Zod schema for MetricContent (matches MetricYml from server-shared)
|
||||||
|
export const MetricContentSchema = z.object({
|
||||||
|
name: z.string(),
|
||||||
|
description: z.string().optional(),
|
||||||
|
timeFrame: z.string().optional(),
|
||||||
|
sql: z.string(),
|
||||||
|
chartConfig: z.record(z.unknown()).optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type MetricContent = z.infer<typeof MetricContentSchema>;
|
||||||
|
|
||||||
|
// Zod schema for version history entry
|
||||||
|
export const VersionHistoryEntrySchema = z.object({
|
||||||
|
content: MetricContentSchema,
|
||||||
|
updated_at: z.string(),
|
||||||
|
version_number: z.number(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type VersionHistoryEntry = z.infer<typeof VersionHistoryEntrySchema>;
|
||||||
|
|
||||||
|
// Zod schema for the full metric with data source
|
||||||
|
export const MetricWithDataSourceSchema = z.object({
|
||||||
|
id: z.string(),
|
||||||
|
name: z.string(),
|
||||||
|
content: MetricContentSchema,
|
||||||
|
dataSourceId: z.string(),
|
||||||
|
organizationId: z.string(),
|
||||||
|
dataMetadata: z.record(z.unknown()).nullable(),
|
||||||
|
versionHistory: z.record(VersionHistoryEntrySchema),
|
||||||
|
secretId: z.string(),
|
||||||
|
dataSourceType: z.string(),
|
||||||
|
versionNumber: z.number().optional(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type MetricWithDataSource = z.infer<typeof MetricWithDataSourceSchema>;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches metric details along with data source information
|
||||||
|
* Supports fetching specific versions from the versionHistory field
|
||||||
|
*/
|
||||||
|
export async function getMetricWithDataSource(
|
||||||
|
input: GetMetricWithDataSourceInput
|
||||||
|
): Promise<MetricWithDataSource | null> {
|
||||||
|
const validated = GetMetricWithDataSourceInputSchema.parse(input);
|
||||||
|
|
||||||
|
// Fetch the metric with its data source
|
||||||
|
const [result] = await db
|
||||||
|
.select({
|
||||||
|
id: metricFiles.id,
|
||||||
|
name: metricFiles.name,
|
||||||
|
content: metricFiles.content,
|
||||||
|
dataSourceId: metricFiles.dataSourceId,
|
||||||
|
organizationId: metricFiles.organizationId,
|
||||||
|
dataMetadata: metricFiles.dataMetadata,
|
||||||
|
versionHistory: metricFiles.versionHistory,
|
||||||
|
secretId: dataSources.secretId,
|
||||||
|
dataSourceType: dataSources.type,
|
||||||
|
})
|
||||||
|
.from(metricFiles)
|
||||||
|
.innerJoin(dataSources, eq(metricFiles.dataSourceId, dataSources.id))
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(metricFiles.id, validated.metricId),
|
||||||
|
isNull(metricFiles.deletedAt),
|
||||||
|
isNull(dataSources.deletedAt)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.limit(1);
|
||||||
|
|
||||||
|
if (!result) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse and validate the content
|
||||||
|
const parsedContent = MetricContentSchema.safeParse(result.content);
|
||||||
|
if (!parsedContent.success) {
|
||||||
|
console.error('Invalid metric content structure:', parsedContent.error);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse version history if it exists
|
||||||
|
const versionHistory: Record<string, VersionHistoryEntry> = {};
|
||||||
|
if (result.versionHistory && typeof result.versionHistory === 'object') {
|
||||||
|
// Validate each version entry
|
||||||
|
for (const [key, value] of Object.entries(result.versionHistory as Record<string, unknown>)) {
|
||||||
|
const parsed = VersionHistoryEntrySchema.safeParse(value);
|
||||||
|
if (parsed.success) {
|
||||||
|
versionHistory[key] = parsed.data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine which content to use (specific version or current)
|
||||||
|
let content = parsedContent.data;
|
||||||
|
let versionNumber: number | undefined;
|
||||||
|
|
||||||
|
if (validated.versionNumber !== undefined && versionHistory) {
|
||||||
|
const versionKey = validated.versionNumber.toString();
|
||||||
|
const versionData = versionHistory[versionKey];
|
||||||
|
|
||||||
|
if (versionData?.content) {
|
||||||
|
content = versionData.content;
|
||||||
|
versionNumber = validated.versionNumber;
|
||||||
|
}
|
||||||
|
// If version not found, fall back to current content
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse and validate dataMetadata
|
||||||
|
const dataMetadata =
|
||||||
|
result.dataMetadata && typeof result.dataMetadata === 'object'
|
||||||
|
? (result.dataMetadata as Record<string, unknown>)
|
||||||
|
: null;
|
||||||
|
|
||||||
|
const metricData: MetricWithDataSource = {
|
||||||
|
id: result.id,
|
||||||
|
name: result.name,
|
||||||
|
content,
|
||||||
|
dataSourceId: result.dataSourceId,
|
||||||
|
organizationId: result.organizationId,
|
||||||
|
dataMetadata,
|
||||||
|
versionHistory,
|
||||||
|
secretId: result.secretId,
|
||||||
|
dataSourceType: result.dataSourceType,
|
||||||
|
...(versionNumber !== undefined && { versionNumber }),
|
||||||
|
};
|
||||||
|
|
||||||
|
return metricData;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extracts SQL query from metric content
|
||||||
|
* The content should follow the MetricYml schema where SQL is a direct field
|
||||||
|
*/
|
||||||
|
export function extractSqlFromMetricContent(content: MetricContent): string {
|
||||||
|
return content.sql;
|
||||||
|
}
|
|
@ -10,3 +10,18 @@ export {
|
||||||
type GetMetricForExportInput,
|
type GetMetricForExportInput,
|
||||||
type MetricForExport,
|
type MetricForExport,
|
||||||
} from './get-metric-for-export';
|
} from './get-metric-for-export';
|
||||||
|
|
||||||
|
export {
|
||||||
|
getMetricWithDataSource,
|
||||||
|
extractSqlFromMetricContent,
|
||||||
|
// Schemas (Zod-first)
|
||||||
|
GetMetricWithDataSourceInputSchema,
|
||||||
|
MetricContentSchema,
|
||||||
|
VersionHistoryEntrySchema,
|
||||||
|
MetricWithDataSourceSchema,
|
||||||
|
// Types (derived from schemas)
|
||||||
|
type GetMetricWithDataSourceInput,
|
||||||
|
type MetricWithDataSource,
|
||||||
|
type MetricContent,
|
||||||
|
type VersionHistoryEntry,
|
||||||
|
} from './get-metric-with-data-source';
|
||||||
|
|
|
@ -54,6 +54,7 @@ export type MetricDataParams = z.infer<typeof MetricDataParamsSchema>;
|
||||||
*/
|
*/
|
||||||
export const MetricDataQuerySchema = z.object({
|
export const MetricDataQuerySchema = z.object({
|
||||||
limit: z.coerce.number().min(1).max(5000).default(5000).optional(),
|
limit: z.coerce.number().min(1).max(5000).default(5000).optional(),
|
||||||
|
version_number: z.coerce.number().optional(),
|
||||||
});
|
});
|
||||||
|
|
||||||
export type MetricDataQuery = z.infer<typeof MetricDataQuerySchema>;
|
export type MetricDataQuery = z.infer<typeof MetricDataQuerySchema>;
|
||||||
|
|
|
@ -163,6 +163,9 @@ importers:
|
||||||
'@buster/ai':
|
'@buster/ai':
|
||||||
specifier: workspace:*
|
specifier: workspace:*
|
||||||
version: link:../../packages/ai
|
version: link:../../packages/ai
|
||||||
|
'@buster/data-source':
|
||||||
|
specifier: workspace:*
|
||||||
|
version: link:../../packages/data-source
|
||||||
'@buster/database':
|
'@buster/database':
|
||||||
specifier: workspace:*
|
specifier: workspace:*
|
||||||
version: link:../../packages/database
|
version: link:../../packages/database
|
||||||
|
|
Loading…
Reference in New Issue