mirror of https://github.com/buster-so/buster.git
Merge pull request #1129 from buster-so/dallin-bus-1930-modifying-metrics-from-an-existing-report
version on report file cache
This commit is contained in:
commit
d0df2ab3db
|
@ -90,20 +90,30 @@ export async function getMetricDataHandler(
|
|||
});
|
||||
}
|
||||
|
||||
// Determine the actual version number we're working with
|
||||
const resolvedVersion = metric.versionNumber;
|
||||
|
||||
// Check R2 cache if report_file_id is provided
|
||||
if (reportFileId) {
|
||||
console.info('Checking R2 cache for metric data', {
|
||||
metricId,
|
||||
reportFileId,
|
||||
organizationId,
|
||||
version: resolvedVersion,
|
||||
});
|
||||
|
||||
try {
|
||||
const cachedData = await getCachedMetricData(organizationId, metricId, reportFileId);
|
||||
const cachedData = await getCachedMetricData(
|
||||
organizationId,
|
||||
metricId,
|
||||
reportFileId,
|
||||
resolvedVersion
|
||||
);
|
||||
if (cachedData) {
|
||||
console.info('Cache hit - returning cached metric data', {
|
||||
metricId,
|
||||
reportFileId,
|
||||
version: resolvedVersion,
|
||||
rowCount: cachedData.data?.length || 0,
|
||||
});
|
||||
return cachedData;
|
||||
|
@ -111,11 +121,13 @@ export async function getMetricDataHandler(
|
|||
console.info('Cache miss - will fetch from data source', {
|
||||
metricId,
|
||||
reportFileId,
|
||||
version: resolvedVersion,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error('Error checking cache, falling back to data source', {
|
||||
metricId,
|
||||
reportFileId,
|
||||
version: resolvedVersion,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
|
@ -164,6 +176,7 @@ export async function getMetricDataHandler(
|
|||
data_metadata: result.dataMetadata,
|
||||
metricId,
|
||||
has_more_records: hasMore || result.hasMoreRecords,
|
||||
...(resolvedVersion !== undefined && { version: resolvedVersion }),
|
||||
};
|
||||
|
||||
// Cache the data if report_file_id is provided (pass-through write)
|
||||
|
@ -172,17 +185,21 @@ export async function getMetricDataHandler(
|
|||
metricId,
|
||||
reportFileId,
|
||||
organizationId,
|
||||
version: resolvedVersion,
|
||||
rowCount: trimmedData.length,
|
||||
});
|
||||
|
||||
// Fire and forget - don't wait for cache write
|
||||
setCachedMetricData(organizationId, metricId, reportFileId, response).catch((error) => {
|
||||
console.error('Failed to cache metric data', {
|
||||
metricId,
|
||||
reportFileId,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
});
|
||||
setCachedMetricData(organizationId, metricId, reportFileId, response, resolvedVersion).catch(
|
||||
(error) => {
|
||||
console.error('Failed to cache metric data', {
|
||||
metricId,
|
||||
reportFileId,
|
||||
version: resolvedVersion,
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
return response;
|
||||
|
|
|
@ -58,20 +58,7 @@ export const cacheReportMetrics: ReturnType<
|
|||
await Promise.all(
|
||||
batch.map(async (metricId) => {
|
||||
try {
|
||||
// Check if already cached
|
||||
const exists = await checkCacheExists(organizationId, metricId, reportId);
|
||||
if (exists) {
|
||||
logger.info('Metric already cached, skipping', { metricId, reportId });
|
||||
cached.push({
|
||||
metricId,
|
||||
success: true,
|
||||
rowCount: 0, // Already cached, don't know the count
|
||||
});
|
||||
successCount++;
|
||||
return;
|
||||
}
|
||||
|
||||
// Fetch metric definition
|
||||
// Fetch metric definition first to get the version
|
||||
const metric = await getMetricWithDataSource({ metricId });
|
||||
if (!metric) {
|
||||
logger.warn('Metric not found', { metricId });
|
||||
|
@ -84,6 +71,31 @@ export const cacheReportMetrics: ReturnType<
|
|||
return;
|
||||
}
|
||||
|
||||
const metricVersion = metric.versionNumber;
|
||||
|
||||
// Check if already cached with version
|
||||
const exists = await checkCacheExists(
|
||||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
metricVersion
|
||||
);
|
||||
if (exists) {
|
||||
logger.info('Metric already cached, skipping', {
|
||||
metricId,
|
||||
reportId,
|
||||
version: metricVersion,
|
||||
});
|
||||
cached.push({
|
||||
metricId,
|
||||
success: true,
|
||||
rowCount: 0, // Already cached, don't know the count
|
||||
version: metricVersion,
|
||||
});
|
||||
successCount++;
|
||||
return;
|
||||
}
|
||||
|
||||
// Verify metric belongs to the organization
|
||||
if (metric.organizationId !== organizationId) {
|
||||
logger.warn('Metric belongs to different organization', {
|
||||
|
@ -136,19 +148,27 @@ export const cacheReportMetrics: ReturnType<
|
|||
retryDelays: [1000, 3000],
|
||||
});
|
||||
|
||||
// Cache the data
|
||||
// Cache the data with version
|
||||
const metricData: MetricDataResponse = {
|
||||
data: result.data,
|
||||
data_metadata: result.dataMetadata,
|
||||
metricId,
|
||||
has_more_records: result.hasMoreRecords,
|
||||
...(metricVersion !== undefined && { version: metricVersion }),
|
||||
};
|
||||
|
||||
await setCachedMetricData(organizationId, metricId, reportId, metricData);
|
||||
await setCachedMetricData(
|
||||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
metricData,
|
||||
metricVersion
|
||||
);
|
||||
|
||||
logger.info('Successfully cached metric', {
|
||||
metricId,
|
||||
reportId,
|
||||
version: metricVersion,
|
||||
rowCount: result.data.length,
|
||||
});
|
||||
|
||||
|
@ -156,6 +176,7 @@ export const cacheReportMetrics: ReturnType<
|
|||
metricId,
|
||||
success: true,
|
||||
rowCount: result.data.length,
|
||||
version: metricVersion,
|
||||
});
|
||||
successCount++;
|
||||
} catch (error) {
|
||||
|
|
|
@ -24,6 +24,7 @@ export const CacheReportMetricsOutputSchema = z.object({
|
|||
success: z.boolean(),
|
||||
rowCount: z.number().optional(),
|
||||
error: z.string().optional(),
|
||||
version: z.number().optional().describe('The version number of the cached metric'),
|
||||
})
|
||||
),
|
||||
totalMetrics: z.number(),
|
||||
|
|
|
@ -25,6 +25,7 @@ describe('Metric Cache', () => {
|
|||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
vi.restoreAllMocks();
|
||||
|
||||
mockProvider = {
|
||||
upload: vi.fn(),
|
||||
|
@ -264,6 +265,7 @@ describe('Metric Cache', () => {
|
|||
'organization-id': 'org-123',
|
||||
'metric-id': 'metric-456',
|
||||
'report-id': 'report-789',
|
||||
'metric-version': 'unversioned',
|
||||
'row-count': '2',
|
||||
'cached-at': '2024-01-15T12:00:00.000Z',
|
||||
},
|
||||
|
@ -303,6 +305,7 @@ describe('Metric Cache', () => {
|
|||
expect.any(Buffer),
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
'metric-version': 'unversioned',
|
||||
'row-count': '0',
|
||||
}),
|
||||
})
|
||||
|
@ -327,6 +330,7 @@ describe('Metric Cache', () => {
|
|||
expect.any(Buffer),
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
'metric-version': 'unversioned',
|
||||
'row-count': '0',
|
||||
}),
|
||||
})
|
||||
|
@ -388,11 +392,14 @@ describe('Metric Cache', () => {
|
|||
});
|
||||
|
||||
describe('batchCheckCacheExists', () => {
|
||||
it('should check multiple cache entries', async () => {
|
||||
mockProvider.exists
|
||||
.mockResolvedValueOnce(true)
|
||||
.mockResolvedValueOnce(false)
|
||||
.mockResolvedValueOnce(true);
|
||||
it('should check multiple cache entries without version', async () => {
|
||||
// Setup function-based mocking to control return values per key
|
||||
mockProvider.exists.mockImplementation(async (key: string) => {
|
||||
if (key.includes('metric-1')) return true;
|
||||
if (key.includes('metric-2')) return false; // both versioned and legacy return false
|
||||
if (key.includes('metric-3')) return true;
|
||||
return false;
|
||||
});
|
||||
|
||||
const pairs = [
|
||||
{ metricId: 'metric-1', reportId: 'report-1' },
|
||||
|
@ -402,10 +409,11 @@ describe('Metric Cache', () => {
|
|||
|
||||
const results = await batchCheckCacheExists('org-123', pairs);
|
||||
|
||||
expect(results.size).toBe(3);
|
||||
expect(results.get('metric-1-report-1')).toBe(true);
|
||||
expect(results.get('metric-2-report-2')).toBe(false);
|
||||
expect(results.get('metric-3-report-3')).toBe(true);
|
||||
expect(mockProvider.exists).toHaveBeenCalledTimes(3);
|
||||
expect(mockProvider.exists).toHaveBeenCalledTimes(4); // metric-2 will check both primary and legacy
|
||||
});
|
||||
|
||||
it('should process in batches', async () => {
|
||||
|
@ -471,5 +479,90 @@ describe('Metric Cache', () => {
|
|||
expect(results.get('metric.with.dots-report_with_underscores')).toBe(true);
|
||||
expect(results.get('metric-with-dashes-report-123')).toBe(true);
|
||||
});
|
||||
|
||||
it('should handle versioned metrics correctly', async () => {
|
||||
mockProvider.exists
|
||||
.mockResolvedValueOnce(true)
|
||||
.mockResolvedValueOnce(false)
|
||||
.mockResolvedValueOnce(true);
|
||||
|
||||
const pairs = [
|
||||
{ metricId: 'metric-1', reportId: 'report-1', version: 1 },
|
||||
{ metricId: 'metric-2', reportId: 'report-2', version: 2 },
|
||||
{ metricId: 'metric-3', reportId: 'report-3', version: undefined },
|
||||
];
|
||||
|
||||
const results = await batchCheckCacheExists('org-123', pairs);
|
||||
|
||||
expect(results.get('metric-1-report-1-v1')).toBe(true);
|
||||
expect(results.get('metric-2-report-2-v2')).toBe(false);
|
||||
expect(results.get('metric-3-report-3')).toBe(true);
|
||||
expect(mockProvider.exists).toHaveBeenCalledTimes(3);
|
||||
});
|
||||
});
|
||||
|
||||
describe('version-aware caching', () => {
|
||||
it('should check legacy cache when version is provided but not found', async () => {
|
||||
const consoleInfoSpy = vi.spyOn(console, 'info').mockImplementation(() => {});
|
||||
|
||||
mockProvider.download
|
||||
.mockResolvedValueOnce({
|
||||
success: false, // versioned key download fails
|
||||
})
|
||||
.mockResolvedValueOnce({
|
||||
success: true, // legacy key download succeeds
|
||||
data: Buffer.from(JSON.stringify({ metricId: 'metric-123', data: [] })),
|
||||
contentType: 'application/json',
|
||||
size: 100,
|
||||
});
|
||||
|
||||
const result = await getCachedMetricData('org-123', 'metric-123', 'report-456', 2);
|
||||
|
||||
expect(result).toBeTruthy();
|
||||
expect(result?.metricId).toBe('metric-123');
|
||||
expect(mockProvider.download).toHaveBeenCalledTimes(2);
|
||||
expect(mockProvider.download).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
'static-report-assets/org-123/metric-123-report-456-v2.json'
|
||||
);
|
||||
expect(mockProvider.download).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
'static-report-assets/org-123/metric-123-report-456.json'
|
||||
);
|
||||
|
||||
consoleInfoSpy.mockRestore();
|
||||
});
|
||||
|
||||
it('should write cache with version metadata', async () => {
|
||||
mockProvider.upload.mockResolvedValue({
|
||||
success: true,
|
||||
key: 'test-key',
|
||||
size: 100,
|
||||
});
|
||||
|
||||
const dateSpy = vi
|
||||
.spyOn(Date.prototype, 'toISOString')
|
||||
.mockReturnValue('2024-01-15T12:00:00.000Z');
|
||||
|
||||
await setCachedMetricData(
|
||||
'org-123',
|
||||
'metric-456',
|
||||
'report-789',
|
||||
{ metricId: 'metric-456', data: [] },
|
||||
3
|
||||
);
|
||||
|
||||
expect(mockProvider.upload).toHaveBeenCalledWith(
|
||||
'static-report-assets/org-123/metric-456-report-789-v3.json',
|
||||
expect.any(Buffer),
|
||||
expect.objectContaining({
|
||||
metadata: expect.objectContaining({
|
||||
'metric-version': '3',
|
||||
}),
|
||||
})
|
||||
);
|
||||
|
||||
dateSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
@ -4,9 +4,25 @@ import { getProviderForOrganization } from '../storage';
|
|||
const CACHE_PREFIX = 'static-report-assets';
|
||||
|
||||
/**
|
||||
* Generate cache key for metric data
|
||||
* Generate cache key for metric data with version
|
||||
*/
|
||||
export function generateCacheKey(
|
||||
organizationId: string,
|
||||
metricId: string,
|
||||
reportId: string,
|
||||
version?: number
|
||||
): string {
|
||||
if (version !== undefined) {
|
||||
return `${CACHE_PREFIX}/${organizationId}/${metricId}-${reportId}-v${version}.json`;
|
||||
}
|
||||
// Legacy format for backward compatibility
|
||||
return `${CACHE_PREFIX}/${organizationId}/${metricId}-${reportId}.json`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate legacy cache key for backward compatibility
|
||||
*/
|
||||
function generateLegacyCacheKey(
|
||||
organizationId: string,
|
||||
metricId: string,
|
||||
reportId: string
|
||||
|
@ -37,13 +53,21 @@ function jsonToData(buffer: Buffer): MetricDataResponse {
|
|||
export async function checkCacheExists(
|
||||
organizationId: string,
|
||||
metricId: string,
|
||||
reportId: string
|
||||
reportId: string,
|
||||
version?: number
|
||||
): Promise<boolean> {
|
||||
try {
|
||||
const storageProvider = await getProviderForOrganization(organizationId);
|
||||
const key = generateCacheKey(organizationId, metricId, reportId);
|
||||
const key = generateCacheKey(organizationId, metricId, reportId, version);
|
||||
|
||||
const exists = await storageProvider.exists(key);
|
||||
|
||||
// If versioned key doesn't exist and no version was specified, check legacy key
|
||||
if (!exists && version === undefined) {
|
||||
const legacyKey = generateLegacyCacheKey(organizationId, metricId, reportId);
|
||||
return await storageProvider.exists(legacyKey);
|
||||
}
|
||||
|
||||
return exists;
|
||||
} catch (error: unknown) {
|
||||
console.error('[metric-cache] Error checking cache existence:', error);
|
||||
|
@ -57,26 +81,41 @@ export async function checkCacheExists(
|
|||
export async function getCachedMetricData(
|
||||
organizationId: string,
|
||||
metricId: string,
|
||||
reportId: string
|
||||
reportId: string,
|
||||
version?: number
|
||||
): Promise<MetricDataResponse | null> {
|
||||
try {
|
||||
const storageProvider = await getProviderForOrganization(organizationId);
|
||||
const key = generateCacheKey(organizationId, metricId, reportId);
|
||||
const key = generateCacheKey(organizationId, metricId, reportId, version);
|
||||
|
||||
console.info('[metric-cache] Fetching cached data', {
|
||||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
version,
|
||||
key,
|
||||
});
|
||||
|
||||
const downloadResult = await storageProvider.download(key);
|
||||
let downloadResult = await storageProvider.download(key);
|
||||
|
||||
// If versioned key doesn't exist and version was provided, try without version for backward compatibility
|
||||
if ((!downloadResult.success || !downloadResult.data) && version !== undefined) {
|
||||
const legacyKey = generateLegacyCacheKey(organizationId, metricId, reportId);
|
||||
console.info('[metric-cache] Trying legacy cache key', {
|
||||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
legacyKey,
|
||||
});
|
||||
downloadResult = await storageProvider.download(legacyKey);
|
||||
}
|
||||
|
||||
if (!downloadResult.success || !downloadResult.data) {
|
||||
console.info('[metric-cache] Cache miss', {
|
||||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
version,
|
||||
});
|
||||
return null;
|
||||
}
|
||||
|
@ -89,6 +128,7 @@ export async function getCachedMetricData(
|
|||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
version,
|
||||
rowCount: data.data?.length || 0,
|
||||
});
|
||||
|
||||
|
@ -106,16 +146,18 @@ export async function setCachedMetricData(
|
|||
organizationId: string,
|
||||
metricId: string,
|
||||
reportId: string,
|
||||
data: MetricDataResponse
|
||||
data: MetricDataResponse,
|
||||
version?: number
|
||||
): Promise<void> {
|
||||
try {
|
||||
const storageProvider = await getProviderForOrganization(organizationId);
|
||||
const key = generateCacheKey(organizationId, metricId, reportId);
|
||||
const key = generateCacheKey(organizationId, metricId, reportId, version);
|
||||
|
||||
console.info('[metric-cache] Caching metric data', {
|
||||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
version,
|
||||
key,
|
||||
rowCount: data.data?.length || 0,
|
||||
});
|
||||
|
@ -129,6 +171,7 @@ export async function setCachedMetricData(
|
|||
'organization-id': organizationId,
|
||||
'metric-id': metricId,
|
||||
'report-id': reportId,
|
||||
'metric-version': version !== undefined ? String(version) : 'unversioned',
|
||||
'row-count': String(data.data?.length || 0),
|
||||
'cached-at': new Date().toISOString(),
|
||||
},
|
||||
|
@ -139,6 +182,7 @@ export async function setCachedMetricData(
|
|||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
version,
|
||||
sizeBytes: jsonBuffer.length,
|
||||
});
|
||||
} else {
|
||||
|
@ -155,7 +199,7 @@ export async function setCachedMetricData(
|
|||
*/
|
||||
export async function batchCheckCacheExists(
|
||||
organizationId: string,
|
||||
metricReportPairs: Array<{ metricId: string; reportId: string }>
|
||||
metricReportPairs: Array<{ metricId: string; reportId: string; version?: number }>
|
||||
): Promise<Map<string, boolean>> {
|
||||
const results = new Map<string, boolean>();
|
||||
|
||||
|
@ -164,9 +208,11 @@ export async function batchCheckCacheExists(
|
|||
for (let i = 0; i < metricReportPairs.length; i += BATCH_SIZE) {
|
||||
const batch = metricReportPairs.slice(i, i + BATCH_SIZE);
|
||||
const checks = await Promise.all(
|
||||
batch.map(async ({ metricId, reportId }) => {
|
||||
const exists = await checkCacheExists(organizationId, metricId, reportId);
|
||||
return { key: `${metricId}-${reportId}`, exists };
|
||||
batch.map(async ({ metricId, reportId, version }) => {
|
||||
const exists = await checkCacheExists(organizationId, metricId, reportId, version);
|
||||
const key =
|
||||
version !== undefined ? `${metricId}-${reportId}-v${version}` : `${metricId}-${reportId}`;
|
||||
return { key, exists };
|
||||
})
|
||||
);
|
||||
|
||||
|
|
|
@ -42,7 +42,10 @@ export const MetricWithDataSourceSchema = z.object({
|
|||
versionHistory: z.record(VersionHistoryEntrySchema),
|
||||
secretId: z.string(),
|
||||
dataSourceType: z.string(),
|
||||
versionNumber: z.number().optional(),
|
||||
versionNumber: z
|
||||
.number()
|
||||
.optional()
|
||||
.describe('The version number of the metric content being used'),
|
||||
workspaceSharing: z.enum(['none', 'can_view', 'can_edit', 'full_access']).nullable(),
|
||||
workspaceSharingEnabledBy: z.string().nullable(),
|
||||
workspaceSharingEnabledAt: z.string().nullable(),
|
||||
|
@ -121,6 +124,7 @@ export async function getMetricWithDataSource(
|
|||
let content = parsedContent.data;
|
||||
let versionNumber: number | undefined;
|
||||
|
||||
// If version number was specified, try to use that version
|
||||
if (validated.versionNumber !== undefined && versionHistory) {
|
||||
const versionKey = validated.versionNumber.toString();
|
||||
const versionData = versionHistory[versionKey];
|
||||
|
@ -128,8 +132,13 @@ export async function getMetricWithDataSource(
|
|||
if (versionData?.content) {
|
||||
content = versionData.content;
|
||||
versionNumber = validated.versionNumber;
|
||||
} else {
|
||||
// If specified version not found, use latest version
|
||||
versionNumber = getLatestMetricVersion(versionHistory);
|
||||
}
|
||||
// If version not found, fall back to current content
|
||||
} else {
|
||||
// No version specified, determine the latest version
|
||||
versionNumber = getLatestMetricVersion(versionHistory);
|
||||
}
|
||||
|
||||
// Parse and validate dataMetadata
|
||||
|
@ -168,3 +177,25 @@ export async function getMetricWithDataSource(
|
|||
export function extractSqlFromMetricContent(content: MetricContent): string {
|
||||
return content.sql;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the latest version number for a metric
|
||||
* Returns the highest version number from the version history
|
||||
*/
|
||||
export function getLatestMetricVersion(
|
||||
versionHistory: Record<string, VersionHistoryEntry> | null
|
||||
): number | undefined {
|
||||
if (!versionHistory || Object.keys(versionHistory).length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const versions = Object.keys(versionHistory)
|
||||
.map((key) => Number.parseInt(key, 10))
|
||||
.filter((num) => !Number.isNaN(num));
|
||||
|
||||
if (versions.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return Math.max(...versions);
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ export {
|
|||
export {
|
||||
getMetricWithDataSource,
|
||||
extractSqlFromMetricContent,
|
||||
getLatestMetricVersion,
|
||||
// Schemas (Zod-first)
|
||||
GetMetricWithDataSourceInputSchema,
|
||||
MetricContentSchema,
|
||||
|
|
Loading…
Reference in New Issue