mirror of https://github.com/buster-so/buster.git
react appropriately
This commit is contained in:
parent
26f196c77d
commit
8221bca8de
|
@ -7,9 +7,7 @@ import {
|
|||
updateMessage,
|
||||
} from '@buster/database';
|
||||
import type { Chat, Message } from '@buster/database';
|
||||
import { tasks } from '@trigger.dev/sdk';
|
||||
import { and, eq, isNull } from 'drizzle-orm';
|
||||
import type { analystAgentTask } from '../analyst-agent-task/analyst-agent-task';
|
||||
|
||||
/**
|
||||
* Create a message for an existing chat
|
||||
|
@ -58,38 +56,10 @@ export async function createMessageForChat({
|
|||
}
|
||||
|
||||
/**
|
||||
* Trigger the analyst agent task for a message
|
||||
* Returns the trigger handle ID for tracking
|
||||
* Main helper to create a message
|
||||
* Now only creates the message, triggering is handled in the main task
|
||||
*/
|
||||
export async function triggerAnalystAgent(messageId: string): Promise<string> {
|
||||
try {
|
||||
// Trigger the analyst agent task
|
||||
const taskHandle = await tasks.trigger<typeof analystAgentTask>('analyst-agent-task', {
|
||||
message_id: messageId,
|
||||
});
|
||||
|
||||
// Verify trigger service received the task
|
||||
if (!taskHandle.id) {
|
||||
throw new Error('Trigger service returned invalid handle');
|
||||
}
|
||||
|
||||
// Update the message with the trigger run ID
|
||||
await updateMessage(messageId, {
|
||||
triggerRunId: taskHandle.id,
|
||||
});
|
||||
|
||||
return taskHandle.id;
|
||||
} catch (error) {
|
||||
console.error('Failed to trigger analyst agent task:', error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Main helper to create a message and trigger analysis
|
||||
* Combines both operations for convenience
|
||||
*/
|
||||
export async function createMessageAndTriggerAnalysis({
|
||||
export async function createMessage({
|
||||
chatId,
|
||||
userId,
|
||||
content,
|
||||
|
@ -97,10 +67,7 @@ export async function createMessageAndTriggerAnalysis({
|
|||
chatId: string;
|
||||
userId: string;
|
||||
content: string;
|
||||
}): Promise<{
|
||||
message: Message;
|
||||
triggerRunId: string;
|
||||
}> {
|
||||
}): Promise<Message> {
|
||||
// Create the message
|
||||
const message = await createMessageForChat({
|
||||
chatId,
|
||||
|
@ -108,13 +75,7 @@ export async function createMessageAndTriggerAnalysis({
|
|||
content,
|
||||
});
|
||||
|
||||
// Trigger the analyst agent task
|
||||
const triggerRunId = await triggerAnalystAgent(message.id);
|
||||
|
||||
return {
|
||||
message,
|
||||
triggerRunId,
|
||||
};
|
||||
return message;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -184,3 +145,58 @@ export async function getOrganizationSlackIntegration(organizationId: string): P
|
|||
accessToken: vaultSecret.secret,
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter Slack messages to only include non-bot messages after the most recent app mention
|
||||
*
|
||||
* @param messages - Array of Slack messages from a thread
|
||||
* @param botUserId - The bot user ID to identify bot messages
|
||||
* @returns Object containing array of non-bot messages that came after the most recent @mention and the mention message timestamp
|
||||
*/
|
||||
export function filterMessagesAfterLastMention(
|
||||
messages: Array<{
|
||||
ts: string;
|
||||
text?: string | undefined;
|
||||
user?: string | undefined;
|
||||
[key: string]: unknown;
|
||||
}>,
|
||||
botUserId: string
|
||||
): {
|
||||
filteredMessages: Array<{
|
||||
ts: string;
|
||||
text?: string | undefined;
|
||||
user?: string | undefined;
|
||||
[key: string]: unknown;
|
||||
}>;
|
||||
mentionMessageTs: string | null;
|
||||
} {
|
||||
if (!messages || messages.length === 0) {
|
||||
return { filteredMessages: [], mentionMessageTs: null };
|
||||
}
|
||||
|
||||
// Find the most recent message that contains an app mention (@Buster)
|
||||
let lastMentionIndex = -1;
|
||||
let mentionMessageTs: string | null = null;
|
||||
|
||||
// Iterate backwards to find the most recent mention
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
const message = messages[i];
|
||||
if (message?.text?.includes(`<@${botUserId}>`)) {
|
||||
lastMentionIndex = i;
|
||||
mentionMessageTs = message.ts;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If no mention found, return empty array
|
||||
if (lastMentionIndex === -1) {
|
||||
return { filteredMessages: [], mentionMessageTs: null };
|
||||
}
|
||||
|
||||
// Filter messages: only non-bot messages after the last mention
|
||||
const filteredMessages = messages
|
||||
.slice(lastMentionIndex + 1) // Get messages after the mention
|
||||
.filter((message) => message.user !== botUserId); // Remove bot messages
|
||||
|
||||
return { filteredMessages, mentionMessageTs };
|
||||
}
|
||||
|
|
|
@ -5,10 +5,12 @@ import {
|
|||
getThreadMessages,
|
||||
removeReaction,
|
||||
} from '@buster/slack';
|
||||
import { type TaskOutput, logger, schemaTask } from '@trigger.dev/sdk';
|
||||
import { type TaskOutput, logger, runs, schemaTask, wait } from '@trigger.dev/sdk';
|
||||
import { z } from 'zod';
|
||||
import { analystAgentTask } from '../analyst-agent-task/analyst-agent-task';
|
||||
import {
|
||||
createMessageAndTriggerAnalysis,
|
||||
createMessage,
|
||||
filterMessagesAfterLastMention,
|
||||
getChatDetails,
|
||||
getOrganizationSlackIntegration,
|
||||
} from './helpers';
|
||||
|
@ -98,13 +100,52 @@ export const slackAgentTask: ReturnType<
|
|||
botUserId: integration.botUserId,
|
||||
});
|
||||
|
||||
// Step 3: Add hourglass reaction (and remove any existing bot reactions)
|
||||
// Step 3: We'll add reactions after we fetch messages and find the mention
|
||||
|
||||
// Step 4: Fetch all needed data concurrently
|
||||
const [slackMessages] = await Promise.all([
|
||||
getThreadMessages({
|
||||
accessToken,
|
||||
channelId: chatDetails.slackChannelId,
|
||||
threadTs: chatDetails.slackThreadTs,
|
||||
}),
|
||||
// Add more concurrent fetches here if needed
|
||||
]);
|
||||
|
||||
logger.log('Retrieved Slack thread messages', {
|
||||
messageCount: slackMessages.length,
|
||||
hasParentMessage: slackMessages.length > 0,
|
||||
});
|
||||
|
||||
// Filter messages to only include non-bot messages after the most recent app mention
|
||||
if (!integration.botUserId) {
|
||||
logger.error('No bot user ID found for Slack integration');
|
||||
throw new Error('Slack integration is missing bot user ID');
|
||||
}
|
||||
|
||||
const { filteredMessages: relevantMessages, mentionMessageTs } =
|
||||
filterMessagesAfterLastMention(slackMessages, integration.botUserId);
|
||||
|
||||
logger.log('Filtered relevant messages', {
|
||||
originalCount: slackMessages.length,
|
||||
filteredCount: relevantMessages.length,
|
||||
botUserId: integration.botUserId,
|
||||
mentionMessageTs,
|
||||
});
|
||||
|
||||
// If no mention was found, we can't proceed
|
||||
if (!mentionMessageTs) {
|
||||
logger.error('No @Buster mention found in thread');
|
||||
throw new Error('No @Buster mention found in the thread');
|
||||
}
|
||||
|
||||
// Step 5: Add hourglass reaction to the message that mentioned @Buster
|
||||
try {
|
||||
// First, get existing reactions to see if we need to clean up
|
||||
const existingReactions = await getReactions({
|
||||
accessToken,
|
||||
channelId: chatDetails.slackChannelId,
|
||||
messageTs: chatDetails.slackThreadTs,
|
||||
messageTs: mentionMessageTs,
|
||||
});
|
||||
|
||||
// Remove any existing reactions from the bot
|
||||
|
@ -119,7 +160,7 @@ export const slackAgentTask: ReturnType<
|
|||
await removeReaction({
|
||||
accessToken,
|
||||
channelId: chatDetails.slackChannelId,
|
||||
messageTs: chatDetails.slackThreadTs,
|
||||
messageTs: mentionMessageTs,
|
||||
emoji: reaction.name,
|
||||
});
|
||||
logger.log('Removed existing bot reaction', { emoji: reaction.name });
|
||||
|
@ -137,11 +178,13 @@ export const slackAgentTask: ReturnType<
|
|||
await addReaction({
|
||||
accessToken,
|
||||
channelId: chatDetails.slackChannelId,
|
||||
messageTs: chatDetails.slackThreadTs,
|
||||
messageTs: mentionMessageTs,
|
||||
emoji: 'hourglass_flowing_sand',
|
||||
});
|
||||
|
||||
logger.log('Added hourglass reaction to Slack thread');
|
||||
logger.log('Added hourglass reaction to message with @Buster mention', {
|
||||
messageTs: mentionMessageTs,
|
||||
});
|
||||
} catch (error) {
|
||||
// Log but don't fail the entire task if reaction handling fails
|
||||
logger.warn('Failed to manage Slack reactions', {
|
||||
|
@ -149,41 +192,181 @@ export const slackAgentTask: ReturnType<
|
|||
});
|
||||
}
|
||||
|
||||
// Step 4: Fetch all needed data concurrently
|
||||
const [slackMessages] = await Promise.all([
|
||||
getThreadMessages({
|
||||
accessToken,
|
||||
channelId: chatDetails.slackChannelId,
|
||||
threadTs: chatDetails.slackThreadTs,
|
||||
}),
|
||||
// Add more concurrent fetches here if needed
|
||||
]);
|
||||
// Generate prompt from the filtered messages
|
||||
const prompt =
|
||||
relevantMessages
|
||||
.map((msg) => msg.text || '')
|
||||
.filter((text) => text.trim() !== '')
|
||||
.join(' ')
|
||||
.trim() || 'who is my top customer?'; // Fallback if no messages found
|
||||
|
||||
logger.log('Retrieved Slack thread messages', {
|
||||
messageCount: slackMessages.length,
|
||||
hasParentMessage: slackMessages.length > 0,
|
||||
});
|
||||
|
||||
// TODO: Process slack messages to generate prompt
|
||||
const prompt = 'TODO: Generate from slack conversation';
|
||||
|
||||
// Step 5: Create message and trigger analyst agent
|
||||
const { message, triggerRunId } = await createMessageAndTriggerAnalysis({
|
||||
// Step 6: Create message
|
||||
const message = await createMessage({
|
||||
chatId: payload.chatId,
|
||||
userId: payload.userId,
|
||||
content: prompt,
|
||||
});
|
||||
|
||||
logger.log('Successfully created message and triggered analyst agent', {
|
||||
logger.log('Successfully created message', {
|
||||
chatId: payload.chatId,
|
||||
messageId: message.id,
|
||||
triggerRunId,
|
||||
});
|
||||
|
||||
// Step 7: Trigger analyst agent task (without waiting)
|
||||
logger.log('Triggering analyst agent task', {
|
||||
messageId: message.id,
|
||||
});
|
||||
|
||||
const analystHandle = await analystAgentTask.trigger({
|
||||
message_id: message.id,
|
||||
});
|
||||
|
||||
logger.log('Analyst agent task triggered', {
|
||||
runId: analystHandle.id,
|
||||
});
|
||||
|
||||
// Step 8: Send initial Slack message immediately after triggering
|
||||
const messagingService = new SlackMessagingService();
|
||||
const busterUrl = process.env.BUSTER_URL || 'https://platform.buster.so';
|
||||
|
||||
try {
|
||||
const progressMessage = {
|
||||
text: `I've started working on your request! You can view it here: ${busterUrl}/app/chats/${payload.chatId}`,
|
||||
thread_ts: chatDetails.slackThreadTs,
|
||||
};
|
||||
|
||||
await messagingService.sendMessage(
|
||||
accessToken,
|
||||
chatDetails.slackChannelId,
|
||||
progressMessage
|
||||
);
|
||||
|
||||
logger.log('Sent progress message to Slack thread');
|
||||
} catch (error) {
|
||||
// Log but don't fail the task if we can't send the progress message
|
||||
logger.warn('Failed to send progress message to Slack', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
|
||||
// Step 9: Poll for analyst task completion
|
||||
let isComplete = false;
|
||||
let analystResult: { ok: boolean; output?: unknown; error?: unknown } | null = null;
|
||||
const maxPollingTime = 30 * 60 * 1000; // 30 minutes
|
||||
const pollingInterval = 5000; // 5 seconds
|
||||
const startTime = Date.now();
|
||||
|
||||
while (!isComplete && Date.now() - startTime < maxPollingTime) {
|
||||
await wait.for({ seconds: pollingInterval / 1000 });
|
||||
|
||||
try {
|
||||
const run = await runs.retrieve(analystHandle.id);
|
||||
|
||||
logger.log('Polling analyst task status', {
|
||||
runId: analystHandle.id,
|
||||
status: run.status,
|
||||
});
|
||||
|
||||
if (run.status === 'COMPLETED') {
|
||||
isComplete = true;
|
||||
analystResult = { ok: true, output: run.output };
|
||||
} else if (
|
||||
run.status === 'SYSTEM_FAILURE' ||
|
||||
run.status === 'CRASHED' ||
|
||||
run.status === 'CANCELED' ||
|
||||
run.status === 'TIMED_OUT' ||
|
||||
run.status === 'INTERRUPTED'
|
||||
) {
|
||||
isComplete = true;
|
||||
analystResult = { ok: false, error: run.error || 'Task failed' };
|
||||
}
|
||||
} catch (error) {
|
||||
logger.warn('Failed to retrieve run status', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Check if we timed out
|
||||
if (!isComplete) {
|
||||
logger.error('Analyst task polling timed out');
|
||||
analystResult = { ok: false, error: 'Task timed out' };
|
||||
}
|
||||
|
||||
// Handle analyst task result
|
||||
if (!analystResult || !analystResult.ok) {
|
||||
// If analyst task failed, notify the user
|
||||
logger.error('Analyst agent task failed', {
|
||||
error: analystResult?.error || 'Unknown error',
|
||||
});
|
||||
|
||||
try {
|
||||
await messagingService.sendMessage(accessToken, chatDetails.slackChannelId, {
|
||||
text: `Sorry, I encountered an error while processing your request. Please try again or contact support if the issue persists.`,
|
||||
thread_ts: chatDetails.slackThreadTs,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.warn('Failed to send error message to Slack', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
|
||||
throw new Error(
|
||||
`Analyst agent task failed: ${JSON.stringify(analystResult?.error || 'Unknown error')}`
|
||||
);
|
||||
}
|
||||
|
||||
// Step 10: Update reactions - remove hourglass, add checkmark on the mention message
|
||||
try {
|
||||
// Remove the hourglass reaction
|
||||
await removeReaction({
|
||||
accessToken,
|
||||
channelId: chatDetails.slackChannelId,
|
||||
messageTs: mentionMessageTs,
|
||||
emoji: 'hourglass_flowing_sand',
|
||||
});
|
||||
|
||||
// Add the checkmark reaction
|
||||
await addReaction({
|
||||
accessToken,
|
||||
channelId: chatDetails.slackChannelId,
|
||||
messageTs: mentionMessageTs,
|
||||
emoji: 'white_check_mark',
|
||||
});
|
||||
|
||||
logger.log('Updated Slack reactions on mention message', {
|
||||
messageTs: mentionMessageTs,
|
||||
});
|
||||
} catch (error) {
|
||||
logger.warn('Failed to update Slack reactions', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
|
||||
// Step 11: Send completion message
|
||||
try {
|
||||
const completionMessage = {
|
||||
text: `I've finished working on your request!`,
|
||||
thread_ts: chatDetails.slackThreadTs,
|
||||
};
|
||||
|
||||
await messagingService.sendMessage(
|
||||
accessToken,
|
||||
chatDetails.slackChannelId,
|
||||
completionMessage
|
||||
);
|
||||
|
||||
logger.log('Sent completion message to Slack thread');
|
||||
} catch (error) {
|
||||
logger.warn('Failed to send completion message to Slack', {
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
messageId: message.id,
|
||||
triggerRunId,
|
||||
triggerRunId: analystHandle.id,
|
||||
};
|
||||
} catch (error) {
|
||||
logger.error('Failed to process Slack agent task', {
|
||||
|
|
Loading…
Reference in New Issue