From 6e65bec49936c679e9a4b8f03a49c981ffb9db2e Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 18 Jul 2025 23:01:32 -0600 Subject: [PATCH] Enhance task triggering by adding concurrency key for sequential processing - Updated `createChatHandler` to include a `concurrencyKey` when triggering the `analyst-agent-task`, ensuring tasks are processed sequentially per chat. - Adjusted related test cases to verify the inclusion of the `concurrencyKey`. - Integrated the `concurrencyKey` in the `slackAgentTask` to manage task queuing and notify users when tasks are queued. --- apps/server/src/api/v2/chats/handler.test.ts | 36 ++++++++++---- apps/server/src/api/v2/chats/handler.ts | 12 +++-- apps/trigger/src/queues/analyst-queue.ts | 14 ++++++ .../analyst-agent-task/analyst-agent-task.ts | 2 + .../slack-agent-task/slack-agent-task.ts | 48 +++++++++++++++++-- 5 files changed, 97 insertions(+), 15 deletions(-) create mode 100644 apps/trigger/src/queues/analyst-queue.ts diff --git a/apps/server/src/api/v2/chats/handler.test.ts b/apps/server/src/api/v2/chats/handler.test.ts index 6a24acc9b..4c8e79dff 100644 --- a/apps/server/src/api/v2/chats/handler.test.ts +++ b/apps/server/src/api/v2/chats/handler.test.ts @@ -108,9 +108,15 @@ describe('createChatHandler', () => { mockUser, '550e8400-e29b-41d4-a716-446655440000' ); - expect(tasks.trigger).toHaveBeenCalledWith('analyst-agent-task', { - message_id: 'msg-123', - }); + expect(tasks.trigger).toHaveBeenCalledWith( + 'analyst-agent-task', + { + message_id: 'msg-123', + }, + { + concurrencyKey: 'chat-123', + } + ); expect(result).toEqual(mockChat); }); @@ -131,9 +137,15 @@ describe('createChatHandler', () => { mockUser, mockChat ); - expect(tasks.trigger).toHaveBeenCalledWith('analyst-agent-task', { - message_id: 'msg-123', - }); + expect(tasks.trigger).toHaveBeenCalledWith( + 'analyst-agent-task', + { + message_id: 'msg-123', + }, + { + concurrencyKey: 'chat-123', + } + ); expect(result).toEqual(assetChat); }); @@ -153,9 +165,15 @@ describe('createChatHandler', () => { ); expect(handleAssetChat).not.toHaveBeenCalled(); - expect(tasks.trigger).toHaveBeenCalledWith('analyst-agent-task', { - message_id: 'msg-123', - }); + expect(tasks.trigger).toHaveBeenCalledWith( + 'analyst-agent-task', + { + message_id: 'msg-123', + }, + { + concurrencyKey: 'chat-123', + } + ); expect(result).toEqual(mockChat); }); diff --git a/apps/server/src/api/v2/chats/handler.ts b/apps/server/src/api/v2/chats/handler.ts index 89be85b32..3c89be23b 100644 --- a/apps/server/src/api/v2/chats/handler.ts +++ b/apps/server/src/api/v2/chats/handler.ts @@ -77,9 +77,15 @@ export async function createChatHandler( if (request.prompt || request.asset_id) { try { // Just queue the background job - should be <100ms - const taskHandle = await tasks.trigger('analyst-agent-task', { - message_id: messageId, - }); + const taskHandle = await tasks.trigger( + 'analyst-agent-task', + { + message_id: messageId, + }, + { + concurrencyKey: chatId, // Ensure sequential processing per chat + } + ); // Health check: Verify trigger service received the task // The presence of taskHandle.id confirms Trigger.dev accepted our request diff --git a/apps/trigger/src/queues/analyst-queue.ts b/apps/trigger/src/queues/analyst-queue.ts new file mode 100644 index 000000000..b72f132c1 --- /dev/null +++ b/apps/trigger/src/queues/analyst-queue.ts @@ -0,0 +1,14 @@ +import { queue } from '@trigger.dev/sdk'; + +/** + * Queue configuration for analyst agent tasks. + * This queue ensures that only one analyst task runs at a time per chat, + * while allowing multiple chats to process concurrently. + * + * Usage: When triggering analyst-agent-task, use concurrencyKey with the chatId + * to create separate queue instances for each chat. + */ +export const analystQueue: ReturnType = queue({ + name: 'analyst-agent-queue', + concurrencyLimit: 1, // Only 1 task runs at a time per concurrencyKey (chatId) +}); diff --git a/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts b/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts index 051ce3297..6476c4187 100644 --- a/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts +++ b/apps/trigger/src/tasks/analyst-agent-task/analyst-agent-task.ts @@ -1,5 +1,6 @@ import { logger, schemaTask, tasks } from '@trigger.dev/sdk'; import { currentSpan, initLogger, wrapTraced } from 'braintrust'; +import { analystQueue } from '../../queues/analyst-queue'; import { AnalystAgentTaskInputSchema, type AnalystAgentTaskOutput } from './types'; // Task 2 & 4: Database helpers (IMPLEMENTED) @@ -276,6 +277,7 @@ export const analystAgentTask: ReturnType< id: 'analyst-agent-task', machine: 'small-2x', schema: AnalystAgentTaskInputSchema, + queue: analystQueue, maxDuration: 600, // 10 minutes for complex analysis run: async (payload): Promise => { const taskStartTime = Date.now(); diff --git a/apps/trigger/src/tasks/slack-agent-task/slack-agent-task.ts b/apps/trigger/src/tasks/slack-agent-task/slack-agent-task.ts index 9eef65371..78121ee66 100644 --- a/apps/trigger/src/tasks/slack-agent-task/slack-agent-task.ts +++ b/apps/trigger/src/tasks/slack-agent-task/slack-agent-task.ts @@ -288,14 +288,56 @@ export const slackAgentTask: ReturnType< messageId: message.id, }); - const analystHandle = await analystAgentTask.trigger({ - message_id: message.id, - }); + const analystHandle = await analystAgentTask.trigger( + { + message_id: message.id, + }, + { + concurrencyKey: payload.chatId, // Ensure sequential processing per chat + } + ); logger.log('Analyst agent task triggered', { runId: analystHandle.id, }); + // Check if the analyst task is queued (another task is already running for this chat) + try { + const runStatus = await runs.retrieve(analystHandle.id); + + if (runStatus.status === 'QUEUED') { + logger.log('Analyst task is queued, notifying user', { + runId: analystHandle.id, + status: runStatus.status, + }); + + // Send a message to Slack indicating the task is queued + const messagingService = new SlackMessagingService(); + try { + const queuedMessage = { + text: "It looks like I'm still running your previous request. When that finishes I'll start working on this one!", + thread_ts: chatDetails.slackThreadTs, + }; + + await messagingService.sendMessage( + accessToken, + chatDetails.slackChannelId, + queuedMessage + ); + + logger.log('Sent queued message to Slack thread'); + } catch (error) { + logger.warn('Failed to send queued message to Slack', { + error: error instanceof Error ? error.message : 'Unknown error', + }); + } + } + } catch (error) { + logger.warn('Failed to check run status', { + error: error instanceof Error ? error.message : 'Unknown error', + }); + } + // Update the message with the trigger run ID if (!analystHandle.id) { throw new Error('Trigger service returned invalid handle');