mirror of https://github.com/buster-so/buster.git
Refactor S3 integration handlers and storage provider logic
- Simplified S3 integration handler functions by consolidating imports and improving code readability. - Updated error handling in the S3 integration process to enhance clarity and maintainability. - Refactored storage provider functions to utilize a more modular approach, separating concerns for better organization. - Introduced utility functions for common operations, improving code reuse and reducing duplication. These changes enhance the overall structure and maintainability of the S3 integration management features.
This commit is contained in:
parent
5f51dfc459
commit
41ca3c3724
|
@ -1,7 +1,7 @@
|
|||
import { Hono } from 'hono';
|
||||
import type { User } from '@buster/database';
|
||||
import { deleteS3IntegrationHandler } from './delete-s3-integration';
|
||||
import type { DeleteS3IntegrationResponse } from '@buster/server-shared';
|
||||
import { Hono } from 'hono';
|
||||
import { deleteS3IntegrationHandler } from './delete-s3-integration';
|
||||
|
||||
const app = new Hono();
|
||||
|
||||
|
@ -13,4 +13,4 @@ export const deleteS3IntegrationRoute = app.delete('/:id', async (c) => {
|
|||
return c.json(response as DeleteS3IntegrationResponse);
|
||||
});
|
||||
|
||||
export default app;
|
||||
export default app;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import { Hono } from 'hono';
|
||||
import type { User } from '@buster/database';
|
||||
import { getS3IntegrationHandler } from './get-s3-integration';
|
||||
import type { GetS3IntegrationResponse } from '@buster/server-shared';
|
||||
import { Hono } from 'hono';
|
||||
import { getS3IntegrationHandler } from './get-s3-integration';
|
||||
|
||||
const app = new Hono();
|
||||
|
||||
|
@ -12,4 +12,4 @@ export const getS3IntegrationRoute = app.get('/', async (c) => {
|
|||
return c.json(response as GetS3IntegrationResponse);
|
||||
});
|
||||
|
||||
export default app;
|
||||
export default app;
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
import { Hono } from 'hono';
|
||||
import { zValidator } from '@hono/zod-validator';
|
||||
import type { User } from '@buster/database';
|
||||
import {
|
||||
CreateS3IntegrationRequestSchema,
|
||||
type CreateS3IntegrationRequest,
|
||||
CreateS3IntegrationRequestSchema,
|
||||
type CreateS3IntegrationResponse,
|
||||
} from '@buster/server-shared';
|
||||
import type { User } from '@buster/database';
|
||||
import { zValidator } from '@hono/zod-validator';
|
||||
import { Hono } from 'hono';
|
||||
import { createS3IntegrationHandler } from './create-s3-integration';
|
||||
|
||||
const app = new Hono();
|
||||
|
@ -22,4 +22,4 @@ export const createS3IntegrationRoute = app.post(
|
|||
}
|
||||
);
|
||||
|
||||
export default app;
|
||||
export default app;
|
||||
|
|
|
@ -1,20 +1,16 @@
|
|||
import { type StorageConfig, testStorageCredentials } from '@buster/data-source';
|
||||
import type { User } from '@buster/database';
|
||||
import {
|
||||
createS3Integration,
|
||||
getUserOrganizationId,
|
||||
createSecret,
|
||||
} from '@buster/database';
|
||||
import { createS3Integration, createSecret, getUserOrganizationId } from '@buster/database';
|
||||
import type {
|
||||
CreateS3IntegrationRequest,
|
||||
CreateS3IntegrationResponse,
|
||||
} from '@buster/server-shared';
|
||||
import { StorageFactory, type StorageConfig } from '@buster/data-source';
|
||||
import { tasks } from '@trigger.dev/sdk/v3';
|
||||
import { HTTPException } from 'hono/http-exception';
|
||||
|
||||
/**
|
||||
* Handler for creating S3 integrations
|
||||
*
|
||||
*
|
||||
* This handler:
|
||||
* 1. Validates user has access to an organization
|
||||
* 2. Checks if organization already has an active integration
|
||||
|
@ -62,7 +58,7 @@ export async function createS3IntegrationHandler(
|
|||
integrationId: integration.id,
|
||||
organizationId,
|
||||
});
|
||||
|
||||
|
||||
console.info('Migration task triggered for storage integration', {
|
||||
integrationId: integration.id,
|
||||
organizationId,
|
||||
|
@ -94,13 +90,15 @@ export async function createS3IntegrationHandler(
|
|||
});
|
||||
}
|
||||
|
||||
if (error.message.includes('Invalid credentials') ||
|
||||
error.message.includes('Failed to parse GCS') ||
|
||||
error.message.includes('Failed to initialize GCS')) {
|
||||
if (
|
||||
error.message.includes('Invalid credentials') ||
|
||||
error.message.includes('Failed to parse GCS') ||
|
||||
error.message.includes('Failed to initialize GCS')
|
||||
) {
|
||||
throw new HTTPException(400, {
|
||||
message: error.message.includes('parse') ?
|
||||
'Invalid GCS service account key format' :
|
||||
'Invalid storage credentials provided',
|
||||
message: error.message.includes('parse')
|
||||
? 'Invalid GCS service account key format'
|
||||
: 'Invalid storage credentials provided',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -116,9 +114,7 @@ export async function createS3IntegrationHandler(
|
|||
/**
|
||||
* Validate storage credentials by attempting to access the bucket
|
||||
*/
|
||||
async function validateStorageCredentials(
|
||||
request: CreateS3IntegrationRequest
|
||||
): Promise<void> {
|
||||
async function validateStorageCredentials(request: CreateS3IntegrationRequest): Promise<void> {
|
||||
// Build storage config from request
|
||||
let config: StorageConfig;
|
||||
|
||||
|
@ -172,10 +168,10 @@ async function validateStorageCredentials(
|
|||
|
||||
// Test the credentials
|
||||
console.info('Testing credentials for bucket:', config.bucket);
|
||||
const isValid = await StorageFactory.testCredentials(config);
|
||||
const isValid = await testStorageCredentials(config);
|
||||
console.info('Credential test result:', isValid);
|
||||
|
||||
|
||||
if (!isValid) {
|
||||
throw new Error('Invalid credentials: Unable to access the specified bucket');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,16 +1,16 @@
|
|||
import type { User } from '@buster/database';
|
||||
import {
|
||||
deleteS3IntegrationById,
|
||||
deleteSecret,
|
||||
getS3IntegrationById,
|
||||
getUserOrganizationId,
|
||||
deleteSecret,
|
||||
} from '@buster/database';
|
||||
import type { DeleteS3IntegrationResponse } from '@buster/server-shared';
|
||||
import { HTTPException } from 'hono/http-exception';
|
||||
|
||||
/**
|
||||
* Handler for deleting S3 integrations
|
||||
*
|
||||
*
|
||||
* This handler:
|
||||
* 1. Validates user has access to the organization
|
||||
* 2. Verifies the integration belongs to the user's organization
|
||||
|
@ -91,4 +91,4 @@ export async function deleteS3IntegrationHandler(
|
|||
message: 'Failed to delete storage integration',
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,22 +1,17 @@
|
|||
import type { User } from '@buster/database';
|
||||
import {
|
||||
getS3IntegrationByOrganizationId,
|
||||
getUserOrganizationId,
|
||||
} from '@buster/database';
|
||||
import { getS3IntegrationByOrganizationId, getUserOrganizationId } from '@buster/database';
|
||||
import type { GetS3IntegrationResponse } from '@buster/server-shared';
|
||||
import { HTTPException } from 'hono/http-exception';
|
||||
|
||||
/**
|
||||
* Handler for getting the current S3 integration for an organization
|
||||
*
|
||||
*
|
||||
* This handler:
|
||||
* 1. Validates user has access to an organization
|
||||
* 2. Retrieves the active integration for the organization
|
||||
* 3. Returns null if no active integration exists
|
||||
*/
|
||||
export async function getS3IntegrationHandler(
|
||||
user: User
|
||||
): Promise<GetS3IntegrationResponse> {
|
||||
export async function getS3IntegrationHandler(user: User): Promise<GetS3IntegrationResponse> {
|
||||
// Get user's organization
|
||||
const userOrg = await getUserOrganizationId(user.id);
|
||||
|
||||
|
@ -51,4 +46,4 @@ export async function getS3IntegrationHandler(
|
|||
message: 'Failed to retrieve storage integration',
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import { Hono } from 'hono';
|
||||
import { requireAuth } from '../../../middleware/auth';
|
||||
import { createS3IntegrationRoute } from './POST';
|
||||
import { deleteS3IntegrationRoute } from './DELETE';
|
||||
import { getS3IntegrationRoute } from './GET';
|
||||
import { createS3IntegrationRoute } from './POST';
|
||||
|
||||
const app = new Hono();
|
||||
|
||||
|
@ -14,4 +14,4 @@ app.route('/', createS3IntegrationRoute);
|
|||
app.route('/', deleteS3IntegrationRoute);
|
||||
app.route('/', getS3IntegrationRoute);
|
||||
|
||||
export default app;
|
||||
export default app;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
import { StorageFactory } from '@buster/data-source';
|
||||
import { getProviderForOrganization } from '@buster/data-source';
|
||||
import { logger, task } from '@trigger.dev/sdk';
|
||||
import { CleanupExportFileInputSchema } from './interfaces';
|
||||
|
||||
|
@ -19,9 +19,7 @@ export const cleanupExportFile = task({
|
|||
|
||||
try {
|
||||
// Get storage provider (customer storage or default R2)
|
||||
const storageProvider = await StorageFactory.getProviderForOrganization(
|
||||
validated.organizationId
|
||||
);
|
||||
const storageProvider = await getProviderForOrganization(validated.organizationId);
|
||||
|
||||
logger.log('Cleaning up export file', {
|
||||
key: validated.key,
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import { randomBytes } from 'node:crypto';
|
||||
import { type AssetPermissionCheck, checkPermission } from '@buster/access-controls';
|
||||
import { createAdapter, StorageFactory } from '@buster/data-source';
|
||||
import { createAdapter, getProviderForOrganization } from '@buster/data-source';
|
||||
import type { Credentials } from '@buster/data-source';
|
||||
import { getDataSourceCredentials, getMetricForExport } from '@buster/database';
|
||||
import { logger, schemaTask } from '@trigger.dev/sdk';
|
||||
|
@ -200,9 +200,7 @@ export const exportMetricData: ReturnType<
|
|||
const key = `exports/${payload.organizationId}/${payload.metricId}/${timestamp}-${randomId}/${fileName}`;
|
||||
|
||||
// Step 6: Get storage provider (customer storage or default R2)
|
||||
const storageProvider = await StorageFactory.getProviderForOrganization(
|
||||
payload.organizationId
|
||||
);
|
||||
const storageProvider = await getProviderForOrganization(payload.organizationId);
|
||||
|
||||
// Step 7: Upload to storage
|
||||
logger.log('Uploading to storage', { key });
|
||||
|
|
|
@ -1 +1 @@
|
|||
export { migrateStorageAssets } from './migrate-storage-assets';
|
||||
export { migrateStorageAssets } from './migrate-storage-assets';
|
||||
|
|
|
@ -14,13 +14,15 @@ export const MigrateStorageAssetsOutputSchema = z.object({
|
|||
totalAssets: z.number(),
|
||||
migratedAssets: z.number(),
|
||||
failedAssets: z.number(),
|
||||
errors: z.array(
|
||||
z.object({
|
||||
key: z.string(),
|
||||
error: z.string(),
|
||||
})
|
||||
).optional(),
|
||||
errors: z
|
||||
.array(
|
||||
z.object({
|
||||
key: z.string(),
|
||||
error: z.string(),
|
||||
})
|
||||
)
|
||||
.optional(),
|
||||
executionTimeMs: z.number(),
|
||||
});
|
||||
|
||||
export type MigrateStorageAssetsOutput = z.infer<typeof MigrateStorageAssetsOutputSchema>;
|
||||
export type MigrateStorageAssetsOutput = z.infer<typeof MigrateStorageAssetsOutputSchema>;
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
import { StorageFactory } from '@buster/data-source';
|
||||
import { getDefaultProvider, getProviderForOrganization } from '@buster/data-source';
|
||||
import { logger, schemaTask } from '@trigger.dev/sdk/v3';
|
||||
import {
|
||||
MigrateStorageAssetsInputSchema,
|
||||
type MigrateStorageAssetsInput,
|
||||
MigrateStorageAssetsInputSchema,
|
||||
type MigrateStorageAssetsOutput,
|
||||
} from './interfaces';
|
||||
|
||||
/**
|
||||
* Task for migrating existing storage assets to customer's storage integration
|
||||
*
|
||||
*
|
||||
* This task:
|
||||
* 1. Lists all existing assets in default R2 storage for the organization
|
||||
* 2. Downloads each asset from R2
|
||||
|
@ -38,7 +38,7 @@ export const migrateStorageAssets: ReturnType<
|
|||
run: async (payload: MigrateStorageAssetsInput): Promise<MigrateStorageAssetsOutput> => {
|
||||
const startTime = Date.now();
|
||||
const errors: Array<{ key: string; error: string }> = [];
|
||||
|
||||
|
||||
try {
|
||||
logger.log('Starting storage asset migration', {
|
||||
integrationId: payload.integrationId,
|
||||
|
@ -46,12 +46,10 @@ export const migrateStorageAssets: ReturnType<
|
|||
});
|
||||
|
||||
// Get default R2 provider to read from
|
||||
const defaultProvider = StorageFactory.getDefaultProvider();
|
||||
|
||||
const defaultProvider = getDefaultProvider();
|
||||
|
||||
// Get customer's storage provider to write to
|
||||
const customerProvider = await StorageFactory.getProviderForOrganization(
|
||||
payload.organizationId
|
||||
);
|
||||
const customerProvider = await getProviderForOrganization(payload.organizationId);
|
||||
|
||||
// Define prefixes to migrate
|
||||
const prefixesToMigrate = [
|
||||
|
@ -80,13 +78,13 @@ export const migrateStorageAssets: ReturnType<
|
|||
const BATCH_SIZE = 10;
|
||||
for (let i = 0; i < objects.length; i += BATCH_SIZE) {
|
||||
const batch = objects.slice(i, i + BATCH_SIZE);
|
||||
|
||||
|
||||
await Promise.all(
|
||||
batch.map(async (object) => {
|
||||
try {
|
||||
// Download from default storage
|
||||
const downloadResult = await defaultProvider.download(object.key);
|
||||
|
||||
|
||||
if (!downloadResult.success || !downloadResult.data) {
|
||||
throw new Error(`Failed to download: ${downloadResult.error}`);
|
||||
}
|
||||
|
@ -95,7 +93,9 @@ export const migrateStorageAssets: ReturnType<
|
|||
const uploadResult = await customerProvider.upload(
|
||||
object.key,
|
||||
downloadResult.data,
|
||||
downloadResult.contentType ? { contentType: downloadResult.contentType } : undefined
|
||||
downloadResult.contentType
|
||||
? { contentType: downloadResult.contentType }
|
||||
: undefined
|
||||
);
|
||||
|
||||
if (!uploadResult.success) {
|
||||
|
@ -111,12 +111,12 @@ export const migrateStorageAssets: ReturnType<
|
|||
} catch (error) {
|
||||
failedAssets++;
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error';
|
||||
|
||||
|
||||
errors.push({
|
||||
key: object.key,
|
||||
error: errorMessage,
|
||||
});
|
||||
|
||||
|
||||
logger.error('Failed to migrate asset', {
|
||||
key: object.key,
|
||||
error: errorMessage,
|
||||
|
@ -159,7 +159,7 @@ export const migrateStorageAssets: ReturnType<
|
|||
};
|
||||
} catch (error) {
|
||||
const executionTimeMs = Date.now() - startTime;
|
||||
|
||||
|
||||
logger.error('Unexpected error during migration', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
stack: error instanceof Error ? error.stack : undefined,
|
||||
|
@ -170,12 +170,14 @@ export const migrateStorageAssets: ReturnType<
|
|||
totalAssets: 0,
|
||||
migratedAssets: 0,
|
||||
failedAssets: 0,
|
||||
errors: [{
|
||||
key: 'migration',
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
}],
|
||||
errors: [
|
||||
{
|
||||
key: 'migration',
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
},
|
||||
],
|
||||
executionTimeMs,
|
||||
};
|
||||
}
|
||||
},
|
||||
});
|
||||
});
|
||||
|
|
|
@ -4,4 +4,4 @@ export {
|
|||
setCachedMetricData,
|
||||
batchCheckCacheExists,
|
||||
generateCacheKey,
|
||||
} from './r2-metric-cache';
|
||||
} from './metric-cache';
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
import { generateCacheKey } from './r2-metric-cache';
|
||||
import { generateCacheKey } from './metric-cache';
|
||||
|
||||
// Mock AWS SDK
|
||||
vi.mock('@aws-sdk/client-s3', () => ({
|
|
@ -1,5 +1,5 @@
|
|||
import type { MetricDataResponse } from '@buster/server-shared/metrics';
|
||||
import { StorageFactory } from '../storage';
|
||||
import { getProviderForOrganization } from '../storage';
|
||||
|
||||
const CACHE_PREFIX = 'static-report-assets';
|
||||
|
||||
|
@ -40,13 +40,13 @@ export async function checkCacheExists(
|
|||
reportId: string
|
||||
): Promise<boolean> {
|
||||
try {
|
||||
const storageProvider = await StorageFactory.getProviderForOrganization(organizationId);
|
||||
const storageProvider = await getProviderForOrganization(organizationId);
|
||||
const key = generateCacheKey(organizationId, metricId, reportId);
|
||||
|
||||
const exists = await storageProvider.exists(key);
|
||||
return exists;
|
||||
} catch (error: unknown) {
|
||||
console.error('[r2-metric-cache] Error checking cache existence:', error);
|
||||
console.error('[metric-cache] Error checking cache existence:', error);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -60,10 +60,10 @@ export async function getCachedMetricData(
|
|||
reportId: string
|
||||
): Promise<MetricDataResponse | null> {
|
||||
try {
|
||||
const storageProvider = await StorageFactory.getProviderForOrganization(organizationId);
|
||||
const storageProvider = await getProviderForOrganization(organizationId);
|
||||
const key = generateCacheKey(organizationId, metricId, reportId);
|
||||
|
||||
console.info('[r2-metric-cache] Fetching cached data', {
|
||||
console.info('[metric-cache] Fetching cached data', {
|
||||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
|
@ -73,7 +73,7 @@ export async function getCachedMetricData(
|
|||
const downloadResult = await storageProvider.download(key);
|
||||
|
||||
if (!downloadResult.success || !downloadResult.data) {
|
||||
console.info('[r2-metric-cache] Cache miss', {
|
||||
console.info('[metric-cache] Cache miss', {
|
||||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
|
@ -85,7 +85,7 @@ export async function getCachedMetricData(
|
|||
const data = jsonToData(downloadResult.data);
|
||||
data.metricId = metricId;
|
||||
|
||||
console.info('[r2-metric-cache] Cache hit', {
|
||||
console.info('[metric-cache] Cache hit', {
|
||||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
|
@ -94,7 +94,7 @@ export async function getCachedMetricData(
|
|||
|
||||
return data;
|
||||
} catch (error: unknown) {
|
||||
console.error('[r2-metric-cache] Error fetching cached data:', error);
|
||||
console.error('[metric-cache] Error fetching cached data:', error);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -109,10 +109,10 @@ export async function setCachedMetricData(
|
|||
data: MetricDataResponse
|
||||
): Promise<void> {
|
||||
try {
|
||||
const storageProvider = await StorageFactory.getProviderForOrganization(organizationId);
|
||||
const storageProvider = await getProviderForOrganization(organizationId);
|
||||
const key = generateCacheKey(organizationId, metricId, reportId);
|
||||
|
||||
console.info('[r2-metric-cache] Caching metric data', {
|
||||
console.info('[metric-cache] Caching metric data', {
|
||||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
|
@ -135,17 +135,17 @@ export async function setCachedMetricData(
|
|||
});
|
||||
|
||||
if (uploadResult.success) {
|
||||
console.info('[r2-metric-cache] Successfully cached metric data', {
|
||||
console.info('[metric-cache] Successfully cached metric data', {
|
||||
organizationId,
|
||||
metricId,
|
||||
reportId,
|
||||
sizeBytes: jsonBuffer.length,
|
||||
});
|
||||
} else {
|
||||
console.error('[r2-metric-cache] Failed to cache metric data:', uploadResult.error);
|
||||
console.error('[metric-cache] Failed to cache metric data:', uploadResult.error);
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('[r2-metric-cache] Error caching metric data:', error);
|
||||
console.error('[metric-cache] Error caching metric data:', error);
|
||||
// Don't throw - caching failures shouldn't break the main flow
|
||||
}
|
||||
}
|
|
@ -1,7 +1,16 @@
|
|||
// Storage abstraction layer exports
|
||||
export * from './storage-provider';
|
||||
export * from './providers/s3-provider';
|
||||
export * from './providers/r2-provider';
|
||||
export * from './providers/gcs-provider';
|
||||
export * from './storage-factory';
|
||||
export * from './types';
|
||||
export * from './utils';
|
||||
|
||||
// Storage providers
|
||||
export { createS3Provider } from './providers/s3-provider';
|
||||
export { createR2Provider } from './providers/r2-provider';
|
||||
export { createGCSProvider } from './providers/gcs-provider';
|
||||
|
||||
// Storage factory functions
|
||||
export {
|
||||
createStorageProvider,
|
||||
getProviderForOrganization,
|
||||
getDefaultProvider,
|
||||
testStorageCredentials,
|
||||
} from './storage-factory';
|
||||
|
|
|
@ -1,63 +1,75 @@
|
|||
import { Storage } from '@google-cloud/storage';
|
||||
import { BaseStorageProvider } from '../storage-provider';
|
||||
import type {
|
||||
ConnectionTestResult,
|
||||
DownloadResult,
|
||||
GCSConfig,
|
||||
ListOptions,
|
||||
StorageObject,
|
||||
StorageProvider,
|
||||
UploadOptions,
|
||||
UploadResult,
|
||||
} from '../types';
|
||||
import { parseErrorMessage, sanitizeKey, toBuffer } from '../utils';
|
||||
|
||||
export class GCSProvider extends BaseStorageProvider {
|
||||
private storage: Storage;
|
||||
private bucketInstance: any; // Using any to avoid complex GCS types
|
||||
|
||||
constructor(config: GCSConfig) {
|
||||
super(config.bucket);
|
||||
|
||||
// Parse service account key
|
||||
let credentials;
|
||||
try {
|
||||
credentials = JSON.parse(config.serviceAccountKey);
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to parse GCS service account key: ${error instanceof Error ? error.message : 'Invalid JSON'}`);
|
||||
}
|
||||
|
||||
try {
|
||||
this.storage = new Storage({
|
||||
projectId: config.projectId,
|
||||
credentials,
|
||||
});
|
||||
|
||||
this.bucketInstance = this.storage.bucket(config.bucket);
|
||||
} catch (error) {
|
||||
throw new Error(`Failed to initialize GCS client: ${error instanceof Error ? error.message : 'Unknown error'}`);
|
||||
}
|
||||
/**
|
||||
* Create a Google Cloud Storage provider
|
||||
*/
|
||||
export function createGCSProvider(config: GCSConfig): StorageProvider {
|
||||
// Parse service account key
|
||||
let credentials: Record<string, unknown>;
|
||||
try {
|
||||
credentials = JSON.parse(config.serviceAccountKey);
|
||||
} catch (error) {
|
||||
throw new Error(
|
||||
`Failed to parse GCS service account key: ${
|
||||
error instanceof Error ? error.message : 'Invalid JSON'
|
||||
}`
|
||||
);
|
||||
}
|
||||
|
||||
async upload(key: string, data: Buffer | string, options?: UploadOptions): Promise<UploadResult> {
|
||||
try {
|
||||
const sanitizedKey = this.sanitizeKey(key);
|
||||
const buffer = this.toBuffer(data);
|
||||
let storage: Storage;
|
||||
// Using a more specific type for bucket instance
|
||||
let bucketInstance: ReturnType<Storage['bucket']>;
|
||||
|
||||
try {
|
||||
storage = new Storage({
|
||||
projectId: config.projectId,
|
||||
credentials,
|
||||
});
|
||||
|
||||
bucketInstance = storage.bucket(config.bucket);
|
||||
} catch (error) {
|
||||
throw new Error(
|
||||
`Failed to initialize GCS client: ${error instanceof Error ? error.message : 'Unknown error'}`
|
||||
);
|
||||
}
|
||||
|
||||
async function upload(
|
||||
key: string,
|
||||
data: Buffer | string,
|
||||
options?: UploadOptions
|
||||
): Promise<UploadResult> {
|
||||
try {
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
const buffer = toBuffer(data);
|
||||
|
||||
const file = bucketInstance.file(sanitizedKey);
|
||||
const metadata: Record<string, unknown> = {};
|
||||
if (options?.contentType) metadata.contentType = options.contentType;
|
||||
if (options?.contentDisposition) metadata.contentDisposition = options.contentDisposition;
|
||||
if (options?.metadata) metadata.metadata = options.metadata;
|
||||
|
||||
const file = this.bucketInstance.file(sanitizedKey);
|
||||
const stream = file.createWriteStream({
|
||||
resumable: false,
|
||||
metadata: {
|
||||
contentType: options?.contentType,
|
||||
contentDisposition: options?.contentDisposition,
|
||||
metadata: options?.metadata,
|
||||
},
|
||||
metadata,
|
||||
});
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
stream.on('error', (error: any) => {
|
||||
return new Promise((resolve) => {
|
||||
stream.on('error', (error: unknown) => {
|
||||
resolve({
|
||||
success: false,
|
||||
key,
|
||||
error: error.message,
|
||||
error: parseErrorMessage(error),
|
||||
});
|
||||
});
|
||||
|
||||
|
@ -76,15 +88,15 @@ export class GCSProvider extends BaseStorageProvider {
|
|||
return {
|
||||
success: false,
|
||||
key,
|
||||
error: error instanceof Error ? error.message : 'Upload failed',
|
||||
error: parseErrorMessage(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async download(key: string): Promise<DownloadResult> {
|
||||
async function download(key: string): Promise<DownloadResult> {
|
||||
try {
|
||||
const sanitizedKey = this.sanitizeKey(key);
|
||||
const file = this.bucketInstance.file(sanitizedKey);
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
const file = bucketInstance.file(sanitizedKey);
|
||||
|
||||
const [buffer] = await file.download();
|
||||
const [metadata] = await file.getMetadata();
|
||||
|
@ -98,14 +110,14 @@ export class GCSProvider extends BaseStorageProvider {
|
|||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Download failed',
|
||||
error: parseErrorMessage(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async getSignedUrl(key: string, expiresIn: number): Promise<string> {
|
||||
const sanitizedKey = this.sanitizeKey(key);
|
||||
const file = this.bucketInstance.file(sanitizedKey);
|
||||
async function getSignedUrl(key: string, expiresIn: number): Promise<string> {
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
const file = bucketInstance.file(sanitizedKey);
|
||||
|
||||
// Convert seconds to milliseconds and add to current time
|
||||
const expires = Date.now() + expiresIn * 1000;
|
||||
|
@ -118,10 +130,10 @@ export class GCSProvider extends BaseStorageProvider {
|
|||
return url;
|
||||
}
|
||||
|
||||
async delete(key: string): Promise<boolean> {
|
||||
async function deleteObject(key: string): Promise<boolean> {
|
||||
try {
|
||||
const sanitizedKey = this.sanitizeKey(key);
|
||||
const file = this.bucketInstance.file(sanitizedKey);
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
const file = bucketInstance.file(sanitizedKey);
|
||||
|
||||
await file.delete();
|
||||
return true;
|
||||
|
@ -131,10 +143,10 @@ export class GCSProvider extends BaseStorageProvider {
|
|||
}
|
||||
}
|
||||
|
||||
async exists(key: string): Promise<boolean> {
|
||||
async function exists(key: string): Promise<boolean> {
|
||||
try {
|
||||
const sanitizedKey = this.sanitizeKey(key);
|
||||
const file = this.bucketInstance.file(sanitizedKey);
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
const file = bucketInstance.file(sanitizedKey);
|
||||
|
||||
const [exists] = await file.exists();
|
||||
return exists;
|
||||
|
@ -144,20 +156,37 @@ export class GCSProvider extends BaseStorageProvider {
|
|||
}
|
||||
}
|
||||
|
||||
async list(prefix: string, options?: ListOptions): Promise<StorageObject[]> {
|
||||
async function list(prefix: string, options?: ListOptions): Promise<StorageObject[]> {
|
||||
try {
|
||||
const sanitizedPrefix = this.sanitizeKey(prefix);
|
||||
const sanitizedPrefix = sanitizeKey(prefix);
|
||||
|
||||
const [files] = await this.bucketInstance.getFiles({
|
||||
const getFilesOptions: {
|
||||
prefix: string;
|
||||
maxResults?: number;
|
||||
pageToken?: string;
|
||||
} = {
|
||||
prefix: sanitizedPrefix,
|
||||
maxResults: options?.maxKeys,
|
||||
pageToken: options?.continuationToken,
|
||||
});
|
||||
};
|
||||
if (options?.maxKeys !== undefined) {
|
||||
getFilesOptions.maxResults = options.maxKeys;
|
||||
}
|
||||
if (options?.continuationToken !== undefined) {
|
||||
getFilesOptions.pageToken = options.continuationToken;
|
||||
}
|
||||
|
||||
return files.map((file: any) => ({
|
||||
key: file.name,
|
||||
size: Number.parseInt(file.metadata.size || '0', 10),
|
||||
lastModified: new Date(file.metadata.updated || file.metadata.timeCreated),
|
||||
const [files] = await storage.bucket(config.bucket).getFiles(getFilesOptions);
|
||||
|
||||
return files.map((file) => ({
|
||||
key: file.name || '',
|
||||
size:
|
||||
typeof file.metadata.size === 'string'
|
||||
? Number.parseInt(file.metadata.size, 10)
|
||||
: file.metadata.size || 0,
|
||||
lastModified: file.metadata.updated
|
||||
? new Date(file.metadata.updated)
|
||||
: file.metadata.timeCreated
|
||||
? new Date(file.metadata.timeCreated)
|
||||
: new Date(),
|
||||
etag: file.metadata.etag || '',
|
||||
}));
|
||||
} catch (error) {
|
||||
|
@ -166,16 +195,16 @@ export class GCSProvider extends BaseStorageProvider {
|
|||
}
|
||||
}
|
||||
|
||||
async testConnection(): Promise<ConnectionTestResult> {
|
||||
async function testConnection(): Promise<ConnectionTestResult> {
|
||||
const testKey = `_test_${Date.now()}.txt`;
|
||||
const testData = 'test';
|
||||
|
||||
try {
|
||||
// Test bucket exists
|
||||
console.info('GCS: Testing bucket exists for:', this.bucket);
|
||||
let exists: boolean;
|
||||
console.info('GCS: Testing bucket exists for:', config.bucket);
|
||||
let bucketExists: boolean;
|
||||
try {
|
||||
[exists] = await this.bucketInstance.exists();
|
||||
[bucketExists] = await bucketInstance.exists();
|
||||
} catch (error) {
|
||||
console.error('GCS: Error checking bucket existence:', error);
|
||||
return {
|
||||
|
@ -183,11 +212,11 @@ export class GCSProvider extends BaseStorageProvider {
|
|||
canRead: false,
|
||||
canWrite: false,
|
||||
canDelete: false,
|
||||
error: `Failed to check bucket existence: ${error instanceof Error ? error.message : 'Unknown error'}`,
|
||||
error: `Failed to check bucket existence: ${parseErrorMessage(error)}`,
|
||||
};
|
||||
}
|
||||
|
||||
if (!exists) {
|
||||
|
||||
if (!bucketExists) {
|
||||
return {
|
||||
success: false,
|
||||
canRead: false,
|
||||
|
@ -199,7 +228,7 @@ export class GCSProvider extends BaseStorageProvider {
|
|||
|
||||
console.info('GCS: Bucket exists, testing write permission');
|
||||
// Test write
|
||||
const uploadResult = await this.upload(testKey, testData);
|
||||
const uploadResult = await upload(testKey, testData);
|
||||
if (!uploadResult.success) {
|
||||
return {
|
||||
success: false,
|
||||
|
@ -212,10 +241,10 @@ export class GCSProvider extends BaseStorageProvider {
|
|||
|
||||
console.info('GCS: Write successful, testing read permission');
|
||||
// Test read
|
||||
const downloadResult = await this.download(testKey);
|
||||
const downloadResult = await download(testKey);
|
||||
if (!downloadResult.success) {
|
||||
// Clean up test file
|
||||
await this.delete(testKey);
|
||||
await deleteObject(testKey);
|
||||
return {
|
||||
success: false,
|
||||
canRead: false,
|
||||
|
@ -227,7 +256,7 @@ export class GCSProvider extends BaseStorageProvider {
|
|||
|
||||
console.info('GCS: Read successful, testing delete permission');
|
||||
// Test delete
|
||||
const deleteResult = await this.delete(testKey);
|
||||
const deleteResult = await deleteObject(testKey);
|
||||
if (!deleteResult) {
|
||||
return {
|
||||
success: false,
|
||||
|
@ -252,8 +281,18 @@ export class GCSProvider extends BaseStorageProvider {
|
|||
canRead: false,
|
||||
canWrite: false,
|
||||
canDelete: false,
|
||||
error: error instanceof Error ? error.message : 'Connection test failed',
|
||||
error: parseErrorMessage(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
upload,
|
||||
download,
|
||||
getSignedUrl,
|
||||
delete: deleteObject,
|
||||
exists,
|
||||
list,
|
||||
testConnection,
|
||||
};
|
||||
}
|
||||
|
|
|
@ -1,37 +1,263 @@
|
|||
import { S3Client } from '@aws-sdk/client-s3';
|
||||
import type { R2Config } from '../types';
|
||||
import { S3Provider } from './s3-provider';
|
||||
import {
|
||||
DeleteObjectCommand,
|
||||
GetObjectCommand,
|
||||
HeadObjectCommand,
|
||||
ListObjectsV2Command,
|
||||
PutObjectCommand,
|
||||
S3Client,
|
||||
} from '@aws-sdk/client-s3';
|
||||
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
|
||||
import type {
|
||||
ConnectionTestResult,
|
||||
DownloadResult,
|
||||
ListOptions,
|
||||
R2Config,
|
||||
StorageObject,
|
||||
StorageProvider,
|
||||
UploadOptions,
|
||||
UploadResult,
|
||||
} from '../types';
|
||||
import { parseErrorMessage, sanitizeKey, toBuffer } from '../utils';
|
||||
|
||||
/**
|
||||
* Cloudflare R2 provider
|
||||
* R2 is S3-compatible, so we extend S3Provider with R2-specific configuration
|
||||
* Create a Cloudflare R2 storage provider
|
||||
* R2 is S3-compatible with specific endpoint configuration
|
||||
*/
|
||||
export class R2Provider extends S3Provider {
|
||||
constructor(config: R2Config) {
|
||||
// Create S3-compatible configuration for R2
|
||||
const s3Config = {
|
||||
provider: 's3' as const,
|
||||
region: 'auto', // R2 uses 'auto' region
|
||||
bucket: config.bucket,
|
||||
export function createR2Provider(config: R2Config): StorageProvider {
|
||||
const client = new S3Client({
|
||||
region: 'auto',
|
||||
endpoint: `https://${config.accountId}.r2.cloudflarestorage.com`,
|
||||
credentials: {
|
||||
accessKeyId: config.accessKeyId,
|
||||
secretAccessKey: config.secretAccessKey,
|
||||
};
|
||||
},
|
||||
forcePathStyle: true, // Required for R2
|
||||
});
|
||||
|
||||
// Call parent constructor with S3 config
|
||||
super(s3Config);
|
||||
const bucket = config.bucket;
|
||||
|
||||
// Override the client with R2-specific endpoint
|
||||
this.client = new S3Client({
|
||||
region: 'auto',
|
||||
endpoint: `https://${config.accountId}.r2.cloudflarestorage.com`,
|
||||
credentials: {
|
||||
accessKeyId: config.accessKeyId,
|
||||
secretAccessKey: config.secretAccessKey,
|
||||
},
|
||||
forcePathStyle: true, // Required for R2
|
||||
});
|
||||
async function upload(
|
||||
key: string,
|
||||
data: Buffer | string,
|
||||
options?: UploadOptions
|
||||
): Promise<UploadResult> {
|
||||
try {
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
const buffer = toBuffer(data);
|
||||
|
||||
const command = new PutObjectCommand({
|
||||
Bucket: bucket,
|
||||
Key: sanitizedKey,
|
||||
Body: buffer,
|
||||
ContentType: options?.contentType,
|
||||
ContentDisposition: options?.contentDisposition,
|
||||
Metadata: options?.metadata,
|
||||
});
|
||||
|
||||
const response = await client.send(command);
|
||||
|
||||
const result: UploadResult = {
|
||||
success: true,
|
||||
key: sanitizedKey,
|
||||
size: buffer.length,
|
||||
};
|
||||
if (response.ETag) {
|
||||
result.etag = response.ETag;
|
||||
}
|
||||
return result;
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
key,
|
||||
error: parseErrorMessage(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// All methods inherited from S3Provider work with R2
|
||||
// No need to override unless R2-specific behavior is needed
|
||||
async function download(key: string): Promise<DownloadResult> {
|
||||
try {
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: bucket,
|
||||
Key: sanitizedKey,
|
||||
});
|
||||
|
||||
const response = await client.send(command);
|
||||
|
||||
if (!response.Body) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'No data returned from R2',
|
||||
};
|
||||
}
|
||||
|
||||
const chunks: Uint8Array[] = [];
|
||||
const stream = response.Body as AsyncIterable<Uint8Array>;
|
||||
for await (const chunk of stream) {
|
||||
chunks.push(chunk);
|
||||
}
|
||||
const buffer = Buffer.concat(chunks);
|
||||
|
||||
const result: DownloadResult = {
|
||||
success: true,
|
||||
data: buffer,
|
||||
size: buffer.length,
|
||||
};
|
||||
if (response.ContentType) {
|
||||
result.contentType = response.ContentType;
|
||||
}
|
||||
return result;
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
error: parseErrorMessage(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async function getSignedUrlForDownload(key: string, expiresIn: number): Promise<string> {
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: bucket,
|
||||
Key: sanitizedKey,
|
||||
});
|
||||
|
||||
return getSignedUrl(client, command, { expiresIn });
|
||||
}
|
||||
|
||||
async function deleteObject(key: string): Promise<boolean> {
|
||||
try {
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
|
||||
const command = new DeleteObjectCommand({
|
||||
Bucket: bucket,
|
||||
Key: sanitizedKey,
|
||||
});
|
||||
|
||||
await client.send(command);
|
||||
return true;
|
||||
} catch (error) {
|
||||
console.error('Error deleting from R2:', parseErrorMessage(error));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function exists(key: string): Promise<boolean> {
|
||||
try {
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
|
||||
const command = new HeadObjectCommand({
|
||||
Bucket: bucket,
|
||||
Key: sanitizedKey,
|
||||
});
|
||||
|
||||
await client.send(command);
|
||||
return true;
|
||||
} catch {
|
||||
// Silently return false for existence check
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function list(prefix: string, options?: ListOptions): Promise<StorageObject[]> {
|
||||
try {
|
||||
const sanitizedPrefix = sanitizeKey(prefix);
|
||||
|
||||
const command = new ListObjectsV2Command({
|
||||
Bucket: bucket,
|
||||
Prefix: sanitizedPrefix,
|
||||
MaxKeys: options?.maxKeys,
|
||||
ContinuationToken: options?.continuationToken,
|
||||
});
|
||||
|
||||
const response = await client.send(command);
|
||||
|
||||
return (
|
||||
response.Contents?.map((obj) => {
|
||||
const item: StorageObject = {
|
||||
key: obj.Key || '',
|
||||
size: obj.Size || 0,
|
||||
lastModified: obj.LastModified || new Date(),
|
||||
};
|
||||
if (obj.ETag) {
|
||||
item.etag = obj.ETag;
|
||||
}
|
||||
return item;
|
||||
}) || []
|
||||
);
|
||||
} catch (error) {
|
||||
console.error('Error listing R2 objects:', parseErrorMessage(error));
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async function testConnection(): Promise<ConnectionTestResult> {
|
||||
const testKey = `_test_${Date.now()}.txt`;
|
||||
const testData = 'test';
|
||||
|
||||
try {
|
||||
// Test write
|
||||
const uploadResult = await upload(testKey, testData);
|
||||
if (!uploadResult.success) {
|
||||
return {
|
||||
success: false,
|
||||
canRead: false,
|
||||
canWrite: false,
|
||||
canDelete: false,
|
||||
error: `Upload failed: ${uploadResult.error}`,
|
||||
};
|
||||
}
|
||||
|
||||
// Test read
|
||||
const downloadResult = await download(testKey);
|
||||
if (!downloadResult.success) {
|
||||
return {
|
||||
success: false,
|
||||
canRead: false,
|
||||
canWrite: true,
|
||||
canDelete: false,
|
||||
error: `Download failed: ${downloadResult.error}`,
|
||||
};
|
||||
}
|
||||
|
||||
// Test delete
|
||||
const deleteResult = await deleteObject(testKey);
|
||||
if (!deleteResult) {
|
||||
return {
|
||||
success: false,
|
||||
canRead: true,
|
||||
canWrite: true,
|
||||
canDelete: false,
|
||||
error: 'Delete failed',
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
canRead: true,
|
||||
canWrite: true,
|
||||
canDelete: true,
|
||||
};
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
canRead: false,
|
||||
canWrite: false,
|
||||
canDelete: false,
|
||||
error: parseErrorMessage(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
upload,
|
||||
download,
|
||||
getSignedUrl: getSignedUrlForDownload,
|
||||
delete: deleteObject,
|
||||
exists,
|
||||
list,
|
||||
testConnection,
|
||||
};
|
||||
}
|
||||
|
|
|
@ -7,39 +7,43 @@ import {
|
|||
S3Client,
|
||||
} from '@aws-sdk/client-s3';
|
||||
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
|
||||
import { BaseStorageProvider } from '../storage-provider';
|
||||
import type {
|
||||
ConnectionTestResult,
|
||||
DownloadResult,
|
||||
ListOptions,
|
||||
S3Config,
|
||||
StorageObject,
|
||||
StorageProvider,
|
||||
UploadOptions,
|
||||
UploadResult,
|
||||
} from '../types';
|
||||
import { parseErrorMessage, sanitizeKey, toBuffer } from '../utils';
|
||||
|
||||
export class S3Provider extends BaseStorageProvider {
|
||||
protected client: S3Client;
|
||||
/**
|
||||
* Create an S3 storage provider
|
||||
*/
|
||||
export function createS3Provider(config: S3Config): StorageProvider {
|
||||
const client = new S3Client({
|
||||
region: config.region,
|
||||
credentials: {
|
||||
accessKeyId: config.accessKeyId,
|
||||
secretAccessKey: config.secretAccessKey,
|
||||
},
|
||||
});
|
||||
|
||||
constructor(config: S3Config) {
|
||||
super(config.bucket);
|
||||
const bucket = config.bucket;
|
||||
|
||||
this.client = new S3Client({
|
||||
region: config.region,
|
||||
credentials: {
|
||||
accessKeyId: config.accessKeyId,
|
||||
secretAccessKey: config.secretAccessKey,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
async upload(key: string, data: Buffer | string, options?: UploadOptions): Promise<UploadResult> {
|
||||
async function upload(
|
||||
key: string,
|
||||
data: Buffer | string,
|
||||
options?: UploadOptions
|
||||
): Promise<UploadResult> {
|
||||
try {
|
||||
const sanitizedKey = this.sanitizeKey(key);
|
||||
const buffer = this.toBuffer(data);
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
const buffer = toBuffer(data);
|
||||
|
||||
const command = new PutObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Bucket: bucket,
|
||||
Key: sanitizedKey,
|
||||
Body: buffer,
|
||||
ContentType: options?.contentType,
|
||||
|
@ -47,33 +51,36 @@ export class S3Provider extends BaseStorageProvider {
|
|||
Metadata: options?.metadata,
|
||||
});
|
||||
|
||||
const response = await this.client.send(command);
|
||||
const response = await client.send(command);
|
||||
|
||||
return {
|
||||
const result: UploadResult = {
|
||||
success: true,
|
||||
key: sanitizedKey,
|
||||
etag: response.ETag || '',
|
||||
size: buffer.length,
|
||||
};
|
||||
if (response.ETag) {
|
||||
result.etag = response.ETag;
|
||||
}
|
||||
return result;
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
key,
|
||||
error: error instanceof Error ? error.message : 'Upload failed',
|
||||
error: parseErrorMessage(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async download(key: string): Promise<DownloadResult> {
|
||||
async function download(key: string): Promise<DownloadResult> {
|
||||
try {
|
||||
const sanitizedKey = this.sanitizeKey(key);
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Bucket: bucket,
|
||||
Key: sanitizedKey,
|
||||
});
|
||||
|
||||
const response = await this.client.send(command);
|
||||
const response = await client.send(command);
|
||||
|
||||
if (!response.Body) {
|
||||
return {
|
||||
|
@ -82,7 +89,6 @@ export class S3Provider extends BaseStorageProvider {
|
|||
};
|
||||
}
|
||||
|
||||
// Convert stream to buffer
|
||||
const chunks: Uint8Array[] = [];
|
||||
const stream = response.Body as AsyncIterable<Uint8Array>;
|
||||
for await (const chunk of stream) {
|
||||
|
@ -90,130 +96,138 @@ export class S3Provider extends BaseStorageProvider {
|
|||
}
|
||||
const buffer = Buffer.concat(chunks);
|
||||
|
||||
return {
|
||||
const result: DownloadResult = {
|
||||
success: true,
|
||||
data: buffer,
|
||||
contentType: response.ContentType || 'application/octet-stream',
|
||||
size: buffer.length,
|
||||
};
|
||||
if (response.ContentType) {
|
||||
result.contentType = response.ContentType;
|
||||
}
|
||||
return result;
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
error: error instanceof Error ? error.message : 'Download failed',
|
||||
error: parseErrorMessage(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async getSignedUrl(key: string, expiresIn: number): Promise<string> {
|
||||
const sanitizedKey = this.sanitizeKey(key);
|
||||
async function getSignedUrlForDownload(key: string, expiresIn: number): Promise<string> {
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Bucket: bucket,
|
||||
Key: sanitizedKey,
|
||||
});
|
||||
|
||||
return getSignedUrl(this.client, command, { expiresIn });
|
||||
return getSignedUrl(client, command, { expiresIn });
|
||||
}
|
||||
|
||||
async delete(key: string): Promise<boolean> {
|
||||
async function deleteObject(key: string): Promise<boolean> {
|
||||
try {
|
||||
const sanitizedKey = this.sanitizeKey(key);
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
|
||||
const command = new DeleteObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Bucket: bucket,
|
||||
Key: sanitizedKey,
|
||||
});
|
||||
|
||||
await this.client.send(command);
|
||||
await client.send(command);
|
||||
return true;
|
||||
} catch (error) {
|
||||
console.error('S3 delete error:', error);
|
||||
console.error('Error deleting from S3:', parseErrorMessage(error));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async exists(key: string): Promise<boolean> {
|
||||
async function exists(key: string): Promise<boolean> {
|
||||
try {
|
||||
const sanitizedKey = this.sanitizeKey(key);
|
||||
const sanitizedKey = sanitizeKey(key);
|
||||
|
||||
const command = new HeadObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Bucket: bucket,
|
||||
Key: sanitizedKey,
|
||||
});
|
||||
|
||||
await this.client.send(command);
|
||||
await client.send(command);
|
||||
return true;
|
||||
} catch (error: any) {
|
||||
if (error?.name === 'NotFound' || error?.$metadata?.httpStatusCode === 404) {
|
||||
return false;
|
||||
}
|
||||
throw error;
|
||||
} catch {
|
||||
// Silently return false for existence check
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async list(prefix: string, options?: ListOptions): Promise<StorageObject[]> {
|
||||
async function list(prefix: string, options?: ListOptions): Promise<StorageObject[]> {
|
||||
try {
|
||||
const sanitizedPrefix = this.sanitizeKey(prefix);
|
||||
const sanitizedPrefix = sanitizeKey(prefix);
|
||||
|
||||
const command = new ListObjectsV2Command({
|
||||
Bucket: this.bucket,
|
||||
Bucket: bucket,
|
||||
Prefix: sanitizedPrefix,
|
||||
MaxKeys: options?.maxKeys,
|
||||
ContinuationToken: options?.continuationToken,
|
||||
});
|
||||
|
||||
const response = await this.client.send(command);
|
||||
const response = await client.send(command);
|
||||
|
||||
return (response.Contents || []).map((object) => ({
|
||||
key: object.Key || '',
|
||||
size: object.Size || 0,
|
||||
lastModified: object.LastModified || new Date(),
|
||||
etag: object.ETag || '',
|
||||
}));
|
||||
return (
|
||||
response.Contents?.map((obj) => {
|
||||
const item: StorageObject = {
|
||||
key: obj.Key || '',
|
||||
size: obj.Size || 0,
|
||||
lastModified: obj.LastModified || new Date(),
|
||||
};
|
||||
if (obj.ETag) {
|
||||
item.etag = obj.ETag;
|
||||
}
|
||||
return item;
|
||||
}) || []
|
||||
);
|
||||
} catch (error) {
|
||||
console.error('S3 list error:', error);
|
||||
console.error('Error listing S3 objects:', parseErrorMessage(error));
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
async testConnection(): Promise<ConnectionTestResult> {
|
||||
async function testConnection(): Promise<ConnectionTestResult> {
|
||||
const testKey = `_test_${Date.now()}.txt`;
|
||||
const testData = 'test';
|
||||
|
||||
try {
|
||||
// Test write
|
||||
const uploadResult = await this.upload(testKey, testData);
|
||||
const uploadResult = await upload(testKey, testData);
|
||||
if (!uploadResult.success) {
|
||||
return {
|
||||
success: false,
|
||||
canRead: false,
|
||||
canWrite: false,
|
||||
canDelete: false,
|
||||
error: `Cannot write to bucket: ${uploadResult.error}`,
|
||||
error: `Upload failed: ${uploadResult.error}`,
|
||||
};
|
||||
}
|
||||
|
||||
// Test read
|
||||
const downloadResult = await this.download(testKey);
|
||||
const downloadResult = await download(testKey);
|
||||
if (!downloadResult.success) {
|
||||
return {
|
||||
success: false,
|
||||
canRead: false,
|
||||
canWrite: true,
|
||||
canDelete: false,
|
||||
error: `Cannot read from bucket: ${downloadResult.error}`,
|
||||
error: `Download failed: ${downloadResult.error}`,
|
||||
};
|
||||
}
|
||||
|
||||
// Test delete
|
||||
const deleteResult = await this.delete(testKey);
|
||||
const deleteResult = await deleteObject(testKey);
|
||||
if (!deleteResult) {
|
||||
return {
|
||||
success: false,
|
||||
canRead: true,
|
||||
canWrite: true,
|
||||
canDelete: false,
|
||||
error: 'Cannot delete from bucket',
|
||||
error: 'Delete failed',
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -229,8 +243,18 @@ export class S3Provider extends BaseStorageProvider {
|
|||
canRead: false,
|
||||
canWrite: false,
|
||||
canDelete: false,
|
||||
error: error instanceof Error ? error.message : 'Connection test failed',
|
||||
error: parseErrorMessage(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
upload,
|
||||
download,
|
||||
getSignedUrl: getSignedUrlForDownload,
|
||||
delete: deleteObject,
|
||||
exists,
|
||||
list,
|
||||
testConnection,
|
||||
};
|
||||
}
|
||||
|
|
|
@ -1,130 +1,127 @@
|
|||
import { getS3IntegrationByOrganizationId, getSecretByName } from '@buster/database';
|
||||
import type { CreateS3IntegrationRequest } from '@buster/server-shared';
|
||||
import { GCSProvider } from './providers/gcs-provider';
|
||||
import { R2Provider } from './providers/r2-provider';
|
||||
import { S3Provider } from './providers/s3-provider';
|
||||
import { createGCSProvider } from './providers/gcs-provider';
|
||||
import { createR2Provider } from './providers/r2-provider';
|
||||
import { createS3Provider } from './providers/s3-provider';
|
||||
import type { StorageConfig, StorageProvider } from './types';
|
||||
|
||||
/**
|
||||
* Factory for creating storage providers
|
||||
* Create a storage provider from configuration
|
||||
*/
|
||||
export class StorageFactory {
|
||||
/**
|
||||
* Create a storage provider from configuration
|
||||
*/
|
||||
static createProvider(config: StorageConfig): StorageProvider {
|
||||
switch (config.provider) {
|
||||
case 's3':
|
||||
return new S3Provider(config);
|
||||
case 'r2':
|
||||
return new R2Provider(config);
|
||||
case 'gcs':
|
||||
return new GCSProvider(config);
|
||||
default:
|
||||
throw new Error(`Unsupported storage provider: ${(config as any).provider}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get storage provider for an organization
|
||||
* Returns customer storage if configured, otherwise returns default R2 storage
|
||||
*/
|
||||
static async getProviderForOrganization(organizationId: string): Promise<StorageProvider> {
|
||||
try {
|
||||
// Check if organization has a storage integration
|
||||
const integration = await getS3IntegrationByOrganizationId(organizationId);
|
||||
|
||||
if (integration) {
|
||||
// Get credentials from vault
|
||||
const secretName = `s3-integration-${integration.id}`;
|
||||
const secret = await getSecretByName(secretName);
|
||||
|
||||
if (secret && secret.secret) {
|
||||
// Parse the stored credentials
|
||||
const credentials = JSON.parse(secret.secret) as CreateS3IntegrationRequest;
|
||||
|
||||
// Create appropriate config based on provider
|
||||
let config: StorageConfig;
|
||||
|
||||
if (credentials.provider === 's3') {
|
||||
config = {
|
||||
provider: 's3',
|
||||
region: credentials.region,
|
||||
bucket: credentials.bucket,
|
||||
accessKeyId: credentials.accessKeyId,
|
||||
secretAccessKey: credentials.secretAccessKey,
|
||||
};
|
||||
} else if (credentials.provider === 'r2') {
|
||||
config = {
|
||||
provider: 'r2',
|
||||
accountId: credentials.accountId,
|
||||
bucket: credentials.bucket,
|
||||
accessKeyId: credentials.accessKeyId,
|
||||
secretAccessKey: credentials.secretAccessKey,
|
||||
};
|
||||
} else if (credentials.provider === 'gcs') {
|
||||
config = {
|
||||
provider: 'gcs',
|
||||
projectId: credentials.projectId,
|
||||
bucket: credentials.bucket,
|
||||
serviceAccountKey: credentials.serviceAccountKey,
|
||||
};
|
||||
} else {
|
||||
throw new Error(`Unknown provider type: ${(credentials as any).provider}`);
|
||||
}
|
||||
|
||||
return StorageFactory.createProvider(config);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error getting customer storage integration:', error);
|
||||
// Fall back to default storage
|
||||
}
|
||||
|
||||
// Return default R2 storage
|
||||
return StorageFactory.getDefaultProvider();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the default R2 storage provider
|
||||
*/
|
||||
static getDefaultProvider(): StorageProvider {
|
||||
const accountId = process.env.R2_ACCOUNT_ID;
|
||||
const accessKeyId = process.env.R2_ACCESS_KEY_ID;
|
||||
const secretAccessKey = process.env.R2_SECRET_ACCESS_KEY;
|
||||
const bucket = process.env.R2_BUCKET || 'metric-exports';
|
||||
|
||||
if (!accountId || !accessKeyId || !secretAccessKey) {
|
||||
throw new Error('Default R2 storage credentials not configured');
|
||||
}
|
||||
|
||||
const config: StorageConfig = {
|
||||
provider: 'r2',
|
||||
accountId,
|
||||
bucket,
|
||||
accessKeyId,
|
||||
secretAccessKey,
|
||||
};
|
||||
|
||||
return StorageFactory.createProvider(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test storage credentials
|
||||
*/
|
||||
static async testCredentials(config: StorageConfig): Promise<boolean> {
|
||||
try {
|
||||
const provider = StorageFactory.createProvider(config);
|
||||
const result = await provider.testConnection();
|
||||
return result.success;
|
||||
} catch (error) {
|
||||
console.error('Credential test failed:', error);
|
||||
// Log more details about the error
|
||||
if (error instanceof Error) {
|
||||
console.error('Error message:', error.message);
|
||||
console.error('Error stack:', error.stack);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
export function createStorageProvider(config: StorageConfig): StorageProvider {
|
||||
switch (config.provider) {
|
||||
case 's3':
|
||||
return createS3Provider(config);
|
||||
case 'r2':
|
||||
return createR2Provider(config);
|
||||
case 'gcs':
|
||||
return createGCSProvider(config);
|
||||
default:
|
||||
// This should never happen as TypeScript ensures the config matches one of the union types
|
||||
throw new Error(`Unsupported storage provider`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the default R2 storage provider
|
||||
*/
|
||||
export function getDefaultProvider(): StorageProvider {
|
||||
const accountId = process.env.R2_ACCOUNT_ID;
|
||||
const accessKeyId = process.env.R2_ACCESS_KEY_ID;
|
||||
const secretAccessKey = process.env.R2_SECRET_ACCESS_KEY;
|
||||
const bucket = process.env.R2_BUCKET || 'metric-exports';
|
||||
|
||||
if (!accountId || !accessKeyId || !secretAccessKey) {
|
||||
throw new Error('Default R2 storage credentials not configured');
|
||||
}
|
||||
|
||||
const config: StorageConfig = {
|
||||
provider: 'r2',
|
||||
accountId,
|
||||
bucket,
|
||||
accessKeyId,
|
||||
secretAccessKey,
|
||||
};
|
||||
|
||||
return createStorageProvider(config);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get storage provider for an organization
|
||||
* Returns customer storage if configured, otherwise returns default R2 storage
|
||||
*/
|
||||
export async function getProviderForOrganization(organizationId: string): Promise<StorageProvider> {
|
||||
try {
|
||||
// Check if organization has a storage integration
|
||||
const integration = await getS3IntegrationByOrganizationId(organizationId);
|
||||
|
||||
if (integration) {
|
||||
// Get credentials from vault
|
||||
const secretName = `s3-integration-${integration.id}`;
|
||||
const secret = await getSecretByName(secretName);
|
||||
|
||||
if (secret?.secret) {
|
||||
// Parse the stored credentials
|
||||
const credentials = JSON.parse(secret.secret) as CreateS3IntegrationRequest;
|
||||
|
||||
// Create appropriate config based on provider
|
||||
let config: StorageConfig;
|
||||
|
||||
if (credentials.provider === 's3') {
|
||||
config = {
|
||||
provider: 's3',
|
||||
region: credentials.region,
|
||||
bucket: credentials.bucket,
|
||||
accessKeyId: credentials.accessKeyId,
|
||||
secretAccessKey: credentials.secretAccessKey,
|
||||
};
|
||||
} else if (credentials.provider === 'r2') {
|
||||
config = {
|
||||
provider: 'r2',
|
||||
accountId: credentials.accountId,
|
||||
bucket: credentials.bucket,
|
||||
accessKeyId: credentials.accessKeyId,
|
||||
secretAccessKey: credentials.secretAccessKey,
|
||||
};
|
||||
} else if (credentials.provider === 'gcs') {
|
||||
config = {
|
||||
provider: 'gcs',
|
||||
projectId: credentials.projectId,
|
||||
bucket: credentials.bucket,
|
||||
serviceAccountKey: credentials.serviceAccountKey,
|
||||
};
|
||||
} else {
|
||||
// This should never happen as the type is validated above
|
||||
throw new Error(`Unknown provider type`);
|
||||
}
|
||||
|
||||
return createStorageProvider(config);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error getting customer storage integration:', error);
|
||||
// Fall back to default storage
|
||||
}
|
||||
|
||||
// Return default R2 storage
|
||||
return getDefaultProvider();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test storage credentials
|
||||
*/
|
||||
export async function testStorageCredentials(config: StorageConfig): Promise<boolean> {
|
||||
try {
|
||||
const provider = createStorageProvider(config);
|
||||
const result = await provider.testConnection();
|
||||
return result.success;
|
||||
} catch (error) {
|
||||
console.error('Credential test failed:', error);
|
||||
// Log more details about the error
|
||||
if (error instanceof Error) {
|
||||
console.error('Error message:', error.message);
|
||||
console.error('Error stack:', error.stack);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,73 +0,0 @@
|
|||
import type {
|
||||
ConnectionTestResult,
|
||||
DownloadResult,
|
||||
ListOptions,
|
||||
StorageObject,
|
||||
StorageProvider,
|
||||
UploadOptions,
|
||||
UploadResult,
|
||||
} from './types';
|
||||
|
||||
/**
|
||||
* Abstract base class for storage providers
|
||||
* Provides common functionality and error handling
|
||||
*/
|
||||
export abstract class BaseStorageProvider implements StorageProvider {
|
||||
protected bucket: string;
|
||||
|
||||
constructor(bucket: string) {
|
||||
this.bucket = bucket;
|
||||
}
|
||||
|
||||
abstract upload(
|
||||
key: string,
|
||||
data: Buffer | string,
|
||||
options?: UploadOptions
|
||||
): Promise<UploadResult>;
|
||||
|
||||
abstract download(key: string): Promise<DownloadResult>;
|
||||
|
||||
abstract getSignedUrl(key: string, expiresIn: number): Promise<string>;
|
||||
|
||||
abstract delete(key: string): Promise<boolean>;
|
||||
|
||||
abstract exists(key: string): Promise<boolean>;
|
||||
|
||||
abstract list(prefix: string, options?: ListOptions): Promise<StorageObject[]>;
|
||||
|
||||
abstract testConnection(): Promise<ConnectionTestResult>;
|
||||
|
||||
/**
|
||||
* Sanitize storage key to prevent path traversal
|
||||
*/
|
||||
protected sanitizeKey(key: string): string {
|
||||
// Remove leading slashes
|
||||
key = key.replace(/^\/+/, '');
|
||||
|
||||
// Remove any path traversal attempts
|
||||
key = key.replace(/\.\./g, '');
|
||||
|
||||
// Normalize multiple slashes to single slash
|
||||
key = key.replace(/\/+/g, '/');
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert string to Buffer if needed
|
||||
*/
|
||||
protected toBuffer(data: Buffer | string): Buffer {
|
||||
if (typeof data === 'string') {
|
||||
return Buffer.from(data, 'utf-8');
|
||||
}
|
||||
return data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Standard error handler for storage operations
|
||||
*/
|
||||
protected handleError(operation: string, error: unknown): never {
|
||||
const message = error instanceof Error ? error.message : 'Unknown error';
|
||||
throw new Error(`Storage ${operation} failed: ${message}`);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Common utilities for storage providers
|
||||
*/
|
||||
|
||||
/**
|
||||
* Sanitize storage key to prevent path traversal
|
||||
*/
|
||||
export function sanitizeKey(key: string): string {
|
||||
let sanitized = key;
|
||||
|
||||
// Remove leading slashes
|
||||
sanitized = sanitized.replace(/^\/+/, '');
|
||||
|
||||
// Remove any path traversal attempts
|
||||
sanitized = sanitized.replace(/\.\./g, '');
|
||||
|
||||
// Normalize multiple slashes to single slash
|
||||
sanitized = sanitized.replace(/\/+/g, '/');
|
||||
|
||||
return sanitized;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert string or Buffer to Buffer
|
||||
*/
|
||||
export function toBuffer(data: Buffer | string): Buffer {
|
||||
return typeof data === 'string' ? Buffer.from(data) : data;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse error message from unknown error
|
||||
*/
|
||||
export function parseErrorMessage(error: unknown): string {
|
||||
if (error instanceof Error) {
|
||||
return error.message;
|
||||
}
|
||||
if (typeof error === 'string') {
|
||||
return error;
|
||||
}
|
||||
return 'Unknown error occurred';
|
||||
}
|
Loading…
Reference in New Issue