From 86572acfa48c3afe910edf3c209c0a6d3f08b0f8 Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 16 Jul 2025 13:58:03 -0600 Subject: [PATCH] slack agent react --- .../v2/slack/services/slack-oauth-service.ts | 1 + apps/trigger/package.json | 3 +- .../src/tasks/slack-agent-task/helpers.ts | 187 ++++++++++++++++++ .../slack-agent-task/slack-agent-task.ts | 144 +++++++++++++- packages/slack/src/index.ts | 12 ++ packages/slack/src/reactions.ts | 122 ++++++++++++ packages/slack/src/threads.ts | 144 ++++++++++++++ pnpm-lock.yaml | 4 +- 8 files changed, 610 insertions(+), 7 deletions(-) create mode 100644 apps/trigger/src/tasks/slack-agent-task/helpers.ts create mode 100644 packages/slack/src/reactions.ts create mode 100644 packages/slack/src/threads.ts diff --git a/apps/server/src/api/v2/slack/services/slack-oauth-service.ts b/apps/server/src/api/v2/slack/services/slack-oauth-service.ts index ebaff3574..b48b95e4a 100644 --- a/apps/server/src/api/v2/slack/services/slack-oauth-service.ts +++ b/apps/server/src/api/v2/slack/services/slack-oauth-service.ts @@ -58,6 +58,7 @@ export class SlackOAuthService { 'mpim:read', 'mpim:write', 'reactions:write', + 'reactions:read', 'users:read', 'users:read.email', ], diff --git a/apps/trigger/package.json b/apps/trigger/package.json index 2305ff5ea..b8ef2bee1 100644 --- a/apps/trigger/package.json +++ b/apps/trigger/package.json @@ -31,7 +31,8 @@ "ai": "catalog:", "braintrust": "catalog:", "vitest": "catalog:", - "zod": "catalog:" + "zod": "catalog:", + "drizzle-orm": "catalog:" }, "devDependencies": { "@trigger.dev/build": "catalog:" diff --git a/apps/trigger/src/tasks/slack-agent-task/helpers.ts b/apps/trigger/src/tasks/slack-agent-task/helpers.ts new file mode 100644 index 000000000..09c00211e --- /dev/null +++ b/apps/trigger/src/tasks/slack-agent-task/helpers.ts @@ -0,0 +1,187 @@ +import { + chats, + db, + getSecretByName, + messages, + slackIntegrations, + updateMessage, +} from '@buster/database'; +import type { Chat, Message } from '@buster/database'; +import { tasks } from '@trigger.dev/sdk'; +import { and, eq, isNull } from 'drizzle-orm'; +import type { analystAgentTask } from '../analyst-agent-task/analyst-agent-task'; + +/** + * Create a message for an existing chat + * This replicates the logic from the server's createMessage function + * but is self-contained for the trigger app + */ +export async function createMessageForChat({ + chatId, + userId, + content, +}: { + chatId: string; + userId: string; + content: string; +}): Promise { + const messageId = crypto.randomUUID(); + + // Use transaction to ensure atomicity + const result = await db.transaction(async (tx) => { + const [message] = await tx + .insert(messages) + .values({ + id: messageId, + chatId: chatId, + createdBy: userId, + requestMessage: content, + responseMessages: {}, + reasoning: {}, + title: content.substring(0, 255), // Ensure title fits in database + rawLlmMessages: {}, + isCompleted: false, + }) + .returning(); + + if (!message) { + throw new Error('Failed to create message'); + } + + // Update chat's updated_at timestamp + await tx.update(chats).set({ updatedAt: new Date().toISOString() }).where(eq(chats.id, chatId)); + + return message; + }); + + return result; +} + +/** + * Trigger the analyst agent task for a message + * Returns the trigger handle ID for tracking + */ +export async function triggerAnalystAgent(messageId: string): Promise { + try { + // Trigger the analyst agent task + const taskHandle = await tasks.trigger('analyst-agent-task', { + message_id: messageId, + }); + + // Verify trigger service received the task + if (!taskHandle.id) { + throw new Error('Trigger service returned invalid handle'); + } + + // Update the message with the trigger run ID + await updateMessage(messageId, { + triggerRunId: taskHandle.id, + }); + + return taskHandle.id; + } catch (error) { + console.error('Failed to trigger analyst agent task:', error); + throw error; + } +} + +/** + * Main helper to create a message and trigger analysis + * Combines both operations for convenience + */ +export async function createMessageAndTriggerAnalysis({ + chatId, + userId, + content, +}: { + chatId: string; + userId: string; + content: string; +}): Promise<{ + message: Message; + triggerRunId: string; +}> { + // Create the message + const message = await createMessageForChat({ + chatId, + userId, + content, + }); + + // Trigger the analyst agent task + const triggerRunId = await triggerAnalystAgent(message.id); + + return { + message, + triggerRunId, + }; +} + +/** + * Fetch chat details including Slack integration information + */ +export async function getChatDetails(chatId: string): Promise<{ + chat: Chat; + organizationId: string; + slackThreadTs: string | null; + slackChannelId: string | null; +}> { + const [chatRecord] = await db + .select() + .from(chats) + .where(and(eq(chats.id, chatId), isNull(chats.deletedAt))) + .limit(1); + + if (!chatRecord) { + throw new Error(`Chat not found: ${chatId}`); + } + + return { + chat: chatRecord, + organizationId: chatRecord.organizationId, + slackThreadTs: chatRecord.slackThreadTs, + slackChannelId: chatRecord.slackChannelId, + }; +} + +/** + * Get Slack integration and secrets for an organization + */ +export async function getOrganizationSlackIntegration(organizationId: string): Promise<{ + integration: typeof slackIntegrations.$inferSelect; + accessToken: string; +}> { + // Get the active Slack integration for the organization + const [integration] = await db + .select() + .from(slackIntegrations) + .where( + and( + eq(slackIntegrations.organizationId, organizationId), + isNull(slackIntegrations.deletedAt), + eq(slackIntegrations.status, 'active') + ) + ) + .limit(1); + + if (!integration) { + throw new Error(`No active Slack integration found for organization: ${organizationId}`); + } + + // Get the access token from vault using the tokenVaultKey + if (!integration.tokenVaultKey) { + throw new Error(`No token vault key found for integration: ${integration.id}`); + } + + const vaultSecret = await getSecretByName(integration.tokenVaultKey); + + if (!vaultSecret) { + throw new Error(`No token found in vault for key: ${integration.tokenVaultKey}`); + } + + return { + integration, + accessToken: vaultSecret.secret, + }; +} + diff --git a/apps/trigger/src/tasks/slack-agent-task/slack-agent-task.ts b/apps/trigger/src/tasks/slack-agent-task/slack-agent-task.ts index ce1cfefc6..d4fd0a644 100644 --- a/apps/trigger/src/tasks/slack-agent-task/slack-agent-task.ts +++ b/apps/trigger/src/tasks/slack-agent-task/slack-agent-task.ts @@ -1,12 +1,22 @@ -import { type TaskOutput, schemaTask } from '@trigger.dev/sdk'; +import { type TaskOutput, logger, schemaTask } from '@trigger.dev/sdk'; import { z } from 'zod'; +import { + createMessageAndTriggerAnalysis, + getChatDetails, + getOrganizationSlackIntegration, +} from './helpers'; +import { addReaction, removeReaction, getReactions, getThreadMessages } from '@buster/slack'; const SlackAgentTaskInputSchema = z.object({ chatId: z.string().uuid(), userId: z.string().uuid(), }); -const SlackAgentTaskOutputSchema = z.object({}); +const SlackAgentTaskOutputSchema = z.object({ + success: z.boolean(), + messageId: z.string().uuid(), + triggerRunId: z.string(), +}); export type SlackAgentTaskInput = z.infer; export type SlackAgentTaskOutput = z.infer; @@ -18,8 +28,132 @@ export const slackAgentTask: ReturnType< schema: SlackAgentTaskInputSchema, maxDuration: 300, // 300 seconds timeout run: async (payload: SlackAgentTaskInput): Promise => { - return { - message: 'Hello, world!', - }; + try { + logger.log('Starting Slack agent task', { + chatId: payload.chatId, + userId: payload.userId, + }); + + // Step 1: Get chat details first (we need this for everything else) + const chatDetails = await getChatDetails(payload.chatId); + + if (!chatDetails.slackChannelId || !chatDetails.slackThreadTs) { + throw new Error('Chat is missing Slack channel or thread information'); + } + + logger.log('Retrieved chat details', { + chatId: payload.chatId, + organizationId: chatDetails.organizationId, + slackChannelId: chatDetails.slackChannelId, + slackThreadTs: chatDetails.slackThreadTs, + }); + + // Step 2: Get Slack integration for access token + const { integration, accessToken } = await getOrganizationSlackIntegration( + chatDetails.organizationId + ); + + logger.log('Retrieved Slack integration', { + organizationId: chatDetails.organizationId, + teamId: integration.teamId, + teamName: integration.teamName, + botUserId: integration.botUserId, + }); + + // Step 3: Add hourglass reaction (and remove any existing bot reactions) + try { + // First, get existing reactions to see if we need to clean up + const existingReactions = await getReactions({ + accessToken, + channelId: chatDetails.slackChannelId, + messageTs: chatDetails.slackThreadTs, + }); + + // Remove any existing reactions from the bot + if (integration.botUserId && existingReactions.length > 0) { + const botReactions = existingReactions.filter( + reaction => reaction.users.includes(integration.botUserId!) + ); + + for (const reaction of botReactions) { + try { + await removeReaction({ + accessToken, + channelId: chatDetails.slackChannelId, + messageTs: chatDetails.slackThreadTs, + emoji: reaction.name, + }); + logger.log('Removed existing bot reaction', { emoji: reaction.name }); + } catch (error) { + // Log but don't fail if we can't remove a reaction + logger.warn('Failed to remove bot reaction', { + emoji: reaction.name, + error: error instanceof Error ? error.message : 'Unknown error', + }); + } + } + } + + // Add the hourglass reaction + await addReaction({ + accessToken, + channelId: chatDetails.slackChannelId, + messageTs: chatDetails.slackThreadTs, + emoji: 'hourglass_flowing_sand', + }); + + logger.log('Added hourglass reaction to Slack thread'); + } catch (error) { + // Log but don't fail the entire task if reaction handling fails + logger.warn('Failed to manage Slack reactions', { + error: error instanceof Error ? error.message : 'Unknown error', + }); + } + + // Step 4: Fetch all needed data concurrently + const [slackMessages] = await Promise.all([ + getThreadMessages({ + accessToken, + channelId: chatDetails.slackChannelId, + threadTs: chatDetails.slackThreadTs, + }), + // Add more concurrent fetches here if needed + ]); + + logger.log('Retrieved Slack thread messages', { + messageCount: slackMessages.length, + hasParentMessage: slackMessages.length > 0, + }); + + // TODO: Process slack messages to generate prompt + const prompt = 'TODO: Generate from slack conversation'; + + // Step 5: Create message and trigger analyst agent + const { message, triggerRunId } = await createMessageAndTriggerAnalysis({ + chatId: payload.chatId, + userId: payload.userId, + content: prompt, + }); + + logger.log('Successfully created message and triggered analyst agent', { + chatId: payload.chatId, + messageId: message.id, + triggerRunId, + }); + + return { + success: true, + messageId: message.id, + triggerRunId, + }; + } catch (error) { + logger.error('Failed to process Slack agent task', { + chatId: payload.chatId, + userId: payload.userId, + error: error instanceof Error ? error.message : 'Unknown error', + }); + + throw error; + } }, }); diff --git a/packages/slack/src/index.ts b/packages/slack/src/index.ts index 4193ee678..f32b62228 100644 --- a/packages/slack/src/index.ts +++ b/packages/slack/src/index.ts @@ -40,5 +40,17 @@ export * from './utils/validation-helpers'; export * from './utils/message-formatter'; export * from './utils/oauth-helpers'; +// Reactions +export { addReaction, removeReaction, getReactions } from './reactions'; + +// Threads +export { + getThreadMessages, + getMessage, + getThreadReplyCount, + formatThreadMessages, + type SlackMessage, +} from './threads'; + // Version export const VERSION = '1.0.0'; diff --git a/packages/slack/src/reactions.ts b/packages/slack/src/reactions.ts new file mode 100644 index 000000000..b7049d8a7 --- /dev/null +++ b/packages/slack/src/reactions.ts @@ -0,0 +1,122 @@ +import { WebClient } from '@slack/web-api'; + +/** + * Add an emoji reaction to a Slack message + * @param accessToken - Slack bot access token + * @param channelId - Slack channel ID + * @param messageTs - Message timestamp + * @param emoji - Emoji name (without colons) + * @returns Promise that resolves when reaction is added + */ +export async function addReaction({ + accessToken, + channelId, + messageTs, + emoji, +}: { + accessToken: string; + channelId: string; + messageTs: string; + emoji: string; +}): Promise { + const client = new WebClient(accessToken); + + try { + await client.reactions.add({ + channel: channelId, + timestamp: messageTs, + name: emoji, + }); + } catch (error) { + // If the reaction already exists, that's fine + if (error instanceof Error && error.message.includes('already_reacted')) { + console.info(`Reaction ${emoji} already exists on message`); + return; + } + throw error; + } +} + +/** + * Remove an emoji reaction from a Slack message + * @param accessToken - Slack bot access token + * @param channelId - Slack channel ID + * @param messageTs - Message timestamp + * @param emoji - Emoji name (without colons) + * @returns Promise that resolves when reaction is removed + */ +export async function removeReaction({ + accessToken, + channelId, + messageTs, + emoji, +}: { + accessToken: string; + channelId: string; + messageTs: string; + emoji: string; +}): Promise { + const client = new WebClient(accessToken); + + try { + await client.reactions.remove({ + channel: channelId, + timestamp: messageTs, + name: emoji, + }); + } catch (error) { + // If the reaction doesn't exist, that's fine + if (error instanceof Error && error.message.includes('no_reaction')) { + console.info(`Reaction ${emoji} doesn't exist on message`); + return; + } + throw error; + } +} + +/** + * Get reactions for a message + * @param accessToken - Slack bot access token + * @param channelId - Slack channel ID + * @param messageTs - Message timestamp + * @returns Promise with array of reactions + */ +export async function getReactions({ + accessToken, + channelId, + messageTs, +}: { + accessToken: string; + channelId: string; + messageTs: string; +}): Promise< + Array<{ + name: string; + users: string[]; + count: number; + }> +> { + const client = new WebClient(accessToken); + + try { + const result = await client.reactions.get({ + channel: channelId, + timestamp: messageTs, + }); + + if (result.type === 'message' && result.message && 'reactions' in result.message) { + // Map Slack API reactions to our expected format + const reactions = result.message.reactions || []; + return reactions.map((reaction) => ({ + name: reaction.name || '', + users: reaction.users || [], + count: reaction.count || 0, + })); + } + + return []; + } catch (error) { + console.error('Failed to get reactions:', error); + throw error; + } +} diff --git a/packages/slack/src/threads.ts b/packages/slack/src/threads.ts new file mode 100644 index 000000000..5f11220fc --- /dev/null +++ b/packages/slack/src/threads.ts @@ -0,0 +1,144 @@ +import { WebClient } from '@slack/web-api'; + +// Define our own simple types to avoid complex Slack API type issues +interface SlackBlock { + type?: string | undefined; + [key: string]: unknown; +} + +interface SlackAttachment { + [key: string]: unknown; +} + +export interface SlackMessage { + ts: string; + text?: string | undefined; + user?: string | undefined; + thread_ts?: string | undefined; + blocks?: SlackBlock[] | undefined; + attachments?: SlackAttachment[] | undefined; + [key: string]: unknown; // Allow additional properties from Slack API +} + +/** + * Fetch all messages in a Slack thread (parent + replies) + * @param accessToken - Slack bot access token + * @param channelId - Slack channel ID + * @param threadTs - Thread timestamp (parent message timestamp) + * @returns Promise with array of messages + */ +export async function getThreadMessages({ + accessToken, + channelId, + threadTs, +}: { + accessToken: string; + channelId: string; + threadTs: string; +}): Promise { + const client = new WebClient(accessToken); + + try { + const result = await client.conversations.replies({ + channel: channelId, + ts: threadTs, + inclusive: true, // Include the parent message + }); + + // Cast the result to our SlackMessage type + const messages = result.messages || []; + return messages as SlackMessage[]; + } catch (error) { + console.error('Failed to get thread messages:', error); + throw error; + } +} + +/** + * Get a single message from Slack + * @param accessToken - Slack bot access token + * @param channelId - Slack channel ID + * @param messageTs - Message timestamp + * @returns Promise with the message or null if not found + */ +export async function getMessage({ + accessToken, + channelId, + messageTs, +}: { + accessToken: string; + channelId: string; + messageTs: string; +}): Promise { + const client = new WebClient(accessToken); + + try { + const result = await client.conversations.history({ + channel: channelId, + latest: messageTs, + limit: 1, + inclusive: true, + }); + + if (result.messages && result.messages.length > 0) { + return result.messages[0] as SlackMessage; + } + + return null; + } catch (error) { + console.error('Failed to get message:', error); + throw error; + } +} + +/** + * Get thread reply count for a message + * @param accessToken - Slack bot access token + * @param channelId - Slack channel ID + * @param threadTs - Thread timestamp + * @returns Promise with reply count + */ +export async function getThreadReplyCount({ + accessToken, + channelId, + threadTs, +}: { + accessToken: string; + channelId: string; + threadTs: string; +}): Promise { + const client = new WebClient(accessToken); + + try { + const result = await client.conversations.replies({ + channel: channelId, + ts: threadTs, + limit: 1, // We just need the count + }); + + // The first message is the parent, so subtract 1 + const totalMessages = result.messages?.length || 0; + return totalMessages > 0 ? totalMessages - 1 : 0; + } catch (error) { + console.error('Failed to get thread reply count:', error); + throw error; + } +} + +/** + * Format Slack messages into a readable string + * @param messages - Array of Slack messages + * @returns Formatted string with message content + */ +export function formatThreadMessages(messages: SlackMessage[]): string { + return messages + .map((msg, index) => { + const isParent = index === 0; + const prefix = isParent ? 'Original message' : `Reply ${index}`; + const user = msg.user ? `<@${msg.user}>` : 'Unknown user'; + const text = msg.text || '(no text content)'; + + return `${prefix} from ${user}:\n${text}`; + }) + .join('\n\n'); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c9a91f2c3..5d1aadd7a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -198,6 +198,9 @@ importers: braintrust: specifier: 'catalog:' version: 0.0.209(@aws-sdk/credential-provider-web-identity@3.840.0)(react@18.3.1)(sswr@2.2.0(svelte@5.34.9))(svelte@5.34.9)(vue@3.5.17(typescript@5.8.3))(zod@3.25.1) + drizzle-orm: + specifier: 'catalog:' + version: 0.44.2(@opentelemetry/api@1.9.0)(@types/pg@8.15.4)(mysql2@3.14.1)(pg@8.16.3)(postgres@3.4.7) vitest: specifier: 'catalog:' version: 3.2.4(@edge-runtime/vm@3.2.0)(@types/debug@4.1.12)(@types/node@24.0.10)(@vitest/ui@3.2.4)(jiti@2.4.2)(jsdom@26.1.0)(lightningcss@1.30.1)(msw@2.10.4(@types/node@24.0.10)(typescript@5.8.3))(sass@1.89.2)(terser@5.43.1)(tsx@4.20.3)(yaml@2.8.0) @@ -5950,7 +5953,6 @@ packages: bun@1.2.18: resolution: {integrity: sha512-OR+EpNckoJN4tHMVZPaTPxDj2RgpJgJwLruTIFYbO3bQMguLd0YrmkWKYqsiihcLgm2ehIjF/H1RLfZiRa7+qQ==} - cpu: [arm64, x64, aarch64] os: [darwin, linux, win32] hasBin: true