From 0c370b1a101c0b469d3a80b38bc4c41f7fe23eff Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 18 Apr 2025 16:01:48 -0600 Subject: [PATCH] planning tweaks and manually modified --- api/libs/agents/src/agents/modes/planning.rs | 7 +- .../create_plan_investigative.rs | 1 + .../create_plan_straightforward.rs | 1 + api/libs/database/src/types/metric_yml.rs | 8 +- .../src/chats/context_loaders/chat_context.rs | 269 ++++++++++++++++-- .../handlers/src/chats/post_chat_handler.rs | 59 +++- 6 files changed, 318 insertions(+), 27 deletions(-) diff --git a/api/libs/agents/src/agents/modes/planning.rs b/api/libs/agents/src/agents/modes/planning.rs index e7e213a7e..45d78e7a2 100644 --- a/api/libs/agents/src/agents/modes/planning.rs +++ b/api/libs/agents/src/agents/modes/planning.rs @@ -228,9 +228,10 @@ To determine whether to use a Straightforward Plan or an Investigative Plan, con **Important Notes** -- When creating a plan that involves generating assets (visualizations and dashboards), do not include a separate step for delivering these assets, as they are automatically displayed to the user upon creation. -- Assume that all datasets required for the plan are available, as their availability has already been confirmed in the previous step. -- If the user's request includes aspects that are not supported (e.g., specific visualizations, forecasts, etc.), do not include these in the step-by-step plan. Instead, mention them in the note section of the plan, and specify that they should be addressed in the final response to the user. ++- **Dashboard Creation**: If a plan involves creating more than one visualization, these should always be compiled into a dashboard unless the user specifically requests otherwise. + - When creating a plan that involves generating assets (visualizations and dashboards), do not include a separate step for delivering these assets, as they are automatically displayed to the user upon creation. + - Assume that all datasets required for the plan are available, as their availability has already been confirmed in the previous step. + - If the user's request includes aspects that are not supported (e.g., specific visualizations, forecasts, etc.), do not include these in the step-by-step plan. Instead, mention them in the note section of the plan, and specify that they should be addressed in the final response to the user. **Examples** diff --git a/api/libs/agents/src/tools/categories/planning_tools/create_plan_investigative.rs b/api/libs/agents/src/tools/categories/planning_tools/create_plan_investigative.rs index fc401d4a5..8ba01d517 100644 --- a/api/libs/agents/src/tools/categories/planning_tools/create_plan_investigative.rs +++ b/api/libs/agents/src/tools/categories/planning_tools/create_plan_investigative.rs @@ -165,6 +165,7 @@ Add any assumptions, limitations, or clarifications about the analysis and findi - **For Grouped/Stacked Bars**: Explicitly state if it's a `grouped bar chart` or `stacked bar chart` (or `100% stacked`). Clearly name the field used for splitting/stacking (e.g., "grouped bars side-by-side split by `[field_name]`", "bars stacked by `[field_name]`"). - **For Multi-Line Charts**: Explicitly state it's a `multi-line chart`. Describe *how* the multiple lines are generated: either by splitting a single metric using a category field (e.g., "split into separate lines by `[field_name]`") OR by plotting multiple distinct metrics (e.g., "plotting separate lines for `[metric1]` and `[metric2]`"). - **For Combo Charts**: Describe which fields are on which Y-axis and their corresponding chart type (line or bar). +- **Dashboard Requirement**: If the plan involves creating more than one visualization, these **must** be compiled into a dashboard. Unless the user explicitly requests the metrics only. - **Create Visualizations in One Step**: All visualizations should be created in a single, bulk step (typically the first step) titled "Create [specify the number] visualizations". - **Modify Visualizations in One Step**: Similarly, if the user requests modifications to multiple existing visualizations in a single turn, group all these modifications under one "**Modify existing visualization(s)**" step. - **Review**: Always include a review step to ensure accuracy and relevance. diff --git a/api/libs/agents/src/tools/categories/planning_tools/create_plan_straightforward.rs b/api/libs/agents/src/tools/categories/planning_tools/create_plan_straightforward.rs index d60c22e66..d2099553f 100644 --- a/api/libs/agents/src/tools/categories/planning_tools/create_plan_straightforward.rs +++ b/api/libs/agents/src/tools/categories/planning_tools/create_plan_straightforward.rs @@ -161,6 +161,7 @@ Add context like assumptions, limitations, or acknowledge unsupported aspects of - **For Grouped/Stacked Bars**: Explicitly state if it's a `grouped bar chart` or `stacked bar chart` (or `100% stacked`). Clearly name the field used for splitting/stacking (e.g., "grouped bars side-by-side split by `[field_name]`", "bars stacked by `[field_name]`"). - **For Multi-Line Charts**: Explicitly state it's a `multi-line chart`. Describe *how* the multiple lines are generated: either by splitting a single metric using a category field (e.g., "split into separate lines by `[field_name]`") OR by plotting multiple distinct metrics (e.g., "plotting separate lines for `[metric1]` and `[metric2]`"). - **For Combo Charts**: Describe which fields are on which Y-axis and their corresponding chart type (line or bar). +- **Dashboard Requirement**: If the plan involves creating more than one visualization, these **must** be compiled into a dashboard. Unless the user explicitly requests the metrics only. - **Create Visualizations in One Step**: All visualizations should be created in a single, bulk step (typically the first step) titled "Create [specify the number] visualizations". - **Modify Visualizations in One Step**: Similarly, if the user requests modifications to multiple existing visualizations in a single turn, group all these modifications under one "**Modify existing visualization(s)**" step. - **Broad Requests**: For broad or summary requests (e.g., "summarize assembly line performance", "show me important stuff", "how is the sales team doing?"), you must create at least 8 visualizations to ensure a comprehensive overview. Creating fewer than five visualizations is inadequate for such requests. Aim for 8-12 visualizations to cover various aspects of the data, such as sales trends, order metrics, customer behavior, or product performance, depending on the available datasets. Include lots of trends (time-series data), groupings, segments, etc. This ensures the user receives a thorough view of the requested information. diff --git a/api/libs/database/src/types/metric_yml.rs b/api/libs/database/src/types/metric_yml.rs index d3110d3df..393596d32 100644 --- a/api/libs/database/src/types/metric_yml.rs +++ b/api/libs/database/src/types/metric_yml.rs @@ -207,16 +207,16 @@ pub struct BaseChartConfig { pub disable_tooltip: Option, // Updated Axis Configs using defined structs (now optional) #[serde(skip_serializing_if = "Option::is_none")] - #[serde(alias = "y_axis_config")] + #[serde(alias = "y_axis_config", flatten)] pub y_axis_config: Option, #[serde(skip_serializing_if = "Option::is_none")] - #[serde(alias = "x_axis_config")] + #[serde(alias = "x_axis_config", flatten)] pub x_axis_config: Option, #[serde(skip_serializing_if = "Option::is_none")] - #[serde(alias = "category_axis_style_config")] + #[serde(alias = "category_axis_style_config", flatten)] pub category_axis_style_config: Option, #[serde(skip_serializing_if = "Option::is_none")] - #[serde(alias = "y2_axis_config")] + #[serde(alias = "y2_axis_config", flatten)] pub y2_axis_config: Option, } diff --git a/api/libs/handlers/src/chats/context_loaders/chat_context.rs b/api/libs/handlers/src/chats/context_loaders/chat_context.rs index e1544ef3c..0d71ff6a2 100644 --- a/api/libs/handlers/src/chats/context_loaders/chat_context.rs +++ b/api/libs/handlers/src/chats/context_loaders/chat_context.rs @@ -1,21 +1,44 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use agents::{Agent, AgentMessage}; use anyhow::Result; use async_trait::async_trait; +use database::schema::metric_files; use database::{ + models::{DashboardFile, MetricFile}, pool::get_pg_pool, - schema::{chats, messages}, + schema::{chats, dashboard_files, messages}, }; use diesel::prelude::*; use diesel_async::RunQueryDsl; +use litellm::{FunctionCall, ToolCall, MessageProgress}; use middleware::AuthenticatedUser; +use serde::{Deserialize, Serialize}; use serde_json::Value; use uuid::Uuid; use super::ContextLoader; +// --- Structs for Simulated Tool Call (handling multiple files) --- +#[derive(Serialize, Deserialize, Debug, Clone)] +struct UserManuallyModifiedFileParams { + asset_ids: Vec, // Changed to Vec +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +struct ModifiedFileInfo { + asset_id: Uuid, + version_number: i32, + yml_content: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +struct UserManuallyModifiedFileOutput { + updated_files: Vec, // Contains details for all updated files +} +// --- End Structs --- + pub struct ChatContextLoader { pub chat_id: Uuid, } @@ -181,6 +204,186 @@ impl ChatContextLoader { } } } + + // Helper function to check if assets modified by tools in history were updated externally + // Returns a list of simulated AgentMessages representing the updates. + async fn check_external_asset_updates( + agent: &Arc, + messages: &[AgentMessage], + ) -> Result> { + let mut tool_history_versions: HashMap = HashMap::new(); // asset_id -> latest version seen in tool history + + // First pass: Find the latest version mentioned for each asset in tool history + for message in messages { + if let AgentMessage::Tool { + name: Some(tool_name), + content, + .. + } = message + { + if tool_name == "update_metrics" + || tool_name == "update_dashboards" + || tool_name == "create_metrics" + || tool_name == "create_dashboards" + { + // ASSUMPTION: Content is JSON with "files": [{ "id": "...", "version_number": ... }] or similar + // We need to handle both single object responses and array responses + if let Ok(response_val) = serde_json::from_str::(content) { + let files_to_process = if let Some(files_array) = + response_val.get("files").and_then(|f| f.as_array()) + { + // Handle array of files (like create/update tools) + files_array.clone() + } else if response_val.get("id").is_some() + && response_val.get("version_number").is_some() + { + // Handle single file object (potential alternative response format?) + vec![response_val] + } else { + // No recognizable file data + vec![] + }; + + for file_data in files_to_process { + if let (Some(id_val), Some(version_val)) = + (file_data.get("id"), file_data.get("version_number")) + // Look for version_number + { + if let (Some(id_str), Some(version_num)) = + (id_val.as_str(), version_val.as_i64()) + { + if let Ok(asset_id) = Uuid::parse_str(id_str) { + let entry = + tool_history_versions.entry(asset_id).or_insert(0); + *entry = (*entry).max(version_num as i32); + } + } + } + } + } + } + } + } + + if tool_history_versions.is_empty() { + return Ok(vec![]); // No assets modified by tools in history, nothing to check + } + + let mut simulated_messages = Vec::new(); + let pool = get_pg_pool(); + let mut conn = pool.get().await?; + + let asset_ids: Vec = tool_history_versions.keys().cloned().collect(); + + // Query current full records from DB to get version and content + let current_metrics = metric_files::table + .filter(metric_files::id.eq_any(&asset_ids)) + .load::(&mut conn) // Load full MetricFile + .await?; + + let current_dashboards = dashboard_files::table + .filter(dashboard_files::id.eq_any(&asset_ids)) + .load::(&mut conn) // Load full DashboardFile + .await?; + + // Combine results for easier iteration + let all_current_assets: HashMap = current_metrics + .into_iter() + .map(|mf| { + let version = mf.version_history.get_version_number(); + let yml = serde_yaml::to_string(&mf.content).unwrap_or_default(); + (mf.id, (version, yml)) + }) + .chain(current_dashboards.into_iter().map(|df| { + let version = df.version_history.get_version_number(); + let yml = serde_yaml::to_string(&df.content).unwrap_or_default(); + (df.id, (version, yml)) + })) + .collect(); + + // --- Refactored Logic: Collect all modified assets first --- + let mut modified_assets_info: Vec = Vec::new(); + + for (asset_id, tool_version) in &tool_history_versions { + if let Some((db_version, db_yml_content)) = all_current_assets.get(asset_id) { + // Compare DB version with the latest version seen in tool history + if *db_version > *tool_version { + tracing::warn!( + asset_id = %asset_id, + db_version = %db_version, + tool_version = %tool_version, + "Asset updated externally since last tool call in chat history. Adding to simulated update." + ); + modified_assets_info.push(ModifiedFileInfo { + asset_id: *asset_id, + version_number: *db_version, + yml_content: db_yml_content.clone(), + }); + } + } + } + + // --- If any assets were modified, create ONE simulated call/response pair --- + if !modified_assets_info.is_empty() { + let tool_name = "user_manually_modified_file".to_string(); + let modified_ids: Vec = modified_assets_info.iter().map(|i| i.asset_id).collect(); + + // --- Generate Deterministic, LLM-like IDs --- + // Create a namespace UUID (can be any constant UUID) + let namespace_uuid = Uuid::parse_str("6ba7b810-9dad-11d1-80b4-00c04fd430c8").unwrap(); + + // Generate UUID v5 based on asset ID and version for determinism + let call_seed = format!("{}-{}", modified_assets_info[0].asset_id, modified_assets_info[0].version_number); + let deterministic_uuid = Uuid::new_v5(&namespace_uuid, call_seed.as_bytes()); + + // Use the first part of the UUID for the ID string + let id_suffix = deterministic_uuid.simple().to_string()[..27].to_string(); // Adjust length as needed + + // 1. ID for the ToolCall (and Assistant message) + let tool_call_id = format!("call_{}", id_suffix); + // 2. ID for the Tool response message itself (make it slightly different) + let tool_response_msg_id = format!("tool_{}", id_suffix); + // --- End ID Generation --- + + // --- Create Simulated Tool Call (Params) --- + let params = UserManuallyModifiedFileParams { asset_ids: modified_ids }; + let params_json = serde_json::to_string(¶ms)?; + + let assistant_message = AgentMessage::Assistant { + id: Some(tool_call_id.clone()), // Use ToolCall ID for Assistant Message ID + content: None, + tool_calls: Some(vec![ToolCall { + id: tool_call_id.clone(), // Use ID #1 for the ToolCall's ID + call_type: "function".to_string(), + function: FunctionCall { + name: tool_name.clone(), + arguments: params_json, + }, + code_interpreter: None, + retrieval: None, + }]), + name: None, + progress: MessageProgress::Complete, + initial: false, + }; + simulated_messages.push(assistant_message); + + // --- Create Simulated Tool Response (Output) --- + let output = UserManuallyModifiedFileOutput { updated_files: modified_assets_info }; + let output_json = serde_json::to_string(&output)?; + + let tool_message = AgentMessage::Tool { + tool_call_id: tool_call_id, // Use ID #1 for the ToolCall + name: Some(tool_name), + content: output_json, + id: Some(tool_response_msg_id), // Use ID #2 for the Tool message's ID + progress: MessageProgress::Complete, + }; + simulated_messages.push(tool_message); + } + + Ok(simulated_messages) + } } #[async_trait] @@ -213,28 +416,56 @@ impl ContextLoader for ChatContextLoader { Err(e) => return Err(anyhow::anyhow!("Failed to get message: {}", e)), }; - // Track seen message IDs - let mut seen_ids = HashSet::new(); - // Convert messages to AgentMessages + // Convert the single message's history let mut agent_messages = Vec::new(); + let raw_messages = + match serde_json::from_value::>(message.raw_llm_messages) { + Ok(messages) => messages, + Err(e) => { + tracing::error!( + "Failed to parse raw LLM messages for chat {}: {}", + chat.id, + e + ); + Vec::new() // Return empty if parsing fails + } + }; - // Process only the most recent message's raw LLM messages - if let Ok(raw_messages) = - serde_json::from_value::>(message.raw_llm_messages) - { - // Check each message for tool calls and update context - for agent_message in &raw_messages { - Self::update_context_from_tool_calls(agent, agent_message).await; + // Track seen message IDs to avoid duplicates from potential re-parsing/saving issues + let mut seen_ids: HashSet = HashSet::new(); - // Only add messages with new IDs - if let Some(id) = agent_message.get_id() { - if seen_ids.insert(id.to_string()) { - agent_messages.push(agent_message.clone()); - } - } else { - // Messages without IDs are always included + // Process messages to update context flags and collect unique messages + for agent_message in &raw_messages { + Self::update_context_from_tool_calls(agent, agent_message).await; + + if let Some(id) = agent_message.get_id() { + if seen_ids.insert(id.to_string()) { agent_messages.push(agent_message.clone()); } + } else { + agent_messages.push(agent_message.clone()); + } + } + + // Check for external updates and get simulated messages + let simulated_update_messages = + match Self::check_external_asset_updates(agent, &raw_messages).await { + Ok(sim_messages) => sim_messages, + Err(e) => { + tracing::error!("Failed to check for external asset updates: {}", e); + Vec::new() // Don't fail, just log and return no simulated messages + } + }; + + // Append simulated messages, ensuring they haven't been seen before + for sim_message in simulated_update_messages { + if let Some(id) = sim_message.get_id() { + if seen_ids.insert(id.to_string()) { + agent_messages.push(sim_message); + } + } else { + // Should not happen for our simulated messages, but handle defensively + agent_messages.push(sim_message); } } diff --git a/api/libs/handlers/src/chats/post_chat_handler.rs b/api/libs/handlers/src/chats/post_chat_handler.rs index e334f1ee3..08524f84b 100644 --- a/api/libs/handlers/src/chats/post_chat_handler.rs +++ b/api/libs/handlers/src/chats/post_chat_handler.rs @@ -208,7 +208,10 @@ pub async fn post_chat_handler( tracing::info!( "Starting post_chat_handler for chat_id: {}, message_id: {}, organization_id: {}, user_id: {}", - chat_id, message_id, user_org_id, user.id + chat_id, + message_id, + user_org_id, + user.id ); if let Some(tx) = tx.clone() { @@ -2323,6 +2326,60 @@ fn transform_assistant_tool_message( "no_search_needed" | "review_plan" => { // Clear tracker since this tool doesn't use chunking for its reasoning output tracker.clear_chunk(tool_id.clone()); + + // Add reasoning for review_plan + if tool_name == "review_plan" { + if progress == MessageProgress::InProgress { + // Send initial "Reviewing Plan..." only once + let tracker_key = format!("{}_reviewing", tool_id); + if tracker.get_complete_text(tracker_key.clone()).is_none() { + let review_msg = BusterReasoningMessage::Text(BusterReasoningText { + id: tool_id.clone(), + reasoning_type: "text".to_string(), + title: "Reviewing Plan...".to_string(), + secondary_title: "".to_string(), + message: None, + message_chunk: None, + status: Some("loading".to_string()), + }); + all_results.push(ToolTransformResult::Reasoning(review_msg)); + tracker.add_chunk(tracker_key, "reviewing_sent".to_string()); + } + } else if progress == MessageProgress::Complete { + // Send final "Reviewed Plan" message + let elapsed_duration = last_reasoning_completion_time.elapsed(); + let reviewed_msg = BusterReasoningMessage::Text(BusterReasoningText { + id: tool_id.clone(), + reasoning_type: "text".to_string(), + title: "Reviewed plan".to_string(), + secondary_title: format!("{:.2} seconds", elapsed_duration.as_secs_f32()), + message: None, + message_chunk: None, + status: Some("completed".to_string()), + }); + all_results.push(ToolTransformResult::Reasoning(reviewed_msg)); + // Clear the tracker key used for the initial message + let tracker_key = format!("{}_reviewing", tool_id); + tracker.clear_chunk(tracker_key); + // Update completion time + *last_reasoning_completion_time = Instant::now(); + } + } else if tool_name == "no_search_needed" && progress == MessageProgress::Complete { + // Send final "Skipped searching" message + let elapsed_duration = last_reasoning_completion_time.elapsed(); + let skipped_msg = BusterReasoningMessage::Text(BusterReasoningText { + id: tool_id.clone(), + reasoning_type: "text".to_string(), + title: "Skipped searching the data catalog".to_string(), + secondary_title: format!("{:.2} seconds", elapsed_duration.as_secs_f32()), // Show duration it took to decide to skip + message: None, + message_chunk: None, + status: Some("completed".to_string()), + }); + all_results.push(ToolTransformResult::Reasoning(skipped_msg)); + // Update completion time + *last_reasoning_completion_time = Instant::now(); + } } "message_user_clarifying_question" => { // This tool generates a direct response message, not reasoning.