filtering assets in res messages

This commit is contained in:
dal 2025-04-09 11:32:47 -06:00
parent 6fe50c78de
commit 494a838260
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
2 changed files with 102 additions and 95 deletions

View File

@ -179,6 +179,7 @@ impl SearchDataCatalogTool {
trace_id: session_id.to_string(),
}),
max_completion_tokens: Some(8092),
temperature: Some(0.0),
..Default::default()
};

View File

@ -48,6 +48,15 @@ use crate::messages::types::{ChatMessage, ChatUserMessage};
use super::types::ChatWithMessages;
use tokio::sync::mpsc;
// Define the helper struct at the module level
#[derive(Debug, Clone)]
struct CompletedFileInfo {
id: String,
file_type: String, // "metric" or "dashboard"
file_name: String,
version_number: i32,
}
// Define ThreadEvent
#[derive(Clone, Copy, Debug)]
pub enum ThreadEvent {
@ -236,25 +245,25 @@ pub async fn post_chat_handler(
.and_then(|id_str| Uuid::parse_str(id_str).ok())
.unwrap_or(asset_id_value);
// Verify the response ID matches the asset ID
if response_id == asset_id_value {
// Create association in database - now the message exists in DB
if let Err(e) = create_message_file_association(
message.id,
asset_id_value,
asset_version_number,
asset_type_value,
)
.await {
tracing::warn!("Failed to create message file association: {}", e);
// Verify the response ID matches the asset ID
if response_id == asset_id_value {
// Create association in database - now the message exists in DB
if let Err(e) = create_message_file_association(
message.id,
asset_id_value,
asset_version_number,
asset_type_value,
)
.await {
tracing::warn!("Failed to create message file association: {}", e);
}
}
// We only need to process one file association
break;
}
// We only need to process one file association
break;
}
}
}
// Add to updated messages for the response
updated_messages.push(message);
@ -515,11 +524,80 @@ pub async fn post_chat_handler(
}
};
// --- START: New File Filtering Logic ---
// 1. Collect all completed files from reasoning messages
let mut completed_files: Vec<CompletedFileInfo> = Vec::new();
for container in &all_transformed_containers {
if let BusterContainer::ReasoningMessage(reasoning_msg) = container {
if let BusterReasoningMessage::File(file_reasoning) = &reasoning_msg.reasoning {
// Consider files from both create and modify operations ("files" type)
// Check if the overall reasoning status is completed
if file_reasoning.message_type == "files" && file_reasoning.status == "completed" {
for (_file_id_key, file_detail) in &file_reasoning.files {
// Also ensure the individual file status is completed
if file_detail.status == "completed" {
completed_files.push(CompletedFileInfo {
id: file_detail.id.clone(),
file_type: file_detail.file_type.clone(),
file_name: file_detail.file_name.clone(),
version_number: file_detail.version_number,
});
}
}
}
}
}
}
// 2. Apply filtering logic
let contains_metrics = completed_files.iter().any(|f| f.file_type == "metric");
let contains_dashboards = completed_files.iter().any(|f| f.file_type == "dashboard");
let filtered_files_for_response: Vec<CompletedFileInfo> = if contains_dashboards {
// If any dashboards exist (created or modified), only show dashboards
completed_files.into_iter().filter(|f| f.file_type == "dashboard").collect()
} else if contains_metrics {
// If only metrics exist, show all metrics
completed_files.into_iter().filter(|f| f.file_type == "metric").collect()
} else {
// If neither (or empty), show nothing
vec![]
};
// 3. Generate BusterChatMessage::File for the filtered files
// Populate the vector declared outside the scope
let mut filtered_file_response_messages: Vec<Value> = Vec::new();
for file_info in filtered_files_for_response {
let response_message = BusterChatMessage::File {
id: file_info.id.clone(),
file_type: file_info.file_type.clone(),
file_name: file_info.file_name.clone(),
version_number: file_info.version_number,
filter_version_id: None, // Assuming no filter ID needed here
metadata: Some(vec![BusterChatResponseFileMetadata {
status: "completed".to_string(),
message: "Generated by Buster".to_string(), // Or indicate modification if possible? Needs more state tracking.
timestamp: Some(Utc::now().timestamp()),
}]),
};
if let Ok(value) = serde_json::to_value(&response_message) {
filtered_file_response_messages.push(value);
}
}
// --- END: New File Filtering Logic ---
// Transform all messages for final storage
let (response_messages, reasoning_messages) =
// Get initial response messages (text, etc.) and reasoning messages separately
let (mut text_and_other_response_messages, reasoning_messages) =
prepare_final_message_state(&all_transformed_containers)?;
// Update chat_with_messages with final state
// Create the final response message list: Start with filtered files, then add text/other messages
let mut final_response_messages = filtered_file_response_messages;
final_response_messages.append(&mut text_and_other_response_messages);
// Update chat_with_messages with final state (now including filtered files first)
let message = ChatMessage::new_with_messages(
message_id,
Some(ChatUserMessage {
@ -528,7 +606,7 @@ pub async fn post_chat_handler(
sender_name: user.name.clone().unwrap_or_default(),
sender_avatar: None,
}),
response_messages.clone(),
final_response_messages.clone(), // Use the reordered list
reasoning_messages.clone(),
Some(formatted_reasoning_duration.clone()), // Use formatted reasoning duration for regular messages
Utc::now(),
@ -545,7 +623,7 @@ pub async fn post_chat_handler(
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
response_messages: serde_json::to_value(&response_messages)?,
response_messages: serde_json::to_value(&final_response_messages)?, // Use the reordered list
reasoning: serde_json::to_value(&reasoning_messages)?,
final_reasoning_message: Some(formatted_reasoning_duration), // Use formatted reasoning duration for regular messages
title: title.title.clone().unwrap_or_default(),
@ -1036,42 +1114,7 @@ pub async fn transform_message(
) {
Ok(messages) => {
for reasoning_container in messages {
// Only process file response messages when they're completed
match &reasoning_container {
BusterReasoningMessage::File(file)
if matches!(progress, MessageProgress::Complete)
&& file.status == "completed"
&& file.message_type == "files" =>
{
// For each completed file, create and send a file response message
for (_file_id, file_content) in &file.files {
let response_message = BusterChatMessage::File {
id: file_content.id.clone(),
file_type: file_content.file_type.clone(),
file_name: file_content.file_name.clone(),
version_number: file_content.version_number,
filter_version_id: None,
metadata: Some(vec![BusterChatResponseFileMetadata {
status: "completed".to_string(),
message: "Created by Buster".to_string(),
timestamp: Some(Utc::now().timestamp()),
}]),
};
containers.push((
BusterContainer::ChatMessage(
BusterChatMessageContainer {
response_message,
chat_id: *chat_id,
message_id: *message_id,
},
),
ThreadEvent::GeneratingResponseMessage,
));
}
}
_ => {}
}
// No longer generate response messages here, only reasoning
containers.push((
BusterContainer::ReasoningMessage(
@ -1110,7 +1153,7 @@ pub async fn transform_message(
let name_str = name.clone();
let mut containers = Vec::new();
let messages = match transform_tool_message(
match transform_tool_message(
tool_call_id,
name,
content.clone(),
@ -1119,42 +1162,7 @@ pub async fn transform_message(
) {
Ok(messages) => {
for reasoning_container in messages {
// Only process file response messages when they're completed
match &reasoning_container {
BusterReasoningMessage::File(file)
if matches!(progress, MessageProgress::Complete)
&& file.status == "completed"
&& file.message_type == "files" =>
{
// For each completed file, create and send a file response message
for (_file_id, file_content) in &file.files {
let response_message = BusterChatMessage::File {
id: file_content.id.clone(),
file_type: file_content.file_type.clone(),
file_name: file_content.file_name.clone(),
version_number: file_content.version_number,
filter_version_id: None,
metadata: Some(vec![BusterChatResponseFileMetadata {
status: "completed".to_string(),
message: "Created by Buster".to_string(),
timestamp: Some(Utc::now().timestamp()),
}]),
};
containers.push((
BusterContainer::ChatMessage(
BusterChatMessageContainer {
response_message,
chat_id: *chat_id,
message_id: *message_id,
},
),
ThreadEvent::GeneratingResponseMessage,
));
}
}
_ => {}
}
// No longer generate response messages here, only reasoning
containers.push((
BusterContainer::ReasoningMessage(
@ -1167,16 +1175,14 @@ pub async fn transform_message(
ThreadEvent::GeneratingReasoningMessage,
));
}
containers
}
Err(e) => {
tracing::warn!("Error transforming tool message '{}': {:?}", name_str, e);
println!("MESSAGE_STREAM: Error transforming tool message: {:?}", e);
vec![]
}
};
Ok(messages)
Ok(containers)
} else {
Ok(vec![])
}