ok response streaming well

This commit is contained in:
dal 2025-03-03 12:47:58 -07:00
parent 46ad056524
commit 4bd6243ae6
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 25 additions and 85 deletions

View File

@ -490,32 +490,20 @@ pub fn transform_message(
} => { } => {
if let Some(content) = content { if let Some(content) = content {
let messages = match transform_text_message( let messages = match transform_text_message(
id, id.unwrap_or_else(|| Uuid::new_v4().to_string()),
content, content,
progress, progress,
chat_id.clone(), chat_id.clone(),
message_id.clone(), message_id.clone(),
) { ) {
Ok(messages) => { Ok(messages) => messages
let filtered_messages: Vec<BusterContainer> = messages
.into_iter() .into_iter()
.filter(|msg| msg.response_message.message_chunk.is_none()) // Only include completed messages .map(|msg| BusterContainer::ChatMessage(BusterChatMessageContainer {
.map(BusterContainer::ChatMessage) response_message: msg,
.collect(); chat_id: chat_id.clone(),
message_id: message_id.clone(),
println!( }))
"MESSAGE_STREAM: Transformed text message into {} containers", .collect(),
filtered_messages.len()
);
if !filtered_messages.is_empty() {
println!(
"MESSAGE_STREAM: First container: {:?}",
filtered_messages[0]
);
}
filtered_messages
}
Err(e) => { Err(e) => {
tracing::warn!("Error transforming text message: {:?}", e); tracing::warn!("Error transforming text message: {:?}", e);
println!("MESSAGE_STREAM: Error transforming text message: {:?}", e); println!("MESSAGE_STREAM: Error transforming text message: {:?}", e);
@ -592,12 +580,12 @@ pub fn transform_message(
} }
fn transform_text_message( fn transform_text_message(
id: Option<String>, id: String,
content: String, content: String,
progress: MessageProgress, progress: MessageProgress,
chat_id: Uuid, chat_id: Uuid,
message_id: Uuid, message_id: Uuid,
) -> Result<Vec<BusterChatMessageContainer>> { ) -> Result<Vec<BusterChatMessage>> {
println!( println!(
"MESSAGE_STREAM: transform_text_message called with progress: {:?}", "MESSAGE_STREAM: transform_text_message called with progress: {:?}",
progress progress
@ -605,16 +593,12 @@ fn transform_text_message(
match progress { match progress {
MessageProgress::InProgress => { MessageProgress::InProgress => {
let container = BusterChatMessageContainer { let container = BusterChatMessage {
response_message: BusterChatMessage { id: id.clone(),
id: id.unwrap_or_else(|| Uuid::new_v4().to_string()),
message_type: "text".to_string(), message_type: "text".to_string(),
message: None, message: None,
message_chunk: Some(content), message_chunk: Some(content),
is_final_message: Some(false), is_final_message: Some(false),
},
chat_id,
message_id,
}; };
println!( println!(
"MESSAGE_STREAM: Created in-progress text message: {:?}", "MESSAGE_STREAM: Created in-progress text message: {:?}",
@ -623,16 +607,12 @@ fn transform_text_message(
Ok(vec![container]) Ok(vec![container])
} }
MessageProgress::Complete => { MessageProgress::Complete => {
let container = BusterChatMessageContainer { let container = BusterChatMessage {
response_message: BusterChatMessage { id: id.clone(),
id: id.unwrap_or_else(|| Uuid::new_v4().to_string()),
message_type: "text".to_string(), message_type: "text".to_string(),
message: Some(content), message: Some(content),
message_chunk: None, message_chunk: None,
is_final_message: Some(true), is_final_message: Some(true),
},
chat_id,
message_id,
}; };
println!( println!(
"MESSAGE_STREAM: Created complete text message: {:?}", "MESSAGE_STREAM: Created complete text message: {:?}",
@ -763,46 +743,6 @@ fn tool_modify_dashboards(id: String, content: String) -> Result<Vec<BusterReaso
} }
} }
// Update tool_create_plan to work with BusterReasoningMessage
fn tool_create_plan(id: String, content: String) -> Result<Vec<BusterReasoningMessage>> {
let mut parser = StreamingParser::new();
// First try to parse as plan data using StreamingParser
if let Ok(Some(message)) = parser.process_plan_chunk(id.clone(), &content) {
println!(
"MESSAGE_STREAM: StreamingParser produced create plan message: {:?}",
message
);
// Convert BusterReasoningMessage to BusterChatContainer
match message {
BusterReasoningMessage::Text(text) => {
// Create a thought pill container for the plan text
let plan_container = BusterThoughtPillContainer {
title: text.title.clone(),
pills: vec![BusterThoughtPill {
id: Uuid::new_v4().to_string(),
text: text.message.unwrap_or_default(),
thought_file_type: "text".to_string(),
}],
};
Ok(vec![BusterReasoningMessage::Pill(BusterReasoningPill {
id: text.id,
thought_type: "thought".to_string(),
title: text.title,
secondary_title: text.secondary_title.unwrap_or_default(),
pill_containers: Some(vec![plan_container]),
status: text.status.unwrap_or_else(|| "loading".to_string()),
})])
}
_ => unreachable!("Plan creation should only return Text type"),
}
} else {
println!("MESSAGE_STREAM: No valid plan data found in content");
Err(anyhow::anyhow!("Failed to parse plan data from content"))
}
}
// Restore the original tool_data_catalog_search function // Restore the original tool_data_catalog_search function
fn tool_data_catalog_search(id: String, content: String) -> Result<Vec<BusterReasoningMessage>> { fn tool_data_catalog_search(id: String, content: String) -> Result<Vec<BusterReasoningMessage>> {
let data_catalog_result = match serde_json::from_str::<SearchDataCatalogOutput>(&content) { let data_catalog_result = match serde_json::from_str::<SearchDataCatalogOutput>(&content) {
@ -828,7 +768,7 @@ fn tool_data_catalog_search(id: String, content: String) -> Result<Vec<BusterRea
let buster_thought = if result_count > 0 { let buster_thought = if result_count > 0 {
BusterReasoningMessage::Pill(BusterReasoningPill { BusterReasoningMessage::Pill(BusterReasoningPill {
id: id.clone(), id: id.clone(),
thought_type: "thought".to_string(), thought_type: "pills".to_string(),
title: format!("Found {} results", result_count), title: format!("Found {} results", result_count),
secondary_title: format!("{} seconds", duration), secondary_title: format!("{} seconds", duration),
pill_containers: Some(thought_pill_containers), pill_containers: Some(thought_pill_containers),
@ -837,7 +777,7 @@ fn tool_data_catalog_search(id: String, content: String) -> Result<Vec<BusterRea
} else { } else {
BusterReasoningMessage::Pill(BusterReasoningPill { BusterReasoningMessage::Pill(BusterReasoningPill {
id: id.clone(), id: id.clone(),
thought_type: "thought".to_string(), thought_type: "pills".to_string(),
title: "No data catalog items found".to_string(), title: "No data catalog items found".to_string(),
secondary_title: format!("{} seconds", duration), secondary_title: format!("{} seconds", duration),
pill_containers: Some(vec![BusterThoughtPillContainer { pill_containers: Some(vec![BusterThoughtPillContainer {