slack agent react

This commit is contained in:
dal 2025-07-16 13:58:03 -06:00
parent a20c8dad56
commit 86572acfa4
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
8 changed files with 610 additions and 7 deletions

View File

@ -58,6 +58,7 @@ export class SlackOAuthService {
'mpim:read',
'mpim:write',
'reactions:write',
'reactions:read',
'users:read',
'users:read.email',
],

View File

@ -31,7 +31,8 @@
"ai": "catalog:",
"braintrust": "catalog:",
"vitest": "catalog:",
"zod": "catalog:"
"zod": "catalog:",
"drizzle-orm": "catalog:"
},
"devDependencies": {
"@trigger.dev/build": "catalog:"

View File

@ -0,0 +1,187 @@
import {
chats,
db,
getSecretByName,
messages,
slackIntegrations,
updateMessage,
} from '@buster/database';
import type { Chat, Message } from '@buster/database';
import { tasks } from '@trigger.dev/sdk';
import { and, eq, isNull } from 'drizzle-orm';
import type { analystAgentTask } from '../analyst-agent-task/analyst-agent-task';
/**
* Create a message for an existing chat
* This replicates the logic from the server's createMessage function
* but is self-contained for the trigger app
*/
export async function createMessageForChat({
chatId,
userId,
content,
}: {
chatId: string;
userId: string;
content: string;
}): Promise<Message> {
const messageId = crypto.randomUUID();
// Use transaction to ensure atomicity
const result = await db.transaction(async (tx) => {
const [message] = await tx
.insert(messages)
.values({
id: messageId,
chatId: chatId,
createdBy: userId,
requestMessage: content,
responseMessages: {},
reasoning: {},
title: content.substring(0, 255), // Ensure title fits in database
rawLlmMessages: {},
isCompleted: false,
})
.returning();
if (!message) {
throw new Error('Failed to create message');
}
// Update chat's updated_at timestamp
await tx.update(chats).set({ updatedAt: new Date().toISOString() }).where(eq(chats.id, chatId));
return message;
});
return result;
}
/**
* Trigger the analyst agent task for a message
* Returns the trigger handle ID for tracking
*/
export async function triggerAnalystAgent(messageId: string): Promise<string> {
try {
// Trigger the analyst agent task
const taskHandle = await tasks.trigger<typeof analystAgentTask>('analyst-agent-task', {
message_id: messageId,
});
// Verify trigger service received the task
if (!taskHandle.id) {
throw new Error('Trigger service returned invalid handle');
}
// Update the message with the trigger run ID
await updateMessage(messageId, {
triggerRunId: taskHandle.id,
});
return taskHandle.id;
} catch (error) {
console.error('Failed to trigger analyst agent task:', error);
throw error;
}
}
/**
* Main helper to create a message and trigger analysis
* Combines both operations for convenience
*/
export async function createMessageAndTriggerAnalysis({
chatId,
userId,
content,
}: {
chatId: string;
userId: string;
content: string;
}): Promise<{
message: Message;
triggerRunId: string;
}> {
// Create the message
const message = await createMessageForChat({
chatId,
userId,
content,
});
// Trigger the analyst agent task
const triggerRunId = await triggerAnalystAgent(message.id);
return {
message,
triggerRunId,
};
}
/**
* Fetch chat details including Slack integration information
*/
export async function getChatDetails(chatId: string): Promise<{
chat: Chat;
organizationId: string;
slackThreadTs: string | null;
slackChannelId: string | null;
}> {
const [chatRecord] = await db
.select()
.from(chats)
.where(and(eq(chats.id, chatId), isNull(chats.deletedAt)))
.limit(1);
if (!chatRecord) {
throw new Error(`Chat not found: ${chatId}`);
}
return {
chat: chatRecord,
organizationId: chatRecord.organizationId,
slackThreadTs: chatRecord.slackThreadTs,
slackChannelId: chatRecord.slackChannelId,
};
}
/**
* Get Slack integration and secrets for an organization
*/
export async function getOrganizationSlackIntegration(organizationId: string): Promise<{
integration: typeof slackIntegrations.$inferSelect;
accessToken: string;
}> {
// Get the active Slack integration for the organization
const [integration] = await db
.select()
.from(slackIntegrations)
.where(
and(
eq(slackIntegrations.organizationId, organizationId),
isNull(slackIntegrations.deletedAt),
eq(slackIntegrations.status, 'active')
)
)
.limit(1);
if (!integration) {
throw new Error(`No active Slack integration found for organization: ${organizationId}`);
}
// Get the access token from vault using the tokenVaultKey
if (!integration.tokenVaultKey) {
throw new Error(`No token vault key found for integration: ${integration.id}`);
}
const vaultSecret = await getSecretByName(integration.tokenVaultKey);
if (!vaultSecret) {
throw new Error(`No token found in vault for key: ${integration.tokenVaultKey}`);
}
return {
integration,
accessToken: vaultSecret.secret,
};
}

View File

@ -1,12 +1,22 @@
import { type TaskOutput, schemaTask } from '@trigger.dev/sdk';
import { type TaskOutput, logger, schemaTask } from '@trigger.dev/sdk';
import { z } from 'zod';
import {
createMessageAndTriggerAnalysis,
getChatDetails,
getOrganizationSlackIntegration,
} from './helpers';
import { addReaction, removeReaction, getReactions, getThreadMessages } from '@buster/slack';
const SlackAgentTaskInputSchema = z.object({
chatId: z.string().uuid(),
userId: z.string().uuid(),
});
const SlackAgentTaskOutputSchema = z.object({});
const SlackAgentTaskOutputSchema = z.object({
success: z.boolean(),
messageId: z.string().uuid(),
triggerRunId: z.string(),
});
export type SlackAgentTaskInput = z.infer<typeof SlackAgentTaskInputSchema>;
export type SlackAgentTaskOutput = z.infer<typeof SlackAgentTaskOutputSchema>;
@ -18,8 +28,132 @@ export const slackAgentTask: ReturnType<
schema: SlackAgentTaskInputSchema,
maxDuration: 300, // 300 seconds timeout
run: async (payload: SlackAgentTaskInput): Promise<SlackAgentTaskOutput> => {
return {
message: 'Hello, world!',
};
try {
logger.log('Starting Slack agent task', {
chatId: payload.chatId,
userId: payload.userId,
});
// Step 1: Get chat details first (we need this for everything else)
const chatDetails = await getChatDetails(payload.chatId);
if (!chatDetails.slackChannelId || !chatDetails.slackThreadTs) {
throw new Error('Chat is missing Slack channel or thread information');
}
logger.log('Retrieved chat details', {
chatId: payload.chatId,
organizationId: chatDetails.organizationId,
slackChannelId: chatDetails.slackChannelId,
slackThreadTs: chatDetails.slackThreadTs,
});
// Step 2: Get Slack integration for access token
const { integration, accessToken } = await getOrganizationSlackIntegration(
chatDetails.organizationId
);
logger.log('Retrieved Slack integration', {
organizationId: chatDetails.organizationId,
teamId: integration.teamId,
teamName: integration.teamName,
botUserId: integration.botUserId,
});
// Step 3: Add hourglass reaction (and remove any existing bot reactions)
try {
// First, get existing reactions to see if we need to clean up
const existingReactions = await getReactions({
accessToken,
channelId: chatDetails.slackChannelId,
messageTs: chatDetails.slackThreadTs,
});
// Remove any existing reactions from the bot
if (integration.botUserId && existingReactions.length > 0) {
const botReactions = existingReactions.filter(
reaction => reaction.users.includes(integration.botUserId!)
);
for (const reaction of botReactions) {
try {
await removeReaction({
accessToken,
channelId: chatDetails.slackChannelId,
messageTs: chatDetails.slackThreadTs,
emoji: reaction.name,
});
logger.log('Removed existing bot reaction', { emoji: reaction.name });
} catch (error) {
// Log but don't fail if we can't remove a reaction
logger.warn('Failed to remove bot reaction', {
emoji: reaction.name,
error: error instanceof Error ? error.message : 'Unknown error',
});
}
}
}
// Add the hourglass reaction
await addReaction({
accessToken,
channelId: chatDetails.slackChannelId,
messageTs: chatDetails.slackThreadTs,
emoji: 'hourglass_flowing_sand',
});
logger.log('Added hourglass reaction to Slack thread');
} catch (error) {
// Log but don't fail the entire task if reaction handling fails
logger.warn('Failed to manage Slack reactions', {
error: error instanceof Error ? error.message : 'Unknown error',
});
}
// Step 4: Fetch all needed data concurrently
const [slackMessages] = await Promise.all([
getThreadMessages({
accessToken,
channelId: chatDetails.slackChannelId,
threadTs: chatDetails.slackThreadTs,
}),
// Add more concurrent fetches here if needed
]);
logger.log('Retrieved Slack thread messages', {
messageCount: slackMessages.length,
hasParentMessage: slackMessages.length > 0,
});
// TODO: Process slack messages to generate prompt
const prompt = 'TODO: Generate from slack conversation';
// Step 5: Create message and trigger analyst agent
const { message, triggerRunId } = await createMessageAndTriggerAnalysis({
chatId: payload.chatId,
userId: payload.userId,
content: prompt,
});
logger.log('Successfully created message and triggered analyst agent', {
chatId: payload.chatId,
messageId: message.id,
triggerRunId,
});
return {
success: true,
messageId: message.id,
triggerRunId,
};
} catch (error) {
logger.error('Failed to process Slack agent task', {
chatId: payload.chatId,
userId: payload.userId,
error: error instanceof Error ? error.message : 'Unknown error',
});
throw error;
}
},
});

View File

@ -40,5 +40,17 @@ export * from './utils/validation-helpers';
export * from './utils/message-formatter';
export * from './utils/oauth-helpers';
// Reactions
export { addReaction, removeReaction, getReactions } from './reactions';
// Threads
export {
getThreadMessages,
getMessage,
getThreadReplyCount,
formatThreadMessages,
type SlackMessage,
} from './threads';
// Version
export const VERSION = '1.0.0';

View File

@ -0,0 +1,122 @@
import { WebClient } from '@slack/web-api';
/**
* Add an emoji reaction to a Slack message
* @param accessToken - Slack bot access token
* @param channelId - Slack channel ID
* @param messageTs - Message timestamp
* @param emoji - Emoji name (without colons)
* @returns Promise that resolves when reaction is added
*/
export async function addReaction({
accessToken,
channelId,
messageTs,
emoji,
}: {
accessToken: string;
channelId: string;
messageTs: string;
emoji: string;
}): Promise<void> {
const client = new WebClient(accessToken);
try {
await client.reactions.add({
channel: channelId,
timestamp: messageTs,
name: emoji,
});
} catch (error) {
// If the reaction already exists, that's fine
if (error instanceof Error && error.message.includes('already_reacted')) {
console.info(`Reaction ${emoji} already exists on message`);
return;
}
throw error;
}
}
/**
* Remove an emoji reaction from a Slack message
* @param accessToken - Slack bot access token
* @param channelId - Slack channel ID
* @param messageTs - Message timestamp
* @param emoji - Emoji name (without colons)
* @returns Promise that resolves when reaction is removed
*/
export async function removeReaction({
accessToken,
channelId,
messageTs,
emoji,
}: {
accessToken: string;
channelId: string;
messageTs: string;
emoji: string;
}): Promise<void> {
const client = new WebClient(accessToken);
try {
await client.reactions.remove({
channel: channelId,
timestamp: messageTs,
name: emoji,
});
} catch (error) {
// If the reaction doesn't exist, that's fine
if (error instanceof Error && error.message.includes('no_reaction')) {
console.info(`Reaction ${emoji} doesn't exist on message`);
return;
}
throw error;
}
}
/**
* Get reactions for a message
* @param accessToken - Slack bot access token
* @param channelId - Slack channel ID
* @param messageTs - Message timestamp
* @returns Promise with array of reactions
*/
export async function getReactions({
accessToken,
channelId,
messageTs,
}: {
accessToken: string;
channelId: string;
messageTs: string;
}): Promise<
Array<{
name: string;
users: string[];
count: number;
}>
> {
const client = new WebClient(accessToken);
try {
const result = await client.reactions.get({
channel: channelId,
timestamp: messageTs,
});
if (result.type === 'message' && result.message && 'reactions' in result.message) {
// Map Slack API reactions to our expected format
const reactions = result.message.reactions || [];
return reactions.map((reaction) => ({
name: reaction.name || '',
users: reaction.users || [],
count: reaction.count || 0,
}));
}
return [];
} catch (error) {
console.error('Failed to get reactions:', error);
throw error;
}
}

View File

@ -0,0 +1,144 @@
import { WebClient } from '@slack/web-api';
// Define our own simple types to avoid complex Slack API type issues
interface SlackBlock {
type?: string | undefined;
[key: string]: unknown;
}
interface SlackAttachment {
[key: string]: unknown;
}
export interface SlackMessage {
ts: string;
text?: string | undefined;
user?: string | undefined;
thread_ts?: string | undefined;
blocks?: SlackBlock[] | undefined;
attachments?: SlackAttachment[] | undefined;
[key: string]: unknown; // Allow additional properties from Slack API
}
/**
* Fetch all messages in a Slack thread (parent + replies)
* @param accessToken - Slack bot access token
* @param channelId - Slack channel ID
* @param threadTs - Thread timestamp (parent message timestamp)
* @returns Promise with array of messages
*/
export async function getThreadMessages({
accessToken,
channelId,
threadTs,
}: {
accessToken: string;
channelId: string;
threadTs: string;
}): Promise<SlackMessage[]> {
const client = new WebClient(accessToken);
try {
const result = await client.conversations.replies({
channel: channelId,
ts: threadTs,
inclusive: true, // Include the parent message
});
// Cast the result to our SlackMessage type
const messages = result.messages || [];
return messages as SlackMessage[];
} catch (error) {
console.error('Failed to get thread messages:', error);
throw error;
}
}
/**
* Get a single message from Slack
* @param accessToken - Slack bot access token
* @param channelId - Slack channel ID
* @param messageTs - Message timestamp
* @returns Promise with the message or null if not found
*/
export async function getMessage({
accessToken,
channelId,
messageTs,
}: {
accessToken: string;
channelId: string;
messageTs: string;
}): Promise<SlackMessage | null> {
const client = new WebClient(accessToken);
try {
const result = await client.conversations.history({
channel: channelId,
latest: messageTs,
limit: 1,
inclusive: true,
});
if (result.messages && result.messages.length > 0) {
return result.messages[0] as SlackMessage;
}
return null;
} catch (error) {
console.error('Failed to get message:', error);
throw error;
}
}
/**
* Get thread reply count for a message
* @param accessToken - Slack bot access token
* @param channelId - Slack channel ID
* @param threadTs - Thread timestamp
* @returns Promise with reply count
*/
export async function getThreadReplyCount({
accessToken,
channelId,
threadTs,
}: {
accessToken: string;
channelId: string;
threadTs: string;
}): Promise<number> {
const client = new WebClient(accessToken);
try {
const result = await client.conversations.replies({
channel: channelId,
ts: threadTs,
limit: 1, // We just need the count
});
// The first message is the parent, so subtract 1
const totalMessages = result.messages?.length || 0;
return totalMessages > 0 ? totalMessages - 1 : 0;
} catch (error) {
console.error('Failed to get thread reply count:', error);
throw error;
}
}
/**
* Format Slack messages into a readable string
* @param messages - Array of Slack messages
* @returns Formatted string with message content
*/
export function formatThreadMessages(messages: SlackMessage[]): string {
return messages
.map((msg, index) => {
const isParent = index === 0;
const prefix = isParent ? 'Original message' : `Reply ${index}`;
const user = msg.user ? `<@${msg.user}>` : 'Unknown user';
const text = msg.text || '(no text content)';
return `${prefix} from ${user}:\n${text}`;
})
.join('\n\n');
}

View File

@ -198,6 +198,9 @@ importers:
braintrust:
specifier: 'catalog:'
version: 0.0.209(@aws-sdk/credential-provider-web-identity@3.840.0)(react@18.3.1)(sswr@2.2.0(svelte@5.34.9))(svelte@5.34.9)(vue@3.5.17(typescript@5.8.3))(zod@3.25.1)
drizzle-orm:
specifier: 'catalog:'
version: 0.44.2(@opentelemetry/api@1.9.0)(@types/pg@8.15.4)(mysql2@3.14.1)(pg@8.16.3)(postgres@3.4.7)
vitest:
specifier: 'catalog:'
version: 3.2.4(@edge-runtime/vm@3.2.0)(@types/debug@4.1.12)(@types/node@24.0.10)(@vitest/ui@3.2.4)(jiti@2.4.2)(jsdom@26.1.0)(lightningcss@1.30.1)(msw@2.10.4(@types/node@24.0.10)(typescript@5.8.3))(sass@1.89.2)(terser@5.43.1)(tsx@4.20.3)(yaml@2.8.0)
@ -5950,7 +5953,6 @@ packages:
bun@1.2.18:
resolution: {integrity: sha512-OR+EpNckoJN4tHMVZPaTPxDj2RgpJgJwLruTIFYbO3bQMguLd0YrmkWKYqsiihcLgm2ehIjF/H1RLfZiRa7+qQ==}
cpu: [arm64, x64, aarch64]
os: [darwin, linux, win32]
hasBin: true