From db619977de522bb20d10cbc4fd210adfbf2b1851 Mon Sep 17 00:00:00 2001 From: dal Date: Mon, 3 Mar 2025 11:44:47 -0700 Subject: [PATCH] separate out the processing chunks --- .../handlers/src/chats/post_chat_handler.rs | 210 +++++++++--------- .../handlers/src/chats/streaming_parser.rs | 83 ++++++- 2 files changed, 181 insertions(+), 112 deletions(-) diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 71b042d22..e3464583f 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -682,7 +682,7 @@ fn tool_create_metrics(id: String, content: String) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced create metrics message: {:?}", @@ -703,7 +703,7 @@ fn tool_modify_metrics(id: String, content: String) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced modify metrics message: {:?}", @@ -724,7 +724,7 @@ fn tool_create_dashboards(id: String, content: String) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced create dashboards message: {:?}", @@ -747,7 +747,7 @@ fn tool_modify_dashboards(id: String, content: String) -> Result { println!( "MESSAGE_STREAM: StreamingParser produced modify dashboard message: {:?}", @@ -769,7 +769,7 @@ fn tool_create_plan(id: String, content: String) -> Result Result Result> { // First try to parse as search requirements using StreamingParser let mut parser = StreamingParser::new(); - if let Ok(Some(message)) = parser.process_chunk(id.clone(), &content) { + if let Ok(Some(message)) = parser.process_search_data_catalog_chunk(id.clone(), &content) { match message { BusterReasoningMessage::Text(text) => { // If we successfully parsed search requirements, convert to appropriate format @@ -935,14 +935,18 @@ fn tool_create_file(id: String, content: String) -> Result { - println!( - "MESSAGE_STREAM: StreamingParser produced create file message: {:?}", - message - ); - Ok(vec![message]) - } + // For now we'll try both metric and dashboard processing since this is a general file function + if let Ok(Some(message)) = parser.process_metric_chunk(id.clone(), &content) { + return Ok(vec![message]); + } + + if let Ok(Some(message)) = parser.process_dashboard_chunk(id.clone(), &content) { + return Ok(vec![message]); + } + + // If it's neither a metric nor dashboard, parse it as a general file + match parser.process_file_data(id)? { + Some(message) => Ok(vec![message]), None => { println!("MESSAGE_STREAM: No valid file data found in content"); Err(anyhow::anyhow!("Failed to parse file data from content")) @@ -956,14 +960,18 @@ fn tool_modify_file(id: String, content: String) -> Result { - println!( - "MESSAGE_STREAM: StreamingParser produced modify file message: {:?}", - message - ); - Ok(vec![message]) - } + // For now we'll try both metric and dashboard processing since this is a general file function + if let Ok(Some(message)) = parser.process_metric_chunk(id.clone(), &content) { + return Ok(vec![message]); + } + + if let Ok(Some(message)) = parser.process_dashboard_chunk(id.clone(), &content) { + return Ok(vec![message]); + } + + // If it's neither a metric nor dashboard, parse it as a general file + match parser.process_file_data(id)? { + Some(message) => Ok(vec![message]), None => { println!("MESSAGE_STREAM: No valid file data found in content"); Err(anyhow::anyhow!("Failed to parse file data from content")) @@ -1064,87 +1072,82 @@ fn assistant_data_catalog_search( progress: MessageProgress, initial: bool, ) -> Result> { - // Only process complete messages for data catalog search - if matches!(progress, MessageProgress::Complete) { - // First try to parse as search requirements using StreamingParser - let mut parser = StreamingParser::new(); - if let Ok(Some(message)) = parser.process_chunk(id.clone(), &content) { - match message { - BusterReasoningMessage::Text(text) => { - // If we successfully parsed search requirements, convert to appropriate format - return Ok(vec![BusterReasoningMessage::Pill(BusterReasoningPill { - id: text.id, - thought_type: "thought".to_string(), - title: "Search Query".to_string(), - secondary_title: "Processing search...".to_string(), - pill_containers: Some(vec![BusterThoughtPillContainer { - title: "Search Requirements".to_string(), - pills: vec![BusterThoughtPill { - id: Uuid::new_v4().to_string(), - text: text.message.unwrap_or_default(), - thought_file_type: "text".to_string(), - }], - }]), - status: "completed".to_string(), - })]); - } - _ => unreachable!("Data catalog search should only return Text type"), + let mut parser = StreamingParser::new(); + + if let Ok(Some(message)) = parser.process_search_data_catalog_chunk(id.clone(), &content) { + match message { + BusterReasoningMessage::Text(text) => { + // If we successfully parsed search requirements, convert to appropriate format + return Ok(vec![BusterReasoningMessage::Pill(BusterReasoningPill { + id: text.id, + thought_type: "thought".to_string(), + title: "Search Query".to_string(), + secondary_title: "Processing search...".to_string(), + pill_containers: Some(vec![BusterThoughtPillContainer { + title: "Search Requirements".to_string(), + pills: vec![BusterThoughtPill { + id: Uuid::new_v4().to_string(), + text: text.message.unwrap_or_default(), + thought_file_type: "text".to_string(), + }], + }]), + status: "completed".to_string(), + })]); } + _ => unreachable!("Data catalog search should only return Text type"), } + } - // Fall back to existing logic for full search results - let data_catalog_result = match serde_json::from_str::(&content) { - Ok(result) => result, + // Fall back to existing logic for full search results + let data_catalog_result = match serde_json::from_str::(&content) { + Ok(result) => result, + Err(e) => { + println!("Failed to parse SearchDataCatalogOutput: {:?}", e); + return Ok(vec![]); + } + }; + + let duration = (data_catalog_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; + let result_count = data_catalog_result.results.len(); + let query_params = data_catalog_result.search_requirements.clone(); + + let thought_pill_containers = + match proccess_data_catalog_search_results(data_catalog_result) { + Ok(containers) => containers, Err(e) => { - println!("Failed to parse SearchDataCatalogOutput: {:?}", e); + println!("Failed to process data catalog search results: {:?}", e); return Ok(vec![]); } }; - let duration = (data_catalog_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; - let result_count = data_catalog_result.results.len(); - let query_params = data_catalog_result.search_requirements.clone(); - - let thought_pill_containers = - match proccess_data_catalog_search_results(data_catalog_result) { - Ok(containers) => containers, - Err(e) => { - println!("Failed to process data catalog search results: {:?}", e); - return Ok(vec![]); - } - }; - - let thought = if result_count > 0 { - BusterReasoningMessage::Pill(BusterReasoningPill { - id: id.clone(), - thought_type: "thought".to_string(), - title: format!("Found {} results", result_count), - secondary_title: format!("{} seconds", duration), - pill_containers: Some(thought_pill_containers), - status: "completed".to_string(), - }) - } else { - BusterReasoningMessage::Pill(BusterReasoningPill { - id: id.clone(), - thought_type: "thought".to_string(), - title: "No data catalog items found".to_string(), - secondary_title: format!("{} seconds", duration), - pill_containers: Some(vec![BusterThoughtPillContainer { - title: "No results found".to_string(), - pills: vec![BusterThoughtPill { - id: "".to_string(), - text: query_params, - thought_file_type: "empty".to_string(), - }], - }]), - status: "completed".to_string(), - }) - }; - - Ok(vec![thought]) + let thought = if result_count > 0 { + BusterReasoningMessage::Pill(BusterReasoningPill { + id: id.clone(), + thought_type: "thought".to_string(), + title: format!("Found {} results", result_count), + secondary_title: format!("{} seconds", duration), + pill_containers: Some(thought_pill_containers), + status: "completed".to_string(), + }) } else { - Ok(vec![]) // Skip in-progress messages - } + BusterReasoningMessage::Pill(BusterReasoningPill { + id: id.clone(), + thought_type: "thought".to_string(), + title: "No data catalog items found".to_string(), + secondary_title: format!("{} seconds", duration), + pill_containers: Some(vec![BusterThoughtPillContainer { + title: "No results found".to_string(), + pills: vec![BusterThoughtPill { + id: "".to_string(), + text: query_params, + thought_file_type: "empty".to_string(), + }], + }]), + status: "completed".to_string(), + }) + }; + + Ok(vec![thought]) } fn assistant_create_metrics( @@ -1154,8 +1157,8 @@ fn assistant_create_metrics( initial: bool, ) -> Result> { let mut parser = StreamingParser::new(); - // Process both in-progress and complete messages for metrics - match parser.process_chunk(id.clone(), &content) { + + match parser.process_metric_chunk(id.clone(), &content) { Ok(Some(message)) => { let status = match progress { MessageProgress::InProgress => "in_progress", @@ -1184,8 +1187,8 @@ fn assistant_modify_metrics( initial: bool, ) -> Result> { let mut parser = StreamingParser::new(); - // Process both in-progress and complete messages for metrics - match parser.process_chunk(id.clone(), &content) { + + match parser.process_metric_chunk(id.clone(), &content) { Ok(Some(message)) => { let status = match progress { MessageProgress::InProgress => "in_progress", @@ -1213,8 +1216,8 @@ fn assistant_create_dashboards( initial: bool, ) -> Result> { let mut parser = StreamingParser::new(); - // Process both in-progress and complete messages for dashboards - match parser.process_chunk(id.clone(), &content) { + + match parser.process_dashboard_chunk(id.clone(), &content) { Ok(Some(message)) => { let status = match progress { MessageProgress::InProgress => "in_progress", @@ -1243,8 +1246,8 @@ fn assistant_modify_dashboards( initial: bool, ) -> Result> { let mut parser = StreamingParser::new(); - // Process both in-progress and complete messages for dashboards - match parser.process_chunk(id.clone(), &content) { + + match parser.process_dashboard_chunk(id.clone(), &content) { Ok(Some(message)) => { let status = match progress { MessageProgress::InProgress => "in_progress", @@ -1273,8 +1276,9 @@ fn assistant_create_plan( initial: bool, ) -> Result> { let mut parser = StreamingParser::new(); + // Process both in-progress and complete messages for plan creation - match parser.process_chunk(id.clone(), &content) { + match parser.process_plan_chunk(id.clone(), &content) { Ok(Some(message)) => { let status = match progress { MessageProgress::InProgress => "in_progress", diff --git a/api/libs/handlers/src/chats/streaming_parser.rs b/api/libs/handlers/src/chats/streaming_parser.rs index fb86bb95f..85d5dbc4c 100644 --- a/api/libs/handlers/src/chats/streaming_parser.rs +++ b/api/libs/handlers/src/chats/streaming_parser.rs @@ -20,27 +20,50 @@ impl StreamingParser { } } - // Main entry point to process chunks based on type + // Clear the buffer - useful when reusing the parser for different content formats + pub fn clear_buffer(&mut self) { + self.buffer.clear(); + } + + // Main entry point to process chunks based on type (kept for backward compatibility) pub fn process_chunk(&mut self, id: String, chunk: &str) -> Result> { // Add new chunk to buffer self.buffer.push_str(chunk); // Try to process as a plan first - if let Some(plan) = self.process_plan_chunk(id.clone())? { + let mut plan_parser = StreamingParser::new(); + if let Some(plan) = plan_parser.process_plan_chunk(id.clone(), chunk)? { return Ok(Some(plan)); } // Then try to process as search data catalog - if let Some(search) = self.process_search_catalog_chunk(id.clone())? { + let mut search_parser = StreamingParser::new(); + if let Some(search) = search_parser.process_search_data_catalog_chunk(id.clone(), chunk)? { return Ok(Some(search)); } - // Finally try to process as a file - self.process_file_chunk(id) + // Try to process as a metric + let mut metric_parser = StreamingParser::new(); + if let Some(metric) = metric_parser.process_metric_chunk(id.clone(), chunk)? { + return Ok(Some(metric)); + } + + // Try to process as a dashboard + let mut dashboard_parser = StreamingParser::new(); + if let Some(dashboard) = dashboard_parser.process_dashboard_chunk(id.clone(), chunk)? { + return Ok(Some(dashboard)); + } + + // Finally try to process as a general file + self.process_file_data(id) } // Process chunks meant for plan creation - pub fn process_plan_chunk(&mut self, id: String) -> Result> { + pub fn process_plan_chunk(&mut self, id: String, chunk: &str) -> Result> { + // Clear buffer and add new chunk + self.clear_buffer(); + self.buffer.push_str(chunk); + // Complete any incomplete JSON structure let processed_json = self.complete_json_structure(self.buffer.clone()); @@ -65,7 +88,11 @@ impl StreamingParser { } // Process chunks meant for search data catalog - pub fn process_search_catalog_chunk(&mut self, id: String) -> Result> { + pub fn process_search_data_catalog_chunk(&mut self, id: String, chunk: &str) -> Result> { + // Clear buffer and add new chunk + self.clear_buffer(); + self.buffer.push_str(chunk); + // Complete any incomplete JSON structure let processed_json = self.complete_json_structure(self.buffer.clone()); @@ -89,8 +116,46 @@ impl StreamingParser { Ok(None) } - // Process chunks meant for files (original implementation) - pub fn process_file_chunk(&mut self, id: String) -> Result> { + // Process chunks meant for metric files + pub fn process_metric_chunk(&mut self, id: String, chunk: &str) -> Result> { + // Clear buffer and add new chunk + self.clear_buffer(); + self.buffer.push_str(chunk); + + // Process the buffer as a file + let processed_file = self.process_file_data(id.clone())?; + + // Check if it's a metric file + if let Some(BusterReasoningMessage::File(file)) = processed_file { + if file.file_type == "metric" || file.file_name.ends_with(".metric.yml") { + return Ok(Some(BusterReasoningMessage::File(file))); + } + } + + Ok(None) + } + + // Process chunks meant for dashboard files + pub fn process_dashboard_chunk(&mut self, id: String, chunk: &str) -> Result> { + // Clear buffer and add new chunk + self.clear_buffer(); + self.buffer.push_str(chunk); + + // Process the buffer as a file + let processed_file = self.process_file_data(id.clone())?; + + // Check if it's a dashboard file + if let Some(BusterReasoningMessage::File(file)) = processed_file { + if file.file_type == "dashboard" || file.file_name.ends_with(".dashboard.yml") { + return Ok(Some(BusterReasoningMessage::File(file))); + } + } + + Ok(None) + } + + // Internal function to process file data (shared by metric and dashboard processing) + pub fn process_file_data(&mut self, id: String) -> Result> { // Extract and replace yml_content with placeholders let mut yml_contents = Vec::new(); let mut positions = Vec::new();