mirror of https://github.com/buster-so/buster.git
fix trigger int tests
This commit is contained in:
parent
78374e7ddf
commit
8d438f25fd
|
@ -6,7 +6,7 @@ import {
|
|||
createTestMessage,
|
||||
createTestUser,
|
||||
} from '@buster/test-utils';
|
||||
import { tasks } from '@trigger.dev/sdk/v3';
|
||||
import { runs, tasks } from '@trigger.dev/sdk';
|
||||
import { afterAll, beforeAll, describe, expect, it } from 'vitest';
|
||||
import type { messagePostProcessingTask } from './message-post-processing';
|
||||
|
||||
|
@ -19,6 +19,33 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', (
|
|||
let testMessageId: string;
|
||||
let testOrgId: string;
|
||||
|
||||
async function triggerAndPollMessagePostProcessing(
|
||||
payload: { messageId: string },
|
||||
pollIntervalMs = 2000,
|
||||
timeoutMs = 60000
|
||||
) {
|
||||
const handle = await tasks.trigger<typeof messagePostProcessingTask>(
|
||||
'message-post-processing',
|
||||
payload
|
||||
);
|
||||
|
||||
const start = Date.now();
|
||||
// poll until terminal state or timeout
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
const run = await runs.retrieve(handle.id);
|
||||
if (run.status === 'COMPLETED' || run.status === 'FAILED' || run.status === 'CANCELED') {
|
||||
return run;
|
||||
}
|
||||
|
||||
if (Date.now() - start > timeoutMs) {
|
||||
return run;
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs));
|
||||
}
|
||||
}
|
||||
|
||||
beforeAll(async () => {
|
||||
// Use specific test user with datasets and permissions
|
||||
testUserId = 'c2dd64cd-f7f3-4884-bc91-d46ae431901e';
|
||||
|
@ -42,11 +69,7 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', (
|
|||
const messageId = 'a3206f20-35d1-4a6c-84a7-48f8f222c39f';
|
||||
|
||||
// Execute task
|
||||
const result = await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
|
||||
'message-post-processing',
|
||||
{ messageId },
|
||||
{ pollIntervalMs: 2000 }
|
||||
);
|
||||
const result = await triggerAndPollMessagePostProcessing({ messageId }, 2000);
|
||||
|
||||
// Verify result structure
|
||||
expect(result).toBeDefined();
|
||||
|
@ -108,10 +131,9 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', (
|
|||
});
|
||||
|
||||
// Execute task for follow-up
|
||||
const result = await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
|
||||
'message-post-processing',
|
||||
const result = await triggerAndPollMessagePostProcessing(
|
||||
{ messageId: followUpMessageId },
|
||||
{ pollIntervalMs: 2000 }
|
||||
2000
|
||||
);
|
||||
|
||||
// Verify it's a follow-up result
|
||||
|
@ -128,11 +150,7 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', (
|
|||
const messageId = '203744bd-439f-4b3c-9ea2-ddfe243c5afe';
|
||||
|
||||
// Execute task
|
||||
const result = await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
|
||||
'message-post-processing',
|
||||
{ messageId },
|
||||
{ pollIntervalMs: 2000 }
|
||||
);
|
||||
const result = await triggerAndPollMessagePostProcessing({ messageId }, 2000);
|
||||
|
||||
// Should still process successfully
|
||||
expect(result).toBeDefined();
|
||||
|
@ -151,11 +169,7 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', (
|
|||
it('should fail gracefully when message does not exist', async () => {
|
||||
const nonExistentId = '00000000-0000-0000-0000-000000000000';
|
||||
|
||||
const result = await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
|
||||
'message-post-processing',
|
||||
{ messageId: nonExistentId },
|
||||
{ pollIntervalMs: 2000 }
|
||||
);
|
||||
const result = await triggerAndPollMessagePostProcessing({ messageId: nonExistentId }, 2000);
|
||||
|
||||
expect(result.status).toBe('COMPLETED');
|
||||
expect(result.output?.success).toBe(false);
|
||||
|
@ -168,11 +182,7 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', (
|
|||
|
||||
const startTime = Date.now();
|
||||
|
||||
await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
|
||||
'message-post-processing',
|
||||
{ messageId },
|
||||
{ pollIntervalMs: 2000 }
|
||||
);
|
||||
await triggerAndPollMessagePostProcessing({ messageId }, 2000);
|
||||
|
||||
const duration = Date.now() - startTime;
|
||||
|
||||
|
@ -203,11 +213,7 @@ describe.skipIf(skipIntegrationTests)('messagePostProcessingTask integration', (
|
|||
});
|
||||
|
||||
// Should still process successfully
|
||||
const result = await tasks.triggerAndPoll<typeof messagePostProcessingTask>(
|
||||
'message-post-processing',
|
||||
{ messageId: largeMessageId },
|
||||
{ pollIntervalMs: 2000 }
|
||||
);
|
||||
const result = await triggerAndPollMessagePostProcessing({ messageId: largeMessageId }, 2000);
|
||||
|
||||
expect(result).toBeDefined();
|
||||
expect(result.status).toBe('COMPLETED');
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import { db, eq, messages } from '@buster/database';
|
||||
import { createTestChat, createTestMessage } from '@buster/test-utils';
|
||||
import { tasks } from '@trigger.dev/sdk/v3';
|
||||
import { runs, tasks } from '@trigger.dev/sdk';
|
||||
import { initLogger, wrapTraced } from 'braintrust';
|
||||
import { afterAll, beforeAll, describe, expect, test } from 'vitest';
|
||||
import type { analystAgentTask } from '../../src/tasks/analyst-agent-task';
|
||||
|
@ -33,6 +33,29 @@ describe('Analyst Agent Task Integration Tests', () => {
|
|||
const TEST_ORG_ID = 'bf58d19a-8bb9-4f1d-a257-2d2105e7f1ce';
|
||||
const TEST_MESSAGE_CONTENT = 'who is our top customer';
|
||||
|
||||
async function triggerAndPollAnalystAgent(
|
||||
payload: { message_id: string },
|
||||
pollIntervalMs = 2000,
|
||||
timeoutMs = 30 * 60 * 1000 // align with 30 min test timeout
|
||||
) {
|
||||
const handle = await tasks.trigger<typeof analystAgentTask>('analyst-agent-task', payload);
|
||||
|
||||
const start = Date.now();
|
||||
// eslint-disable-next-line no-constant-condition
|
||||
while (true) {
|
||||
const run = await runs.retrieve(handle.id);
|
||||
if (run.status === 'COMPLETED' || run.status === 'FAILED' || run.status === 'CANCELED') {
|
||||
return run;
|
||||
}
|
||||
|
||||
if (Date.now() - start > timeoutMs) {
|
||||
return run;
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs));
|
||||
}
|
||||
}
|
||||
|
||||
beforeAll(() => {
|
||||
if (!process.env.BRAINTRUST_KEY) {
|
||||
throw new Error('BRAINTRUST_KEY is required for observability');
|
||||
|
@ -82,13 +105,7 @@ describe('Analyst Agent Task Integration Tests', () => {
|
|||
console.log('Triggering analyst agent task...');
|
||||
|
||||
const tracedTaskTrigger = wrapTraced(
|
||||
async () => {
|
||||
return await tasks.triggerAndPoll<typeof analystAgentTask>(
|
||||
'analyst-agent-task',
|
||||
{ message_id: messageId },
|
||||
{ pollIntervalMs: 5000 } // Poll every 5 seconds
|
||||
);
|
||||
},
|
||||
async () => await triggerAndPollAnalystAgent({ message_id: messageId }, 5000),
|
||||
{
|
||||
name: 'Trigger Analyst Agent Task',
|
||||
}
|
||||
|
@ -161,11 +178,7 @@ describe('Analyst Agent Task Integration Tests', () => {
|
|||
try {
|
||||
console.log('Testing error handling with invalid message ID...');
|
||||
|
||||
const result = await tasks.triggerAndPoll<typeof analystAgentTask>(
|
||||
'analyst-agent-task',
|
||||
{ message_id: invalidMessageId },
|
||||
{ pollIntervalMs: 2000 } // Poll every 2 seconds for error case
|
||||
);
|
||||
const result = await triggerAndPollAnalystAgent({ message_id: invalidMessageId }, 2000);
|
||||
|
||||
// Task should complete but with error result
|
||||
expect(result).toBeDefined();
|
||||
|
@ -193,11 +206,10 @@ describe('Analyst Agent Task Integration Tests', () => {
|
|||
|
||||
// Test with invalid UUID format
|
||||
await expect(
|
||||
tasks.triggerAndPoll<typeof analystAgentTask>(
|
||||
'analyst-agent-task',
|
||||
triggerAndPollAnalystAgent(
|
||||
// Intentionally invalid input to test validation
|
||||
{ message_id: 'not-a-uuid' } as { message_id: string },
|
||||
{ pollIntervalMs: 1000 }
|
||||
1000
|
||||
)
|
||||
).rejects.toThrow();
|
||||
|
||||
|
|
Loading…
Reference in New Issue