mirror of https://github.com/buster-so/buster.git
Implement caching for report snapshots in modify and create reports tools
This commit is contained in:
parent
0ac8921a8a
commit
bb6d932f26
|
@ -2,6 +2,7 @@ import { batchUpdateReport, updateMessageEntries } from '@buster/database';
|
||||||
import type { ChatMessageResponseMessage } from '@buster/server-shared/chats';
|
import type { ChatMessageResponseMessage } from '@buster/server-shared/chats';
|
||||||
import { wrapTraced } from 'braintrust';
|
import { wrapTraced } from 'braintrust';
|
||||||
import { createRawToolResultEntry } from '../../../shared/create-raw-llm-tool-result-entry';
|
import { createRawToolResultEntry } from '../../../shared/create-raw-llm-tool-result-entry';
|
||||||
|
import { updateCachedSnapshot } from '../report-snapshot-cache';
|
||||||
import type {
|
import type {
|
||||||
CreateReportsContext,
|
CreateReportsContext,
|
||||||
CreateReportsInput,
|
CreateReportsInput,
|
||||||
|
@ -168,6 +169,9 @@ export function createCreateReportsExecute(
|
||||||
versionHistory,
|
versionHistory,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Update cache with the newly created report content
|
||||||
|
updateCachedSnapshot(reportId, content, versionHistory);
|
||||||
|
|
||||||
// Update state to reflect successful update
|
// Update state to reflect successful update
|
||||||
if (!state.files) {
|
if (!state.files) {
|
||||||
state.files = [];
|
state.files = [];
|
||||||
|
|
|
@ -10,6 +10,7 @@ import {
|
||||||
OptimisticJsonParser,
|
OptimisticJsonParser,
|
||||||
getOptimisticValue,
|
getOptimisticValue,
|
||||||
} from '../../../../utils/streaming/optimistic-json-parser';
|
} from '../../../../utils/streaming/optimistic-json-parser';
|
||||||
|
import { getCachedSnapshot, updateCachedSnapshot } from '../report-snapshot-cache';
|
||||||
import {
|
import {
|
||||||
createModifyReportsRawLlmMessageEntry,
|
createModifyReportsRawLlmMessageEntry,
|
||||||
createModifyReportsReasoningEntry,
|
createModifyReportsReasoningEntry,
|
||||||
|
@ -53,19 +54,14 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
|
||||||
if (id && !state.reportId) {
|
if (id && !state.reportId) {
|
||||||
state.reportId = id;
|
state.reportId = id;
|
||||||
|
|
||||||
// Fetch the report snapshot and version history immediately when we get the ID
|
// Check cache first, then fetch from DB if needed
|
||||||
try {
|
try {
|
||||||
const existingReport = await db
|
// Try to get from cache first
|
||||||
.select({
|
const cached = getCachedSnapshot(id);
|
||||||
content: reportFiles.content,
|
|
||||||
versionHistory: reportFiles.versionHistory,
|
|
||||||
})
|
|
||||||
.from(reportFiles)
|
|
||||||
.where(and(eq(reportFiles.id, id), isNull(reportFiles.deletedAt)))
|
|
||||||
.limit(1);
|
|
||||||
|
|
||||||
if (existingReport.length > 0 && existingReport[0]) {
|
if (cached) {
|
||||||
state.snapshotContent = existingReport[0].content;
|
// Use cached snapshot
|
||||||
|
state.snapshotContent = cached.content;
|
||||||
|
|
||||||
type VersionHistoryEntry = {
|
type VersionHistoryEntry = {
|
||||||
content: string;
|
content: string;
|
||||||
|
@ -73,7 +69,7 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
|
||||||
version_number: number;
|
version_number: number;
|
||||||
};
|
};
|
||||||
|
|
||||||
const versionHistory = existingReport[0].versionHistory as Record<
|
const versionHistory = cached.versionHistory as Record<
|
||||||
string,
|
string,
|
||||||
VersionHistoryEntry
|
VersionHistoryEntry
|
||||||
> | null;
|
> | null;
|
||||||
|
@ -89,12 +85,56 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
|
||||||
state.snapshotVersion = 1;
|
state.snapshotVersion = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
console.info('[modify-reports-delta] Fetched report snapshot', {
|
console.info('[modify-reports-delta] Using cached snapshot', {
|
||||||
reportId: id,
|
reportId: id,
|
||||||
version: state.snapshotVersion,
|
version: state.snapshotVersion,
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
console.error('[modify-reports-delta] Report not found', { reportId: id });
|
// Cache miss - fetch from database
|
||||||
|
const existingReport = await db
|
||||||
|
.select({
|
||||||
|
content: reportFiles.content,
|
||||||
|
versionHistory: reportFiles.versionHistory,
|
||||||
|
})
|
||||||
|
.from(reportFiles)
|
||||||
|
.where(and(eq(reportFiles.id, id), isNull(reportFiles.deletedAt)))
|
||||||
|
.limit(1);
|
||||||
|
|
||||||
|
if (existingReport.length > 0 && existingReport[0]) {
|
||||||
|
state.snapshotContent = existingReport[0].content;
|
||||||
|
|
||||||
|
type VersionHistoryEntry = {
|
||||||
|
content: string;
|
||||||
|
updated_at: string;
|
||||||
|
version_number: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
const versionHistory = existingReport[0].versionHistory as Record<
|
||||||
|
string,
|
||||||
|
VersionHistoryEntry
|
||||||
|
> | null;
|
||||||
|
state.versionHistory = versionHistory || undefined;
|
||||||
|
|
||||||
|
// Extract current version number from version history
|
||||||
|
if (state.versionHistory) {
|
||||||
|
const versionNumbers = Object.values(state.versionHistory).map(
|
||||||
|
(v) => v.version_number
|
||||||
|
);
|
||||||
|
state.snapshotVersion = versionNumbers.length > 0 ? Math.max(...versionNumbers) : 1;
|
||||||
|
} else {
|
||||||
|
state.snapshotVersion = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update cache for next time
|
||||||
|
updateCachedSnapshot(id, existingReport[0].content, versionHistory);
|
||||||
|
|
||||||
|
console.info('[modify-reports-delta] Fetched report snapshot from DB', {
|
||||||
|
reportId: id,
|
||||||
|
version: state.snapshotVersion,
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
console.error('[modify-reports-delta] Report not found', { reportId: id });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('[modify-reports-delta] Error fetching report snapshot:', error);
|
console.error('[modify-reports-delta] Error fetching report snapshot:', error);
|
||||||
|
@ -266,6 +306,9 @@ export function createModifyReportsDelta(context: ModifyReportsContext, state: M
|
||||||
versionHistory,
|
versionHistory,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Update cache with the new content for subsequent modifications
|
||||||
|
updateCachedSnapshot(state.reportId, newContent, versionHistory);
|
||||||
|
|
||||||
// Update state with the final content (but keep snapshot immutable)
|
// Update state with the final content (but keep snapshot immutable)
|
||||||
state.finalContent = newContent;
|
state.finalContent = newContent;
|
||||||
state.versionHistory = versionHistory;
|
state.versionHistory = versionHistory;
|
||||||
|
|
|
@ -84,8 +84,8 @@ describe('modify-reports-execute', () => {
|
||||||
toolCallId: 'tool-call-123',
|
toolCallId: 'tool-call-123',
|
||||||
edits: [],
|
edits: [],
|
||||||
startTime: Date.now(),
|
startTime: Date.now(),
|
||||||
snapshotContent: undefined, // Will be set per test
|
snapshotContent: undefined, // Will be set per test
|
||||||
versionHistory: undefined, // Will be set per test
|
versionHistory: undefined, // Will be set per test
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ import { and, eq, isNull } from 'drizzle-orm';
|
||||||
import { createRawToolResultEntry } from '../../../shared/create-raw-llm-tool-result-entry';
|
import { createRawToolResultEntry } from '../../../shared/create-raw-llm-tool-result-entry';
|
||||||
import { trackFileAssociations } from '../../file-tracking-helper';
|
import { trackFileAssociations } from '../../file-tracking-helper';
|
||||||
import { shouldIncrementVersion, updateVersionHistory } from '../helpers/report-version-helper';
|
import { shouldIncrementVersion, updateVersionHistory } from '../helpers/report-version-helper';
|
||||||
|
import { updateCachedSnapshot } from '../report-snapshot-cache';
|
||||||
import {
|
import {
|
||||||
createModifyReportsRawLlmMessageEntry,
|
createModifyReportsRawLlmMessageEntry,
|
||||||
createModifyReportsReasoningEntry,
|
createModifyReportsReasoningEntry,
|
||||||
|
@ -84,7 +85,7 @@ async function processEditOperations(
|
||||||
// Otherwise fetch from database (for cases where delta didn't run)
|
// Otherwise fetch from database (for cases where delta didn't run)
|
||||||
let baseContent: string;
|
let baseContent: string;
|
||||||
let baseVersionHistory: VersionHistory | null;
|
let baseVersionHistory: VersionHistory | null;
|
||||||
|
|
||||||
if (snapshotContent !== undefined) {
|
if (snapshotContent !== undefined) {
|
||||||
// Use the immutable snapshot from state
|
// Use the immutable snapshot from state
|
||||||
baseContent = snapshotContent;
|
baseContent = snapshotContent;
|
||||||
|
@ -114,7 +115,7 @@ async function processEditOperations(
|
||||||
errors: ['Report not found'],
|
errors: ['Report not found'],
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
baseContent = report.content;
|
baseContent = report.content;
|
||||||
baseVersionHistory = report.versionHistory as VersionHistory | null;
|
baseVersionHistory = report.versionHistory as VersionHistory | null;
|
||||||
}
|
}
|
||||||
|
@ -164,6 +165,9 @@ async function processEditOperations(
|
||||||
versionHistory: newVersionHistory,
|
versionHistory: newVersionHistory,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Update cache with the modified content for future operations
|
||||||
|
updateCachedSnapshot(reportId, currentContent, newVersionHistory);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
success: true,
|
success: true,
|
||||||
finalContent: currentContent,
|
finalContent: currentContent,
|
||||||
|
@ -248,12 +252,12 @@ const modifyReportsFile = wrapTraced(
|
||||||
|
|
||||||
// Process all edit operations using snapshot as source of truth
|
// Process all edit operations using snapshot as source of truth
|
||||||
const editResult = await processEditOperations(
|
const editResult = await processEditOperations(
|
||||||
params.id,
|
params.id,
|
||||||
params.name,
|
params.name,
|
||||||
params.edits,
|
params.edits,
|
||||||
messageId,
|
messageId,
|
||||||
snapshotContent, // Pass immutable snapshot
|
snapshotContent, // Pass immutable snapshot
|
||||||
versionHistory // Pass snapshot version history
|
versionHistory // Pass snapshot version history
|
||||||
);
|
);
|
||||||
|
|
||||||
// Track file associations if this is a new version (not part of same turn)
|
// Track file associations if this is a new version (not part of same turn)
|
||||||
|
@ -328,10 +332,10 @@ export function createModifyReportsExecute(
|
||||||
// Always process using the complete input as source of truth
|
// Always process using the complete input as source of truth
|
||||||
console.info('[modify-reports] Processing modifications from complete input');
|
console.info('[modify-reports] Processing modifications from complete input');
|
||||||
const result = await modifyReportsFile(
|
const result = await modifyReportsFile(
|
||||||
input,
|
input,
|
||||||
context,
|
context,
|
||||||
state.snapshotContent, // Pass immutable snapshot from state
|
state.snapshotContent, // Pass immutable snapshot from state
|
||||||
state.versionHistory // Pass snapshot version history from state
|
state.versionHistory // Pass snapshot version history from state
|
||||||
);
|
);
|
||||||
|
|
||||||
if (!result) {
|
if (!result) {
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
// Simple in-memory cache for report snapshots to avoid repeated DB queries
|
||||||
|
// during sequential report modifications (create → modify → modify pattern)
|
||||||
|
|
||||||
|
type VersionHistoryEntry = {
|
||||||
|
content: string;
|
||||||
|
updated_at: string;
|
||||||
|
version_number: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
type VersionHistory = Record<string, VersionHistoryEntry>;
|
||||||
|
|
||||||
|
type CachedSnapshot = {
|
||||||
|
content: string;
|
||||||
|
versionHistory: VersionHistory | null;
|
||||||
|
timestamp: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Simple in-memory cache - no LRU, just a Map
|
||||||
|
const reportSnapshots = new Map<string, CachedSnapshot>();
|
||||||
|
|
||||||
|
// 5 minute expiry
|
||||||
|
const CACHE_TTL = 5 * 60 * 1000;
|
||||||
|
|
||||||
|
export function getCachedSnapshot(reportId: string): {
|
||||||
|
content: string;
|
||||||
|
versionHistory: VersionHistory | null;
|
||||||
|
} | null {
|
||||||
|
const cached = reportSnapshots.get(reportId);
|
||||||
|
|
||||||
|
// Check if exists and not expired
|
||||||
|
if (cached && Date.now() - cached.timestamp < CACHE_TTL) {
|
||||||
|
console.info('[report-cache] Cache hit', {
|
||||||
|
reportId,
|
||||||
|
age: `${Math.round((Date.now() - cached.timestamp) / 1000)}s`,
|
||||||
|
});
|
||||||
|
return {
|
||||||
|
content: cached.content,
|
||||||
|
versionHistory: cached.versionHistory,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Expired or not found
|
||||||
|
if (cached) {
|
||||||
|
console.info('[report-cache] Cache expired, removing', { reportId });
|
||||||
|
reportSnapshots.delete(reportId);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function updateCachedSnapshot(
|
||||||
|
reportId: string,
|
||||||
|
content: string,
|
||||||
|
versionHistory: VersionHistory | null
|
||||||
|
): void {
|
||||||
|
reportSnapshots.set(reportId, {
|
||||||
|
content,
|
||||||
|
versionHistory,
|
||||||
|
timestamp: Date.now(),
|
||||||
|
});
|
||||||
|
console.info('[report-cache] Updated cache', {
|
||||||
|
reportId,
|
||||||
|
contentLength: content.length,
|
||||||
|
cacheSize: reportSnapshots.size,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear old entries periodically to prevent memory bloat
|
||||||
|
const cleanupInterval = setInterval(() => {
|
||||||
|
const now = Date.now();
|
||||||
|
let cleaned = 0;
|
||||||
|
|
||||||
|
for (const [id, data] of reportSnapshots.entries()) {
|
||||||
|
if (now - data.timestamp > CACHE_TTL) {
|
||||||
|
reportSnapshots.delete(id);
|
||||||
|
cleaned++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cleaned > 0) {
|
||||||
|
console.info('[report-cache] Cleanup completed', {
|
||||||
|
entriesRemoved: cleaned,
|
||||||
|
remainingEntries: reportSnapshots.size,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}, CACHE_TTL); // Run cleanup every 5 minutes
|
||||||
|
|
||||||
|
// Prevent the interval from keeping the process alive
|
||||||
|
cleanupInterval.unref?.();
|
||||||
|
|
||||||
|
// Export a function to clear the cache if needed (e.g., for testing)
|
||||||
|
export function clearReportCache(): void {
|
||||||
|
reportSnapshots.clear();
|
||||||
|
console.info('[report-cache] Cache cleared');
|
||||||
|
}
|
Loading…
Reference in New Issue