streaming queries

This commit is contained in:
dal 2025-04-24 11:49:47 -06:00
parent 469fd620a3
commit c1f35d6144
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
2 changed files with 118 additions and 40 deletions

View File

@ -2281,29 +2281,116 @@ fn transform_assistant_tool_message(
// --- Handle Search Tool (Simple Text) ---
"search_data_catalog" => {
// This tool doesn't stream complex arguments, just signals searching.
// Only send the message once when arguments start appearing.
if let Some(_args) = parser.process_search_data_catalog_chunk(tool_id.clone(), &tool_call.function.arguments) {
if tracker.get_complete_text(tool_id.clone()).is_none() { // Send only once
let search_msg = BusterReasoningMessage::Text(BusterReasoningText {
id: tool_id.clone(),
reasoning_type: "text".to_string(),
title: "Searching your data catalog...".to_string(),
secondary_title: "".to_string(),
message: None,
message_chunk: None,
status: Some("loading".to_string()),
finished_reasoning: false,
});
all_results.push(ToolTransformResult::Reasoning(search_msg));
// Use tracker to mark that we've sent the initial message for this ID
tracker.add_chunk(tool_id.clone(), "searching_sent".to_string());
}
// --- Handle InProgress: Send initial message once ---
if progress == MessageProgress::InProgress {
// Use tracker to ensure this initial message is sent only once per tool call ID
let tracker_key = format!("{}_initial_search", tool_id);
if tracker.get_complete_text(tracker_key.clone()).is_none() {
let initial_search_msg = BusterReasoningMessage::Text(BusterReasoningText {
id: tool_id.clone(), // Use the tool call ID for the message
reasoning_type: "text".to_string(),
title: "Searching data catalog".to_string(),
secondary_title: "".to_string(),
message: None,
message_chunk: None,
status: Some("loading".to_string()),
finished_reasoning: false,
});
all_results.push(ToolTransformResult::Reasoning(initial_search_msg));
// Mark that we've sent the initial message for this ID
tracker.add_chunk(tracker_key, "initial_sent".to_string());
}
}
if progress == MessageProgress::Complete {
// The result from transform_tool_message will show final status and duration.
// Clear the tracker entry used for the initial "searching..." message.
tracker.clear_chunk(tool_id.clone());
// --- Handle Complete: Update message with queries ---
else if progress == MessageProgress::Complete {
// Define struct to parse final arguments
#[derive(Deserialize)]
struct SearchArgs {
specific_queries: Option<Vec<String>>,
exploratory_topics: Option<Vec<String>>,
}
// Parse the complete arguments string
match serde_json::from_str::<SearchArgs>(&tool_call.function.arguments) {
Ok(args) => {
let queries = args.specific_queries.unwrap_or_default();
let topics = args.exploratory_topics.unwrap_or_default();
println!("queries: {:?}", queries);
println!("topics: {:?}", topics);
if !queries.is_empty() || !topics.is_empty() {
let mut message_parts = Vec::new();
if !queries.is_empty() {
message_parts.push("Specific Queries:".to_string());
message_parts.extend(queries.iter().map(|q| format!("- {}", q)));
}
if !topics.is_empty() {
if !message_parts.is_empty() { message_parts.push("".to_string()); } // Add separator
message_parts.push("Exploratory Topics:".to_string());
message_parts.extend(topics.iter().map(|t| format!("- {}", t)));
}
// Create the UPDATED reasoning message with the full query list
// Use the SAME tool_id as the initial message
let mut pill_containers = Vec::new();
if !queries.is_empty() {
pill_containers.push(BusterThoughtPillContainer {
title: "Specific Queries".to_string(),
pills: queries.iter().enumerate().map(|(i, q)| BusterThoughtPill {
// Use a combination of tool_id and index for a unique pill id
id: format!("{}_query_{}", tool_id, i),
text: q.clone(),
thought_file_type: "query".to_string(),
}).collect(),
});
}
if !topics.is_empty() {
pill_containers.push(BusterThoughtPillContainer {
title: "Exploratory Topics".to_string(),
pills: topics.iter().enumerate().map(|(i, t)| BusterThoughtPill {
// Use a combination of tool_id and index for a unique pill id
id: format!("{}_topic_{}", tool_id, i),
text: t.clone(),
thought_file_type: "topic".to_string(),
}).collect(),
});
}
// Create the Pill reasoning message
let updated_search_msg = BusterReasoningMessage::Pill(BusterReasoningPill {
id: tool_id.clone(),
thought_type: "pills".to_string(), // Correct type
title: "Searching data catalog".to_string(), // Keep title simple
secondary_title: "".to_string(), // Duration added by tool result later
pill_containers: Some(pill_containers), // Use the generated containers
status: "loading".to_string(), // Still loading until tool result comes back
});
all_results.push(ToolTransformResult::Reasoning(updated_search_msg));
} else {
// If parsing succeeds but queries/topics are empty,
// we might want to keep the initial simple "Searching..." message.
// Currently, this branch does nothing, leaving the initial message as is.
tracing::debug!("Search arguments parsed but no queries or topics found for tool_id: {}", tool_id);
}
}
Err(e) => {
tracing::warn!(
"Failed to parse final search_data_catalog arguments for {}: {}. Args: {}",
tool_id,
e,
tool_call.function.arguments
);
// If parsing fails, the initial "Searching..." message remains.
// Optionally create an error reasoning message here.
}
}
// Clear the tracker flag used for the initial message
let tracker_key = format!("{}_initial_search", tool_id);
tracker.clear_chunk(tracker_key);
}
}
@ -3085,7 +3172,7 @@ async fn apply_file_filtering_rules(
// return the context dashboard (unmodified info) followed by the other processed assets.
tracing::debug!("Context dashboard {} was NOT modified this turn (only metrics). Prepending context info.", ctx_id);
Ok(vec![context_dashboard_info] // Use the fetched (unmodified) context info
.into_iter()
.into_iter()
.chain(new_filtered_assets.into_iter())
.collect())
}
@ -3166,7 +3253,7 @@ fn process_current_turn_files(
.map_or(false, |uuid| unreferenced_metric_uuids.contains(&uuid))
})
.cloned()
.collect();
.collect();
// Return unreferenced metrics first, then dashboards
let mut combined = unreferenced_metrics;

View File

@ -1,6 +1,7 @@
use agents::tools::categories::file_tools::common::generate_deterministic_uuid;
use anyhow::Result;
use serde_json::Value;
use serde::Deserialize;
use super::post_chat_handler::{
BusterFile, BusterFileContent, BusterReasoningFile, BusterReasoningMessage, BusterReasoningText,
@ -62,22 +63,12 @@ impl StreamingParser {
pub fn process_search_data_catalog_chunk(
&mut self,
_id: String,
chunk: &str,
) -> Option<Value> {
self.clear_buffer();
self.buffer.push_str(chunk);
let processed_json = self.complete_json_structure(self.buffer.clone());
// Try to parse arguments, return Some(Value) if successful and looks like search args
if let Ok(value) = serde_json::from_str::<Value>(&processed_json) {
if value.get("queries").is_some() {
return Some(value);
}
}
// If the start of queries is not detected, return None
None
_chunk: &str,
) -> Result<Option<BusterReasoningMessage>> {
// We no longer process streaming search args here.
// Logic moved to post_chat_handler on message completion.
self.clear_buffer(); // Clear buffer in case it was used
Ok(None) // Always return None during streaming for this tool now
}
// Process chunks meant for metric files