mirror of https://github.com/buster-so/buster.git
Enhance S3 integration functionality
- Added bucketName to S3 integration responses and handlers to support bucket-specific operations. - Updated tests to reflect the inclusion of bucketName in integration creation and retrieval processes. - Improved error handling for fetching bucket names from the vault during S3 integration retrieval. These changes improve the S3 integration management by allowing for more granular control and visibility of storage buckets.
This commit is contained in:
parent
f4b8199f21
commit
e97ec9c002
|
@ -60,6 +60,7 @@ describe('createS3IntegrationHandler', () => {
|
||||||
id: 'integration-123',
|
id: 'integration-123',
|
||||||
provider: 's3',
|
provider: 's3',
|
||||||
organizationId: 'org-123',
|
organizationId: 'org-123',
|
||||||
|
bucketName: 'test-bucket',
|
||||||
createdAt: new Date('2024-01-15'),
|
createdAt: new Date('2024-01-15'),
|
||||||
updatedAt: new Date('2024-01-15'),
|
updatedAt: new Date('2024-01-15'),
|
||||||
deletedAt: null,
|
deletedAt: null,
|
||||||
|
|
|
@ -72,6 +72,7 @@ export async function createS3IntegrationHandler(
|
||||||
id: integration.id,
|
id: integration.id,
|
||||||
provider: integration.provider,
|
provider: integration.provider,
|
||||||
organizationId: integration.organizationId,
|
organizationId: integration.organizationId,
|
||||||
|
bucketName: request.bucket,
|
||||||
createdAt: integration.createdAt,
|
createdAt: integration.createdAt,
|
||||||
updatedAt: integration.updatedAt,
|
updatedAt: integration.updatedAt,
|
||||||
deletedAt: integration.deletedAt,
|
deletedAt: integration.deletedAt,
|
||||||
|
|
|
@ -1,6 +1,10 @@
|
||||||
import type { User } from '@buster/database';
|
import type { User } from '@buster/database';
|
||||||
import { getS3IntegrationByOrganizationId, getUserOrganizationId } from '@buster/database';
|
import {
|
||||||
import type { GetS3IntegrationResponse } from '@buster/server-shared';
|
getS3IntegrationByOrganizationId,
|
||||||
|
getSecretByName,
|
||||||
|
getUserOrganizationId,
|
||||||
|
} from '@buster/database';
|
||||||
|
import type { CreateS3IntegrationRequest, GetS3IntegrationResponse } from '@buster/server-shared';
|
||||||
import { HTTPException } from 'hono/http-exception';
|
import { HTTPException } from 'hono/http-exception';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -32,10 +36,26 @@ export async function getS3IntegrationHandler(user: User): Promise<GetS3Integrat
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Try to fetch the bucket name from the vault
|
||||||
|
let bucketName: string | undefined;
|
||||||
|
try {
|
||||||
|
const secretName = `s3-integration-${integration.id}`;
|
||||||
|
const secret = await getSecretByName(secretName);
|
||||||
|
|
||||||
|
if (secret) {
|
||||||
|
const secretData = JSON.parse(secret.secret) as CreateS3IntegrationRequest;
|
||||||
|
bucketName = secretData.bucket;
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
// Log but don't fail the request if we can't get the bucket name
|
||||||
|
console.warn('Failed to fetch bucket name from vault:', error);
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
id: integration.id,
|
id: integration.id,
|
||||||
provider: integration.provider,
|
provider: integration.provider,
|
||||||
organizationId: integration.organizationId,
|
organizationId: integration.organizationId,
|
||||||
|
bucketName,
|
||||||
createdAt: integration.createdAt,
|
createdAt: integration.createdAt,
|
||||||
updatedAt: integration.updatedAt,
|
updatedAt: integration.updatedAt,
|
||||||
deletedAt: integration.deletedAt,
|
deletedAt: integration.deletedAt,
|
||||||
|
|
|
@ -245,7 +245,7 @@ export const analystAgentTask: ReturnType<
|
||||||
machine: 'small-2x',
|
machine: 'small-2x',
|
||||||
schema: AnalystAgentTaskInputSchema,
|
schema: AnalystAgentTaskInputSchema,
|
||||||
queue: analystQueue,
|
queue: analystQueue,
|
||||||
maxDuration: 1200, // 15 minutes for complex analysis
|
maxDuration: 1200, // 20 minutes for complex analysis
|
||||||
run: async (payload): Promise<AnalystAgentTaskOutput> => {
|
run: async (payload): Promise<AnalystAgentTaskOutput> => {
|
||||||
const taskStartTime = Date.now();
|
const taskStartTime = Date.now();
|
||||||
const resourceTracker = new ResourceTracker();
|
const resourceTracker = new ResourceTracker();
|
||||||
|
|
|
@ -140,7 +140,7 @@ const StorageConfiguration = React.memo(() => {
|
||||||
</div>
|
</div>
|
||||||
<div className="flex items-center space-x-2">
|
<div className="flex items-center space-x-2">
|
||||||
<Text size="sm" className="text-icon-color">
|
<Text size="sm" className="text-icon-color">
|
||||||
{providerLabels[s3Integration.provider]}
|
{s3Integration.bucketName || providerLabels[s3Integration.provider]}
|
||||||
</Text>
|
</Text>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
|
@ -37,15 +37,6 @@ function initializeSonnet4(): ReturnType<typeof createFallback> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (process.env.ANTHROPIC_API_KEY) {
|
|
||||||
try {
|
|
||||||
models.push(anthropicModel('claude-opus-4-1-20250805'));
|
|
||||||
console.info('Opus41: Anthropic model added to fallback chain');
|
|
||||||
} catch (error) {
|
|
||||||
console.warn('Opus41: Failed to initialize Anthropic model:', error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure we have at least one model
|
// Ensure we have at least one model
|
||||||
if (models.length === 0) {
|
if (models.length === 0) {
|
||||||
throw new Error(
|
throw new Error(
|
||||||
|
@ -53,15 +44,6 @@ function initializeSonnet4(): ReturnType<typeof createFallback> {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (process.env.OPENAI_API_KEY) {
|
|
||||||
try {
|
|
||||||
models.push(openaiModel('gpt-5'));
|
|
||||||
console.info('Sonnet4: OpenAI model added to fallback chain');
|
|
||||||
} catch (error) {
|
|
||||||
console.warn('Sonnet4: Failed to initialize OpenAI model:', error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
console.info(`Sonnet4: Initialized with ${models.length} model(s) in fallback chain`);
|
console.info(`Sonnet4: Initialized with ${models.length} model(s) in fallback chain`);
|
||||||
|
|
||||||
_sonnet4Instance = createFallback({
|
_sonnet4Instance = createFallback({
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
import { updateMessage, updateMessageEntries } from '@buster/database';
|
import { updateChat, updateMessage, updateMessageEntries } from '@buster/database';
|
||||||
import type { ToolCallOptions } from 'ai';
|
import type { ToolCallOptions } from 'ai';
|
||||||
import type { UpdateMessageEntriesParams } from '../../../../../database/src/queries/messages/update-message-entries';
|
import type { UpdateMessageEntriesParams } from '../../../../../database/src/queries/messages/update-message-entries';
|
||||||
import type { DoneToolContext, DoneToolState } from './done-tool';
|
import type { DoneToolContext, DoneToolState } from './done-tool';
|
||||||
import {
|
import {
|
||||||
createFileResponseMessages,
|
createFileResponseMessages,
|
||||||
|
extractAllFilesForChatUpdate,
|
||||||
extractFilesFromToolCalls,
|
extractFilesFromToolCalls,
|
||||||
} from './helpers/done-tool-file-selection';
|
} from './helpers/done-tool-file-selection';
|
||||||
import {
|
import {
|
||||||
|
@ -26,13 +27,24 @@ export function createDoneToolStart(context: DoneToolContext, doneToolState: Don
|
||||||
toolCallId: options.toolCallId,
|
toolCallId: options.toolCallId,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Extract files for response messages (filtered to avoid duplicates)
|
||||||
const extractedFiles = extractFilesFromToolCalls(options.messages);
|
const extractedFiles = extractFilesFromToolCalls(options.messages);
|
||||||
|
|
||||||
|
// Extract ALL files for updating the chat's most recent file (includes reports)
|
||||||
|
const allFilesForChatUpdate = extractAllFilesForChatUpdate(options.messages);
|
||||||
|
|
||||||
console.info('[done-tool-start] Files extracted', {
|
console.info('[done-tool-start] Files extracted', {
|
||||||
extractedCount: extractedFiles.length,
|
extractedCount: extractedFiles.length,
|
||||||
files: extractedFiles.map((f) => ({ id: f.id, type: f.fileType, name: f.fileName })),
|
files: extractedFiles.map((f) => ({ id: f.id, type: f.fileType, name: f.fileName })),
|
||||||
|
allFilesCount: allFilesForChatUpdate.length,
|
||||||
|
allFiles: allFilesForChatUpdate.map((f) => ({
|
||||||
|
id: f.id,
|
||||||
|
type: f.fileType,
|
||||||
|
name: f.fileName,
|
||||||
|
})),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Add extracted files as response messages (these are filtered to avoid duplicates)
|
||||||
if (extractedFiles.length > 0 && context.messageId) {
|
if (extractedFiles.length > 0 && context.messageId) {
|
||||||
const fileResponses = createFileResponseMessages(extractedFiles);
|
const fileResponses = createFileResponseMessages(extractedFiles);
|
||||||
|
|
||||||
|
@ -50,6 +62,41 @@ export function createDoneToolStart(context: DoneToolContext, doneToolState: Don
|
||||||
console.error('[done-tool] Failed to add file response entries:', error);
|
console.error('[done-tool] Failed to add file response entries:', error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update the chat with the most recent file (using ALL files, including reports)
|
||||||
|
if (context.chatId && allFilesForChatUpdate.length > 0) {
|
||||||
|
// Sort files by version number (descending) to get the most recent
|
||||||
|
const sortedFiles = allFilesForChatUpdate.sort((a, b) => {
|
||||||
|
const versionA = a.versionNumber || 1;
|
||||||
|
const versionB = b.versionNumber || 1;
|
||||||
|
return versionB - versionA;
|
||||||
|
});
|
||||||
|
|
||||||
|
// Prefer reports over other file types for the chat's most recent file
|
||||||
|
const reportFile = sortedFiles.find((f) => f.fileType === 'report');
|
||||||
|
const mostRecentFile = reportFile || sortedFiles[0];
|
||||||
|
|
||||||
|
if (mostRecentFile) {
|
||||||
|
console.info('[done-tool-start] Updating chat with most recent file', {
|
||||||
|
chatId: context.chatId,
|
||||||
|
fileId: mostRecentFile.id,
|
||||||
|
fileType: mostRecentFile.fileType,
|
||||||
|
fileName: mostRecentFile.fileName,
|
||||||
|
versionNumber: mostRecentFile.versionNumber,
|
||||||
|
isReport: mostRecentFile.fileType === 'report',
|
||||||
|
});
|
||||||
|
|
||||||
|
try {
|
||||||
|
await updateChat(context.chatId, {
|
||||||
|
mostRecentFileId: mostRecentFile.id,
|
||||||
|
mostRecentFileType: mostRecentFile.fileType as 'metric' | 'dashboard' | 'report',
|
||||||
|
mostRecentVersionNumber: mostRecentFile.versionNumber || 1,
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
console.error('[done-tool] Failed to update chat with most recent file:', error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const doneToolResponseEntry = createDoneToolResponseMessage(doneToolState, options.toolCallId);
|
const doneToolResponseEntry = createDoneToolResponseMessage(doneToolState, options.toolCallId);
|
||||||
|
|
|
@ -8,11 +8,13 @@ import { createDoneToolStart } from './done-tool-start';
|
||||||
vi.mock('@buster/database', () => ({
|
vi.mock('@buster/database', () => ({
|
||||||
updateMessageEntries: vi.fn().mockResolvedValue({ success: true }),
|
updateMessageEntries: vi.fn().mockResolvedValue({ success: true }),
|
||||||
updateMessage: vi.fn().mockResolvedValue({ success: true }),
|
updateMessage: vi.fn().mockResolvedValue({ success: true }),
|
||||||
|
updateChat: vi.fn().mockResolvedValue({ success: true }),
|
||||||
}));
|
}));
|
||||||
|
|
||||||
describe('Done Tool Streaming Tests', () => {
|
describe('Done Tool Streaming Tests', () => {
|
||||||
const mockContext: DoneToolContext = {
|
const mockContext: DoneToolContext = {
|
||||||
messageId: 'test-message-id-123',
|
messageId: 'test-message-id-123',
|
||||||
|
chatId: 'test-chat-id-456',
|
||||||
workflowStartTime: Date.now(),
|
workflowStartTime: Date.now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -112,6 +114,7 @@ describe('Done Tool Streaming Tests', () => {
|
||||||
test('should handle context without messageId', async () => {
|
test('should handle context without messageId', async () => {
|
||||||
const contextWithoutMessageId: DoneToolContext = {
|
const contextWithoutMessageId: DoneToolContext = {
|
||||||
messageId: '',
|
messageId: '',
|
||||||
|
chatId: 'test-chat-id-456',
|
||||||
workflowStartTime: Date.now(),
|
workflowStartTime: Date.now(),
|
||||||
};
|
};
|
||||||
const state: DoneToolState = {
|
const state: DoneToolState = {
|
||||||
|
@ -364,11 +367,13 @@ The following items were processed:
|
||||||
test('should enforce DoneToolContext type requirements', () => {
|
test('should enforce DoneToolContext type requirements', () => {
|
||||||
const validContext: DoneToolContext = {
|
const validContext: DoneToolContext = {
|
||||||
messageId: 'message-123',
|
messageId: 'message-123',
|
||||||
|
chatId: 'test-chat-id-456',
|
||||||
workflowStartTime: Date.now(),
|
workflowStartTime: Date.now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
const extendedContext = {
|
const extendedContext = {
|
||||||
messageId: 'message-456',
|
messageId: 'message-456',
|
||||||
|
chatId: 'test-chat-id-456',
|
||||||
workflowStartTime: Date.now(),
|
workflowStartTime: Date.now(),
|
||||||
additionalField: 'extra-data',
|
additionalField: 'extra-data',
|
||||||
};
|
};
|
||||||
|
|
|
@ -28,6 +28,7 @@ describe('Done Tool Integration Tests', () => {
|
||||||
|
|
||||||
mockContext = {
|
mockContext = {
|
||||||
messageId: testMessageId,
|
messageId: testMessageId,
|
||||||
|
chatId: testChatId,
|
||||||
workflowStartTime: Date.now(),
|
workflowStartTime: Date.now(),
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
@ -233,6 +234,7 @@ All operations completed successfully.`;
|
||||||
test('should handle database errors gracefully', async () => {
|
test('should handle database errors gracefully', async () => {
|
||||||
const invalidContext: DoneToolContext = {
|
const invalidContext: DoneToolContext = {
|
||||||
messageId: 'non-existent-message-id',
|
messageId: 'non-existent-message-id',
|
||||||
|
chatId: testChatId,
|
||||||
workflowStartTime: Date.now(),
|
workflowStartTime: Date.now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -258,6 +260,7 @@ All operations completed successfully.`;
|
||||||
|
|
||||||
const invalidContext: DoneToolContext = {
|
const invalidContext: DoneToolContext = {
|
||||||
messageId: 'invalid-id',
|
messageId: 'invalid-id',
|
||||||
|
chatId: testChatId,
|
||||||
workflowStartTime: Date.now(),
|
workflowStartTime: Date.now(),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,7 @@ const DoneToolOutputSchema = z.object({
|
||||||
|
|
||||||
const DoneToolContextSchema = z.object({
|
const DoneToolContextSchema = z.object({
|
||||||
messageId: z.string().describe('The message ID of the message that triggered the done tool'),
|
messageId: z.string().describe('The message ID of the message that triggered the done tool'),
|
||||||
|
chatId: z.string().describe('The chat ID that this message belongs to'),
|
||||||
workflowStartTime: z.number().describe('The start time of the workflow'),
|
workflowStartTime: z.number().describe('The start time of the workflow'),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -66,6 +66,106 @@ interface ReportInfo {
|
||||||
operation: 'created' | 'modified';
|
operation: 'created' | 'modified';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Extract ALL files from tool calls for updating the chat's most recent file
|
||||||
|
* This includes reports that would normally be filtered out
|
||||||
|
*/
|
||||||
|
export function extractAllFilesForChatUpdate(messages: ModelMessage[]): ExtractedFile[] {
|
||||||
|
const files: ExtractedFile[] = [];
|
||||||
|
|
||||||
|
console.info('[done-tool-file-selection] Extracting ALL files for chat update', {
|
||||||
|
messageCount: messages.length,
|
||||||
|
});
|
||||||
|
|
||||||
|
// First pass: extract create report content from assistant messages
|
||||||
|
const createReportContents: Map<string, string> = new Map();
|
||||||
|
|
||||||
|
for (const message of messages) {
|
||||||
|
if (message.role === 'assistant' && Array.isArray(message.content)) {
|
||||||
|
for (const content of message.content) {
|
||||||
|
if (
|
||||||
|
content &&
|
||||||
|
typeof content === 'object' &&
|
||||||
|
'type' in content &&
|
||||||
|
content.type === 'tool-call' &&
|
||||||
|
'toolName' in content &&
|
||||||
|
content.toolName === CREATE_REPORTS_TOOL_NAME
|
||||||
|
) {
|
||||||
|
const contentObj = content as { toolCallId?: string; input?: unknown };
|
||||||
|
const toolCallId = contentObj.toolCallId;
|
||||||
|
const input = contentObj.input as {
|
||||||
|
files?: Array<{ yml_content?: string; content?: string }>;
|
||||||
|
};
|
||||||
|
if (toolCallId && input && input.files && Array.isArray(input.files)) {
|
||||||
|
for (const file of input.files) {
|
||||||
|
const reportContent = file.yml_content || file.content;
|
||||||
|
if (reportContent) {
|
||||||
|
createReportContents.set(toolCallId, reportContent);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second pass: process tool results
|
||||||
|
for (const message of messages) {
|
||||||
|
if (message.role === 'tool') {
|
||||||
|
const toolContent = message.content;
|
||||||
|
|
||||||
|
if (Array.isArray(toolContent)) {
|
||||||
|
for (const content of toolContent) {
|
||||||
|
if (content && typeof content === 'object') {
|
||||||
|
if ('type' in content && content.type === 'tool-result') {
|
||||||
|
const toolName = (content as unknown as Record<string, unknown>).toolName;
|
||||||
|
const output = (content as unknown as Record<string, unknown>).output;
|
||||||
|
const contentWithCallId = content as { toolCallId?: string };
|
||||||
|
const toolCallId = contentWithCallId.toolCallId;
|
||||||
|
|
||||||
|
const outputObj = output as Record<string, unknown>;
|
||||||
|
if (outputObj && outputObj.type === 'json' && outputObj.value) {
|
||||||
|
try {
|
||||||
|
const parsedOutput =
|
||||||
|
typeof outputObj.value === 'string'
|
||||||
|
? JSON.parse(outputObj.value)
|
||||||
|
: outputObj.value;
|
||||||
|
|
||||||
|
processToolOutput(
|
||||||
|
toolName as string,
|
||||||
|
parsedOutput,
|
||||||
|
files,
|
||||||
|
toolCallId,
|
||||||
|
createReportContents
|
||||||
|
);
|
||||||
|
} catch (error) {
|
||||||
|
console.warn('[done-tool-file-selection] Failed to parse JSON output', {
|
||||||
|
error,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if ('files' in content || 'file' in content) {
|
||||||
|
processDirectFileContent(content, files);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deduplicate files by ID, keeping highest version
|
||||||
|
const deduplicatedFiles = deduplicateFilesByVersion(files);
|
||||||
|
|
||||||
|
console.info('[done-tool-file-selection] All extracted files for chat update', {
|
||||||
|
totalFiles: deduplicatedFiles.length,
|
||||||
|
metrics: deduplicatedFiles.filter((f) => f.fileType === 'metric').length,
|
||||||
|
dashboards: deduplicatedFiles.filter((f) => f.fileType === 'dashboard').length,
|
||||||
|
reports: deduplicatedFiles.filter((f) => f.fileType === 'report').length,
|
||||||
|
});
|
||||||
|
|
||||||
|
return deduplicatedFiles;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extract files from tool call responses in the conversation messages
|
* Extract files from tool call responses in the conversation messages
|
||||||
* Focuses on tool result messages that contain file information
|
* Focuses on tool result messages that contain file information
|
||||||
|
|
|
@ -8,6 +8,7 @@ export const S3IntegrationResponseSchema = z.object({
|
||||||
id: z.string().uuid(),
|
id: z.string().uuid(),
|
||||||
provider: StorageProviderSchema,
|
provider: StorageProviderSchema,
|
||||||
organizationId: z.string().uuid(),
|
organizationId: z.string().uuid(),
|
||||||
|
bucketName: z.string().optional(),
|
||||||
createdAt: z.string(),
|
createdAt: z.string(),
|
||||||
updatedAt: z.string(),
|
updatedAt: z.string(),
|
||||||
deletedAt: z.string().nullable(),
|
deletedAt: z.string().nullable(),
|
||||||
|
|
Loading…
Reference in New Issue