diff --git a/apps/server/src/api/v2/s3-integrations/DELETE.ts b/apps/server/src/api/v2/s3-integrations/DELETE.ts index 6d03669ea..61bdbdff3 100644 --- a/apps/server/src/api/v2/s3-integrations/DELETE.ts +++ b/apps/server/src/api/v2/s3-integrations/DELETE.ts @@ -1,7 +1,7 @@ -import { Hono } from 'hono'; import type { User } from '@buster/database'; -import { deleteS3IntegrationHandler } from './delete-s3-integration'; import type { DeleteS3IntegrationResponse } from '@buster/server-shared'; +import { Hono } from 'hono'; +import { deleteS3IntegrationHandler } from './delete-s3-integration'; const app = new Hono(); @@ -13,4 +13,4 @@ export const deleteS3IntegrationRoute = app.delete('/:id', async (c) => { return c.json(response as DeleteS3IntegrationResponse); }); -export default app; \ No newline at end of file +export default app; diff --git a/apps/server/src/api/v2/s3-integrations/GET.ts b/apps/server/src/api/v2/s3-integrations/GET.ts index dbb940c3d..373e93a31 100644 --- a/apps/server/src/api/v2/s3-integrations/GET.ts +++ b/apps/server/src/api/v2/s3-integrations/GET.ts @@ -1,7 +1,7 @@ -import { Hono } from 'hono'; import type { User } from '@buster/database'; -import { getS3IntegrationHandler } from './get-s3-integration'; import type { GetS3IntegrationResponse } from '@buster/server-shared'; +import { Hono } from 'hono'; +import { getS3IntegrationHandler } from './get-s3-integration'; const app = new Hono(); @@ -12,4 +12,4 @@ export const getS3IntegrationRoute = app.get('/', async (c) => { return c.json(response as GetS3IntegrationResponse); }); -export default app; \ No newline at end of file +export default app; diff --git a/apps/server/src/api/v2/s3-integrations/POST.ts b/apps/server/src/api/v2/s3-integrations/POST.ts index 9efced04f..37b0f72e4 100644 --- a/apps/server/src/api/v2/s3-integrations/POST.ts +++ b/apps/server/src/api/v2/s3-integrations/POST.ts @@ -1,11 +1,11 @@ -import { Hono } from 'hono'; -import { zValidator } from '@hono/zod-validator'; +import type { User } from '@buster/database'; import { - CreateS3IntegrationRequestSchema, type CreateS3IntegrationRequest, + CreateS3IntegrationRequestSchema, type CreateS3IntegrationResponse, } from '@buster/server-shared'; -import type { User } from '@buster/database'; +import { zValidator } from '@hono/zod-validator'; +import { Hono } from 'hono'; import { createS3IntegrationHandler } from './create-s3-integration'; const app = new Hono(); @@ -22,4 +22,4 @@ export const createS3IntegrationRoute = app.post( } ); -export default app; \ No newline at end of file +export default app; diff --git a/apps/server/src/api/v2/s3-integrations/create-s3-integration.ts b/apps/server/src/api/v2/s3-integrations/create-s3-integration.ts index fa84b892b..0802014dc 100644 --- a/apps/server/src/api/v2/s3-integrations/create-s3-integration.ts +++ b/apps/server/src/api/v2/s3-integrations/create-s3-integration.ts @@ -1,20 +1,16 @@ +import { type StorageConfig, testStorageCredentials } from '@buster/data-source'; import type { User } from '@buster/database'; -import { - createS3Integration, - getUserOrganizationId, - createSecret, -} from '@buster/database'; +import { createS3Integration, createSecret, getUserOrganizationId } from '@buster/database'; import type { CreateS3IntegrationRequest, CreateS3IntegrationResponse, } from '@buster/server-shared'; -import { StorageFactory, type StorageConfig } from '@buster/data-source'; import { tasks } from '@trigger.dev/sdk/v3'; import { HTTPException } from 'hono/http-exception'; /** * Handler for creating S3 integrations - * + * * This handler: * 1. Validates user has access to an organization * 2. Checks if organization already has an active integration @@ -62,7 +58,7 @@ export async function createS3IntegrationHandler( integrationId: integration.id, organizationId, }); - + console.info('Migration task triggered for storage integration', { integrationId: integration.id, organizationId, @@ -94,13 +90,15 @@ export async function createS3IntegrationHandler( }); } - if (error.message.includes('Invalid credentials') || - error.message.includes('Failed to parse GCS') || - error.message.includes('Failed to initialize GCS')) { + if ( + error.message.includes('Invalid credentials') || + error.message.includes('Failed to parse GCS') || + error.message.includes('Failed to initialize GCS') + ) { throw new HTTPException(400, { - message: error.message.includes('parse') ? - 'Invalid GCS service account key format' : - 'Invalid storage credentials provided', + message: error.message.includes('parse') + ? 'Invalid GCS service account key format' + : 'Invalid storage credentials provided', }); } } @@ -116,9 +114,7 @@ export async function createS3IntegrationHandler( /** * Validate storage credentials by attempting to access the bucket */ -async function validateStorageCredentials( - request: CreateS3IntegrationRequest -): Promise { +async function validateStorageCredentials(request: CreateS3IntegrationRequest): Promise { // Build storage config from request let config: StorageConfig; @@ -172,10 +168,10 @@ async function validateStorageCredentials( // Test the credentials console.info('Testing credentials for bucket:', config.bucket); - const isValid = await StorageFactory.testCredentials(config); + const isValid = await testStorageCredentials(config); console.info('Credential test result:', isValid); - + if (!isValid) { throw new Error('Invalid credentials: Unable to access the specified bucket'); } -} \ No newline at end of file +} diff --git a/apps/server/src/api/v2/s3-integrations/delete-s3-integration.ts b/apps/server/src/api/v2/s3-integrations/delete-s3-integration.ts index 2842101b9..3114a1870 100644 --- a/apps/server/src/api/v2/s3-integrations/delete-s3-integration.ts +++ b/apps/server/src/api/v2/s3-integrations/delete-s3-integration.ts @@ -1,16 +1,16 @@ import type { User } from '@buster/database'; import { deleteS3IntegrationById, + deleteSecret, getS3IntegrationById, getUserOrganizationId, - deleteSecret, } from '@buster/database'; import type { DeleteS3IntegrationResponse } from '@buster/server-shared'; import { HTTPException } from 'hono/http-exception'; /** * Handler for deleting S3 integrations - * + * * This handler: * 1. Validates user has access to the organization * 2. Verifies the integration belongs to the user's organization @@ -91,4 +91,4 @@ export async function deleteS3IntegrationHandler( message: 'Failed to delete storage integration', }); } -} \ No newline at end of file +} diff --git a/apps/server/src/api/v2/s3-integrations/get-s3-integration.ts b/apps/server/src/api/v2/s3-integrations/get-s3-integration.ts index 8657ca289..586250840 100644 --- a/apps/server/src/api/v2/s3-integrations/get-s3-integration.ts +++ b/apps/server/src/api/v2/s3-integrations/get-s3-integration.ts @@ -1,22 +1,17 @@ import type { User } from '@buster/database'; -import { - getS3IntegrationByOrganizationId, - getUserOrganizationId, -} from '@buster/database'; +import { getS3IntegrationByOrganizationId, getUserOrganizationId } from '@buster/database'; import type { GetS3IntegrationResponse } from '@buster/server-shared'; import { HTTPException } from 'hono/http-exception'; /** * Handler for getting the current S3 integration for an organization - * + * * This handler: * 1. Validates user has access to an organization * 2. Retrieves the active integration for the organization * 3. Returns null if no active integration exists */ -export async function getS3IntegrationHandler( - user: User -): Promise { +export async function getS3IntegrationHandler(user: User): Promise { // Get user's organization const userOrg = await getUserOrganizationId(user.id); @@ -51,4 +46,4 @@ export async function getS3IntegrationHandler( message: 'Failed to retrieve storage integration', }); } -} \ No newline at end of file +} diff --git a/apps/server/src/api/v2/s3-integrations/index.ts b/apps/server/src/api/v2/s3-integrations/index.ts index 2749bff4a..0cbcb353b 100644 --- a/apps/server/src/api/v2/s3-integrations/index.ts +++ b/apps/server/src/api/v2/s3-integrations/index.ts @@ -1,8 +1,8 @@ import { Hono } from 'hono'; import { requireAuth } from '../../../middleware/auth'; -import { createS3IntegrationRoute } from './POST'; import { deleteS3IntegrationRoute } from './DELETE'; import { getS3IntegrationRoute } from './GET'; +import { createS3IntegrationRoute } from './POST'; const app = new Hono(); @@ -14,4 +14,4 @@ app.route('/', createS3IntegrationRoute); app.route('/', deleteS3IntegrationRoute); app.route('/', getS3IntegrationRoute); -export default app; \ No newline at end of file +export default app; diff --git a/apps/trigger/src/tasks/export-metric-data/cleanup-export-file.ts b/apps/trigger/src/tasks/export-metric-data/cleanup-export-file.ts index 0b24861b2..46c4f2a9a 100644 --- a/apps/trigger/src/tasks/export-metric-data/cleanup-export-file.ts +++ b/apps/trigger/src/tasks/export-metric-data/cleanup-export-file.ts @@ -1,4 +1,4 @@ -import { StorageFactory } from '@buster/data-source'; +import { getProviderForOrganization } from '@buster/data-source'; import { logger, task } from '@trigger.dev/sdk'; import { CleanupExportFileInputSchema } from './interfaces'; @@ -19,9 +19,7 @@ export const cleanupExportFile = task({ try { // Get storage provider (customer storage or default R2) - const storageProvider = await StorageFactory.getProviderForOrganization( - validated.organizationId - ); + const storageProvider = await getProviderForOrganization(validated.organizationId); logger.log('Cleaning up export file', { key: validated.key, diff --git a/apps/trigger/src/tasks/export-metric-data/export-metric-data.ts b/apps/trigger/src/tasks/export-metric-data/export-metric-data.ts index e7543fa64..addb4e2ea 100644 --- a/apps/trigger/src/tasks/export-metric-data/export-metric-data.ts +++ b/apps/trigger/src/tasks/export-metric-data/export-metric-data.ts @@ -1,6 +1,6 @@ import { randomBytes } from 'node:crypto'; import { type AssetPermissionCheck, checkPermission } from '@buster/access-controls'; -import { createAdapter, StorageFactory } from '@buster/data-source'; +import { createAdapter, getProviderForOrganization } from '@buster/data-source'; import type { Credentials } from '@buster/data-source'; import { getDataSourceCredentials, getMetricForExport } from '@buster/database'; import { logger, schemaTask } from '@trigger.dev/sdk'; @@ -200,9 +200,7 @@ export const exportMetricData: ReturnType< const key = `exports/${payload.organizationId}/${payload.metricId}/${timestamp}-${randomId}/${fileName}`; // Step 6: Get storage provider (customer storage or default R2) - const storageProvider = await StorageFactory.getProviderForOrganization( - payload.organizationId - ); + const storageProvider = await getProviderForOrganization(payload.organizationId); // Step 7: Upload to storage logger.log('Uploading to storage', { key }); diff --git a/apps/trigger/src/tasks/migrate-storage-assets/index.ts b/apps/trigger/src/tasks/migrate-storage-assets/index.ts index db7e90492..f5f6c4a43 100644 --- a/apps/trigger/src/tasks/migrate-storage-assets/index.ts +++ b/apps/trigger/src/tasks/migrate-storage-assets/index.ts @@ -1 +1 @@ -export { migrateStorageAssets } from './migrate-storage-assets'; \ No newline at end of file +export { migrateStorageAssets } from './migrate-storage-assets'; diff --git a/apps/trigger/src/tasks/migrate-storage-assets/interfaces.ts b/apps/trigger/src/tasks/migrate-storage-assets/interfaces.ts index 021955ac8..2a5312676 100644 --- a/apps/trigger/src/tasks/migrate-storage-assets/interfaces.ts +++ b/apps/trigger/src/tasks/migrate-storage-assets/interfaces.ts @@ -14,13 +14,15 @@ export const MigrateStorageAssetsOutputSchema = z.object({ totalAssets: z.number(), migratedAssets: z.number(), failedAssets: z.number(), - errors: z.array( - z.object({ - key: z.string(), - error: z.string(), - }) - ).optional(), + errors: z + .array( + z.object({ + key: z.string(), + error: z.string(), + }) + ) + .optional(), executionTimeMs: z.number(), }); -export type MigrateStorageAssetsOutput = z.infer; \ No newline at end of file +export type MigrateStorageAssetsOutput = z.infer; diff --git a/apps/trigger/src/tasks/migrate-storage-assets/migrate-storage-assets.ts b/apps/trigger/src/tasks/migrate-storage-assets/migrate-storage-assets.ts index 8710f911f..6701a4662 100644 --- a/apps/trigger/src/tasks/migrate-storage-assets/migrate-storage-assets.ts +++ b/apps/trigger/src/tasks/migrate-storage-assets/migrate-storage-assets.ts @@ -1,14 +1,14 @@ -import { StorageFactory } from '@buster/data-source'; +import { getDefaultProvider, getProviderForOrganization } from '@buster/data-source'; import { logger, schemaTask } from '@trigger.dev/sdk/v3'; import { - MigrateStorageAssetsInputSchema, type MigrateStorageAssetsInput, + MigrateStorageAssetsInputSchema, type MigrateStorageAssetsOutput, } from './interfaces'; /** * Task for migrating existing storage assets to customer's storage integration - * + * * This task: * 1. Lists all existing assets in default R2 storage for the organization * 2. Downloads each asset from R2 @@ -38,7 +38,7 @@ export const migrateStorageAssets: ReturnType< run: async (payload: MigrateStorageAssetsInput): Promise => { const startTime = Date.now(); const errors: Array<{ key: string; error: string }> = []; - + try { logger.log('Starting storage asset migration', { integrationId: payload.integrationId, @@ -46,12 +46,10 @@ export const migrateStorageAssets: ReturnType< }); // Get default R2 provider to read from - const defaultProvider = StorageFactory.getDefaultProvider(); - + const defaultProvider = getDefaultProvider(); + // Get customer's storage provider to write to - const customerProvider = await StorageFactory.getProviderForOrganization( - payload.organizationId - ); + const customerProvider = await getProviderForOrganization(payload.organizationId); // Define prefixes to migrate const prefixesToMigrate = [ @@ -80,13 +78,13 @@ export const migrateStorageAssets: ReturnType< const BATCH_SIZE = 10; for (let i = 0; i < objects.length; i += BATCH_SIZE) { const batch = objects.slice(i, i + BATCH_SIZE); - + await Promise.all( batch.map(async (object) => { try { // Download from default storage const downloadResult = await defaultProvider.download(object.key); - + if (!downloadResult.success || !downloadResult.data) { throw new Error(`Failed to download: ${downloadResult.error}`); } @@ -95,7 +93,9 @@ export const migrateStorageAssets: ReturnType< const uploadResult = await customerProvider.upload( object.key, downloadResult.data, - downloadResult.contentType ? { contentType: downloadResult.contentType } : undefined + downloadResult.contentType + ? { contentType: downloadResult.contentType } + : undefined ); if (!uploadResult.success) { @@ -111,12 +111,12 @@ export const migrateStorageAssets: ReturnType< } catch (error) { failedAssets++; const errorMessage = error instanceof Error ? error.message : 'Unknown error'; - + errors.push({ key: object.key, error: errorMessage, }); - + logger.error('Failed to migrate asset', { key: object.key, error: errorMessage, @@ -159,7 +159,7 @@ export const migrateStorageAssets: ReturnType< }; } catch (error) { const executionTimeMs = Date.now() - startTime; - + logger.error('Unexpected error during migration', { error: error instanceof Error ? error.message : 'Unknown error', stack: error instanceof Error ? error.stack : undefined, @@ -170,12 +170,14 @@ export const migrateStorageAssets: ReturnType< totalAssets: 0, migratedAssets: 0, failedAssets: 0, - errors: [{ - key: 'migration', - error: error instanceof Error ? error.message : 'Unknown error', - }], + errors: [ + { + key: 'migration', + error: error instanceof Error ? error.message : 'Unknown error', + }, + ], executionTimeMs, }; } }, -}); \ No newline at end of file +}); diff --git a/packages/data-source/src/cache/index.ts b/packages/data-source/src/cache/index.ts index 4adb0f057..97331d77e 100644 --- a/packages/data-source/src/cache/index.ts +++ b/packages/data-source/src/cache/index.ts @@ -4,4 +4,4 @@ export { setCachedMetricData, batchCheckCacheExists, generateCacheKey, -} from './r2-metric-cache'; +} from './metric-cache'; diff --git a/packages/data-source/src/cache/r2-metric-cache.test.ts b/packages/data-source/src/cache/metric-cache.test.ts similarity index 98% rename from packages/data-source/src/cache/r2-metric-cache.test.ts rename to packages/data-source/src/cache/metric-cache.test.ts index 0cb40d66d..696ba057d 100644 --- a/packages/data-source/src/cache/r2-metric-cache.test.ts +++ b/packages/data-source/src/cache/metric-cache.test.ts @@ -1,5 +1,5 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; -import { generateCacheKey } from './r2-metric-cache'; +import { generateCacheKey } from './metric-cache'; // Mock AWS SDK vi.mock('@aws-sdk/client-s3', () => ({ diff --git a/packages/data-source/src/cache/r2-metric-cache.ts b/packages/data-source/src/cache/metric-cache.ts similarity index 80% rename from packages/data-source/src/cache/r2-metric-cache.ts rename to packages/data-source/src/cache/metric-cache.ts index 86036ff71..a797cf88c 100644 --- a/packages/data-source/src/cache/r2-metric-cache.ts +++ b/packages/data-source/src/cache/metric-cache.ts @@ -1,5 +1,5 @@ import type { MetricDataResponse } from '@buster/server-shared/metrics'; -import { StorageFactory } from '../storage'; +import { getProviderForOrganization } from '../storage'; const CACHE_PREFIX = 'static-report-assets'; @@ -40,13 +40,13 @@ export async function checkCacheExists( reportId: string ): Promise { try { - const storageProvider = await StorageFactory.getProviderForOrganization(organizationId); + const storageProvider = await getProviderForOrganization(organizationId); const key = generateCacheKey(organizationId, metricId, reportId); const exists = await storageProvider.exists(key); return exists; } catch (error: unknown) { - console.error('[r2-metric-cache] Error checking cache existence:', error); + console.error('[metric-cache] Error checking cache existence:', error); return false; } } @@ -60,10 +60,10 @@ export async function getCachedMetricData( reportId: string ): Promise { try { - const storageProvider = await StorageFactory.getProviderForOrganization(organizationId); + const storageProvider = await getProviderForOrganization(organizationId); const key = generateCacheKey(organizationId, metricId, reportId); - console.info('[r2-metric-cache] Fetching cached data', { + console.info('[metric-cache] Fetching cached data', { organizationId, metricId, reportId, @@ -73,7 +73,7 @@ export async function getCachedMetricData( const downloadResult = await storageProvider.download(key); if (!downloadResult.success || !downloadResult.data) { - console.info('[r2-metric-cache] Cache miss', { + console.info('[metric-cache] Cache miss', { organizationId, metricId, reportId, @@ -85,7 +85,7 @@ export async function getCachedMetricData( const data = jsonToData(downloadResult.data); data.metricId = metricId; - console.info('[r2-metric-cache] Cache hit', { + console.info('[metric-cache] Cache hit', { organizationId, metricId, reportId, @@ -94,7 +94,7 @@ export async function getCachedMetricData( return data; } catch (error: unknown) { - console.error('[r2-metric-cache] Error fetching cached data:', error); + console.error('[metric-cache] Error fetching cached data:', error); return null; } } @@ -109,10 +109,10 @@ export async function setCachedMetricData( data: MetricDataResponse ): Promise { try { - const storageProvider = await StorageFactory.getProviderForOrganization(organizationId); + const storageProvider = await getProviderForOrganization(organizationId); const key = generateCacheKey(organizationId, metricId, reportId); - console.info('[r2-metric-cache] Caching metric data', { + console.info('[metric-cache] Caching metric data', { organizationId, metricId, reportId, @@ -135,17 +135,17 @@ export async function setCachedMetricData( }); if (uploadResult.success) { - console.info('[r2-metric-cache] Successfully cached metric data', { + console.info('[metric-cache] Successfully cached metric data', { organizationId, metricId, reportId, sizeBytes: jsonBuffer.length, }); } else { - console.error('[r2-metric-cache] Failed to cache metric data:', uploadResult.error); + console.error('[metric-cache] Failed to cache metric data:', uploadResult.error); } } catch (error) { - console.error('[r2-metric-cache] Error caching metric data:', error); + console.error('[metric-cache] Error caching metric data:', error); // Don't throw - caching failures shouldn't break the main flow } } diff --git a/packages/data-source/src/storage/index.ts b/packages/data-source/src/storage/index.ts index 4baf39797..12bb48880 100644 --- a/packages/data-source/src/storage/index.ts +++ b/packages/data-source/src/storage/index.ts @@ -1,7 +1,16 @@ // Storage abstraction layer exports -export * from './storage-provider'; -export * from './providers/s3-provider'; -export * from './providers/r2-provider'; -export * from './providers/gcs-provider'; -export * from './storage-factory'; export * from './types'; +export * from './utils'; + +// Storage providers +export { createS3Provider } from './providers/s3-provider'; +export { createR2Provider } from './providers/r2-provider'; +export { createGCSProvider } from './providers/gcs-provider'; + +// Storage factory functions +export { + createStorageProvider, + getProviderForOrganization, + getDefaultProvider, + testStorageCredentials, +} from './storage-factory'; diff --git a/packages/data-source/src/storage/providers/gcs-provider.ts b/packages/data-source/src/storage/providers/gcs-provider.ts index 0457d46ff..bac689be7 100644 --- a/packages/data-source/src/storage/providers/gcs-provider.ts +++ b/packages/data-source/src/storage/providers/gcs-provider.ts @@ -1,63 +1,75 @@ import { Storage } from '@google-cloud/storage'; -import { BaseStorageProvider } from '../storage-provider'; import type { ConnectionTestResult, DownloadResult, GCSConfig, ListOptions, StorageObject, + StorageProvider, UploadOptions, UploadResult, } from '../types'; +import { parseErrorMessage, sanitizeKey, toBuffer } from '../utils'; -export class GCSProvider extends BaseStorageProvider { - private storage: Storage; - private bucketInstance: any; // Using any to avoid complex GCS types - - constructor(config: GCSConfig) { - super(config.bucket); - - // Parse service account key - let credentials; - try { - credentials = JSON.parse(config.serviceAccountKey); - } catch (error) { - throw new Error(`Failed to parse GCS service account key: ${error instanceof Error ? error.message : 'Invalid JSON'}`); - } - - try { - this.storage = new Storage({ - projectId: config.projectId, - credentials, - }); - - this.bucketInstance = this.storage.bucket(config.bucket); - } catch (error) { - throw new Error(`Failed to initialize GCS client: ${error instanceof Error ? error.message : 'Unknown error'}`); - } +/** + * Create a Google Cloud Storage provider + */ +export function createGCSProvider(config: GCSConfig): StorageProvider { + // Parse service account key + let credentials: Record; + try { + credentials = JSON.parse(config.serviceAccountKey); + } catch (error) { + throw new Error( + `Failed to parse GCS service account key: ${ + error instanceof Error ? error.message : 'Invalid JSON' + }` + ); } - async upload(key: string, data: Buffer | string, options?: UploadOptions): Promise { - try { - const sanitizedKey = this.sanitizeKey(key); - const buffer = this.toBuffer(data); + let storage: Storage; + // Using a more specific type for bucket instance + let bucketInstance: ReturnType; + + try { + storage = new Storage({ + projectId: config.projectId, + credentials, + }); + + bucketInstance = storage.bucket(config.bucket); + } catch (error) { + throw new Error( + `Failed to initialize GCS client: ${error instanceof Error ? error.message : 'Unknown error'}` + ); + } + + async function upload( + key: string, + data: Buffer | string, + options?: UploadOptions + ): Promise { + try { + const sanitizedKey = sanitizeKey(key); + const buffer = toBuffer(data); + + const file = bucketInstance.file(sanitizedKey); + const metadata: Record = {}; + if (options?.contentType) metadata.contentType = options.contentType; + if (options?.contentDisposition) metadata.contentDisposition = options.contentDisposition; + if (options?.metadata) metadata.metadata = options.metadata; - const file = this.bucketInstance.file(sanitizedKey); const stream = file.createWriteStream({ resumable: false, - metadata: { - contentType: options?.contentType, - contentDisposition: options?.contentDisposition, - metadata: options?.metadata, - }, + metadata, }); - return new Promise((resolve, reject) => { - stream.on('error', (error: any) => { + return new Promise((resolve) => { + stream.on('error', (error: unknown) => { resolve({ success: false, key, - error: error.message, + error: parseErrorMessage(error), }); }); @@ -76,15 +88,15 @@ export class GCSProvider extends BaseStorageProvider { return { success: false, key, - error: error instanceof Error ? error.message : 'Upload failed', + error: parseErrorMessage(error), }; } } - async download(key: string): Promise { + async function download(key: string): Promise { try { - const sanitizedKey = this.sanitizeKey(key); - const file = this.bucketInstance.file(sanitizedKey); + const sanitizedKey = sanitizeKey(key); + const file = bucketInstance.file(sanitizedKey); const [buffer] = await file.download(); const [metadata] = await file.getMetadata(); @@ -98,14 +110,14 @@ export class GCSProvider extends BaseStorageProvider { } catch (error) { return { success: false, - error: error instanceof Error ? error.message : 'Download failed', + error: parseErrorMessage(error), }; } } - async getSignedUrl(key: string, expiresIn: number): Promise { - const sanitizedKey = this.sanitizeKey(key); - const file = this.bucketInstance.file(sanitizedKey); + async function getSignedUrl(key: string, expiresIn: number): Promise { + const sanitizedKey = sanitizeKey(key); + const file = bucketInstance.file(sanitizedKey); // Convert seconds to milliseconds and add to current time const expires = Date.now() + expiresIn * 1000; @@ -118,10 +130,10 @@ export class GCSProvider extends BaseStorageProvider { return url; } - async delete(key: string): Promise { + async function deleteObject(key: string): Promise { try { - const sanitizedKey = this.sanitizeKey(key); - const file = this.bucketInstance.file(sanitizedKey); + const sanitizedKey = sanitizeKey(key); + const file = bucketInstance.file(sanitizedKey); await file.delete(); return true; @@ -131,10 +143,10 @@ export class GCSProvider extends BaseStorageProvider { } } - async exists(key: string): Promise { + async function exists(key: string): Promise { try { - const sanitizedKey = this.sanitizeKey(key); - const file = this.bucketInstance.file(sanitizedKey); + const sanitizedKey = sanitizeKey(key); + const file = bucketInstance.file(sanitizedKey); const [exists] = await file.exists(); return exists; @@ -144,20 +156,37 @@ export class GCSProvider extends BaseStorageProvider { } } - async list(prefix: string, options?: ListOptions): Promise { + async function list(prefix: string, options?: ListOptions): Promise { try { - const sanitizedPrefix = this.sanitizeKey(prefix); + const sanitizedPrefix = sanitizeKey(prefix); - const [files] = await this.bucketInstance.getFiles({ + const getFilesOptions: { + prefix: string; + maxResults?: number; + pageToken?: string; + } = { prefix: sanitizedPrefix, - maxResults: options?.maxKeys, - pageToken: options?.continuationToken, - }); + }; + if (options?.maxKeys !== undefined) { + getFilesOptions.maxResults = options.maxKeys; + } + if (options?.continuationToken !== undefined) { + getFilesOptions.pageToken = options.continuationToken; + } - return files.map((file: any) => ({ - key: file.name, - size: Number.parseInt(file.metadata.size || '0', 10), - lastModified: new Date(file.metadata.updated || file.metadata.timeCreated), + const [files] = await storage.bucket(config.bucket).getFiles(getFilesOptions); + + return files.map((file) => ({ + key: file.name || '', + size: + typeof file.metadata.size === 'string' + ? Number.parseInt(file.metadata.size, 10) + : file.metadata.size || 0, + lastModified: file.metadata.updated + ? new Date(file.metadata.updated) + : file.metadata.timeCreated + ? new Date(file.metadata.timeCreated) + : new Date(), etag: file.metadata.etag || '', })); } catch (error) { @@ -166,16 +195,16 @@ export class GCSProvider extends BaseStorageProvider { } } - async testConnection(): Promise { + async function testConnection(): Promise { const testKey = `_test_${Date.now()}.txt`; const testData = 'test'; try { // Test bucket exists - console.info('GCS: Testing bucket exists for:', this.bucket); - let exists: boolean; + console.info('GCS: Testing bucket exists for:', config.bucket); + let bucketExists: boolean; try { - [exists] = await this.bucketInstance.exists(); + [bucketExists] = await bucketInstance.exists(); } catch (error) { console.error('GCS: Error checking bucket existence:', error); return { @@ -183,11 +212,11 @@ export class GCSProvider extends BaseStorageProvider { canRead: false, canWrite: false, canDelete: false, - error: `Failed to check bucket existence: ${error instanceof Error ? error.message : 'Unknown error'}`, + error: `Failed to check bucket existence: ${parseErrorMessage(error)}`, }; } - - if (!exists) { + + if (!bucketExists) { return { success: false, canRead: false, @@ -199,7 +228,7 @@ export class GCSProvider extends BaseStorageProvider { console.info('GCS: Bucket exists, testing write permission'); // Test write - const uploadResult = await this.upload(testKey, testData); + const uploadResult = await upload(testKey, testData); if (!uploadResult.success) { return { success: false, @@ -212,10 +241,10 @@ export class GCSProvider extends BaseStorageProvider { console.info('GCS: Write successful, testing read permission'); // Test read - const downloadResult = await this.download(testKey); + const downloadResult = await download(testKey); if (!downloadResult.success) { // Clean up test file - await this.delete(testKey); + await deleteObject(testKey); return { success: false, canRead: false, @@ -227,7 +256,7 @@ export class GCSProvider extends BaseStorageProvider { console.info('GCS: Read successful, testing delete permission'); // Test delete - const deleteResult = await this.delete(testKey); + const deleteResult = await deleteObject(testKey); if (!deleteResult) { return { success: false, @@ -252,8 +281,18 @@ export class GCSProvider extends BaseStorageProvider { canRead: false, canWrite: false, canDelete: false, - error: error instanceof Error ? error.message : 'Connection test failed', + error: parseErrorMessage(error), }; } } + + return { + upload, + download, + getSignedUrl, + delete: deleteObject, + exists, + list, + testConnection, + }; } diff --git a/packages/data-source/src/storage/providers/r2-provider.ts b/packages/data-source/src/storage/providers/r2-provider.ts index df197d408..e03bf67b3 100644 --- a/packages/data-source/src/storage/providers/r2-provider.ts +++ b/packages/data-source/src/storage/providers/r2-provider.ts @@ -1,37 +1,263 @@ -import { S3Client } from '@aws-sdk/client-s3'; -import type { R2Config } from '../types'; -import { S3Provider } from './s3-provider'; +import { + DeleteObjectCommand, + GetObjectCommand, + HeadObjectCommand, + ListObjectsV2Command, + PutObjectCommand, + S3Client, +} from '@aws-sdk/client-s3'; +import { getSignedUrl } from '@aws-sdk/s3-request-presigner'; +import type { + ConnectionTestResult, + DownloadResult, + ListOptions, + R2Config, + StorageObject, + StorageProvider, + UploadOptions, + UploadResult, +} from '../types'; +import { parseErrorMessage, sanitizeKey, toBuffer } from '../utils'; /** - * Cloudflare R2 provider - * R2 is S3-compatible, so we extend S3Provider with R2-specific configuration + * Create a Cloudflare R2 storage provider + * R2 is S3-compatible with specific endpoint configuration */ -export class R2Provider extends S3Provider { - constructor(config: R2Config) { - // Create S3-compatible configuration for R2 - const s3Config = { - provider: 's3' as const, - region: 'auto', // R2 uses 'auto' region - bucket: config.bucket, +export function createR2Provider(config: R2Config): StorageProvider { + const client = new S3Client({ + region: 'auto', + endpoint: `https://${config.accountId}.r2.cloudflarestorage.com`, + credentials: { accessKeyId: config.accessKeyId, secretAccessKey: config.secretAccessKey, - }; + }, + forcePathStyle: true, // Required for R2 + }); - // Call parent constructor with S3 config - super(s3Config); + const bucket = config.bucket; - // Override the client with R2-specific endpoint - this.client = new S3Client({ - region: 'auto', - endpoint: `https://${config.accountId}.r2.cloudflarestorage.com`, - credentials: { - accessKeyId: config.accessKeyId, - secretAccessKey: config.secretAccessKey, - }, - forcePathStyle: true, // Required for R2 - }); + async function upload( + key: string, + data: Buffer | string, + options?: UploadOptions + ): Promise { + try { + const sanitizedKey = sanitizeKey(key); + const buffer = toBuffer(data); + + const command = new PutObjectCommand({ + Bucket: bucket, + Key: sanitizedKey, + Body: buffer, + ContentType: options?.contentType, + ContentDisposition: options?.contentDisposition, + Metadata: options?.metadata, + }); + + const response = await client.send(command); + + const result: UploadResult = { + success: true, + key: sanitizedKey, + size: buffer.length, + }; + if (response.ETag) { + result.etag = response.ETag; + } + return result; + } catch (error) { + return { + success: false, + key, + error: parseErrorMessage(error), + }; + } } - // All methods inherited from S3Provider work with R2 - // No need to override unless R2-specific behavior is needed + async function download(key: string): Promise { + try { + const sanitizedKey = sanitizeKey(key); + + const command = new GetObjectCommand({ + Bucket: bucket, + Key: sanitizedKey, + }); + + const response = await client.send(command); + + if (!response.Body) { + return { + success: false, + error: 'No data returned from R2', + }; + } + + const chunks: Uint8Array[] = []; + const stream = response.Body as AsyncIterable; + for await (const chunk of stream) { + chunks.push(chunk); + } + const buffer = Buffer.concat(chunks); + + const result: DownloadResult = { + success: true, + data: buffer, + size: buffer.length, + }; + if (response.ContentType) { + result.contentType = response.ContentType; + } + return result; + } catch (error) { + return { + success: false, + error: parseErrorMessage(error), + }; + } + } + + async function getSignedUrlForDownload(key: string, expiresIn: number): Promise { + const sanitizedKey = sanitizeKey(key); + + const command = new GetObjectCommand({ + Bucket: bucket, + Key: sanitizedKey, + }); + + return getSignedUrl(client, command, { expiresIn }); + } + + async function deleteObject(key: string): Promise { + try { + const sanitizedKey = sanitizeKey(key); + + const command = new DeleteObjectCommand({ + Bucket: bucket, + Key: sanitizedKey, + }); + + await client.send(command); + return true; + } catch (error) { + console.error('Error deleting from R2:', parseErrorMessage(error)); + return false; + } + } + + async function exists(key: string): Promise { + try { + const sanitizedKey = sanitizeKey(key); + + const command = new HeadObjectCommand({ + Bucket: bucket, + Key: sanitizedKey, + }); + + await client.send(command); + return true; + } catch { + // Silently return false for existence check + return false; + } + } + + async function list(prefix: string, options?: ListOptions): Promise { + try { + const sanitizedPrefix = sanitizeKey(prefix); + + const command = new ListObjectsV2Command({ + Bucket: bucket, + Prefix: sanitizedPrefix, + MaxKeys: options?.maxKeys, + ContinuationToken: options?.continuationToken, + }); + + const response = await client.send(command); + + return ( + response.Contents?.map((obj) => { + const item: StorageObject = { + key: obj.Key || '', + size: obj.Size || 0, + lastModified: obj.LastModified || new Date(), + }; + if (obj.ETag) { + item.etag = obj.ETag; + } + return item; + }) || [] + ); + } catch (error) { + console.error('Error listing R2 objects:', parseErrorMessage(error)); + return []; + } + } + + async function testConnection(): Promise { + const testKey = `_test_${Date.now()}.txt`; + const testData = 'test'; + + try { + // Test write + const uploadResult = await upload(testKey, testData); + if (!uploadResult.success) { + return { + success: false, + canRead: false, + canWrite: false, + canDelete: false, + error: `Upload failed: ${uploadResult.error}`, + }; + } + + // Test read + const downloadResult = await download(testKey); + if (!downloadResult.success) { + return { + success: false, + canRead: false, + canWrite: true, + canDelete: false, + error: `Download failed: ${downloadResult.error}`, + }; + } + + // Test delete + const deleteResult = await deleteObject(testKey); + if (!deleteResult) { + return { + success: false, + canRead: true, + canWrite: true, + canDelete: false, + error: 'Delete failed', + }; + } + + return { + success: true, + canRead: true, + canWrite: true, + canDelete: true, + }; + } catch (error) { + return { + success: false, + canRead: false, + canWrite: false, + canDelete: false, + error: parseErrorMessage(error), + }; + } + } + + return { + upload, + download, + getSignedUrl: getSignedUrlForDownload, + delete: deleteObject, + exists, + list, + testConnection, + }; } diff --git a/packages/data-source/src/storage/providers/s3-provider.ts b/packages/data-source/src/storage/providers/s3-provider.ts index 90c15a11c..0d5076f47 100644 --- a/packages/data-source/src/storage/providers/s3-provider.ts +++ b/packages/data-source/src/storage/providers/s3-provider.ts @@ -7,39 +7,43 @@ import { S3Client, } from '@aws-sdk/client-s3'; import { getSignedUrl } from '@aws-sdk/s3-request-presigner'; -import { BaseStorageProvider } from '../storage-provider'; import type { ConnectionTestResult, DownloadResult, ListOptions, S3Config, StorageObject, + StorageProvider, UploadOptions, UploadResult, } from '../types'; +import { parseErrorMessage, sanitizeKey, toBuffer } from '../utils'; -export class S3Provider extends BaseStorageProvider { - protected client: S3Client; +/** + * Create an S3 storage provider + */ +export function createS3Provider(config: S3Config): StorageProvider { + const client = new S3Client({ + region: config.region, + credentials: { + accessKeyId: config.accessKeyId, + secretAccessKey: config.secretAccessKey, + }, + }); - constructor(config: S3Config) { - super(config.bucket); + const bucket = config.bucket; - this.client = new S3Client({ - region: config.region, - credentials: { - accessKeyId: config.accessKeyId, - secretAccessKey: config.secretAccessKey, - }, - }); - } - - async upload(key: string, data: Buffer | string, options?: UploadOptions): Promise { + async function upload( + key: string, + data: Buffer | string, + options?: UploadOptions + ): Promise { try { - const sanitizedKey = this.sanitizeKey(key); - const buffer = this.toBuffer(data); + const sanitizedKey = sanitizeKey(key); + const buffer = toBuffer(data); const command = new PutObjectCommand({ - Bucket: this.bucket, + Bucket: bucket, Key: sanitizedKey, Body: buffer, ContentType: options?.contentType, @@ -47,33 +51,36 @@ export class S3Provider extends BaseStorageProvider { Metadata: options?.metadata, }); - const response = await this.client.send(command); + const response = await client.send(command); - return { + const result: UploadResult = { success: true, key: sanitizedKey, - etag: response.ETag || '', size: buffer.length, }; + if (response.ETag) { + result.etag = response.ETag; + } + return result; } catch (error) { return { success: false, key, - error: error instanceof Error ? error.message : 'Upload failed', + error: parseErrorMessage(error), }; } } - async download(key: string): Promise { + async function download(key: string): Promise { try { - const sanitizedKey = this.sanitizeKey(key); + const sanitizedKey = sanitizeKey(key); const command = new GetObjectCommand({ - Bucket: this.bucket, + Bucket: bucket, Key: sanitizedKey, }); - const response = await this.client.send(command); + const response = await client.send(command); if (!response.Body) { return { @@ -82,7 +89,6 @@ export class S3Provider extends BaseStorageProvider { }; } - // Convert stream to buffer const chunks: Uint8Array[] = []; const stream = response.Body as AsyncIterable; for await (const chunk of stream) { @@ -90,130 +96,138 @@ export class S3Provider extends BaseStorageProvider { } const buffer = Buffer.concat(chunks); - return { + const result: DownloadResult = { success: true, data: buffer, - contentType: response.ContentType || 'application/octet-stream', size: buffer.length, }; + if (response.ContentType) { + result.contentType = response.ContentType; + } + return result; } catch (error) { return { success: false, - error: error instanceof Error ? error.message : 'Download failed', + error: parseErrorMessage(error), }; } } - async getSignedUrl(key: string, expiresIn: number): Promise { - const sanitizedKey = this.sanitizeKey(key); + async function getSignedUrlForDownload(key: string, expiresIn: number): Promise { + const sanitizedKey = sanitizeKey(key); const command = new GetObjectCommand({ - Bucket: this.bucket, + Bucket: bucket, Key: sanitizedKey, }); - return getSignedUrl(this.client, command, { expiresIn }); + return getSignedUrl(client, command, { expiresIn }); } - async delete(key: string): Promise { + async function deleteObject(key: string): Promise { try { - const sanitizedKey = this.sanitizeKey(key); + const sanitizedKey = sanitizeKey(key); const command = new DeleteObjectCommand({ - Bucket: this.bucket, + Bucket: bucket, Key: sanitizedKey, }); - await this.client.send(command); + await client.send(command); return true; } catch (error) { - console.error('S3 delete error:', error); + console.error('Error deleting from S3:', parseErrorMessage(error)); return false; } } - async exists(key: string): Promise { + async function exists(key: string): Promise { try { - const sanitizedKey = this.sanitizeKey(key); + const sanitizedKey = sanitizeKey(key); const command = new HeadObjectCommand({ - Bucket: this.bucket, + Bucket: bucket, Key: sanitizedKey, }); - await this.client.send(command); + await client.send(command); return true; - } catch (error: any) { - if (error?.name === 'NotFound' || error?.$metadata?.httpStatusCode === 404) { - return false; - } - throw error; + } catch { + // Silently return false for existence check + return false; } } - async list(prefix: string, options?: ListOptions): Promise { + async function list(prefix: string, options?: ListOptions): Promise { try { - const sanitizedPrefix = this.sanitizeKey(prefix); + const sanitizedPrefix = sanitizeKey(prefix); const command = new ListObjectsV2Command({ - Bucket: this.bucket, + Bucket: bucket, Prefix: sanitizedPrefix, MaxKeys: options?.maxKeys, ContinuationToken: options?.continuationToken, }); - const response = await this.client.send(command); + const response = await client.send(command); - return (response.Contents || []).map((object) => ({ - key: object.Key || '', - size: object.Size || 0, - lastModified: object.LastModified || new Date(), - etag: object.ETag || '', - })); + return ( + response.Contents?.map((obj) => { + const item: StorageObject = { + key: obj.Key || '', + size: obj.Size || 0, + lastModified: obj.LastModified || new Date(), + }; + if (obj.ETag) { + item.etag = obj.ETag; + } + return item; + }) || [] + ); } catch (error) { - console.error('S3 list error:', error); + console.error('Error listing S3 objects:', parseErrorMessage(error)); return []; } } - async testConnection(): Promise { + async function testConnection(): Promise { const testKey = `_test_${Date.now()}.txt`; const testData = 'test'; try { // Test write - const uploadResult = await this.upload(testKey, testData); + const uploadResult = await upload(testKey, testData); if (!uploadResult.success) { return { success: false, canRead: false, canWrite: false, canDelete: false, - error: `Cannot write to bucket: ${uploadResult.error}`, + error: `Upload failed: ${uploadResult.error}`, }; } // Test read - const downloadResult = await this.download(testKey); + const downloadResult = await download(testKey); if (!downloadResult.success) { return { success: false, canRead: false, canWrite: true, canDelete: false, - error: `Cannot read from bucket: ${downloadResult.error}`, + error: `Download failed: ${downloadResult.error}`, }; } // Test delete - const deleteResult = await this.delete(testKey); + const deleteResult = await deleteObject(testKey); if (!deleteResult) { return { success: false, canRead: true, canWrite: true, canDelete: false, - error: 'Cannot delete from bucket', + error: 'Delete failed', }; } @@ -229,8 +243,18 @@ export class S3Provider extends BaseStorageProvider { canRead: false, canWrite: false, canDelete: false, - error: error instanceof Error ? error.message : 'Connection test failed', + error: parseErrorMessage(error), }; } } + + return { + upload, + download, + getSignedUrl: getSignedUrlForDownload, + delete: deleteObject, + exists, + list, + testConnection, + }; } diff --git a/packages/data-source/src/storage/storage-factory.ts b/packages/data-source/src/storage/storage-factory.ts index 73467e39f..46423e69b 100644 --- a/packages/data-source/src/storage/storage-factory.ts +++ b/packages/data-source/src/storage/storage-factory.ts @@ -1,130 +1,127 @@ import { getS3IntegrationByOrganizationId, getSecretByName } from '@buster/database'; import type { CreateS3IntegrationRequest } from '@buster/server-shared'; -import { GCSProvider } from './providers/gcs-provider'; -import { R2Provider } from './providers/r2-provider'; -import { S3Provider } from './providers/s3-provider'; +import { createGCSProvider } from './providers/gcs-provider'; +import { createR2Provider } from './providers/r2-provider'; +import { createS3Provider } from './providers/s3-provider'; import type { StorageConfig, StorageProvider } from './types'; /** - * Factory for creating storage providers + * Create a storage provider from configuration */ -export class StorageFactory { - /** - * Create a storage provider from configuration - */ - static createProvider(config: StorageConfig): StorageProvider { - switch (config.provider) { - case 's3': - return new S3Provider(config); - case 'r2': - return new R2Provider(config); - case 'gcs': - return new GCSProvider(config); - default: - throw new Error(`Unsupported storage provider: ${(config as any).provider}`); - } - } - - /** - * Get storage provider for an organization - * Returns customer storage if configured, otherwise returns default R2 storage - */ - static async getProviderForOrganization(organizationId: string): Promise { - try { - // Check if organization has a storage integration - const integration = await getS3IntegrationByOrganizationId(organizationId); - - if (integration) { - // Get credentials from vault - const secretName = `s3-integration-${integration.id}`; - const secret = await getSecretByName(secretName); - - if (secret && secret.secret) { - // Parse the stored credentials - const credentials = JSON.parse(secret.secret) as CreateS3IntegrationRequest; - - // Create appropriate config based on provider - let config: StorageConfig; - - if (credentials.provider === 's3') { - config = { - provider: 's3', - region: credentials.region, - bucket: credentials.bucket, - accessKeyId: credentials.accessKeyId, - secretAccessKey: credentials.secretAccessKey, - }; - } else if (credentials.provider === 'r2') { - config = { - provider: 'r2', - accountId: credentials.accountId, - bucket: credentials.bucket, - accessKeyId: credentials.accessKeyId, - secretAccessKey: credentials.secretAccessKey, - }; - } else if (credentials.provider === 'gcs') { - config = { - provider: 'gcs', - projectId: credentials.projectId, - bucket: credentials.bucket, - serviceAccountKey: credentials.serviceAccountKey, - }; - } else { - throw new Error(`Unknown provider type: ${(credentials as any).provider}`); - } - - return StorageFactory.createProvider(config); - } - } - } catch (error) { - console.error('Error getting customer storage integration:', error); - // Fall back to default storage - } - - // Return default R2 storage - return StorageFactory.getDefaultProvider(); - } - - /** - * Get the default R2 storage provider - */ - static getDefaultProvider(): StorageProvider { - const accountId = process.env.R2_ACCOUNT_ID; - const accessKeyId = process.env.R2_ACCESS_KEY_ID; - const secretAccessKey = process.env.R2_SECRET_ACCESS_KEY; - const bucket = process.env.R2_BUCKET || 'metric-exports'; - - if (!accountId || !accessKeyId || !secretAccessKey) { - throw new Error('Default R2 storage credentials not configured'); - } - - const config: StorageConfig = { - provider: 'r2', - accountId, - bucket, - accessKeyId, - secretAccessKey, - }; - - return StorageFactory.createProvider(config); - } - - /** - * Test storage credentials - */ - static async testCredentials(config: StorageConfig): Promise { - try { - const provider = StorageFactory.createProvider(config); - const result = await provider.testConnection(); - return result.success; - } catch (error) { - console.error('Credential test failed:', error); - // Log more details about the error - if (error instanceof Error) { - console.error('Error message:', error.message); - console.error('Error stack:', error.stack); - } - return false; - } +export function createStorageProvider(config: StorageConfig): StorageProvider { + switch (config.provider) { + case 's3': + return createS3Provider(config); + case 'r2': + return createR2Provider(config); + case 'gcs': + return createGCSProvider(config); + default: + // This should never happen as TypeScript ensures the config matches one of the union types + throw new Error(`Unsupported storage provider`); + } +} + +/** + * Get the default R2 storage provider + */ +export function getDefaultProvider(): StorageProvider { + const accountId = process.env.R2_ACCOUNT_ID; + const accessKeyId = process.env.R2_ACCESS_KEY_ID; + const secretAccessKey = process.env.R2_SECRET_ACCESS_KEY; + const bucket = process.env.R2_BUCKET || 'metric-exports'; + + if (!accountId || !accessKeyId || !secretAccessKey) { + throw new Error('Default R2 storage credentials not configured'); + } + + const config: StorageConfig = { + provider: 'r2', + accountId, + bucket, + accessKeyId, + secretAccessKey, + }; + + return createStorageProvider(config); +} + +/** + * Get storage provider for an organization + * Returns customer storage if configured, otherwise returns default R2 storage + */ +export async function getProviderForOrganization(organizationId: string): Promise { + try { + // Check if organization has a storage integration + const integration = await getS3IntegrationByOrganizationId(organizationId); + + if (integration) { + // Get credentials from vault + const secretName = `s3-integration-${integration.id}`; + const secret = await getSecretByName(secretName); + + if (secret?.secret) { + // Parse the stored credentials + const credentials = JSON.parse(secret.secret) as CreateS3IntegrationRequest; + + // Create appropriate config based on provider + let config: StorageConfig; + + if (credentials.provider === 's3') { + config = { + provider: 's3', + region: credentials.region, + bucket: credentials.bucket, + accessKeyId: credentials.accessKeyId, + secretAccessKey: credentials.secretAccessKey, + }; + } else if (credentials.provider === 'r2') { + config = { + provider: 'r2', + accountId: credentials.accountId, + bucket: credentials.bucket, + accessKeyId: credentials.accessKeyId, + secretAccessKey: credentials.secretAccessKey, + }; + } else if (credentials.provider === 'gcs') { + config = { + provider: 'gcs', + projectId: credentials.projectId, + bucket: credentials.bucket, + serviceAccountKey: credentials.serviceAccountKey, + }; + } else { + // This should never happen as the type is validated above + throw new Error(`Unknown provider type`); + } + + return createStorageProvider(config); + } + } + } catch (error) { + console.error('Error getting customer storage integration:', error); + // Fall back to default storage + } + + // Return default R2 storage + return getDefaultProvider(); +} + +/** + * Test storage credentials + */ +export async function testStorageCredentials(config: StorageConfig): Promise { + try { + const provider = createStorageProvider(config); + const result = await provider.testConnection(); + return result.success; + } catch (error) { + console.error('Credential test failed:', error); + // Log more details about the error + if (error instanceof Error) { + console.error('Error message:', error.message); + console.error('Error stack:', error.stack); + } + return false; } } diff --git a/packages/data-source/src/storage/storage-provider.ts b/packages/data-source/src/storage/storage-provider.ts deleted file mode 100644 index 1308508a9..000000000 --- a/packages/data-source/src/storage/storage-provider.ts +++ /dev/null @@ -1,73 +0,0 @@ -import type { - ConnectionTestResult, - DownloadResult, - ListOptions, - StorageObject, - StorageProvider, - UploadOptions, - UploadResult, -} from './types'; - -/** - * Abstract base class for storage providers - * Provides common functionality and error handling - */ -export abstract class BaseStorageProvider implements StorageProvider { - protected bucket: string; - - constructor(bucket: string) { - this.bucket = bucket; - } - - abstract upload( - key: string, - data: Buffer | string, - options?: UploadOptions - ): Promise; - - abstract download(key: string): Promise; - - abstract getSignedUrl(key: string, expiresIn: number): Promise; - - abstract delete(key: string): Promise; - - abstract exists(key: string): Promise; - - abstract list(prefix: string, options?: ListOptions): Promise; - - abstract testConnection(): Promise; - - /** - * Sanitize storage key to prevent path traversal - */ - protected sanitizeKey(key: string): string { - // Remove leading slashes - key = key.replace(/^\/+/, ''); - - // Remove any path traversal attempts - key = key.replace(/\.\./g, ''); - - // Normalize multiple slashes to single slash - key = key.replace(/\/+/g, '/'); - - return key; - } - - /** - * Convert string to Buffer if needed - */ - protected toBuffer(data: Buffer | string): Buffer { - if (typeof data === 'string') { - return Buffer.from(data, 'utf-8'); - } - return data; - } - - /** - * Standard error handler for storage operations - */ - protected handleError(operation: string, error: unknown): never { - const message = error instanceof Error ? error.message : 'Unknown error'; - throw new Error(`Storage ${operation} failed: ${message}`); - } -} diff --git a/packages/data-source/src/storage/utils.ts b/packages/data-source/src/storage/utils.ts new file mode 100644 index 000000000..cf52a311c --- /dev/null +++ b/packages/data-source/src/storage/utils.ts @@ -0,0 +1,41 @@ +/** + * Common utilities for storage providers + */ + +/** + * Sanitize storage key to prevent path traversal + */ +export function sanitizeKey(key: string): string { + let sanitized = key; + + // Remove leading slashes + sanitized = sanitized.replace(/^\/+/, ''); + + // Remove any path traversal attempts + sanitized = sanitized.replace(/\.\./g, ''); + + // Normalize multiple slashes to single slash + sanitized = sanitized.replace(/\/+/g, '/'); + + return sanitized; +} + +/** + * Convert string or Buffer to Buffer + */ +export function toBuffer(data: Buffer | string): Buffer { + return typeof data === 'string' ? Buffer.from(data) : data; +} + +/** + * Parse error message from unknown error + */ +export function parseErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + if (typeof error === 'string') { + return error; + } + return 'Unknown error occurred'; +}