separate out the processing chunks

This commit is contained in:
dal 2025-03-03 11:44:47 -07:00
parent b2c988527f
commit db619977de
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
2 changed files with 181 additions and 112 deletions

View File

@ -682,7 +682,7 @@ fn tool_create_metrics(id: String, content: String) -> Result<Vec<BusterReasonin
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
match parser.process_chunk(id, &content)? { match parser.process_metric_chunk(id, &content)? {
Some(message) => { Some(message) => {
println!( println!(
"MESSAGE_STREAM: StreamingParser produced create metrics message: {:?}", "MESSAGE_STREAM: StreamingParser produced create metrics message: {:?}",
@ -703,7 +703,7 @@ fn tool_modify_metrics(id: String, content: String) -> Result<Vec<BusterReasonin
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
match parser.process_chunk(id, &content)? { match parser.process_metric_chunk(id, &content)? {
Some(message) => { Some(message) => {
println!( println!(
"MESSAGE_STREAM: StreamingParser produced modify metrics message: {:?}", "MESSAGE_STREAM: StreamingParser produced modify metrics message: {:?}",
@ -724,7 +724,7 @@ fn tool_create_dashboards(id: String, content: String) -> Result<Vec<BusterReaso
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
match parser.process_chunk(id, &content)? { match parser.process_dashboard_chunk(id, &content)? {
Some(message) => { Some(message) => {
println!( println!(
"MESSAGE_STREAM: StreamingParser produced create dashboards message: {:?}", "MESSAGE_STREAM: StreamingParser produced create dashboards message: {:?}",
@ -747,7 +747,7 @@ fn tool_modify_dashboards(id: String, content: String) -> Result<Vec<BusterReaso
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
match parser.process_chunk(id, &content)? { match parser.process_dashboard_chunk(id, &content)? {
Some(message) => { Some(message) => {
println!( println!(
"MESSAGE_STREAM: StreamingParser produced modify dashboard message: {:?}", "MESSAGE_STREAM: StreamingParser produced modify dashboard message: {:?}",
@ -769,7 +769,7 @@ fn tool_create_plan(id: String, content: String) -> Result<Vec<BusterReasoningMe
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
// First try to parse as plan data using StreamingParser // First try to parse as plan data using StreamingParser
if let Ok(Some(message)) = parser.process_chunk(id.clone(), &content) { if let Ok(Some(message)) = parser.process_plan_chunk(id.clone(), &content) {
println!( println!(
"MESSAGE_STREAM: StreamingParser produced create plan message: {:?}", "MESSAGE_STREAM: StreamingParser produced create plan message: {:?}",
message message
@ -808,7 +808,7 @@ fn tool_create_plan(id: String, content: String) -> Result<Vec<BusterReasoningMe
fn tool_data_catalog_search(id: String, content: String) -> Result<Vec<BusterReasoningMessage>> { fn tool_data_catalog_search(id: String, content: String) -> Result<Vec<BusterReasoningMessage>> {
// First try to parse as search requirements using StreamingParser // First try to parse as search requirements using StreamingParser
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
if let Ok(Some(message)) = parser.process_chunk(id.clone(), &content) { if let Ok(Some(message)) = parser.process_search_data_catalog_chunk(id.clone(), &content) {
match message { match message {
BusterReasoningMessage::Text(text) => { BusterReasoningMessage::Text(text) => {
// If we successfully parsed search requirements, convert to appropriate format // If we successfully parsed search requirements, convert to appropriate format
@ -935,14 +935,18 @@ fn tool_create_file(id: String, content: String) -> Result<Vec<BusterReasoningMe
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
match parser.process_chunk(id, &content)? { // For now we'll try both metric and dashboard processing since this is a general file function
Some(message) => { if let Ok(Some(message)) = parser.process_metric_chunk(id.clone(), &content) {
println!( return Ok(vec![message]);
"MESSAGE_STREAM: StreamingParser produced create file message: {:?}", }
message
); if let Ok(Some(message)) = parser.process_dashboard_chunk(id.clone(), &content) {
Ok(vec![message]) return Ok(vec![message]);
} }
// If it's neither a metric nor dashboard, parse it as a general file
match parser.process_file_data(id)? {
Some(message) => Ok(vec![message]),
None => { None => {
println!("MESSAGE_STREAM: No valid file data found in content"); println!("MESSAGE_STREAM: No valid file data found in content");
Err(anyhow::anyhow!("Failed to parse file data from content")) Err(anyhow::anyhow!("Failed to parse file data from content"))
@ -956,14 +960,18 @@ fn tool_modify_file(id: String, content: String) -> Result<Vec<BusterReasoningMe
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
match parser.process_chunk(id, &content)? { // For now we'll try both metric and dashboard processing since this is a general file function
Some(message) => { if let Ok(Some(message)) = parser.process_metric_chunk(id.clone(), &content) {
println!( return Ok(vec![message]);
"MESSAGE_STREAM: StreamingParser produced modify file message: {:?}", }
message
); if let Ok(Some(message)) = parser.process_dashboard_chunk(id.clone(), &content) {
Ok(vec![message]) return Ok(vec![message]);
} }
// If it's neither a metric nor dashboard, parse it as a general file
match parser.process_file_data(id)? {
Some(message) => Ok(vec![message]),
None => { None => {
println!("MESSAGE_STREAM: No valid file data found in content"); println!("MESSAGE_STREAM: No valid file data found in content");
Err(anyhow::anyhow!("Failed to parse file data from content")) Err(anyhow::anyhow!("Failed to parse file data from content"))
@ -1064,87 +1072,82 @@ fn assistant_data_catalog_search(
progress: MessageProgress, progress: MessageProgress,
initial: bool, initial: bool,
) -> Result<Vec<BusterReasoningMessage>> { ) -> Result<Vec<BusterReasoningMessage>> {
// Only process complete messages for data catalog search let mut parser = StreamingParser::new();
if matches!(progress, MessageProgress::Complete) {
// First try to parse as search requirements using StreamingParser if let Ok(Some(message)) = parser.process_search_data_catalog_chunk(id.clone(), &content) {
let mut parser = StreamingParser::new(); match message {
if let Ok(Some(message)) = parser.process_chunk(id.clone(), &content) { BusterReasoningMessage::Text(text) => {
match message { // If we successfully parsed search requirements, convert to appropriate format
BusterReasoningMessage::Text(text) => { return Ok(vec![BusterReasoningMessage::Pill(BusterReasoningPill {
// If we successfully parsed search requirements, convert to appropriate format id: text.id,
return Ok(vec![BusterReasoningMessage::Pill(BusterReasoningPill { thought_type: "thought".to_string(),
id: text.id, title: "Search Query".to_string(),
thought_type: "thought".to_string(), secondary_title: "Processing search...".to_string(),
title: "Search Query".to_string(), pill_containers: Some(vec![BusterThoughtPillContainer {
secondary_title: "Processing search...".to_string(), title: "Search Requirements".to_string(),
pill_containers: Some(vec![BusterThoughtPillContainer { pills: vec![BusterThoughtPill {
title: "Search Requirements".to_string(), id: Uuid::new_v4().to_string(),
pills: vec![BusterThoughtPill { text: text.message.unwrap_or_default(),
id: Uuid::new_v4().to_string(), thought_file_type: "text".to_string(),
text: text.message.unwrap_or_default(), }],
thought_file_type: "text".to_string(), }]),
}], status: "completed".to_string(),
}]), })]);
status: "completed".to_string(),
})]);
}
_ => unreachable!("Data catalog search should only return Text type"),
} }
_ => unreachable!("Data catalog search should only return Text type"),
} }
}
// Fall back to existing logic for full search results // Fall back to existing logic for full search results
let data_catalog_result = match serde_json::from_str::<SearchDataCatalogOutput>(&content) { let data_catalog_result = match serde_json::from_str::<SearchDataCatalogOutput>(&content) {
Ok(result) => result, Ok(result) => result,
Err(e) => {
println!("Failed to parse SearchDataCatalogOutput: {:?}", e);
return Ok(vec![]);
}
};
let duration = (data_catalog_result.duration as f64 / 1000.0 * 10.0).round() / 10.0;
let result_count = data_catalog_result.results.len();
let query_params = data_catalog_result.search_requirements.clone();
let thought_pill_containers =
match proccess_data_catalog_search_results(data_catalog_result) {
Ok(containers) => containers,
Err(e) => { Err(e) => {
println!("Failed to parse SearchDataCatalogOutput: {:?}", e); println!("Failed to process data catalog search results: {:?}", e);
return Ok(vec![]); return Ok(vec![]);
} }
}; };
let duration = (data_catalog_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; let thought = if result_count > 0 {
let result_count = data_catalog_result.results.len(); BusterReasoningMessage::Pill(BusterReasoningPill {
let query_params = data_catalog_result.search_requirements.clone(); id: id.clone(),
thought_type: "thought".to_string(),
let thought_pill_containers = title: format!("Found {} results", result_count),
match proccess_data_catalog_search_results(data_catalog_result) { secondary_title: format!("{} seconds", duration),
Ok(containers) => containers, pill_containers: Some(thought_pill_containers),
Err(e) => { status: "completed".to_string(),
println!("Failed to process data catalog search results: {:?}", e); })
return Ok(vec![]);
}
};
let thought = if result_count > 0 {
BusterReasoningMessage::Pill(BusterReasoningPill {
id: id.clone(),
thought_type: "thought".to_string(),
title: format!("Found {} results", result_count),
secondary_title: format!("{} seconds", duration),
pill_containers: Some(thought_pill_containers),
status: "completed".to_string(),
})
} else {
BusterReasoningMessage::Pill(BusterReasoningPill {
id: id.clone(),
thought_type: "thought".to_string(),
title: "No data catalog items found".to_string(),
secondary_title: format!("{} seconds", duration),
pill_containers: Some(vec![BusterThoughtPillContainer {
title: "No results found".to_string(),
pills: vec![BusterThoughtPill {
id: "".to_string(),
text: query_params,
thought_file_type: "empty".to_string(),
}],
}]),
status: "completed".to_string(),
})
};
Ok(vec![thought])
} else { } else {
Ok(vec![]) // Skip in-progress messages BusterReasoningMessage::Pill(BusterReasoningPill {
} id: id.clone(),
thought_type: "thought".to_string(),
title: "No data catalog items found".to_string(),
secondary_title: format!("{} seconds", duration),
pill_containers: Some(vec![BusterThoughtPillContainer {
title: "No results found".to_string(),
pills: vec![BusterThoughtPill {
id: "".to_string(),
text: query_params,
thought_file_type: "empty".to_string(),
}],
}]),
status: "completed".to_string(),
})
};
Ok(vec![thought])
} }
fn assistant_create_metrics( fn assistant_create_metrics(
@ -1154,8 +1157,8 @@ fn assistant_create_metrics(
initial: bool, initial: bool,
) -> Result<Vec<BusterReasoningMessage>> { ) -> Result<Vec<BusterReasoningMessage>> {
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
// Process both in-progress and complete messages for metrics
match parser.process_chunk(id.clone(), &content) { match parser.process_metric_chunk(id.clone(), &content) {
Ok(Some(message)) => { Ok(Some(message)) => {
let status = match progress { let status = match progress {
MessageProgress::InProgress => "in_progress", MessageProgress::InProgress => "in_progress",
@ -1184,8 +1187,8 @@ fn assistant_modify_metrics(
initial: bool, initial: bool,
) -> Result<Vec<BusterReasoningMessage>> { ) -> Result<Vec<BusterReasoningMessage>> {
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
// Process both in-progress and complete messages for metrics
match parser.process_chunk(id.clone(), &content) { match parser.process_metric_chunk(id.clone(), &content) {
Ok(Some(message)) => { Ok(Some(message)) => {
let status = match progress { let status = match progress {
MessageProgress::InProgress => "in_progress", MessageProgress::InProgress => "in_progress",
@ -1213,8 +1216,8 @@ fn assistant_create_dashboards(
initial: bool, initial: bool,
) -> Result<Vec<BusterReasoningMessage>> { ) -> Result<Vec<BusterReasoningMessage>> {
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
// Process both in-progress and complete messages for dashboards
match parser.process_chunk(id.clone(), &content) { match parser.process_dashboard_chunk(id.clone(), &content) {
Ok(Some(message)) => { Ok(Some(message)) => {
let status = match progress { let status = match progress {
MessageProgress::InProgress => "in_progress", MessageProgress::InProgress => "in_progress",
@ -1243,8 +1246,8 @@ fn assistant_modify_dashboards(
initial: bool, initial: bool,
) -> Result<Vec<BusterReasoningMessage>> { ) -> Result<Vec<BusterReasoningMessage>> {
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
// Process both in-progress and complete messages for dashboards
match parser.process_chunk(id.clone(), &content) { match parser.process_dashboard_chunk(id.clone(), &content) {
Ok(Some(message)) => { Ok(Some(message)) => {
let status = match progress { let status = match progress {
MessageProgress::InProgress => "in_progress", MessageProgress::InProgress => "in_progress",
@ -1273,8 +1276,9 @@ fn assistant_create_plan(
initial: bool, initial: bool,
) -> Result<Vec<BusterReasoningMessage>> { ) -> Result<Vec<BusterReasoningMessage>> {
let mut parser = StreamingParser::new(); let mut parser = StreamingParser::new();
// Process both in-progress and complete messages for plan creation // Process both in-progress and complete messages for plan creation
match parser.process_chunk(id.clone(), &content) { match parser.process_plan_chunk(id.clone(), &content) {
Ok(Some(message)) => { Ok(Some(message)) => {
let status = match progress { let status = match progress {
MessageProgress::InProgress => "in_progress", MessageProgress::InProgress => "in_progress",

View File

@ -20,27 +20,50 @@ impl StreamingParser {
} }
} }
// Main entry point to process chunks based on type // Clear the buffer - useful when reusing the parser for different content formats
pub fn clear_buffer(&mut self) {
self.buffer.clear();
}
// Main entry point to process chunks based on type (kept for backward compatibility)
pub fn process_chunk(&mut self, id: String, chunk: &str) -> Result<Option<BusterReasoningMessage>> { pub fn process_chunk(&mut self, id: String, chunk: &str) -> Result<Option<BusterReasoningMessage>> {
// Add new chunk to buffer // Add new chunk to buffer
self.buffer.push_str(chunk); self.buffer.push_str(chunk);
// Try to process as a plan first // Try to process as a plan first
if let Some(plan) = self.process_plan_chunk(id.clone())? { let mut plan_parser = StreamingParser::new();
if let Some(plan) = plan_parser.process_plan_chunk(id.clone(), chunk)? {
return Ok(Some(plan)); return Ok(Some(plan));
} }
// Then try to process as search data catalog // Then try to process as search data catalog
if let Some(search) = self.process_search_catalog_chunk(id.clone())? { let mut search_parser = StreamingParser::new();
if let Some(search) = search_parser.process_search_data_catalog_chunk(id.clone(), chunk)? {
return Ok(Some(search)); return Ok(Some(search));
} }
// Finally try to process as a file // Try to process as a metric
self.process_file_chunk(id) let mut metric_parser = StreamingParser::new();
if let Some(metric) = metric_parser.process_metric_chunk(id.clone(), chunk)? {
return Ok(Some(metric));
}
// Try to process as a dashboard
let mut dashboard_parser = StreamingParser::new();
if let Some(dashboard) = dashboard_parser.process_dashboard_chunk(id.clone(), chunk)? {
return Ok(Some(dashboard));
}
// Finally try to process as a general file
self.process_file_data(id)
} }
// Process chunks meant for plan creation // Process chunks meant for plan creation
pub fn process_plan_chunk(&mut self, id: String) -> Result<Option<BusterReasoningMessage>> { pub fn process_plan_chunk(&mut self, id: String, chunk: &str) -> Result<Option<BusterReasoningMessage>> {
// Clear buffer and add new chunk
self.clear_buffer();
self.buffer.push_str(chunk);
// Complete any incomplete JSON structure // Complete any incomplete JSON structure
let processed_json = self.complete_json_structure(self.buffer.clone()); let processed_json = self.complete_json_structure(self.buffer.clone());
@ -65,7 +88,11 @@ impl StreamingParser {
} }
// Process chunks meant for search data catalog // Process chunks meant for search data catalog
pub fn process_search_catalog_chunk(&mut self, id: String) -> Result<Option<BusterReasoningMessage>> { pub fn process_search_data_catalog_chunk(&mut self, id: String, chunk: &str) -> Result<Option<BusterReasoningMessage>> {
// Clear buffer and add new chunk
self.clear_buffer();
self.buffer.push_str(chunk);
// Complete any incomplete JSON structure // Complete any incomplete JSON structure
let processed_json = self.complete_json_structure(self.buffer.clone()); let processed_json = self.complete_json_structure(self.buffer.clone());
@ -89,8 +116,46 @@ impl StreamingParser {
Ok(None) Ok(None)
} }
// Process chunks meant for files (original implementation) // Process chunks meant for metric files
pub fn process_file_chunk(&mut self, id: String) -> Result<Option<BusterReasoningMessage>> { pub fn process_metric_chunk(&mut self, id: String, chunk: &str) -> Result<Option<BusterReasoningMessage>> {
// Clear buffer and add new chunk
self.clear_buffer();
self.buffer.push_str(chunk);
// Process the buffer as a file
let processed_file = self.process_file_data(id.clone())?;
// Check if it's a metric file
if let Some(BusterReasoningMessage::File(file)) = processed_file {
if file.file_type == "metric" || file.file_name.ends_with(".metric.yml") {
return Ok(Some(BusterReasoningMessage::File(file)));
}
}
Ok(None)
}
// Process chunks meant for dashboard files
pub fn process_dashboard_chunk(&mut self, id: String, chunk: &str) -> Result<Option<BusterReasoningMessage>> {
// Clear buffer and add new chunk
self.clear_buffer();
self.buffer.push_str(chunk);
// Process the buffer as a file
let processed_file = self.process_file_data(id.clone())?;
// Check if it's a dashboard file
if let Some(BusterReasoningMessage::File(file)) = processed_file {
if file.file_type == "dashboard" || file.file_name.ends_with(".dashboard.yml") {
return Ok(Some(BusterReasoningMessage::File(file)));
}
}
Ok(None)
}
// Internal function to process file data (shared by metric and dashboard processing)
pub fn process_file_data(&mut self, id: String) -> Result<Option<BusterReasoningMessage>> {
// Extract and replace yml_content with placeholders // Extract and replace yml_content with placeholders
let mut yml_contents = Vec::new(); let mut yml_contents = Vec::new();
let mut positions = Vec::new(); let mut positions = Vec::new();