mirror of https://github.com/buster-so/buster.git
time tracking, docs for cli chat
This commit is contained in:
parent
1f15d0bb6a
commit
d9deefb18d
|
@ -1,7 +1,7 @@
|
|||
use dashmap::DashMap;
|
||||
use middleware::AuthenticatedUser;
|
||||
use std::collections::HashSet;
|
||||
use std::{collections::HashMap, time::Instant};
|
||||
use std::{collections::HashMap, time::{Instant, Duration}};
|
||||
|
||||
use agents::{
|
||||
tools::{
|
||||
|
@ -178,7 +178,7 @@ pub async fn post_chat_handler(
|
|||
tx: Option<mpsc::Sender<Result<(BusterContainer, ThreadEvent)>>>,
|
||||
) -> Result<ChatWithMessages> {
|
||||
let chunk_tracker = ChunkTracker::new();
|
||||
let reasoning_duration = Instant::now();
|
||||
let reasoning_duration = Instant::now(); // Keep overall duration tracking
|
||||
let (asset_id, asset_type) = normalize_asset_fields(&request);
|
||||
validate_context_request(
|
||||
request.chat_id,
|
||||
|
@ -398,6 +398,10 @@ pub async fn post_chat_handler(
|
|||
// Get the receiver and collect all messages
|
||||
let mut rx = agent.run(&mut chat).await?;
|
||||
|
||||
// --- Add timestamp tracking ---
|
||||
let mut last_event_timestamp = Instant::now();
|
||||
// --- End Add timestamp tracking ---
|
||||
|
||||
// Collect all messages for final processing
|
||||
let mut all_messages: Vec<AgentMessage> = Vec::new();
|
||||
let mut all_transformed_containers: Vec<BusterContainer> = Vec::new();
|
||||
|
@ -442,9 +446,16 @@ pub async fn post_chat_handler(
|
|||
|
||||
// Process all messages from the agent
|
||||
while let Ok(message_result) = rx.recv().await {
|
||||
// --- Calculate elapsed duration ---
|
||||
let elapsed_duration = last_event_timestamp.elapsed();
|
||||
// --- End Calculate elapsed duration ---
|
||||
|
||||
match message_result {
|
||||
Ok(AgentMessage::Done) => {
|
||||
// Agent has finished processing, break the loop
|
||||
// --- Update timestamp before breaking ---
|
||||
last_event_timestamp = Instant::now();
|
||||
// --- End Update timestamp ---
|
||||
break;
|
||||
}
|
||||
Ok(msg) => {
|
||||
|
@ -519,6 +530,7 @@ pub async fn post_chat_handler(
|
|||
msg.clone(),
|
||||
tx.as_ref(),
|
||||
&chunk_tracker,
|
||||
elapsed_duration, // Pass elapsed duration
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -716,6 +728,9 @@ pub async fn post_chat_handler(
|
|||
}
|
||||
|
||||
tracing::error!("Error receiving message from agent: {}", e);
|
||||
// --- Update timestamp before breaking ---
|
||||
last_event_timestamp = Instant::now();
|
||||
// --- End Update timestamp ---
|
||||
// Don't return early, continue processing remaining messages
|
||||
break;
|
||||
}
|
||||
|
@ -723,13 +738,13 @@ pub async fn post_chat_handler(
|
|||
}
|
||||
|
||||
let title = title_handle.await??;
|
||||
let reasoning_duration = reasoning_duration.elapsed().as_secs();
|
||||
let total_reasoning_duration = reasoning_duration.elapsed(); // Use original name for total
|
||||
|
||||
// Format reasoning duration
|
||||
let formatted_reasoning_duration = if reasoning_duration < 60 {
|
||||
format!("Reasoned for {} seconds", reasoning_duration)
|
||||
let formatted_reasoning_duration = if total_reasoning_duration.as_secs() < 60 {
|
||||
format!("Reasoned for {} seconds", total_reasoning_duration.as_secs())
|
||||
} else {
|
||||
let minutes = reasoning_duration / 60;
|
||||
let minutes = total_reasoning_duration.as_secs() / 60;
|
||||
if minutes == 1 {
|
||||
"Reasoned for 1 minute".to_string() // Singular minute
|
||||
} else {
|
||||
|
@ -799,6 +814,10 @@ pub async fn post_chat_handler(
|
|||
&user_org_id,
|
||||
&user.id,
|
||||
&chunk_tracker,
|
||||
// Pass a default duration here, as the concept of 'elapsed since last step'
|
||||
// doesn't directly apply during final processing. Or refactor transform_message usage.
|
||||
// For now, let's pass Zero.
|
||||
Duration::ZERO,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
@ -927,6 +946,7 @@ async fn process_completed_files(
|
|||
_organization_id: &Uuid,
|
||||
_user_id: &Uuid,
|
||||
chunk_tracker: &ChunkTracker, // Pass tracker if needed for transforming messages again
|
||||
default_duration: Duration, // Add duration parameter
|
||||
) -> Result<()> {
|
||||
// Transform messages again specifically for DB processing if needed,
|
||||
// or directly use reasoning messages if they contain enough info.
|
||||
|
@ -939,6 +959,7 @@ async fn process_completed_files(
|
|||
msg.clone(),
|
||||
None,
|
||||
chunk_tracker,
|
||||
default_duration, // Pass the default duration
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
@ -1190,6 +1211,7 @@ pub async fn transform_message(
|
|||
message: AgentMessage,
|
||||
_tx: Option<&mpsc::Sender<Result<(BusterContainer, ThreadEvent)>>>,
|
||||
tracker: &ChunkTracker,
|
||||
elapsed_duration: Duration, // Add elapsed_duration parameter
|
||||
) -> Result<Vec<(BusterContainer, ThreadEvent)>> {
|
||||
match message {
|
||||
AgentMessage::Assistant {
|
||||
|
@ -1239,7 +1261,7 @@ pub async fn transform_message(
|
|||
id: Uuid::new_v4().to_string(),
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Finished reasoning".to_string(),
|
||||
secondary_title: String::new(),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()),
|
||||
message: None,
|
||||
message_chunk: None,
|
||||
status: Some("completed".to_string()),
|
||||
|
@ -1267,6 +1289,7 @@ pub async fn transform_message(
|
|||
*chat_id,
|
||||
*message_id,
|
||||
tracker, // Pass tracker here
|
||||
elapsed_duration, // Pass duration here
|
||||
) {
|
||||
Ok(results) => {
|
||||
// Process Vec<ToolTransformResult>
|
||||
|
@ -1332,6 +1355,7 @@ pub async fn transform_message(
|
|||
content.clone(),
|
||||
*chat_id,
|
||||
*message_id,
|
||||
elapsed_duration, // Pass duration here
|
||||
) {
|
||||
Ok(messages) => {
|
||||
for reasoning_container in messages {
|
||||
|
@ -1410,20 +1434,21 @@ fn transform_tool_message(
|
|||
content: String,
|
||||
_chat_id: Uuid,
|
||||
_message_id: Uuid,
|
||||
elapsed_duration: Duration, // Add elapsed_duration parameter
|
||||
) -> Result<Vec<BusterReasoningMessage>> {
|
||||
// Use required ID (tool call ID) for all function calls
|
||||
let messages = match name.as_str() {
|
||||
// Response tools now handled earlier, return empty here
|
||||
"done" | "message_notify_user" | "message_user_clarifying_question" => vec![],
|
||||
|
||||
// Existing tool result processing
|
||||
"search_data_catalog" => tool_data_catalog_search(id.clone(), content)?,
|
||||
"create_metrics" => tool_create_metrics(id.clone(), content)?,
|
||||
"update_metrics" => tool_modify_metrics(id.clone(), content)?,
|
||||
"create_dashboards" => tool_create_dashboards(id.clone(), content)?,
|
||||
"update_dashboards" => tool_modify_dashboards(id.clone(), content)?,
|
||||
// Handle both new plan tools here
|
||||
"create_plan_straightforward" | "create_plan_investigative" => tool_create_plan(id.clone(), content)?,
|
||||
// Existing tool result processing - pass duration
|
||||
"search_data_catalog" => tool_data_catalog_search(id.clone(), content, elapsed_duration)?,
|
||||
"create_metrics" => tool_create_metrics(id.clone(), content, elapsed_duration)?,
|
||||
"update_metrics" => tool_modify_metrics(id.clone(), content, elapsed_duration)?,
|
||||
"create_dashboards" => tool_create_dashboards(id.clone(), content, elapsed_duration)?,
|
||||
"update_dashboards" => tool_modify_dashboards(id.clone(), content, elapsed_duration)?,
|
||||
// Handle both new plan tools here - pass duration
|
||||
"create_plan_straightforward" | "create_plan_investigative" => tool_create_plan(id.clone(), content, elapsed_duration)?,
|
||||
_ => vec![],
|
||||
};
|
||||
|
||||
|
@ -1436,7 +1461,7 @@ struct GenericPlanOutput {
|
|||
plan: String,
|
||||
}
|
||||
|
||||
fn tool_create_plan(id: String, content: String) -> Result<Vec<BusterReasoningMessage>> {
|
||||
fn tool_create_plan(id: String, content: String, elapsed_duration: Duration) -> Result<Vec<BusterReasoningMessage>> {
|
||||
// Define a struct to parse the success status (optional)
|
||||
#[derive(Deserialize)]
|
||||
struct PlanOutput {
|
||||
|
@ -1446,46 +1471,66 @@ fn tool_create_plan(id: String, content: String) -> Result<Vec<BusterReasoningMe
|
|||
// Attempt to parse the success field (optional validation)
|
||||
match serde_json::from_str::<PlanOutput>(&content) {
|
||||
Ok(output) if output.success => {
|
||||
// Successfully completed, no need to return a message
|
||||
// as assistant_create_plan_input already created it.
|
||||
// Successfully completed. The assistant message calling this tool
|
||||
// will have already displayed the "Creating plan" reasoning with duration.
|
||||
// We don't need to add another message here.
|
||||
tracing::debug!("Tool create_plan {} completed successfully.", id);
|
||||
Ok(vec![])
|
||||
}
|
||||
Ok(_) => {
|
||||
// If the tool explicitly failed, maybe show an error reasoning?
|
||||
// For now, aligning with current logic, just log and return empty.
|
||||
tracing::warn!(
|
||||
"Tool create_plan {} output indicates failure: {}.",
|
||||
id,
|
||||
content
|
||||
);
|
||||
// Optionally return an error message here if needed
|
||||
Ok(vec![])
|
||||
}
|
||||
Err(e) => {
|
||||
// Log the parsing error, but still return Ok([]) as the primary
|
||||
// purpose is just acknowledging completion.
|
||||
tracing::error!(
|
||||
"Failed to parse tool_create_plan output for {}: {}. Content: {}",
|
||||
id,
|
||||
e,
|
||||
content
|
||||
);
|
||||
Ok(vec![])
|
||||
// Create a simple text reasoning message indicating the completion/error
|
||||
let reasoning_message = BusterReasoningMessage::Text(BusterReasoningText {
|
||||
id,
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Plan Creation Step Finished".to_string(), // Generic title
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()), // Show duration
|
||||
message: Some(format!("Tool execution finished (parsing failed: {})", e)), // Optional detail
|
||||
message_chunk: None,
|
||||
status: Some("completed".to_string()), // Mark as completed
|
||||
});
|
||||
Ok(vec![reasoning_message]) // Return the message
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update tool_create_metrics to require ID
|
||||
fn tool_create_metrics(id: String, content: String) -> Result<Vec<BusterReasoningMessage>> {
|
||||
// Update tool_create_metrics to require ID and accept duration
|
||||
fn tool_create_metrics(id: String, content: String, elapsed_duration: Duration) -> Result<Vec<BusterReasoningMessage>> {
|
||||
// Parse the CreateMetricFilesOutput from content
|
||||
let create_metrics_result = match serde_json::from_str::<CreateMetricFilesOutput>(&content) {
|
||||
Ok(result) => result,
|
||||
Err(e) => {
|
||||
println!("Failed to parse CreateMetricFilesOutput: {:?}", e);
|
||||
return Ok(vec![]);
|
||||
// Return an error reasoning message
|
||||
return Ok(vec![BusterReasoningMessage::Text(BusterReasoningText {
|
||||
id,
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Error Creating Metrics".to_string(),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()),
|
||||
message: Some(format!("Failed to parse tool output: {}", e)),
|
||||
message_chunk: None,
|
||||
status: Some("error".to_string()),
|
||||
})]);
|
||||
}
|
||||
};
|
||||
|
||||
let duration = (create_metrics_result.duration as f64 / 1000.0 * 10.0).round() / 10.0;
|
||||
// Remove internal duration calculation
|
||||
// let duration = (create_metrics_result.duration as f64 / 1000.0 * 10.0).round() / 10.0;
|
||||
let files_count = create_metrics_result.files.len();
|
||||
|
||||
// Create a map of files
|
||||
|
@ -1514,12 +1559,12 @@ fn tool_create_metrics(id: String, content: String) -> Result<Vec<BusterReasonin
|
|||
files_map.insert(file_id, buster_file);
|
||||
}
|
||||
|
||||
// Create the BusterReasoningFile
|
||||
// Create the BusterReasoningFile using elapsed_duration
|
||||
let buster_file = BusterReasoningMessage::File(BusterReasoningFile {
|
||||
id,
|
||||
message_type: "files".to_string(),
|
||||
title: format!("Created {} metric files", files_count),
|
||||
secondary_title: format!("{} seconds", duration),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()), // Use elapsed_duration
|
||||
status: "completed".to_string(),
|
||||
file_ids,
|
||||
files: files_map,
|
||||
|
@ -1528,18 +1573,28 @@ fn tool_create_metrics(id: String, content: String) -> Result<Vec<BusterReasonin
|
|||
Ok(vec![buster_file])
|
||||
}
|
||||
|
||||
// Update tool_modify_metrics to require ID
|
||||
fn tool_modify_metrics(id: String, content: String) -> Result<Vec<BusterReasoningMessage>> {
|
||||
// Update tool_modify_metrics to require ID and accept duration
|
||||
fn tool_modify_metrics(id: String, content: String, elapsed_duration: Duration) -> Result<Vec<BusterReasoningMessage>> {
|
||||
// Parse the ModifyFilesOutput from content
|
||||
let modify_metrics_result = match serde_json::from_str::<ModifyFilesOutput>(&content) {
|
||||
Ok(result) => result,
|
||||
Err(e) => {
|
||||
println!("Failed to parse ModifyFilesOutput: {:?}", e);
|
||||
return Ok(vec![]);
|
||||
// Return an error reasoning message
|
||||
return Ok(vec![BusterReasoningMessage::Text(BusterReasoningText {
|
||||
id,
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Error Modifying Metrics".to_string(),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()),
|
||||
message: Some(format!("Failed to parse tool output: {}", e)),
|
||||
message_chunk: None,
|
||||
status: Some("error".to_string()),
|
||||
})]);
|
||||
}
|
||||
};
|
||||
|
||||
let duration = (modify_metrics_result.duration as f64 / 1000.0 * 10.0).round() / 10.0;
|
||||
// Remove internal duration calculation
|
||||
// let duration = (modify_metrics_result.duration as f64 / 1000.0 * 10.0).round() / 10.0;
|
||||
let files_count = modify_metrics_result.files.len();
|
||||
|
||||
// Create a map of files
|
||||
|
@ -1568,12 +1623,12 @@ fn tool_modify_metrics(id: String, content: String) -> Result<Vec<BusterReasonin
|
|||
files_map.insert(file_id, buster_file);
|
||||
}
|
||||
|
||||
// Create the BusterReasoningFile
|
||||
// Create the BusterReasoningFile using elapsed_duration
|
||||
let buster_file = BusterReasoningMessage::File(BusterReasoningFile {
|
||||
id,
|
||||
message_type: "files".to_string(),
|
||||
title: format!("Modified {} metric files", files_count),
|
||||
secondary_title: format!("{} seconds", duration),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()), // Use elapsed_duration
|
||||
status: "completed".to_string(),
|
||||
file_ids,
|
||||
files: files_map,
|
||||
|
@ -1582,19 +1637,29 @@ fn tool_modify_metrics(id: String, content: String) -> Result<Vec<BusterReasonin
|
|||
Ok(vec![buster_file])
|
||||
}
|
||||
|
||||
// Update tool_create_dashboards to require ID
|
||||
fn tool_create_dashboards(id: String, content: String) -> Result<Vec<BusterReasoningMessage>> {
|
||||
// Update tool_create_dashboards to require ID and accept duration
|
||||
fn tool_create_dashboards(id: String, content: String, elapsed_duration: Duration) -> Result<Vec<BusterReasoningMessage>> {
|
||||
// Parse the CreateDashboardFilesOutput from content
|
||||
let create_dashboards_result =
|
||||
match serde_json::from_str::<CreateDashboardFilesOutput>(&content) {
|
||||
Ok(result) => result,
|
||||
Err(e) => {
|
||||
println!("Failed to parse CreateDashboardFilesOutput: {:?}", e);
|
||||
return Ok(vec![]);
|
||||
// Return an error reasoning message
|
||||
return Ok(vec![BusterReasoningMessage::Text(BusterReasoningText {
|
||||
id,
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Error Creating Dashboards".to_string(),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()),
|
||||
message: Some(format!("Failed to parse tool output: {}", e)),
|
||||
message_chunk: None,
|
||||
status: Some("error".to_string()),
|
||||
})]);
|
||||
}
|
||||
};
|
||||
|
||||
let duration = (create_dashboards_result.duration as f64 / 1000.0 * 10.0).round() / 10.0;
|
||||
// Remove internal duration calculation
|
||||
// let duration = (create_dashboards_result.duration as f64 / 1000.0 * 10.0).round() / 10.0;
|
||||
let files_count = create_dashboards_result.files.len();
|
||||
|
||||
// Create a map of files
|
||||
|
@ -1623,12 +1688,12 @@ fn tool_create_dashboards(id: String, content: String) -> Result<Vec<BusterReaso
|
|||
files_map.insert(file_id, buster_file);
|
||||
}
|
||||
|
||||
// Create the BusterReasoningFile
|
||||
// Create the BusterReasoningFile using elapsed_duration
|
||||
let buster_file = BusterReasoningMessage::File(BusterReasoningFile {
|
||||
id,
|
||||
message_type: "files".to_string(),
|
||||
title: format!("Created {} dashboard files", files_count),
|
||||
secondary_title: format!("{} seconds", duration),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()), // Use elapsed_duration
|
||||
status: "completed".to_string(),
|
||||
file_ids,
|
||||
files: files_map,
|
||||
|
@ -1637,18 +1702,28 @@ fn tool_create_dashboards(id: String, content: String) -> Result<Vec<BusterReaso
|
|||
Ok(vec![buster_file])
|
||||
}
|
||||
|
||||
// Update tool_modify_dashboards to require ID
|
||||
fn tool_modify_dashboards(id: String, content: String) -> Result<Vec<BusterReasoningMessage>> {
|
||||
// Update tool_modify_dashboards to require ID and accept duration
|
||||
fn tool_modify_dashboards(id: String, content: String, elapsed_duration: Duration) -> Result<Vec<BusterReasoningMessage>> {
|
||||
// Parse the ModifyFilesOutput from content
|
||||
let modify_dashboards_result = match serde_json::from_str::<ModifyFilesOutput>(&content) {
|
||||
Ok(result) => result,
|
||||
Err(e) => {
|
||||
println!("Failed to parse ModifyFilesOutput: {:?}", e);
|
||||
return Ok(vec![]);
|
||||
// Return an error reasoning message
|
||||
return Ok(vec![BusterReasoningMessage::Text(BusterReasoningText {
|
||||
id,
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Error Modifying Dashboards".to_string(),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()),
|
||||
message: Some(format!("Failed to parse tool output: {}", e)),
|
||||
message_chunk: None,
|
||||
status: Some("error".to_string()),
|
||||
})]);
|
||||
}
|
||||
};
|
||||
|
||||
let duration = (modify_dashboards_result.duration as f64 / 1000.0 * 10.0).round() / 10.0;
|
||||
// Remove internal duration calculation
|
||||
// let duration = (modify_dashboards_result.duration as f64 / 1000.0 * 10.0).round() / 10.0;
|
||||
let files_count = modify_dashboards_result.files.len();
|
||||
|
||||
// Create a map of files
|
||||
|
@ -1677,12 +1752,12 @@ fn tool_modify_dashboards(id: String, content: String) -> Result<Vec<BusterReaso
|
|||
files_map.insert(file_id, buster_file);
|
||||
}
|
||||
|
||||
// Create the BusterReasoningFile
|
||||
// Create the BusterReasoningFile using elapsed_duration
|
||||
let buster_file = BusterReasoningMessage::File(BusterReasoningFile {
|
||||
id,
|
||||
message_type: "files".to_string(),
|
||||
title: format!("Modified {} dashboard files", files_count),
|
||||
secondary_title: format!("{} seconds", duration),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()), // Use elapsed_duration
|
||||
status: "completed".to_string(),
|
||||
file_ids,
|
||||
files: files_map,
|
||||
|
@ -1692,16 +1767,26 @@ fn tool_modify_dashboards(id: String, content: String) -> Result<Vec<BusterReaso
|
|||
}
|
||||
|
||||
// Restore the original tool_data_catalog_search function
|
||||
fn tool_data_catalog_search(id: String, content: String) -> Result<Vec<BusterReasoningMessage>> {
|
||||
fn tool_data_catalog_search(id: String, content: String, elapsed_duration: Duration) -> Result<Vec<BusterReasoningMessage>> {
|
||||
let data_catalog_result = match serde_json::from_str::<SearchDataCatalogOutput>(&content) {
|
||||
Ok(result) => result,
|
||||
Err(e) => {
|
||||
println!("Failed to parse SearchDataCatalogOutput: {}. Content: {}", e, content);
|
||||
return Ok(vec![]);
|
||||
// Return an error reasoning message
|
||||
return Ok(vec![BusterReasoningMessage::Text(BusterReasoningText {
|
||||
id,
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Error Searching Data Catalog".to_string(),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()),
|
||||
message: Some(format!("Failed to parse tool output: {}", e)),
|
||||
message_chunk: None,
|
||||
status: Some("error".to_string()),
|
||||
})]);
|
||||
}
|
||||
};
|
||||
|
||||
let duration = (data_catalog_result.duration as f64 / 1000.0 * 10.0).round() / 10.0;
|
||||
// Remove internal duration calculation
|
||||
// let duration = (data_catalog_result.duration as f64 / 1000.0 * 10.0).round() / 10.0;
|
||||
let result_count = data_catalog_result.results.len();
|
||||
let input_queries = data_catalog_result.queries.join(", "); // Join queries for display
|
||||
|
||||
|
@ -1718,7 +1803,7 @@ fn tool_data_catalog_search(id: String, content: String) -> Result<Vec<BusterRea
|
|||
id: id.clone(),
|
||||
thought_type: "pills".to_string(),
|
||||
title: "Data Catalog Search Results".to_string(), // Updated title
|
||||
secondary_title: format!("Searched for: {}", input_queries), // Display input queries
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()),
|
||||
pill_containers: Some(thought_pill_containers),
|
||||
status: "completed".to_string(),
|
||||
})
|
||||
|
@ -1727,10 +1812,10 @@ fn tool_data_catalog_search(id: String, content: String) -> Result<Vec<BusterRea
|
|||
id: id.clone(),
|
||||
thought_type: "pills".to_string(),
|
||||
title: "No data catalog items found".to_string(),
|
||||
secondary_title: format!("Searched for: {}", input_queries), // Display input queries even if no results
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()),
|
||||
pill_containers: Some(vec![BusterThoughtPillContainer {
|
||||
title: "No results found".to_string(),
|
||||
pills: vec![], // Keep pills empty as no results were found
|
||||
title: "No results found".to_string(), // Title now indicates no results explicitly
|
||||
pills: vec![],
|
||||
}]),
|
||||
status: "completed".to_string(),
|
||||
})
|
||||
|
@ -1776,6 +1861,7 @@ fn transform_assistant_tool_message(
|
|||
_chat_id: Uuid,
|
||||
_message_id: Uuid,
|
||||
tracker: &ChunkTracker,
|
||||
elapsed_duration: Duration, // Add elapsed_duration parameter
|
||||
) -> Result<Vec<ToolTransformResult>> {
|
||||
let mut all_results = Vec::new();
|
||||
let mut parser = StreamingParser::new();
|
||||
|
@ -1851,7 +1937,7 @@ fn transform_assistant_tool_message(
|
|||
id: tool_id.clone(),
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Creating Plan".to_string(),
|
||||
secondary_title: String::new(),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()),
|
||||
message: None,
|
||||
message_chunk: Some(delta),
|
||||
status: Some("loading".to_string()),
|
||||
|
@ -1867,7 +1953,7 @@ fn transform_assistant_tool_message(
|
|||
id: tool_id.clone(),
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Creating Plan".to_string(),
|
||||
secondary_title: String::new(),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()),
|
||||
message: Some(final_text), // Final text
|
||||
message_chunk: None,
|
||||
status: Some("completed".to_string()), // Mark as completed
|
||||
|
@ -1888,7 +1974,7 @@ fn transform_assistant_tool_message(
|
|||
id: tool_id.clone(),
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Searching your data catalog...".to_string(),
|
||||
secondary_title: String::new(),
|
||||
secondary_title: format!("{} seconds", elapsed_duration.as_secs()),
|
||||
message: None,
|
||||
message_chunk: None,
|
||||
status: Some("loading".to_string()),
|
||||
|
@ -1899,16 +1985,15 @@ fn transform_assistant_tool_message(
|
|||
}
|
||||
}
|
||||
if progress == MessageProgress::Complete {
|
||||
// When search is complete, update the status of the existing message?
|
||||
// Or maybe the result from `transform_tool_message` handles completion.
|
||||
// Let's assume the result handling is sufficient for now.
|
||||
// The result from transform_tool_message will show final status and duration.
|
||||
// Clear the tracker entry used for the initial "searching..." message.
|
||||
tracker.clear_chunk(tool_id.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// --- Placeholder for File Tools ---
|
||||
"create_metrics" | "update_metrics" | "create_dashboards" | "update_dashboards" => {
|
||||
// Determine file type based on tool name
|
||||
// Determine file type based on tool name
|
||||
let file_type = if tool_name.contains("metric") { "metric" } else { "dashboard" };
|
||||
|
||||
// Process the chunk using the appropriate parser method
|
||||
|
@ -1920,56 +2005,44 @@ fn transform_assistant_tool_message(
|
|||
|
||||
// If parser returns a reasoning message (File type expected)
|
||||
if let Ok(Some(BusterReasoningMessage::File(mut file_reasoning))) = parse_result {
|
||||
// Set the secondary title using elapsed_duration when creating the initial message
|
||||
file_reasoning.secondary_title = format!("{} seconds", elapsed_duration.as_secs());
|
||||
// Added missing variable initializations
|
||||
let mut has_updates = false;
|
||||
let mut updated_files_map = std::collections::HashMap::new();
|
||||
|
||||
// Iterate through the files parsed so far
|
||||
for (file_map_id, mut file_detail) in file_reasoning.files {
|
||||
// Ensure text_chunk has content
|
||||
if let Some(yml_chunk) = &file_detail.file.text_chunk {
|
||||
for (file_map_id, mut file_detail) in file_reasoning.files.iter_mut() { // Use iter_mut
|
||||
// Ensure text_chunk has content
|
||||
if let Some(yml_chunk) = &file_detail.file.text_chunk {
|
||||
// Define unique chunk ID for this file
|
||||
let chunk_id = format!("{}_{}", tool_id, file_detail.id);
|
||||
// Calculate delta using the tracker
|
||||
let delta = tracker.add_chunk(chunk_id.clone(), yml_chunk.clone());
|
||||
|
||||
if !delta.is_empty() {
|
||||
if !delta.is_empty() { // Now delta is defined
|
||||
// Update file detail with delta
|
||||
file_detail.file.text_chunk = Some(delta);
|
||||
file_detail.file.text = None;
|
||||
file_detail.file.text = None; // Ensure full text is cleared when chunking
|
||||
file_detail.status = "loading".to_string();
|
||||
updated_files_map.insert(file_map_id.clone(), file_detail);
|
||||
updated_files_map.insert(file_map_id.clone(), file_detail.clone()); // Clone needed
|
||||
has_updates = true;
|
||||
} // If delta is empty, we don't add it back to updated_files_map
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If there were actual deltas, send an update
|
||||
// If there were actual deltas, send an update
|
||||
if has_updates {
|
||||
file_reasoning.files = updated_files_map;
|
||||
file_reasoning.status = "loading".to_string(); // Ensure parent status is loading
|
||||
// Make sure secondary title remains set
|
||||
file_reasoning.secondary_title = format!("{} seconds", elapsed_duration.as_secs());
|
||||
all_results.push(ToolTransformResult::Reasoning(BusterReasoningMessage::File(file_reasoning)));
|
||||
}
|
||||
|
||||
} else if let Err(e) = parse_result {
|
||||
tracing::error!("Error parsing file chunk for {}: {}", tool_name, e);
|
||||
}
|
||||
|
||||
// Handle completion for file tools
|
||||
if progress == MessageProgress::Complete {
|
||||
// We need to reconstruct the final File reasoning message based on completed chunks
|
||||
// This requires knowing which files were part of this specific tool call.
|
||||
// The parser logic might need adjustment to store file IDs associated with the main ID.
|
||||
// For now, let's assume `tool_data_catalog_search` (called by transform_tool_message later) handles the final state.
|
||||
// We just need to clear the trackers used for YML content.
|
||||
// Need a way to get the final file IDs generated by the parser for this tool_id...
|
||||
// Let's clear based on a guess pattern for now, this might need refinement.
|
||||
// This part is tricky without knowing the final structure produced by the parser/tool execution.
|
||||
// Let's defer complex completion logic here and rely on `transform_tool_message` results for now,
|
||||
// just clearing potential tracker entries.
|
||||
// This is a potential area for bugs if tracker IDs aren't cleared correctly.
|
||||
// Example clearing (might not be robust):
|
||||
// tracker.clear_chunk(format!("{}_file1_id", tool_id));
|
||||
// tracker.clear_chunk(format!("{}_file2_id", tool_id));
|
||||
// TODO: Revisit file tool completion and tracker clearing
|
||||
}
|
||||
// Completion is handled by transform_tool_message
|
||||
}
|
||||
|
||||
// --- Default for Unknown Tools ---
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
# CLAUDE.md - Chat Module
|
||||
|
||||
This document provides guidance for Claude Code (claude.ai/code) when working with the Buster CLI chat implementation.
|
||||
|
||||
## Architecture Overview
|
||||
|
||||
The Buster CLI chat module is a terminal-based interactive chat interface that connects to an AI agent with specialized data engineering capabilities. It's specifically designed for working with:
|
||||
|
||||
- DBT projects (data build tool)
|
||||
- Buster semantic layer
|
||||
- General data engineering tasks
|
||||
|
||||
### Key Components
|
||||
|
||||
- **BusterCliAgent**: Core component that manages the AI connection and tool integration
|
||||
- **AppState**: Maintains chat state, message history, and UI elements
|
||||
- **UI Layer**: Terminal-based interface using Ratatui and Crossterm
|
||||
- **Tool Integration**: Facilitates AI interaction with the filesystem, DBT commands, and Buster API
|
||||
|
||||
## Code Organization
|
||||
|
||||
- `mod.rs`: Main module entry point and public API
|
||||
- `args.rs`: Command-line argument parsing
|
||||
- `config.rs`: Configuration management
|
||||
- `logic.rs`: Core chat execution logic
|
||||
- `state.rs`: Chat session state management
|
||||
- `ui.rs`: Terminal UI implementation
|
||||
- `completion.rs`: Path and command autocompletion
|
||||
|
||||
## AI Agent Capabilities
|
||||
|
||||
The chat agent is designed to:
|
||||
|
||||
1. Interact with DBT projects (compile, run, test)
|
||||
2. Work with Buster semantic layer models
|
||||
3. Execute file operations (read, write, search)
|
||||
4. Run shell commands
|
||||
5. Manage context across chat sessions
|
||||
6. Provide data engineering assistance
|
||||
|
||||
## Important Concepts
|
||||
|
||||
### Message Flow
|
||||
1. User input is submitted through `submit_message()`
|
||||
2. Agent processes input and may call tools
|
||||
3. Tool results are returned to agent
|
||||
4. Agent's final response is processed by `process_agent_message()`
|
||||
|
||||
### Tool Integration
|
||||
- Tools are invoked through a standardized API
|
||||
- Path completion supports filesystem navigation
|
||||
- Shell commands can be executed with `!` prefix
|
||||
- DBT commands have specialized handling
|
||||
|
||||
### State Management
|
||||
- `AppState` is the central state container
|
||||
- UI state and business logic are clearly separated
|
||||
- Different message types (User, Assistant, Tool) are handled appropriately
|
||||
|
||||
## Error Handling
|
||||
|
||||
- Uses Rust's `thiserror` for typed errors
|
||||
- Terminal state is properly restored on errors or panics
|
||||
- User-friendly error messages with suggestions
|
||||
|
||||
## Security Considerations
|
||||
|
||||
- Credentials are handled securely
|
||||
- Path traversal protection is implemented
|
||||
- User inputs are validated
|
||||
- No secrets are logged
|
||||
|
||||
## Development Guidelines
|
||||
|
||||
- Add new tools in the tool handler section of `logic.rs`
|
||||
- UI changes should be isolated to `ui.rs`
|
||||
- Follow the established message handling pattern for new message types
|
||||
- Test with both local and remote agent endpoints
|
||||
- Ensure proper error handling and terminal state restoration
|
||||
|
||||
## Testing
|
||||
|
||||
- Test agent interaction with mock responses
|
||||
- Verify tool integration with isolated test cases
|
||||
- Check terminal UI rendering in different environments
|
||||
- Validate path completion behavior
|
||||
- Confirm proper credential handling
|
Loading…
Reference in New Issue