From d9deefb18df0175c0d56629c440e456ec20e5a5d Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 11 Apr 2025 07:34:57 -0600 Subject: [PATCH] time tracking, docs for cli chat --- .../handlers/src/chats/post_chat_handler.rs | 251 +++++++++++------- cli/src/commands/chat/CLAUDE.md | 87 ++++++ 2 files changed, 249 insertions(+), 89 deletions(-) create mode 100644 cli/src/commands/chat/CLAUDE.md diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index 28b3aa30c..b309df389 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -1,7 +1,7 @@ use dashmap::DashMap; use middleware::AuthenticatedUser; use std::collections::HashSet; -use std::{collections::HashMap, time::Instant}; +use std::{collections::HashMap, time::{Instant, Duration}}; use agents::{ tools::{ @@ -178,7 +178,7 @@ pub async fn post_chat_handler( tx: Option>>, ) -> Result { let chunk_tracker = ChunkTracker::new(); - let reasoning_duration = Instant::now(); + let reasoning_duration = Instant::now(); // Keep overall duration tracking let (asset_id, asset_type) = normalize_asset_fields(&request); validate_context_request( request.chat_id, @@ -398,6 +398,10 @@ pub async fn post_chat_handler( // Get the receiver and collect all messages let mut rx = agent.run(&mut chat).await?; + // --- Add timestamp tracking --- + let mut last_event_timestamp = Instant::now(); + // --- End Add timestamp tracking --- + // Collect all messages for final processing let mut all_messages: Vec = Vec::new(); let mut all_transformed_containers: Vec = Vec::new(); @@ -442,9 +446,16 @@ pub async fn post_chat_handler( // Process all messages from the agent while let Ok(message_result) = rx.recv().await { + // --- Calculate elapsed duration --- + let elapsed_duration = last_event_timestamp.elapsed(); + // --- End Calculate elapsed duration --- + match message_result { Ok(AgentMessage::Done) => { // Agent has finished processing, break the loop + // --- Update timestamp before breaking --- + last_event_timestamp = Instant::now(); + // --- End Update timestamp --- break; } Ok(msg) => { @@ -519,6 +530,7 @@ pub async fn post_chat_handler( msg.clone(), tx.as_ref(), &chunk_tracker, + elapsed_duration, // Pass elapsed duration ) .await; @@ -716,6 +728,9 @@ pub async fn post_chat_handler( } tracing::error!("Error receiving message from agent: {}", e); + // --- Update timestamp before breaking --- + last_event_timestamp = Instant::now(); + // --- End Update timestamp --- // Don't return early, continue processing remaining messages break; } @@ -723,13 +738,13 @@ pub async fn post_chat_handler( } let title = title_handle.await??; - let reasoning_duration = reasoning_duration.elapsed().as_secs(); + let total_reasoning_duration = reasoning_duration.elapsed(); // Use original name for total // Format reasoning duration - let formatted_reasoning_duration = if reasoning_duration < 60 { - format!("Reasoned for {} seconds", reasoning_duration) + let formatted_reasoning_duration = if total_reasoning_duration.as_secs() < 60 { + format!("Reasoned for {} seconds", total_reasoning_duration.as_secs()) } else { - let minutes = reasoning_duration / 60; + let minutes = total_reasoning_duration.as_secs() / 60; if minutes == 1 { "Reasoned for 1 minute".to_string() // Singular minute } else { @@ -799,6 +814,10 @@ pub async fn post_chat_handler( &user_org_id, &user.id, &chunk_tracker, + // Pass a default duration here, as the concept of 'elapsed since last step' + // doesn't directly apply during final processing. Or refactor transform_message usage. + // For now, let's pass Zero. + Duration::ZERO, ) .await?; } @@ -927,6 +946,7 @@ async fn process_completed_files( _organization_id: &Uuid, _user_id: &Uuid, chunk_tracker: &ChunkTracker, // Pass tracker if needed for transforming messages again + default_duration: Duration, // Add duration parameter ) -> Result<()> { // Transform messages again specifically for DB processing if needed, // or directly use reasoning messages if they contain enough info. @@ -939,6 +959,7 @@ async fn process_completed_files( msg.clone(), None, chunk_tracker, + default_duration, // Pass the default duration ) .await { @@ -1190,6 +1211,7 @@ pub async fn transform_message( message: AgentMessage, _tx: Option<&mpsc::Sender>>, tracker: &ChunkTracker, + elapsed_duration: Duration, // Add elapsed_duration parameter ) -> Result> { match message { AgentMessage::Assistant { @@ -1239,7 +1261,7 @@ pub async fn transform_message( id: Uuid::new_v4().to_string(), reasoning_type: "text".to_string(), title: "Finished reasoning".to_string(), - secondary_title: String::new(), + secondary_title: format!("{} seconds", elapsed_duration.as_secs()), message: None, message_chunk: None, status: Some("completed".to_string()), @@ -1267,6 +1289,7 @@ pub async fn transform_message( *chat_id, *message_id, tracker, // Pass tracker here + elapsed_duration, // Pass duration here ) { Ok(results) => { // Process Vec @@ -1332,6 +1355,7 @@ pub async fn transform_message( content.clone(), *chat_id, *message_id, + elapsed_duration, // Pass duration here ) { Ok(messages) => { for reasoning_container in messages { @@ -1410,20 +1434,21 @@ fn transform_tool_message( content: String, _chat_id: Uuid, _message_id: Uuid, + elapsed_duration: Duration, // Add elapsed_duration parameter ) -> Result> { // Use required ID (tool call ID) for all function calls let messages = match name.as_str() { // Response tools now handled earlier, return empty here "done" | "message_notify_user" | "message_user_clarifying_question" => vec![], - // Existing tool result processing - "search_data_catalog" => tool_data_catalog_search(id.clone(), content)?, - "create_metrics" => tool_create_metrics(id.clone(), content)?, - "update_metrics" => tool_modify_metrics(id.clone(), content)?, - "create_dashboards" => tool_create_dashboards(id.clone(), content)?, - "update_dashboards" => tool_modify_dashboards(id.clone(), content)?, - // Handle both new plan tools here - "create_plan_straightforward" | "create_plan_investigative" => tool_create_plan(id.clone(), content)?, + // Existing tool result processing - pass duration + "search_data_catalog" => tool_data_catalog_search(id.clone(), content, elapsed_duration)?, + "create_metrics" => tool_create_metrics(id.clone(), content, elapsed_duration)?, + "update_metrics" => tool_modify_metrics(id.clone(), content, elapsed_duration)?, + "create_dashboards" => tool_create_dashboards(id.clone(), content, elapsed_duration)?, + "update_dashboards" => tool_modify_dashboards(id.clone(), content, elapsed_duration)?, + // Handle both new plan tools here - pass duration + "create_plan_straightforward" | "create_plan_investigative" => tool_create_plan(id.clone(), content, elapsed_duration)?, _ => vec![], }; @@ -1436,7 +1461,7 @@ struct GenericPlanOutput { plan: String, } -fn tool_create_plan(id: String, content: String) -> Result> { +fn tool_create_plan(id: String, content: String, elapsed_duration: Duration) -> Result> { // Define a struct to parse the success status (optional) #[derive(Deserialize)] struct PlanOutput { @@ -1446,46 +1471,66 @@ fn tool_create_plan(id: String, content: String) -> Result(&content) { Ok(output) if output.success => { - // Successfully completed, no need to return a message - // as assistant_create_plan_input already created it. + // Successfully completed. The assistant message calling this tool + // will have already displayed the "Creating plan" reasoning with duration. + // We don't need to add another message here. tracing::debug!("Tool create_plan {} completed successfully.", id); Ok(vec![]) } Ok(_) => { + // If the tool explicitly failed, maybe show an error reasoning? + // For now, aligning with current logic, just log and return empty. tracing::warn!( "Tool create_plan {} output indicates failure: {}.", id, content ); - // Optionally return an error message here if needed Ok(vec![]) } Err(e) => { - // Log the parsing error, but still return Ok([]) as the primary - // purpose is just acknowledging completion. tracing::error!( "Failed to parse tool_create_plan output for {}: {}. Content: {}", id, e, content ); - Ok(vec![]) + // Create a simple text reasoning message indicating the completion/error + let reasoning_message = BusterReasoningMessage::Text(BusterReasoningText { + id, + reasoning_type: "text".to_string(), + title: "Plan Creation Step Finished".to_string(), // Generic title + secondary_title: format!("{} seconds", elapsed_duration.as_secs()), // Show duration + message: Some(format!("Tool execution finished (parsing failed: {})", e)), // Optional detail + message_chunk: None, + status: Some("completed".to_string()), // Mark as completed + }); + Ok(vec![reasoning_message]) // Return the message } } } -// Update tool_create_metrics to require ID -fn tool_create_metrics(id: String, content: String) -> Result> { +// Update tool_create_metrics to require ID and accept duration +fn tool_create_metrics(id: String, content: String, elapsed_duration: Duration) -> Result> { // Parse the CreateMetricFilesOutput from content let create_metrics_result = match serde_json::from_str::(&content) { Ok(result) => result, Err(e) => { println!("Failed to parse CreateMetricFilesOutput: {:?}", e); - return Ok(vec![]); + // Return an error reasoning message + return Ok(vec![BusterReasoningMessage::Text(BusterReasoningText { + id, + reasoning_type: "text".to_string(), + title: "Error Creating Metrics".to_string(), + secondary_title: format!("{} seconds", elapsed_duration.as_secs()), + message: Some(format!("Failed to parse tool output: {}", e)), + message_chunk: None, + status: Some("error".to_string()), + })]); } }; - let duration = (create_metrics_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; + // Remove internal duration calculation + // let duration = (create_metrics_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; let files_count = create_metrics_result.files.len(); // Create a map of files @@ -1514,12 +1559,12 @@ fn tool_create_metrics(id: String, content: String) -> Result Result Result> { +// Update tool_modify_metrics to require ID and accept duration +fn tool_modify_metrics(id: String, content: String, elapsed_duration: Duration) -> Result> { // Parse the ModifyFilesOutput from content let modify_metrics_result = match serde_json::from_str::(&content) { Ok(result) => result, Err(e) => { println!("Failed to parse ModifyFilesOutput: {:?}", e); - return Ok(vec![]); + // Return an error reasoning message + return Ok(vec![BusterReasoningMessage::Text(BusterReasoningText { + id, + reasoning_type: "text".to_string(), + title: "Error Modifying Metrics".to_string(), + secondary_title: format!("{} seconds", elapsed_duration.as_secs()), + message: Some(format!("Failed to parse tool output: {}", e)), + message_chunk: None, + status: Some("error".to_string()), + })]); } }; - let duration = (modify_metrics_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; + // Remove internal duration calculation + // let duration = (modify_metrics_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; let files_count = modify_metrics_result.files.len(); // Create a map of files @@ -1568,12 +1623,12 @@ fn tool_modify_metrics(id: String, content: String) -> Result Result Result> { +// Update tool_create_dashboards to require ID and accept duration +fn tool_create_dashboards(id: String, content: String, elapsed_duration: Duration) -> Result> { // Parse the CreateDashboardFilesOutput from content let create_dashboards_result = match serde_json::from_str::(&content) { Ok(result) => result, Err(e) => { println!("Failed to parse CreateDashboardFilesOutput: {:?}", e); - return Ok(vec![]); + // Return an error reasoning message + return Ok(vec![BusterReasoningMessage::Text(BusterReasoningText { + id, + reasoning_type: "text".to_string(), + title: "Error Creating Dashboards".to_string(), + secondary_title: format!("{} seconds", elapsed_duration.as_secs()), + message: Some(format!("Failed to parse tool output: {}", e)), + message_chunk: None, + status: Some("error".to_string()), + })]); } }; - let duration = (create_dashboards_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; + // Remove internal duration calculation + // let duration = (create_dashboards_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; let files_count = create_dashboards_result.files.len(); // Create a map of files @@ -1623,12 +1688,12 @@ fn tool_create_dashboards(id: String, content: String) -> Result Result Result> { +// Update tool_modify_dashboards to require ID and accept duration +fn tool_modify_dashboards(id: String, content: String, elapsed_duration: Duration) -> Result> { // Parse the ModifyFilesOutput from content let modify_dashboards_result = match serde_json::from_str::(&content) { Ok(result) => result, Err(e) => { println!("Failed to parse ModifyFilesOutput: {:?}", e); - return Ok(vec![]); + // Return an error reasoning message + return Ok(vec![BusterReasoningMessage::Text(BusterReasoningText { + id, + reasoning_type: "text".to_string(), + title: "Error Modifying Dashboards".to_string(), + secondary_title: format!("{} seconds", elapsed_duration.as_secs()), + message: Some(format!("Failed to parse tool output: {}", e)), + message_chunk: None, + status: Some("error".to_string()), + })]); } }; - let duration = (modify_dashboards_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; + // Remove internal duration calculation + // let duration = (modify_dashboards_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; let files_count = modify_dashboards_result.files.len(); // Create a map of files @@ -1677,12 +1752,12 @@ fn tool_modify_dashboards(id: String, content: String) -> Result Result Result> { +fn tool_data_catalog_search(id: String, content: String, elapsed_duration: Duration) -> Result> { let data_catalog_result = match serde_json::from_str::(&content) { Ok(result) => result, Err(e) => { println!("Failed to parse SearchDataCatalogOutput: {}. Content: {}", e, content); - return Ok(vec![]); + // Return an error reasoning message + return Ok(vec![BusterReasoningMessage::Text(BusterReasoningText { + id, + reasoning_type: "text".to_string(), + title: "Error Searching Data Catalog".to_string(), + secondary_title: format!("{} seconds", elapsed_duration.as_secs()), + message: Some(format!("Failed to parse tool output: {}", e)), + message_chunk: None, + status: Some("error".to_string()), + })]); } }; - let duration = (data_catalog_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; + // Remove internal duration calculation + // let duration = (data_catalog_result.duration as f64 / 1000.0 * 10.0).round() / 10.0; let result_count = data_catalog_result.results.len(); let input_queries = data_catalog_result.queries.join(", "); // Join queries for display @@ -1718,7 +1803,7 @@ fn tool_data_catalog_search(id: String, content: String) -> Result Result Result> { let mut all_results = Vec::new(); let mut parser = StreamingParser::new(); @@ -1851,7 +1937,7 @@ fn transform_assistant_tool_message( id: tool_id.clone(), reasoning_type: "text".to_string(), title: "Creating Plan".to_string(), - secondary_title: String::new(), + secondary_title: format!("{} seconds", elapsed_duration.as_secs()), message: None, message_chunk: Some(delta), status: Some("loading".to_string()), @@ -1867,7 +1953,7 @@ fn transform_assistant_tool_message( id: tool_id.clone(), reasoning_type: "text".to_string(), title: "Creating Plan".to_string(), - secondary_title: String::new(), + secondary_title: format!("{} seconds", elapsed_duration.as_secs()), message: Some(final_text), // Final text message_chunk: None, status: Some("completed".to_string()), // Mark as completed @@ -1888,7 +1974,7 @@ fn transform_assistant_tool_message( id: tool_id.clone(), reasoning_type: "text".to_string(), title: "Searching your data catalog...".to_string(), - secondary_title: String::new(), + secondary_title: format!("{} seconds", elapsed_duration.as_secs()), message: None, message_chunk: None, status: Some("loading".to_string()), @@ -1899,16 +1985,15 @@ fn transform_assistant_tool_message( } } if progress == MessageProgress::Complete { - // When search is complete, update the status of the existing message? - // Or maybe the result from `transform_tool_message` handles completion. - // Let's assume the result handling is sufficient for now. + // The result from transform_tool_message will show final status and duration. + // Clear the tracker entry used for the initial "searching..." message. tracker.clear_chunk(tool_id.clone()); } } // --- Placeholder for File Tools --- "create_metrics" | "update_metrics" | "create_dashboards" | "update_dashboards" => { - // Determine file type based on tool name + // Determine file type based on tool name let file_type = if tool_name.contains("metric") { "metric" } else { "dashboard" }; // Process the chunk using the appropriate parser method @@ -1920,56 +2005,44 @@ fn transform_assistant_tool_message( // If parser returns a reasoning message (File type expected) if let Ok(Some(BusterReasoningMessage::File(mut file_reasoning))) = parse_result { + // Set the secondary title using elapsed_duration when creating the initial message + file_reasoning.secondary_title = format!("{} seconds", elapsed_duration.as_secs()); + // Added missing variable initializations let mut has_updates = false; let mut updated_files_map = std::collections::HashMap::new(); // Iterate through the files parsed so far - for (file_map_id, mut file_detail) in file_reasoning.files { - // Ensure text_chunk has content - if let Some(yml_chunk) = &file_detail.file.text_chunk { + for (file_map_id, mut file_detail) in file_reasoning.files.iter_mut() { // Use iter_mut + // Ensure text_chunk has content + if let Some(yml_chunk) = &file_detail.file.text_chunk { // Define unique chunk ID for this file let chunk_id = format!("{}_{}", tool_id, file_detail.id); + // Calculate delta using the tracker let delta = tracker.add_chunk(chunk_id.clone(), yml_chunk.clone()); - if !delta.is_empty() { + if !delta.is_empty() { // Now delta is defined // Update file detail with delta file_detail.file.text_chunk = Some(delta); - file_detail.file.text = None; + file_detail.file.text = None; // Ensure full text is cleared when chunking file_detail.status = "loading".to_string(); - updated_files_map.insert(file_map_id.clone(), file_detail); + updated_files_map.insert(file_map_id.clone(), file_detail.clone()); // Clone needed has_updates = true; - } // If delta is empty, we don't add it back to updated_files_map - } + } + } } - - // If there were actual deltas, send an update + // If there were actual deltas, send an update if has_updates { file_reasoning.files = updated_files_map; file_reasoning.status = "loading".to_string(); // Ensure parent status is loading + // Make sure secondary title remains set + file_reasoning.secondary_title = format!("{} seconds", elapsed_duration.as_secs()); all_results.push(ToolTransformResult::Reasoning(BusterReasoningMessage::File(file_reasoning))); } + } else if let Err(e) = parse_result { tracing::error!("Error parsing file chunk for {}: {}", tool_name, e); } - - // Handle completion for file tools - if progress == MessageProgress::Complete { - // We need to reconstruct the final File reasoning message based on completed chunks - // This requires knowing which files were part of this specific tool call. - // The parser logic might need adjustment to store file IDs associated with the main ID. - // For now, let's assume `tool_data_catalog_search` (called by transform_tool_message later) handles the final state. - // We just need to clear the trackers used for YML content. - // Need a way to get the final file IDs generated by the parser for this tool_id... - // Let's clear based on a guess pattern for now, this might need refinement. - // This part is tricky without knowing the final structure produced by the parser/tool execution. - // Let's defer complex completion logic here and rely on `transform_tool_message` results for now, - // just clearing potential tracker entries. - // This is a potential area for bugs if tracker IDs aren't cleared correctly. - // Example clearing (might not be robust): - // tracker.clear_chunk(format!("{}_file1_id", tool_id)); - // tracker.clear_chunk(format!("{}_file2_id", tool_id)); - // TODO: Revisit file tool completion and tracker clearing - } + // Completion is handled by transform_tool_message } // --- Default for Unknown Tools --- diff --git a/cli/src/commands/chat/CLAUDE.md b/cli/src/commands/chat/CLAUDE.md new file mode 100644 index 000000000..3d009f80a --- /dev/null +++ b/cli/src/commands/chat/CLAUDE.md @@ -0,0 +1,87 @@ +# CLAUDE.md - Chat Module + +This document provides guidance for Claude Code (claude.ai/code) when working with the Buster CLI chat implementation. + +## Architecture Overview + +The Buster CLI chat module is a terminal-based interactive chat interface that connects to an AI agent with specialized data engineering capabilities. It's specifically designed for working with: + +- DBT projects (data build tool) +- Buster semantic layer +- General data engineering tasks + +### Key Components + +- **BusterCliAgent**: Core component that manages the AI connection and tool integration +- **AppState**: Maintains chat state, message history, and UI elements +- **UI Layer**: Terminal-based interface using Ratatui and Crossterm +- **Tool Integration**: Facilitates AI interaction with the filesystem, DBT commands, and Buster API + +## Code Organization + +- `mod.rs`: Main module entry point and public API +- `args.rs`: Command-line argument parsing +- `config.rs`: Configuration management +- `logic.rs`: Core chat execution logic +- `state.rs`: Chat session state management +- `ui.rs`: Terminal UI implementation +- `completion.rs`: Path and command autocompletion + +## AI Agent Capabilities + +The chat agent is designed to: + +1. Interact with DBT projects (compile, run, test) +2. Work with Buster semantic layer models +3. Execute file operations (read, write, search) +4. Run shell commands +5. Manage context across chat sessions +6. Provide data engineering assistance + +## Important Concepts + +### Message Flow +1. User input is submitted through `submit_message()` +2. Agent processes input and may call tools +3. Tool results are returned to agent +4. Agent's final response is processed by `process_agent_message()` + +### Tool Integration +- Tools are invoked through a standardized API +- Path completion supports filesystem navigation +- Shell commands can be executed with `!` prefix +- DBT commands have specialized handling + +### State Management +- `AppState` is the central state container +- UI state and business logic are clearly separated +- Different message types (User, Assistant, Tool) are handled appropriately + +## Error Handling + +- Uses Rust's `thiserror` for typed errors +- Terminal state is properly restored on errors or panics +- User-friendly error messages with suggestions + +## Security Considerations + +- Credentials are handled securely +- Path traversal protection is implemented +- User inputs are validated +- No secrets are logged + +## Development Guidelines + +- Add new tools in the tool handler section of `logic.rs` +- UI changes should be isolated to `ui.rs` +- Follow the established message handling pattern for new message types +- Test with both local and remote agent endpoints +- Ensure proper error handling and terminal state restoration + +## Testing + +- Test agent interaction with mock responses +- Verify tool integration with isolated test cases +- Check terminal UI rendering in different environments +- Validate path completion behavior +- Confirm proper credential handling \ No newline at end of file