mirror of https://github.com/buster-so/buster.git
Enhance task triggering by adding concurrency key for sequential processing
- Updated `createChatHandler` to include a `concurrencyKey` when triggering the `analyst-agent-task`, ensuring tasks are processed sequentially per chat. - Adjusted related test cases to verify the inclusion of the `concurrencyKey`. - Integrated the `concurrencyKey` in the `slackAgentTask` to manage task queuing and notify users when tasks are queued.
This commit is contained in:
parent
8daceae2e3
commit
6e65bec499
|
@ -108,9 +108,15 @@ describe('createChatHandler', () => {
|
|||
mockUser,
|
||||
'550e8400-e29b-41d4-a716-446655440000'
|
||||
);
|
||||
expect(tasks.trigger).toHaveBeenCalledWith('analyst-agent-task', {
|
||||
expect(tasks.trigger).toHaveBeenCalledWith(
|
||||
'analyst-agent-task',
|
||||
{
|
||||
message_id: 'msg-123',
|
||||
});
|
||||
},
|
||||
{
|
||||
concurrencyKey: 'chat-123',
|
||||
}
|
||||
);
|
||||
expect(result).toEqual(mockChat);
|
||||
});
|
||||
|
||||
|
@ -131,9 +137,15 @@ describe('createChatHandler', () => {
|
|||
mockUser,
|
||||
mockChat
|
||||
);
|
||||
expect(tasks.trigger).toHaveBeenCalledWith('analyst-agent-task', {
|
||||
expect(tasks.trigger).toHaveBeenCalledWith(
|
||||
'analyst-agent-task',
|
||||
{
|
||||
message_id: 'msg-123',
|
||||
});
|
||||
},
|
||||
{
|
||||
concurrencyKey: 'chat-123',
|
||||
}
|
||||
);
|
||||
expect(result).toEqual(assetChat);
|
||||
});
|
||||
|
||||
|
@ -153,9 +165,15 @@ describe('createChatHandler', () => {
|
|||
);
|
||||
|
||||
expect(handleAssetChat).not.toHaveBeenCalled();
|
||||
expect(tasks.trigger).toHaveBeenCalledWith('analyst-agent-task', {
|
||||
expect(tasks.trigger).toHaveBeenCalledWith(
|
||||
'analyst-agent-task',
|
||||
{
|
||||
message_id: 'msg-123',
|
||||
});
|
||||
},
|
||||
{
|
||||
concurrencyKey: 'chat-123',
|
||||
}
|
||||
);
|
||||
expect(result).toEqual(mockChat);
|
||||
});
|
||||
|
||||
|
|
|
@ -77,9 +77,15 @@ export async function createChatHandler(
|
|||
if (request.prompt || request.asset_id) {
|
||||
try {
|
||||
// Just queue the background job - should be <100ms
|
||||
const taskHandle = await tasks.trigger('analyst-agent-task', {
|
||||
const taskHandle = await tasks.trigger(
|
||||
'analyst-agent-task',
|
||||
{
|
||||
message_id: messageId,
|
||||
});
|
||||
},
|
||||
{
|
||||
concurrencyKey: chatId, // Ensure sequential processing per chat
|
||||
}
|
||||
);
|
||||
|
||||
// Health check: Verify trigger service received the task
|
||||
// The presence of taskHandle.id confirms Trigger.dev accepted our request
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
import { queue } from '@trigger.dev/sdk';
|
||||
|
||||
/**
|
||||
* Queue configuration for analyst agent tasks.
|
||||
* This queue ensures that only one analyst task runs at a time per chat,
|
||||
* while allowing multiple chats to process concurrently.
|
||||
*
|
||||
* Usage: When triggering analyst-agent-task, use concurrencyKey with the chatId
|
||||
* to create separate queue instances for each chat.
|
||||
*/
|
||||
export const analystQueue: ReturnType<typeof queue> = queue({
|
||||
name: 'analyst-agent-queue',
|
||||
concurrencyLimit: 1, // Only 1 task runs at a time per concurrencyKey (chatId)
|
||||
});
|
|
@ -1,5 +1,6 @@
|
|||
import { logger, schemaTask, tasks } from '@trigger.dev/sdk';
|
||||
import { currentSpan, initLogger, wrapTraced } from 'braintrust';
|
||||
import { analystQueue } from '../../queues/analyst-queue';
|
||||
import { AnalystAgentTaskInputSchema, type AnalystAgentTaskOutput } from './types';
|
||||
|
||||
// Task 2 & 4: Database helpers (IMPLEMENTED)
|
||||
|
@ -276,6 +277,7 @@ export const analystAgentTask: ReturnType<
|
|||
id: 'analyst-agent-task',
|
||||
machine: 'small-2x',
|
||||
schema: AnalystAgentTaskInputSchema,
|
||||
queue: analystQueue,
|
||||
maxDuration: 600, // 10 minutes for complex analysis
|
||||
run: async (payload): Promise<AnalystAgentTaskOutput> => {
|
||||
const taskStartTime = Date.now();
|
||||
|
|
|
@ -288,14 +288,56 @@ export const slackAgentTask: ReturnType<
|
|||
messageId: message.id,
|
||||
});
|
||||
|
||||
const analystHandle = await analystAgentTask.trigger({
|
||||
const analystHandle = await analystAgentTask.trigger(
|
||||
{
|
||||
message_id: message.id,
|
||||
});
|
||||
},
|
||||
{
|
||||
concurrencyKey: payload.chatId, // Ensure sequential processing per chat
|
||||
}
|
||||
);
|
||||
|
||||
logger.log('Analyst agent task triggered', {
|
||||
runId: analystHandle.id,
|
||||
});
|
||||
|
||||
// Check if the analyst task is queued (another task is already running for this chat)
|
||||
try {
|
||||
const runStatus = await runs.retrieve(analystHandle.id);
|
||||
|
||||
if (runStatus.status === 'QUEUED') {
|
||||
logger.log('Analyst task is queued, notifying user', {
|
||||
runId: analystHandle.id,
|
||||
status: runStatus.status,
|
||||
});
|
||||
|
||||
// Send a message to Slack indicating the task is queued
|
||||
const messagingService = new SlackMessagingService();
|
||||
try {
|
||||
const queuedMessage = {
|
||||
text: "It looks like I'm still running your previous request. When that finishes I'll start working on this one!",
|
||||
thread_ts: chatDetails.slackThreadTs,
|
||||
};
|
||||
|
||||
await messagingService.sendMessage(
|
||||
accessToken,
|
||||
chatDetails.slackChannelId,
|
||||
queuedMessage
|
||||
);
|
||||
|
||||
logger.log('Sent queued message to Slack thread');
|
||||
} catch (error) {
|
||||
logger.warn('Failed to send queued message to Slack', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn('Failed to check run status', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
|
||||
// Update the message with the trigger run ID
|
||||
if (!analystHandle.id) {
|
||||
throw new Error('Trigger service returned invalid handle');
|
||||
|
|
Loading…
Reference in New Issue