simplify trigger task

This commit is contained in:
Nate Kelley 2025-10-08 15:35:26 -06:00
parent 5412fa5fa3
commit 600f31faff
No known key found for this signature in database
GPG Key ID: FD90372AB8D98B4F
13 changed files with 110 additions and 221 deletions

View File

@ -252,8 +252,6 @@ export async function getDashboardHandler(
dashboardId,
isOnSaveEvent: false,
organizationId: dashboardFile.organizationId,
supabaseCookieKey: c.get('supabaseCookieKey'),
supabaseUser: c.get('supabaseUser'),
accessToken: c.get('accessToken'),
} satisfies TakeDashboardScreenshotTrigger,
{ concurrencyKey: `take-dashboard-screenshot-${dashboardId}-${versionNumber}` }

View File

@ -3,8 +3,9 @@ import type { TakeMetricScreenshotTrigger } from '@buster-app/trigger/task-schem
import { getUserOrganizationId } from '@buster/database/queries';
import { MetricDataParamsSchema, MetricDataQuerySchema } from '@buster/server-shared';
import { zValidator } from '@hono/zod-validator';
import { tasks } from '@trigger.dev/sdk';
import { runs, tasks } from '@trigger.dev/sdk';
import { Hono } from 'hono';
import { id } from 'zod/v4/locales';
import { standardErrorHandler } from '../../../../../utils/response';
import { getMetricDataHandler } from './get-metric-data';
@ -15,7 +16,8 @@ const app = new Hono()
zValidator('query', MetricDataQuerySchema),
async (c) => {
const { id } = c.req.valid('param');
const { limit, version_number, report_file_id, password } = c.req.valid('query');
const { limit, version_number, report_file_id, password, is_screenshot } =
c.req.valid('query');
const user = c.get('busterUser');
const response = await getMetricDataHandler(
@ -27,21 +29,30 @@ const app = new Hono()
password
);
const organizationId =
(await getUserOrganizationId(user.id).then((res) => res?.organizationId)) || '';
const tag = `take-metric-screenshot-${id}-${version_number}`;
const lastTask = await runs
.list({
status: ['EXECUTING', 'QUEUED'],
taskIdentifier: screenshots_task_keys.take_metric_screenshot,
tag,
limit: 1,
})
.then((res) => res.data[0]);
await tasks.trigger(
screenshots_task_keys.take_metric_screenshot,
{
metricId: id,
isOnSaveEvent: false,
supabaseCookieKey: c.get('supabaseCookieKey'),
supabaseUser: c.get('supabaseUser'),
accessToken: c.get('accessToken'),
organizationId,
} satisfies TakeMetricScreenshotTrigger,
{ concurrencyKey: `take-metric-screenshot-${id}-${version_number}` }
);
if (!lastTask && !is_screenshot) {
const organizationId =
(await getUserOrganizationId(user.id).then((res) => res?.organizationId)) || '';
await tasks.trigger(
screenshots_task_keys.take_metric_screenshot,
{
metricId: id,
isOnSaveEvent: false,
accessToken: c.get('accessToken'),
organizationId,
} satisfies TakeMetricScreenshotTrigger,
{ tags: [tag] }
);
}
return c.json(response);
}

View File

@ -95,8 +95,6 @@ const app = new Hono()
{
reportId,
organizationId: (await getUserOrganizationId(user.id))?.organizationId || '',
supabaseCookieKey: c.get('supabaseCookieKey'),
supabaseUser: c.get('supabaseUser'),
accessToken: c.get('accessToken'),
} satisfies TakeReportScreenshotTrigger,
{ concurrencyKey: `take-report-screenshot-${reportId}-${versionNumber}` }

View File

@ -1,177 +0,0 @@
import { getProviderForOrganization } from '@buster/data-source';
import { updateAssetScreenshotBucketKey } from '@buster/database/queries';
import type { AssetType } from '@buster/server-shared/assets';
import { AssetTypeSchema } from '@buster/server-shared/assets';
import {
PutChatScreenshotRequestSchema,
type PutScreenshotResponse,
PutScreenshotResponseSchema,
} from '@buster/server-shared/screenshots';
import z from 'zod';
export const UploadScreenshotParamsSchema = PutChatScreenshotRequestSchema.extend({
assetType: AssetTypeSchema,
assetId: z.string().uuid('Asset ID must be a valid UUID'),
organizationId: z.string().uuid('Organization ID must be a valid UUID'),
});
export type UploadScreenshotParams = z.infer<typeof UploadScreenshotParamsSchema>;
function getExtensionFromContentType(contentType: string): string {
switch (contentType) {
case 'image/jpeg':
case 'image/jpg':
return '.jpg';
case 'image/png':
return '.png';
case 'image/webp':
return '.webp';
default:
return '.png';
}
}
function parseBase64Image(base64Image: string): {
buffer: Buffer;
contentType: string;
extension: string;
} {
const dataUriPattern = /^data:(?<mime>[^;]+);base64,(?<data>.+)$/;
const match = base64Image.match(dataUriPattern);
const contentType = match?.groups?.mime ?? 'image/png';
const base64Data = match?.groups?.data ?? base64Image;
const buffer = Buffer.from(base64Data, 'base64');
if (buffer.length === 0) {
throw new Error('Provided image data is empty');
}
return {
buffer,
contentType,
extension: getExtensionFromContentType(contentType),
};
}
async function parseImageFile(file: File): Promise<{
buffer: Buffer;
contentType: string;
extension: string;
}> {
const arrayBuffer = await file.arrayBuffer();
const buffer = Buffer.from(arrayBuffer);
if (buffer.length === 0) {
throw new Error('Provided image file is empty');
}
return {
buffer,
contentType: file.type,
extension: getExtensionFromContentType(file.type),
};
}
function detectImageTypeFromBuffer(buffer: Buffer): { contentType: string; extension: string } {
// Check PNG signature (89 50 4E 47)
if (
buffer.length >= 4 &&
buffer[0] === 0x89 &&
buffer[1] === 0x50 &&
buffer[2] === 0x4e &&
buffer[3] === 0x47
) {
return { contentType: 'image/png', extension: '.png' };
}
// Check JPEG signature (FF D8 FF)
if (buffer.length >= 3 && buffer[0] === 0xff && buffer[1] === 0xd8 && buffer[2] === 0xff) {
return { contentType: 'image/jpeg', extension: '.jpg' };
}
// Check WebP signature (RIFF ... WEBP)
if (
buffer.length >= 12 &&
buffer[0] === 0x52 &&
buffer[1] === 0x49 &&
buffer[2] === 0x46 &&
buffer[3] === 0x46 &&
buffer[8] === 0x57 &&
buffer[9] === 0x45 &&
buffer[10] === 0x42 &&
buffer[11] === 0x50
) {
return { contentType: 'image/webp', extension: '.webp' };
}
// Default to PNG if unknown
return { contentType: 'image/png', extension: '.png' };
}
function parseBuffer(buffer: Buffer): {
buffer: Buffer;
contentType: string;
extension: string;
} {
const { contentType, extension } = detectImageTypeFromBuffer(buffer);
return {
buffer,
contentType,
extension,
};
}
async function parseImageInput(image: string | File | Buffer): Promise<{
buffer: Buffer;
contentType: string;
extension: string;
}> {
if (Buffer.isBuffer(image)) {
return parseBuffer(image);
}
if (typeof image === 'string') {
return parseBase64Image(image);
}
return parseImageFile(image);
}
function buildScreenshotKey(
assetType: AssetType,
assetId: string,
extension: string,
organizationId: string
): string {
return `screenshots/${organizationId}/${assetType}-${assetId}${extension}`;
}
export async function uploadScreenshotHandler(
params: UploadScreenshotParams
): Promise<PutScreenshotResponse> {
const { assetType, assetId, image, organizationId } = UploadScreenshotParamsSchema.parse(params);
const { buffer, contentType, extension } = await parseImageInput(image);
const targetKey = buildScreenshotKey(assetType, assetId, extension, organizationId);
const provider = await getProviderForOrganization(organizationId);
const result = await provider.upload(targetKey, buffer, {
contentType,
});
if (!result.success) {
throw new Error(result.error ?? 'Failed to upload screenshot');
}
await updateAssetScreenshotBucketKey({
assetId,
assetType,
screenshotBucketKey: result.key,
});
return PutScreenshotResponseSchema.parse({
success: true,
bucketKey: result.key,
});
}

View File

@ -526,9 +526,7 @@ export const analystAgentTask: ReturnType<
dashboardId: payload.message_id,
isOnSaveEvent: false,
organizationId: messageContext.organizationId,
supabaseCookieKey: getSupabaseCookieKey(),
accessToken: payload.access_token,
supabaseUser,
} satisfies TakeDashboardScreenshotTrigger,
{ concurrencyKey: `take-dashboard-screenshot-${payload.message_id}` }
);

View File

@ -31,10 +31,14 @@ export const takeMetricScreenshotHandlerTask: ReturnType<
isOnSaveEvent,
});
logger.info('Should take new screenshot', { shouldTakeNewScreenshot });
if (!shouldTakeNewScreenshot) {
return;
}
logger.info('Getting metric screenshot');
const screenshotBuffer = await getMetricScreenshot(args);
logger.info('Metric screenshot taken', { screenshotBufferLength: screenshotBuffer.length });
@ -65,5 +69,7 @@ const shouldTakenNewScreenshot = async ({
dayjs().subtract(6, 'hours')
);
return !isScreenshotExpired;
logger.info('Is screenshot expired', { isScreenshotExpired });
return isScreenshotExpired;
};

View File

@ -7,6 +7,7 @@ import {
type PutScreenshotResponse,
PutScreenshotResponseSchema,
} from '@buster/server-shared/screenshots';
import { logger } from '@trigger.dev/sdk';
import z from 'zod';
export const UploadScreenshotParamsSchema = PutChatScreenshotRequestSchema.extend({
@ -149,27 +150,40 @@ function buildScreenshotKey(
export async function uploadScreenshotHandler(
params: UploadScreenshotParams
): Promise<PutScreenshotResponse> {
logger.info('Uploading screenshot', { params });
const { assetType, assetId, image, organizationId } = UploadScreenshotParamsSchema.parse(params);
logger.info('Parsing image input', { image });
const { buffer, contentType, extension } = await parseImageInput(image);
logger.info('Building screenshot key', { assetType, assetId, extension, organizationId });
const targetKey = buildScreenshotKey(assetType, assetId, extension, organizationId);
const provider = await getProviderForOrganization(organizationId);
logger.info('Uploading screenshot', { targetKey });
const result = await provider.upload(targetKey, buffer, {
contentType,
});
logger.info('Screenshot uploaded', { result });
if (!result.success) {
throw new Error(result.error ?? 'Failed to upload screenshot');
}
await updateAssetScreenshotBucketKey({
const resultOfUpload = await updateAssetScreenshotBucketKey({
assetId,
assetType,
screenshotBucketKey: result.key,
});
logger.info('Screenshot uploaded', { resultOfUpload });
return PutScreenshotResponseSchema.parse({
success: true,
bucketKey: result.key,

View File

@ -39,9 +39,8 @@ export const getSupabaseUser = async (jwtToken: string): Promise<User | null> =>
return supabaseUser.data.user;
};
export const getSupabaseAccessToken = async (userId: string) => {
const supabase = getSupabaseClient();
const supabaseUser = await supabase.auth.admin.getUserById(userId);
// export const getSupabaseAccessToken = async (userId: string) => {
// const supabase = getSupabaseClient();
//
};
// //
// };

View File

@ -36,7 +36,7 @@ describe('Analyst Agent Task Integration Tests', () => {
const TEST_MESSAGE_CONTENT = 'who is our top customer';
async function triggerAndPollAnalystAgent(
payload: { message_id: string },
payload: { message_id: string; access_token: string | null },
pollIntervalMs = 2000,
timeoutMs = 30 * 60 * 1000 // align with 30 min test timeout
) {
@ -110,7 +110,8 @@ describe('Analyst Agent Task Integration Tests', () => {
console.log('Triggering analyst agent task...');
const tracedTaskTrigger = wrapTraced(
async () => await triggerAndPollAnalystAgent({ message_id: messageId }, 5000),
async () =>
await triggerAndPollAnalystAgent({ message_id: messageId, access_token: '' }, 5000),
{
name: 'Trigger Analyst Agent Task',
}
@ -183,7 +184,10 @@ describe('Analyst Agent Task Integration Tests', () => {
try {
console.log('Testing error handling with invalid message ID...');
const result = await triggerAndPollAnalystAgent({ message_id: invalidMessageId }, 2000);
const result = await triggerAndPollAnalystAgent(
{ message_id: invalidMessageId, access_token: '' },
2000
);
// Task should complete but with error result
expect(result).toBeDefined();
@ -213,7 +217,7 @@ describe('Analyst Agent Task Integration Tests', () => {
await expect(
triggerAndPollAnalystAgent(
// Intentionally invalid input to test validation
{ message_id: 'not-a-uuid' } as { message_id: string },
{ message_id: 'not-a-uuid', access_token: '' },
1000
)
).rejects.toThrow();

View File

@ -46,16 +46,11 @@ export const updateAssetScreenshotBucketKey = async (
throw new Error(`Asset ${assetType} with id ${assetId} not found`);
}
if (existing.screenshotBucketKey === screenshotBucketKey) {
return { updated: false };
}
await db
.update(table)
.set({
screenshotBucketKey,
screenshotTakenAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
})
.where(and(eq(table.id, assetId), isNull(table.deletedAt)));

View File

@ -55,6 +55,7 @@ export const MetricDataQuerySchema = z.object({
version_number: z.coerce.number().int().min(1).optional(),
report_file_id: z.string().uuid().optional(),
password: z.string().min(1).optional(),
is_screenshot: z.boolean().default(false).optional(),
});
export type MetricDataQuery = z.infer<typeof MetricDataQuerySchema>;

View File

@ -1,6 +1,7 @@
import type { User } from '@supabase/supabase-js';
import type { Browser, Page } from 'playwright';
import { z } from 'zod';
import { getSupabaseCookieKey, getSupabaseUser } from '../../supabase/server';
import { DEFAULT_SCREENSHOT_CONFIG } from './screenshot-config';
type BrowserParamsBase<T> = {
@ -11,8 +12,6 @@ type BrowserParamsBase<T> = {
};
export const BrowserParamsContextSchema = z.object({
supabaseUser: z.any() as z.ZodType<User>,
supabaseCookieKey: z.string(),
accessToken: z.string(),
organizationId: z.string(),
});
@ -27,8 +26,6 @@ export const browserLogin = async <T = Buffer<ArrayBufferLike>>({
height = DEFAULT_SCREENSHOT_CONFIG.height,
fullPath,
callback,
supabaseUser,
supabaseCookieKey,
accessToken,
}: BrowserParams<T>) => {
if (!accessToken) {
@ -41,7 +38,13 @@ export const browserLogin = async <T = Buffer<ArrayBufferLike>>({
const jwtPayload = JSON.parse(Buffer.from(accessToken.split('.')[1] || '', 'base64').toString());
if (!supabaseUser || supabaseUser?.is_anonymous) {
const [supabaseUser, chromium] = await Promise.all([
getSupabaseUser(accessToken),
import('playwright').then(({ chromium }) => chromium),
]);
const supabaseCookieKey = getSupabaseCookieKey();
if (!supabaseUser || supabaseUser?.is_anonymous || !supabaseCookieKey) {
throw new Error('User not authenticated');
}
@ -54,7 +57,6 @@ export const browserLogin = async <T = Buffer<ArrayBufferLike>>({
user: supabaseUser,
};
const { chromium } = await import('playwright');
const browser = await chromium.launch();
try {

View File

@ -0,0 +1,40 @@
import { type User, createClient } from '@supabase/supabase-js';
const createSupabaseClient = () => {
const supabaseUrl = process.env.SUPABASE_URL;
if (!supabaseUrl) {
throw new Error('SUPABASE_URL is not set');
}
const supabaseServiceRoleKey = process.env.SUPABASE_SERVICE_ROLE_KEY;
if (!supabaseServiceRoleKey) {
throw new Error('SUPABASE_SERVICE_ROLE_KEY is not set');
}
const supabase = createClient(supabaseUrl, supabaseServiceRoleKey);
return supabase;
};
let globalSupabase: ReturnType<typeof createSupabaseClient> | null = null;
export const getSupabaseClient = () => {
if (!globalSupabase) {
globalSupabase = createSupabaseClient();
}
return globalSupabase;
};
export const getSupabaseCookieKey = (): string => {
const supabase = getSupabaseClient();
const supabaseCookieKey = (supabase as unknown as { storageKey: string }).storageKey;
return supabaseCookieKey;
};
export const getSupabaseUser = async (jwtToken: string): Promise<User | null> => {
const supabase = getSupabaseClient();
const supabaseUser = await supabase.auth.getUser(jwtToken);
return supabaseUser.data.user;
};