diff --git a/apps/trigger/src/tasks/message-post-processing/message-post-processing.int.test.ts b/apps/trigger/src/tasks/message-post-processing/message-post-processing.int.test.ts index 0754a9887..3dfe28ae1 100644 --- a/apps/trigger/src/tasks/message-post-processing/message-post-processing.int.test.ts +++ b/apps/trigger/src/tasks/message-post-processing/message-post-processing.int.test.ts @@ -6,7 +6,7 @@ import { createTestMessage, createTestUser, } from '@buster/test-utils'; -import { tasks } from '@trigger.dev/sdk/v3'; +import { runs, tasks } from '@trigger.dev/sdk'; import { afterAll, beforeAll, describe, expect, it } from 'vitest'; import type { messagePostProcessingTask } from './message-post-processing'; @@ -19,6 +19,33 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', ( let testMessageId: string; let testOrgId: string; + async function triggerAndPollMessagePostProcessing( + payload: { messageId: string }, + pollIntervalMs = 2000, + timeoutMs = 60000 + ) { + const handle = await tasks.trigger( + 'message-post-processing', + payload + ); + + const start = Date.now(); + // poll until terminal state or timeout + // eslint-disable-next-line no-constant-condition + while (true) { + const run = await runs.retrieve(handle.id); + if (run.status === 'COMPLETED' || run.status === 'FAILED' || run.status === 'CANCELED') { + return run; + } + + if (Date.now() - start > timeoutMs) { + return run; + } + + await new Promise((resolve) => setTimeout(resolve, pollIntervalMs)); + } + } + beforeAll(async () => { // Use specific test user with datasets and permissions testUserId = 'c2dd64cd-f7f3-4884-bc91-d46ae431901e'; @@ -42,11 +69,7 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', ( const messageId = 'a3206f20-35d1-4a6c-84a7-48f8f222c39f'; // Execute task - const result = await tasks.triggerAndPoll( - 'message-post-processing', - { messageId }, - { pollIntervalMs: 2000 } - ); + const result = await triggerAndPollMessagePostProcessing({ messageId }, 2000); // Verify result structure expect(result).toBeDefined(); @@ -108,10 +131,9 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', ( }); // Execute task for follow-up - const result = await tasks.triggerAndPoll( - 'message-post-processing', + const result = await triggerAndPollMessagePostProcessing( { messageId: followUpMessageId }, - { pollIntervalMs: 2000 } + 2000 ); // Verify it's a follow-up result @@ -128,11 +150,7 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', ( const messageId = '203744bd-439f-4b3c-9ea2-ddfe243c5afe'; // Execute task - const result = await tasks.triggerAndPoll( - 'message-post-processing', - { messageId }, - { pollIntervalMs: 2000 } - ); + const result = await triggerAndPollMessagePostProcessing({ messageId }, 2000); // Should still process successfully expect(result).toBeDefined(); @@ -151,11 +169,7 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', ( it('should fail gracefully when message does not exist', async () => { const nonExistentId = '00000000-0000-0000-0000-000000000000'; - const result = await tasks.triggerAndPoll( - 'message-post-processing', - { messageId: nonExistentId }, - { pollIntervalMs: 2000 } - ); + const result = await triggerAndPollMessagePostProcessing({ messageId: nonExistentId }, 2000); expect(result.status).toBe('COMPLETED'); expect(result.output?.success).toBe(false); @@ -168,11 +182,7 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', ( const startTime = Date.now(); - await tasks.triggerAndPoll( - 'message-post-processing', - { messageId }, - { pollIntervalMs: 2000 } - ); + await triggerAndPollMessagePostProcessing({ messageId }, 2000); const duration = Date.now() - startTime; @@ -203,11 +213,7 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', ( }); // Should still process successfully - const result = await tasks.triggerAndPoll( - 'message-post-processing', - { messageId: largeMessageId }, - { pollIntervalMs: 2000 } - ); + const result = await triggerAndPollMessagePostProcessing({ messageId: largeMessageId }, 2000); expect(result).toBeDefined(); expect(result.status).toBe('COMPLETED'); diff --git a/apps/trigger/tests/integration/analyst-agent-task.int.test.ts b/apps/trigger/tests/integration/analyst-agent-task.int.test.ts index 2a5dbacdf..14024b460 100644 --- a/apps/trigger/tests/integration/analyst-agent-task.int.test.ts +++ b/apps/trigger/tests/integration/analyst-agent-task.int.test.ts @@ -1,6 +1,6 @@ import { db, eq, messages } from '@buster/database'; import { createTestChat, createTestMessage } from '@buster/test-utils'; -import { tasks } from '@trigger.dev/sdk/v3'; +import { runs, tasks } from '@trigger.dev/sdk'; import { initLogger, wrapTraced } from 'braintrust'; import { afterAll, beforeAll, describe, expect, test } from 'vitest'; import type { analystAgentTask } from '../../src/tasks/analyst-agent-task'; @@ -33,6 +33,29 @@ describe('Analyst Agent Task Integration Tests', () => { const TEST_ORG_ID = 'bf58d19a-8bb9-4f1d-a257-2d2105e7f1ce'; const TEST_MESSAGE_CONTENT = 'who is our top customer'; + async function triggerAndPollAnalystAgent( + payload: { message_id: string }, + pollIntervalMs = 2000, + timeoutMs = 30 * 60 * 1000 // align with 30 min test timeout + ) { + const handle = await tasks.trigger('analyst-agent-task', payload); + + const start = Date.now(); + // eslint-disable-next-line no-constant-condition + while (true) { + const run = await runs.retrieve(handle.id); + if (run.status === 'COMPLETED' || run.status === 'FAILED' || run.status === 'CANCELED') { + return run; + } + + if (Date.now() - start > timeoutMs) { + return run; + } + + await new Promise((resolve) => setTimeout(resolve, pollIntervalMs)); + } + } + beforeAll(() => { if (!process.env.BRAINTRUST_KEY) { throw new Error('BRAINTRUST_KEY is required for observability'); @@ -82,13 +105,7 @@ describe('Analyst Agent Task Integration Tests', () => { console.log('Triggering analyst agent task...'); const tracedTaskTrigger = wrapTraced( - async () => { - return await tasks.triggerAndPoll( - 'analyst-agent-task', - { message_id: messageId }, - { pollIntervalMs: 5000 } // Poll every 5 seconds - ); - }, + async () => await triggerAndPollAnalystAgent({ message_id: messageId }, 5000), { name: 'Trigger Analyst Agent Task', } @@ -161,11 +178,7 @@ describe('Analyst Agent Task Integration Tests', () => { try { console.log('Testing error handling with invalid message ID...'); - const result = await tasks.triggerAndPoll( - 'analyst-agent-task', - { message_id: invalidMessageId }, - { pollIntervalMs: 2000 } // Poll every 2 seconds for error case - ); + const result = await triggerAndPollAnalystAgent({ message_id: invalidMessageId }, 2000); // Task should complete but with error result expect(result).toBeDefined(); @@ -193,11 +206,10 @@ describe('Analyst Agent Task Integration Tests', () => { // Test with invalid UUID format await expect( - tasks.triggerAndPoll( - 'analyst-agent-task', + triggerAndPollAnalystAgent( // Intentionally invalid input to test validation { message_id: 'not-a-uuid' } as { message_id: string }, - { pollIntervalMs: 1000 } + 1000 ) ).rejects.toThrow();