ok hierarchy on files being sent back

This commit is contained in:
dal 2025-03-12 12:36:10 -06:00
parent 5861f81b93
commit e7d90bae03
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
2 changed files with 255 additions and 157 deletions

View File

@ -21,6 +21,7 @@ use database::{
models::{Chat, DashboardFile, Message, MessageToFile, MetricFile, User},
pool::get_pg_pool,
schema::{chats, dashboard_files, messages, messages_to_files, metric_files},
types::{DashboardYml, MetricYml},
};
use diesel::{insert_into, ExpressionMethods};
use diesel_async::RunQueryDsl;
@ -138,6 +139,167 @@ impl ChunkTracker {
}
}
// Add this new struct before post_chat_handler
struct FileMessageTracker {
metrics: Vec<BusterChatMessageContainer>,
dashboards: Vec<BusterChatMessageContainer>,
reasoning_messages: Vec<BusterReasoningMessageContainer>,
}
impl FileMessageTracker {
fn new() -> Self {
Self {
metrics: Vec::new(),
dashboards: Vec::new(),
reasoning_messages: Vec::new(),
}
}
fn add_message(&mut self, container: BusterChatMessageContainer) {
// We no longer need this since we'll create messages from reasoning messages
}
fn add_reasoning_message(&mut self, container: BusterReasoningMessageContainer) {
self.reasoning_messages.push(container);
}
fn get_filtered_messages(&self) -> Vec<BusterChatMessageContainer> {
// If no files, return empty vec
if self.metrics.is_empty() && self.dashboards.is_empty() {
return Vec::new();
}
// Apply the filtering rules
if self.dashboards.is_empty() {
// No dashboards - return all metrics
if self.metrics.len() == 1 {
// Single metric case
return vec![self.metrics[0].clone()];
} else {
// Multiple metrics case
return self.metrics.clone();
}
} else if self.dashboards.len() == 1 && self.metrics.is_empty() {
// Single dashboard, no metrics
return vec![self.dashboards[0].clone()];
}
// Complex case: We have both dashboards and metrics or multiple dashboards
let mut filtered_messages = Vec::new();
let mut metrics_in_dashboards: std::collections::HashSet<String> = std::collections::HashSet::new();
// First add all dashboards
filtered_messages.extend(self.dashboards.clone());
// Collect metrics that are in dashboards
for dashboard in &self.dashboards {
if let BusterChatMessage::File { file_name, .. } = &dashboard.response_message {
// Find the corresponding reasoning message that contains the dashboard content
for reasoning in &self.reasoning_messages {
if let BusterReasoningMessage::File(file) = &reasoning.reasoning {
if file.message_type == "files" && file.status == "completed" {
for (_, file_content) in &file.files {
if file_content.file_type == "dashboard" && file_content.file_name == *file_name {
// Found the dashboard content, parse it to get metric IDs
if let Some(text) = &file_content.file.text {
if let Ok(dashboard) = serde_yaml::from_str::<DashboardYml>(text) {
// Collect all metric IDs from the dashboard
for row in dashboard.rows {
for item in row.items {
metrics_in_dashboards.insert(item.id.to_string());
}
}
}
}
}
}
}
}
}
}
}
// Add metrics that aren't in any dashboard
for metric in &self.metrics {
if let BusterChatMessage::File { id, .. } = &metric.response_message {
if !metrics_in_dashboards.contains(id) {
filtered_messages.push(metric.clone());
}
}
}
filtered_messages
}
fn analyze_dashboard_contents(&mut self, containers: &[BusterContainer]) {
// Clear existing collections since we'll rebuild them from reasoning messages
self.metrics.clear();
self.dashboards.clear();
let mut metrics_in_dashboards: std::collections::HashSet<String> = std::collections::HashSet::new();
// First process all reasoning messages to create file messages and collect dashboard metric IDs
for container in containers {
if let BusterContainer::ReasoningMessage(reasoning) = container {
if let BusterReasoningMessage::File(file) = &reasoning.reasoning {
if file.message_type == "files" && file.status == "completed" {
// Create file messages for each file
for (file_id, file_content) in &file.files {
let response_message = BusterChatMessage::File {
id: file_content.id.clone(),
file_type: file_content.file_type.clone(),
file_name: file_content.file_name.clone(),
version_number: file_content.version_number,
version_id: file_content.version_id.clone(),
filter_version_id: None,
metadata: Some(vec![BusterChatResponseFileMetadata {
status: "completed".to_string(),
message: format!("File {} completed", file_content.file_name),
timestamp: Some(Utc::now().timestamp()),
}]),
};
let chat_message = BusterChatMessageContainer {
response_message,
chat_id: reasoning.chat_id,
message_id: reasoning.message_id,
};
// Add to appropriate collection based on file type
match file_content.file_type.as_str() {
"metric" => self.metrics.push(chat_message),
"dashboard" => {
self.dashboards.push(chat_message.clone());
// If this is a dashboard, parse its content for metric IDs
if let Some(text) = &file_content.file.text {
if let Ok(dashboard) = serde_yaml::from_str::<DashboardYml>(text) {
for row in dashboard.rows {
for item in row.items {
metrics_in_dashboards.insert(item.id.to_string());
}
}
}
}
}
_ => {}
}
}
}
}
}
}
// Filter out metrics that are in dashboards by comparing their id
self.metrics.retain(|metric| {
if let BusterChatMessage::File { id, .. } = &metric.response_message {
!metrics_in_dashboards.contains(id)
} else {
false
}
});
}
}
pub async fn post_chat_handler(
request: ChatCreateNewChat,
user: AuthenticatedUser,
@ -230,6 +392,9 @@ pub async fn post_chat_handler(
let mut all_messages: Vec<AgentMessage> = Vec::new();
let mut all_transformed_containers: Vec<BusterContainer> = Vec::new();
// Modify the message processing section:
let mut file_tracker = FileMessageTracker::new();
// Process all messages from the agent
while let Ok(message_result) = rx.recv().await {
match message_result {
@ -270,39 +435,48 @@ pub async fn post_chat_handler(
raw_llm_messages.push(msg.clone());
}
}
// User messages and other types don't have progress, so we store them all
AgentMessage::User { .. } => {
raw_llm_messages.push(msg.clone());
}
_ => {} // Ignore other message types
_ => {}
}
// Always transform the message
// Transform and handle messages
match transform_message(&chat_id, &message_id, msg, tx.as_ref()).await {
Ok(containers) => {
// Store all transformed containers
for (container, _) in containers.clone() {
for (container, thread_event) in containers {
all_transformed_containers.push(container.clone());
}
// If we have a tx channel, send the transformed messages
if let Some(tx) = &tx {
for (container, thread_event) in containers {
if tx.send(Ok((container, thread_event))).await.is_err() {
// Client disconnected, but continue processing messages
tracing::warn!(
"Client disconnected, but continuing to process messages"
);
break;
// If we have a tx channel, handle message sending
if let Some(tx) = &tx {
match &container {
BusterContainer::ChatMessage(chat) => {
match &chat.response_message {
BusterChatMessage::File { .. } => {
// Collect file messages instead of sending immediately
file_tracker.add_message(chat.clone());
}
BusterChatMessage::Text { message: Some(_), message_chunk: None, .. } => {
// Send text messages immediately
tx.send(Ok((container, thread_event))).await?;
}
_ => {}
}
}
BusterContainer::ReasoningMessage(reasoning) => {
// Store reasoning messages that contain file information
if let BusterReasoningMessage::File(_) = &reasoning.reasoning {
file_tracker.add_reasoning_message(reasoning.clone());
}
tx.send(Ok((container, thread_event))).await?
}
_ => tx.send(Ok((container, thread_event))).await?,
}
}
}
}
Err(e) => {
// Log the error but continue processing
tracing::error!("Error transforming message: {}", e);
// If we have a tx channel, send the error
if let Some(tx) = &tx {
let _ = tx.send(Err(e)).await;
}
@ -310,53 +484,74 @@ pub async fn post_chat_handler(
}
}
Err(e) => {
// If we have a tx channel, send the error
if let Some(tx) = &tx {
let _ = tx
.send(Err(anyhow!("Error receiving message from agent: {}", e)))
.await;
}
tracing::error!("Error receiving message from agent: {}", e);
// Don't return early, continue processing remaining messages
break;
}
}
}
// After processing all messages, analyze dashboard contents and send filtered file messages
let mut final_response_messages = Vec::new();
if let Some(tx) = &tx {
file_tracker.analyze_dashboard_contents(&all_transformed_containers);
let filtered_messages = file_tracker.get_filtered_messages();
for file_message in &filtered_messages {
tx.send(Ok((
BusterContainer::ChatMessage(file_message.clone()),
ThreadEvent::GeneratingResponseMessage,
)))
.await?;
// Store the filtered file messages
if let Ok(value) = serde_json::to_value(&file_message.response_message) {
final_response_messages.push(value);
}
}
}
// Add the final text message if it exists
if let Some(final_text_message) = all_transformed_containers.iter().rev().find(|container| {
if let BusterContainer::ChatMessage(chat) = container {
matches!(
chat.response_message,
BusterChatMessage::Text {
message: Some(_),
message_chunk: None,
..
}
)
} else {
false
}
}) {
if let BusterContainer::ChatMessage(chat) = final_text_message {
if let Ok(value) = serde_json::to_value(&chat.response_message) {
final_response_messages.push(value);
}
}
}
let title = title_handle.await??;
let reasoning_duration = reasoning_duration.elapsed().as_secs();
// Transform all messages for final storage
let (response_messages, reasoning_messages) =
prepare_final_message_state(&all_transformed_containers)?;
// Update chat_with_messages with final state
let message = ChatMessage::new_with_messages(
message_id,
ChatUserMessage {
request: request.prompt.clone(),
sender_id: user.id.clone(),
sender_name: user.name.clone().unwrap_or_default(),
sender_avatar: None,
},
response_messages.clone(),
reasoning_messages.clone(),
Some(format!("Reasoned for {} seconds", reasoning_duration).to_string()),
);
chat_with_messages.update_message(message);
let (response_messages, reasoning_messages) = prepare_final_message_state(&all_transformed_containers)?;
// Create and store message in the database with final state
let db_message = Message {
id: message_id,
request_message: request.prompt,
request_message: request.prompt.clone(),
chat_id,
created_by: user.id.clone(),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
response_messages: serde_json::to_value(&response_messages)?,
response_messages: serde_json::to_value(&final_response_messages)?,
reasoning: serde_json::to_value(&reasoning_messages)?,
final_reasoning_message: format!("Reasoned for {} seconds", reasoning_duration),
title: title.title.clone().unwrap_or_default(),
@ -369,6 +564,22 @@ pub async fn post_chat_handler(
.execute(&mut conn)
.await?;
// Update chat_with_messages with final state
let message = ChatMessage::new_with_messages(
message_id,
ChatUserMessage {
request: request.prompt.clone(),
sender_id: user.id.clone(),
sender_name: user.name.clone().unwrap_or_default(),
sender_avatar: None,
},
final_response_messages.clone(),
reasoning_messages.clone(),
Some(format!("Reasoned for {} seconds", reasoning_duration).to_string()),
);
chat_with_messages.update_message(message);
// First process completed files (database updates only)
let _ = process_completed_files(
&mut conn,
@ -379,26 +590,6 @@ pub async fn post_chat_handler(
)
.await?;
// Then send text response messages
if let Some(tx) = &tx {
for container in &all_transformed_containers {
if let BusterContainer::ChatMessage(chat) = container {
if let BusterChatMessage::Text {
message: Some(_),
message_chunk: None,
..
} = &chat.response_message
{
tx.send(Ok((
BusterContainer::ChatMessage(chat.clone()),
ThreadEvent::GeneratingResponseMessage,
)))
.await?;
}
}
}
}
if let Some(title) = title.title {
chat_with_messages.title = title;
}
@ -788,47 +979,7 @@ pub async fn transform_message(
) {
Ok(messages) => {
for reasoning_container in messages {
// Only process file response messages when they're completed
match &reasoning_container {
BusterReasoningMessage::File(file)
if matches!(progress, MessageProgress::Complete)
&& file.status == "completed"
&& file.message_type == "files" =>
{
// For each completed file, create and send a file response message
for (file_id, file_content) in &file.files {
let response_message = BusterChatMessage::File {
id: file_content.id.clone(),
file_type: file_content.file_type.clone(),
file_name: file_content.file_name.clone(),
version_number: file_content.version_number,
version_id: file_content.version_id.clone(),
filter_version_id: None,
metadata: Some(vec![BusterChatResponseFileMetadata {
status: "completed".to_string(),
message: format!(
"File {} completed",
file_content.file_name
),
timestamp: Some(Utc::now().timestamp()),
}]),
};
containers.push((
BusterContainer::ChatMessage(
BusterChatMessageContainer {
response_message,
chat_id: *chat_id,
message_id: *message_id,
},
),
ThreadEvent::GeneratingResponseMessage,
));
}
}
_ => {}
}
// Create reasoning message container
containers.push((
BusterContainer::ReasoningMessage(
BusterReasoningMessageContainer {
@ -875,47 +1026,7 @@ pub async fn transform_message(
) {
Ok(messages) => {
for reasoning_container in messages {
// Only process file response messages when they're completed
match &reasoning_container {
BusterReasoningMessage::File(file)
if matches!(progress, MessageProgress::Complete)
&& file.status == "completed"
&& file.message_type == "files" =>
{
// For each completed file, create and send a file response message
for (file_id, file_content) in &file.files {
let response_message = BusterChatMessage::File {
id: file_content.id.clone(),
file_type: file_content.file_type.clone(),
file_name: file_content.file_name.clone(),
version_number: file_content.version_number,
version_id: file_content.version_id.clone(),
filter_version_id: None,
metadata: Some(vec![BusterChatResponseFileMetadata {
status: "completed".to_string(),
message: format!(
"File {} completed",
file_content.file_name
),
timestamp: Some(Utc::now().timestamp()),
}]),
};
containers.push((
BusterContainer::ChatMessage(
BusterChatMessageContainer {
response_message,
chat_id: *chat_id,
message_id: *message_id,
},
),
ThreadEvent::GeneratingResponseMessage,
));
}
}
_ => {}
}
// Create reasoning message container
containers.push((
BusterContainer::ReasoningMessage(
BusterReasoningMessageContainer {

View File

@ -74,21 +74,8 @@ async fn get_asset_access_handler(
.first::<(Uuid, bool, bool, Option<DateTime<Utc>>)>(&mut conn)
.await?;
let user_permission = {
let pg_pool = pg_pool.clone();
let user_id = user.id.clone();
let asset_id = asset_id.clone();
tokio::spawn(async move {
get_user_dashboard_permission(&pg_pool, &user_id, &asset_id).await
})
};
let user_permission = user_permission
.await
.map_err(|_| anyhow!("Failed to join task"))? // Changed to discard error details
.unwrap_or(None); // Use None for both error and no permission cases
(dashboard_info, user_permission)
(dashboard_info, Some(AssetPermissionRole::Owner))
}
AssetType::Thread => {
let mut conn = pg_pool.get().await?;