mirror of https://github.com/buster-so/buster.git
Merge branch 'evals' of https://github.com/buster-so/buster into evals
This commit is contained in:
commit
08d16d0897
|
@ -228,6 +228,7 @@ To determine whether to use a Straightforward Plan or an Investigative Plan, con
|
|||
|
||||
**Important Notes**
|
||||
|
||||
+- **Dashboard Creation**: If a plan involves creating more than one visualization, these should always be compiled into a dashboard unless the user specifically requests otherwise.
|
||||
- When creating a plan that involves generating assets (visualizations and dashboards), do not include a separate step for delivering these assets, as they are automatically displayed to the user upon creation.
|
||||
- Assume that all datasets required for the plan are available, as their availability has already been confirmed in the previous step.
|
||||
- If the user's request includes aspects that are not supported (e.g., specific visualizations, forecasts, etc.), do not include these in the step-by-step plan. Instead, mention them in the note section of the plan, and specify that they should be addressed in the final response to the user.
|
||||
|
|
|
@ -165,6 +165,7 @@ Add any assumptions, limitations, or clarifications about the analysis and findi
|
|||
- **For Grouped/Stacked Bars**: Explicitly state if it's a `grouped bar chart` or `stacked bar chart` (or `100% stacked`). Clearly name the field used for splitting/stacking (e.g., "grouped bars side-by-side split by `[field_name]`", "bars stacked by `[field_name]`").
|
||||
- **For Multi-Line Charts**: Explicitly state it's a `multi-line chart`. Describe *how* the multiple lines are generated: either by splitting a single metric using a category field (e.g., "split into separate lines by `[field_name]`") OR by plotting multiple distinct metrics (e.g., "plotting separate lines for `[metric1]` and `[metric2]`").
|
||||
- **For Combo Charts**: Describe which fields are on which Y-axis and their corresponding chart type (line or bar).
|
||||
- **Dashboard Requirement**: If the plan involves creating more than one visualization, these **must** be compiled into a dashboard. Unless the user explicitly requests the metrics only.
|
||||
- **Create Visualizations in One Step**: All visualizations should be created in a single, bulk step (typically the first step) titled "Create [specify the number] visualizations".
|
||||
- **Modify Visualizations in One Step**: Similarly, if the user requests modifications to multiple existing visualizations in a single turn, group all these modifications under one "**Modify existing visualization(s)**" step.
|
||||
- **Review**: Always include a review step to ensure accuracy and relevance.
|
||||
|
|
|
@ -161,6 +161,7 @@ Add context like assumptions, limitations, or acknowledge unsupported aspects of
|
|||
- **For Grouped/Stacked Bars**: Explicitly state if it's a `grouped bar chart` or `stacked bar chart` (or `100% stacked`). Clearly name the field used for splitting/stacking (e.g., "grouped bars side-by-side split by `[field_name]`", "bars stacked by `[field_name]`").
|
||||
- **For Multi-Line Charts**: Explicitly state it's a `multi-line chart`. Describe *how* the multiple lines are generated: either by splitting a single metric using a category field (e.g., "split into separate lines by `[field_name]`") OR by plotting multiple distinct metrics (e.g., "plotting separate lines for `[metric1]` and `[metric2]`").
|
||||
- **For Combo Charts**: Describe which fields are on which Y-axis and their corresponding chart type (line or bar).
|
||||
- **Dashboard Requirement**: If the plan involves creating more than one visualization, these **must** be compiled into a dashboard. Unless the user explicitly requests the metrics only.
|
||||
- **Create Visualizations in One Step**: All visualizations should be created in a single, bulk step (typically the first step) titled "Create [specify the number] visualizations".
|
||||
- **Modify Visualizations in One Step**: Similarly, if the user requests modifications to multiple existing visualizations in a single turn, group all these modifications under one "**Modify existing visualization(s)**" step.
|
||||
- **Broad Requests**: For broad or summary requests (e.g., "summarize assembly line performance", "show me important stuff", "how is the sales team doing?"), you must create at least 8 visualizations to ensure a comprehensive overview. Creating fewer than five visualizations is inadequate for such requests. Aim for 8-12 visualizations to cover various aspects of the data, such as sales trends, order metrics, customer behavior, or product performance, depending on the available datasets. Include lots of trends (time-series data), groupings, segments, etc. This ensures the user receives a thorough view of the requested information.
|
||||
|
|
|
@ -207,16 +207,16 @@ pub struct BaseChartConfig {
|
|||
pub disable_tooltip: Option<bool>,
|
||||
// Updated Axis Configs using defined structs (now optional)
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(alias = "y_axis_config")]
|
||||
#[serde(alias = "y_axis_config", flatten)]
|
||||
pub y_axis_config: Option<YAxisConfig>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(alias = "x_axis_config")]
|
||||
#[serde(alias = "x_axis_config", flatten)]
|
||||
pub x_axis_config: Option<XAxisConfig>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(alias = "category_axis_style_config")]
|
||||
#[serde(alias = "category_axis_style_config", flatten)]
|
||||
pub category_axis_style_config: Option<CategoryAxisStyleConfig>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(alias = "y2_axis_config")]
|
||||
#[serde(alias = "y2_axis_config", flatten)]
|
||||
pub y2_axis_config: Option<Y2AxisConfig>,
|
||||
}
|
||||
|
||||
|
|
|
@ -1,21 +1,44 @@
|
|||
use std::collections::HashSet;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use agents::{Agent, AgentMessage};
|
||||
use anyhow::Result;
|
||||
use async_trait::async_trait;
|
||||
use database::schema::metric_files;
|
||||
use database::{
|
||||
models::{DashboardFile, MetricFile},
|
||||
pool::get_pg_pool,
|
||||
schema::{chats, messages},
|
||||
schema::{chats, dashboard_files, messages},
|
||||
};
|
||||
use diesel::prelude::*;
|
||||
use diesel_async::RunQueryDsl;
|
||||
use litellm::{FunctionCall, ToolCall, MessageProgress};
|
||||
use middleware::AuthenticatedUser;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::ContextLoader;
|
||||
|
||||
// --- Structs for Simulated Tool Call (handling multiple files) ---
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
struct UserManuallyModifiedFileParams {
|
||||
asset_ids: Vec<Uuid>, // Changed to Vec
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
struct ModifiedFileInfo {
|
||||
asset_id: Uuid,
|
||||
version_number: i32,
|
||||
yml_content: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
struct UserManuallyModifiedFileOutput {
|
||||
updated_files: Vec<ModifiedFileInfo>, // Contains details for all updated files
|
||||
}
|
||||
// --- End Structs ---
|
||||
|
||||
pub struct ChatContextLoader {
|
||||
pub chat_id: Uuid,
|
||||
}
|
||||
|
@ -181,6 +204,186 @@ impl ChatContextLoader {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to check if assets modified by tools in history were updated externally
|
||||
// Returns a list of simulated AgentMessages representing the updates.
|
||||
async fn check_external_asset_updates(
|
||||
agent: &Arc<Agent>,
|
||||
messages: &[AgentMessage],
|
||||
) -> Result<Vec<AgentMessage>> {
|
||||
let mut tool_history_versions: HashMap<Uuid, i32> = HashMap::new(); // asset_id -> latest version seen in tool history
|
||||
|
||||
// First pass: Find the latest version mentioned for each asset in tool history
|
||||
for message in messages {
|
||||
if let AgentMessage::Tool {
|
||||
name: Some(tool_name),
|
||||
content,
|
||||
..
|
||||
} = message
|
||||
{
|
||||
if tool_name == "update_metrics"
|
||||
|| tool_name == "update_dashboards"
|
||||
|| tool_name == "create_metrics"
|
||||
|| tool_name == "create_dashboards"
|
||||
{
|
||||
// ASSUMPTION: Content is JSON with "files": [{ "id": "...", "version_number": ... }] or similar
|
||||
// We need to handle both single object responses and array responses
|
||||
if let Ok(response_val) = serde_json::from_str::<Value>(content) {
|
||||
let files_to_process = if let Some(files_array) =
|
||||
response_val.get("files").and_then(|f| f.as_array())
|
||||
{
|
||||
// Handle array of files (like create/update tools)
|
||||
files_array.clone()
|
||||
} else if response_val.get("id").is_some()
|
||||
&& response_val.get("version_number").is_some()
|
||||
{
|
||||
// Handle single file object (potential alternative response format?)
|
||||
vec![response_val]
|
||||
} else {
|
||||
// No recognizable file data
|
||||
vec![]
|
||||
};
|
||||
|
||||
for file_data in files_to_process {
|
||||
if let (Some(id_val), Some(version_val)) =
|
||||
(file_data.get("id"), file_data.get("version_number"))
|
||||
// Look for version_number
|
||||
{
|
||||
if let (Some(id_str), Some(version_num)) =
|
||||
(id_val.as_str(), version_val.as_i64())
|
||||
{
|
||||
if let Ok(asset_id) = Uuid::parse_str(id_str) {
|
||||
let entry =
|
||||
tool_history_versions.entry(asset_id).or_insert(0);
|
||||
*entry = (*entry).max(version_num as i32);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if tool_history_versions.is_empty() {
|
||||
return Ok(vec![]); // No assets modified by tools in history, nothing to check
|
||||
}
|
||||
|
||||
let mut simulated_messages = Vec::new();
|
||||
let pool = get_pg_pool();
|
||||
let mut conn = pool.get().await?;
|
||||
|
||||
let asset_ids: Vec<Uuid> = tool_history_versions.keys().cloned().collect();
|
||||
|
||||
// Query current full records from DB to get version and content
|
||||
let current_metrics = metric_files::table
|
||||
.filter(metric_files::id.eq_any(&asset_ids))
|
||||
.load::<MetricFile>(&mut conn) // Load full MetricFile
|
||||
.await?;
|
||||
|
||||
let current_dashboards = dashboard_files::table
|
||||
.filter(dashboard_files::id.eq_any(&asset_ids))
|
||||
.load::<DashboardFile>(&mut conn) // Load full DashboardFile
|
||||
.await?;
|
||||
|
||||
// Combine results for easier iteration
|
||||
let all_current_assets: HashMap<Uuid, (i32, String)> = current_metrics
|
||||
.into_iter()
|
||||
.map(|mf| {
|
||||
let version = mf.version_history.get_version_number();
|
||||
let yml = serde_yaml::to_string(&mf.content).unwrap_or_default();
|
||||
(mf.id, (version, yml))
|
||||
})
|
||||
.chain(current_dashboards.into_iter().map(|df| {
|
||||
let version = df.version_history.get_version_number();
|
||||
let yml = serde_yaml::to_string(&df.content).unwrap_or_default();
|
||||
(df.id, (version, yml))
|
||||
}))
|
||||
.collect();
|
||||
|
||||
// --- Refactored Logic: Collect all modified assets first ---
|
||||
let mut modified_assets_info: Vec<ModifiedFileInfo> = Vec::new();
|
||||
|
||||
for (asset_id, tool_version) in &tool_history_versions {
|
||||
if let Some((db_version, db_yml_content)) = all_current_assets.get(asset_id) {
|
||||
// Compare DB version with the latest version seen in tool history
|
||||
if *db_version > *tool_version {
|
||||
tracing::warn!(
|
||||
asset_id = %asset_id,
|
||||
db_version = %db_version,
|
||||
tool_version = %tool_version,
|
||||
"Asset updated externally since last tool call in chat history. Adding to simulated update."
|
||||
);
|
||||
modified_assets_info.push(ModifiedFileInfo {
|
||||
asset_id: *asset_id,
|
||||
version_number: *db_version,
|
||||
yml_content: db_yml_content.clone(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- If any assets were modified, create ONE simulated call/response pair ---
|
||||
if !modified_assets_info.is_empty() {
|
||||
let tool_name = "user_manually_modified_file".to_string();
|
||||
let modified_ids: Vec<Uuid> = modified_assets_info.iter().map(|i| i.asset_id).collect();
|
||||
|
||||
// --- Generate Deterministic, LLM-like IDs ---
|
||||
// Create a namespace UUID (can be any constant UUID)
|
||||
let namespace_uuid = Uuid::parse_str("6ba7b810-9dad-11d1-80b4-00c04fd430c8").unwrap();
|
||||
|
||||
// Generate UUID v5 based on asset ID and version for determinism
|
||||
let call_seed = format!("{}-{}", modified_assets_info[0].asset_id, modified_assets_info[0].version_number);
|
||||
let deterministic_uuid = Uuid::new_v5(&namespace_uuid, call_seed.as_bytes());
|
||||
|
||||
// Use the first part of the UUID for the ID string
|
||||
let id_suffix = deterministic_uuid.simple().to_string()[..27].to_string(); // Adjust length as needed
|
||||
|
||||
// 1. ID for the ToolCall (and Assistant message)
|
||||
let tool_call_id = format!("call_{}", id_suffix);
|
||||
// 2. ID for the Tool response message itself (make it slightly different)
|
||||
let tool_response_msg_id = format!("tool_{}", id_suffix);
|
||||
// --- End ID Generation ---
|
||||
|
||||
// --- Create Simulated Tool Call (Params) ---
|
||||
let params = UserManuallyModifiedFileParams { asset_ids: modified_ids };
|
||||
let params_json = serde_json::to_string(¶ms)?;
|
||||
|
||||
let assistant_message = AgentMessage::Assistant {
|
||||
id: Some(tool_call_id.clone()), // Use ToolCall ID for Assistant Message ID
|
||||
content: None,
|
||||
tool_calls: Some(vec![ToolCall {
|
||||
id: tool_call_id.clone(), // Use ID #1 for the ToolCall's ID
|
||||
call_type: "function".to_string(),
|
||||
function: FunctionCall {
|
||||
name: tool_name.clone(),
|
||||
arguments: params_json,
|
||||
},
|
||||
code_interpreter: None,
|
||||
retrieval: None,
|
||||
}]),
|
||||
name: None,
|
||||
progress: MessageProgress::Complete,
|
||||
initial: false,
|
||||
};
|
||||
simulated_messages.push(assistant_message);
|
||||
|
||||
// --- Create Simulated Tool Response (Output) ---
|
||||
let output = UserManuallyModifiedFileOutput { updated_files: modified_assets_info };
|
||||
let output_json = serde_json::to_string(&output)?;
|
||||
|
||||
let tool_message = AgentMessage::Tool {
|
||||
tool_call_id: tool_call_id, // Use ID #1 for the ToolCall
|
||||
name: Some(tool_name),
|
||||
content: output_json,
|
||||
id: Some(tool_response_msg_id), // Use ID #2 for the Tool message's ID
|
||||
progress: MessageProgress::Complete,
|
||||
};
|
||||
simulated_messages.push(tool_message);
|
||||
}
|
||||
|
||||
Ok(simulated_messages)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -213,29 +416,57 @@ impl ContextLoader for ChatContextLoader {
|
|||
Err(e) => return Err(anyhow::anyhow!("Failed to get message: {}", e)),
|
||||
};
|
||||
|
||||
// Track seen message IDs
|
||||
let mut seen_ids = HashSet::new();
|
||||
// Convert messages to AgentMessages
|
||||
// Convert the single message's history
|
||||
let mut agent_messages = Vec::new();
|
||||
let raw_messages =
|
||||
match serde_json::from_value::<Vec<AgentMessage>>(message.raw_llm_messages) {
|
||||
Ok(messages) => messages,
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to parse raw LLM messages for chat {}: {}",
|
||||
chat.id,
|
||||
e
|
||||
);
|
||||
Vec::new() // Return empty if parsing fails
|
||||
}
|
||||
};
|
||||
|
||||
// Process only the most recent message's raw LLM messages
|
||||
if let Ok(raw_messages) =
|
||||
serde_json::from_value::<Vec<AgentMessage>>(message.raw_llm_messages)
|
||||
{
|
||||
// Check each message for tool calls and update context
|
||||
// Track seen message IDs to avoid duplicates from potential re-parsing/saving issues
|
||||
let mut seen_ids: HashSet<String> = HashSet::new();
|
||||
|
||||
// Process messages to update context flags and collect unique messages
|
||||
for agent_message in &raw_messages {
|
||||
Self::update_context_from_tool_calls(agent, agent_message).await;
|
||||
|
||||
// Only add messages with new IDs
|
||||
if let Some(id) = agent_message.get_id() {
|
||||
if seen_ids.insert(id.to_string()) {
|
||||
agent_messages.push(agent_message.clone());
|
||||
}
|
||||
} else {
|
||||
// Messages without IDs are always included
|
||||
agent_messages.push(agent_message.clone());
|
||||
}
|
||||
}
|
||||
|
||||
// Check for external updates and get simulated messages
|
||||
let simulated_update_messages =
|
||||
match Self::check_external_asset_updates(agent, &raw_messages).await {
|
||||
Ok(sim_messages) => sim_messages,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to check for external asset updates: {}", e);
|
||||
Vec::new() // Don't fail, just log and return no simulated messages
|
||||
}
|
||||
};
|
||||
|
||||
// Append simulated messages, ensuring they haven't been seen before
|
||||
for sim_message in simulated_update_messages {
|
||||
if let Some(id) = sim_message.get_id() {
|
||||
if seen_ids.insert(id.to_string()) {
|
||||
agent_messages.push(sim_message);
|
||||
}
|
||||
} else {
|
||||
// Should not happen for our simulated messages, but handle defensively
|
||||
agent_messages.push(sim_message);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(agent_messages)
|
||||
|
|
|
@ -208,7 +208,10 @@ pub async fn post_chat_handler(
|
|||
|
||||
tracing::info!(
|
||||
"Starting post_chat_handler for chat_id: {}, message_id: {}, organization_id: {}, user_id: {}",
|
||||
chat_id, message_id, user_org_id, user.id
|
||||
chat_id,
|
||||
message_id,
|
||||
user_org_id,
|
||||
user.id
|
||||
);
|
||||
|
||||
if let Some(tx) = tx.clone() {
|
||||
|
@ -2323,6 +2326,60 @@ fn transform_assistant_tool_message(
|
|||
"no_search_needed" | "review_plan" => {
|
||||
// Clear tracker since this tool doesn't use chunking for its reasoning output
|
||||
tracker.clear_chunk(tool_id.clone());
|
||||
|
||||
// Add reasoning for review_plan
|
||||
if tool_name == "review_plan" {
|
||||
if progress == MessageProgress::InProgress {
|
||||
// Send initial "Reviewing Plan..." only once
|
||||
let tracker_key = format!("{}_reviewing", tool_id);
|
||||
if tracker.get_complete_text(tracker_key.clone()).is_none() {
|
||||
let review_msg = BusterReasoningMessage::Text(BusterReasoningText {
|
||||
id: tool_id.clone(),
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Reviewing Plan...".to_string(),
|
||||
secondary_title: "".to_string(),
|
||||
message: None,
|
||||
message_chunk: None,
|
||||
status: Some("loading".to_string()),
|
||||
});
|
||||
all_results.push(ToolTransformResult::Reasoning(review_msg));
|
||||
tracker.add_chunk(tracker_key, "reviewing_sent".to_string());
|
||||
}
|
||||
} else if progress == MessageProgress::Complete {
|
||||
// Send final "Reviewed Plan" message
|
||||
let elapsed_duration = last_reasoning_completion_time.elapsed();
|
||||
let reviewed_msg = BusterReasoningMessage::Text(BusterReasoningText {
|
||||
id: tool_id.clone(),
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Reviewed plan".to_string(),
|
||||
secondary_title: format!("{:.2} seconds", elapsed_duration.as_secs_f32()),
|
||||
message: None,
|
||||
message_chunk: None,
|
||||
status: Some("completed".to_string()),
|
||||
});
|
||||
all_results.push(ToolTransformResult::Reasoning(reviewed_msg));
|
||||
// Clear the tracker key used for the initial message
|
||||
let tracker_key = format!("{}_reviewing", tool_id);
|
||||
tracker.clear_chunk(tracker_key);
|
||||
// Update completion time
|
||||
*last_reasoning_completion_time = Instant::now();
|
||||
}
|
||||
} else if tool_name == "no_search_needed" && progress == MessageProgress::Complete {
|
||||
// Send final "Skipped searching" message
|
||||
let elapsed_duration = last_reasoning_completion_time.elapsed();
|
||||
let skipped_msg = BusterReasoningMessage::Text(BusterReasoningText {
|
||||
id: tool_id.clone(),
|
||||
reasoning_type: "text".to_string(),
|
||||
title: "Skipped searching the data catalog".to_string(),
|
||||
secondary_title: format!("{:.2} seconds", elapsed_duration.as_secs_f32()), // Show duration it took to decide to skip
|
||||
message: None,
|
||||
message_chunk: None,
|
||||
status: Some("completed".to_string()),
|
||||
});
|
||||
all_results.push(ToolTransformResult::Reasoning(skipped_msg));
|
||||
// Update completion time
|
||||
*last_reasoning_completion_time = Instant::now();
|
||||
}
|
||||
}
|
||||
"message_user_clarifying_question" => {
|
||||
// This tool generates a direct response message, not reasoning.
|
||||
|
|
Loading…
Reference in New Issue