From b839e70aa9c5a7a89df57fc6d153a759dbd84128 Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 5 Mar 2025 15:50:53 -0700 Subject: [PATCH] moved around the ids so that the metrics and dashobard align across the board --- api/Cargo.toml | 3 ++- api/libs/agents/src/agent.rs | 4 ++-- .../src/tools/categories/file_tools/common.rs | 21 ++++++++++++++++--- .../file_tools/create_dashboard_files.rs | 11 +++++----- .../file_tools/create_metric_files.rs | 6 +++--- .../file_tools/file_types/metric_yml.rs | 14 +++++-------- .../file_tools/modify_dashboard_files.rs | 2 +- .../file_tools/modify_metric_files.rs | 2 +- .../file_tools/search_data_catalog.rs | 2 +- .../categories/planning_tools/create_plan.rs | 2 +- api/libs/agents/src/tools/executor.rs | 12 +++++------ api/libs/handlers/Cargo.toml | 1 + .../handlers/src/chats/streaming_parser.rs | 15 +++++++------ 13 files changed, 55 insertions(+), 40 deletions(-) diff --git a/api/Cargo.toml b/api/Cargo.toml index cda7d01f7..7fdc18b6f 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -17,7 +17,8 @@ serde_json = { version = "1.0.117", features = ["preserve_order"] } serde_yaml = "0.9.34" tokio = { version = "1.38.0", features = ["full"] } tracing = "0.1.40" -uuid = { version = "1.8", features = ["serde", "v4"] } +uuid = { version = "1.8", features = ["serde", "v4", "v5"] } +sha2 = "0.10.8" diesel = { version = "2", features = [ "uuid", "chrono", diff --git a/api/libs/agents/src/agent.rs b/api/libs/agents/src/agent.rs index b23c05f1b..9fd9fe8f2 100644 --- a/api/libs/agents/src/agent.rs +++ b/api/libs/agents/src/agent.rs @@ -519,7 +519,7 @@ impl Agent { for tool_call in tool_calls { if let Some(tool) = self.tools.read().await.get(&tool_call.function.name) { let params: Value = serde_json::from_str(&tool_call.function.arguments)?; - let result = tool.execute(params).await?; + let result = tool.execute(params, tool_call.id.clone()).await?; let result_str = serde_json::to_string(&result)?; let tool_message = AgentMessage::tool( None, @@ -701,7 +701,7 @@ mod tests { type Output = Value; type Params = Value; - async fn execute(&self, params: Self::Params) -> Result { + async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result { self.send_progress( "Fetching weather data...".to_string(), "123".to_string(), diff --git a/api/libs/agents/src/tools/categories/file_tools/common.rs b/api/libs/agents/src/tools/categories/file_tools/common.rs index 4e1eb15ad..9fad0f987 100644 --- a/api/libs/agents/src/tools/categories/file_tools/common.rs +++ b/api/libs/agents/src/tools/categories/file_tools/common.rs @@ -552,6 +552,7 @@ required: /// The string is a message about the number of records returned by the SQL query /// The vector of IndexMap is the results of the SQL query. Returns empty vector if more than 13 records or no results. pub async fn process_metric_file( + tool_call_id: String, file_name: String, yml_content: String, ) -> Result< @@ -568,9 +569,7 @@ pub async fn process_metric_file( let metric_yml = MetricYml::new(yml_content.clone()).map_err(|e| format!("Invalid YAML format: {}", e))?; - let metric_id = metric_yml - .id - .ok_or_else(|| "Missing required field 'id'".to_string())?; + let metric_id = generate_deterministic_uuid(&tool_call_id, &file_name, "metric").unwrap(); // Check if dataset_ids is empty if metric_yml.dataset_ids.is_empty() { @@ -828,6 +827,22 @@ pub async fn process_metric_file_modification( } } +/// Generates a deterministic UUID based on tool call ID, file name, and file type +pub fn generate_deterministic_uuid( + tool_call_id: &str, + file_name: &str, + file_type: &str, +) -> Result { + // Use a fixed namespace for the application + let namespace_uuid = Uuid::NAMESPACE_OID; + + // Combine inputs to create a unique name + let name = format!("{}:{}:{}", tool_call_id, file_name, file_type); + + // Generate v5 UUID (SHA1 based) + Ok(Uuid::new_v5(&namespace_uuid, name.as_bytes())) +} + #[cfg(test)] mod tests { use super::*; diff --git a/api/libs/agents/src/tools/categories/file_tools/create_dashboard_files.rs b/api/libs/agents/src/tools/categories/file_tools/create_dashboard_files.rs index 04344dc9d..b63ee3f3e 100644 --- a/api/libs/agents/src/tools/categories/file_tools/create_dashboard_files.rs +++ b/api/libs/agents/src/tools/categories/file_tools/create_dashboard_files.rs @@ -18,7 +18,7 @@ use crate::{ }; use super::{ - common::validate_metric_ids, + common::{generate_deterministic_uuid, validate_metric_ids}, file_types::{ dashboard_yml::DashboardYml, file::{FileEnum, FileWithId}, @@ -67,6 +67,7 @@ impl FileModificationTool for CreateDashboardFilesTool {} /// Process a dashboard file creation request /// Returns Ok((DashboardFile, DashboardYml)) if successful, or an error message if failed async fn process_dashboard_file( + tool_call_id: String, file: DashboardFileParams, ) -> Result<(DashboardFile, DashboardYml), String> { debug!("Processing dashboard file creation: {}", file.name); @@ -74,9 +75,7 @@ async fn process_dashboard_file( let dashboard_yml = DashboardYml::new(file.yml_content.clone()) .map_err(|e| format!("Invalid YAML format: {}", e))?; - let dashboard_id = dashboard_yml - .id - .ok_or_else(|| "Missing required field 'id'".to_string())?; + let dashboard_id = generate_deterministic_uuid(&tool_call_id, &file.name, "dashboard").unwrap(); // Collect and validate metric IDs from rows let metric_ids: Vec = dashboard_yml @@ -134,7 +133,7 @@ impl ToolExecutor for CreateDashboardFilesTool { } } - async fn execute(&self, params: Self::Params) -> Result { + async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result { let start_time = Instant::now(); let files = params.files; @@ -148,7 +147,7 @@ impl ToolExecutor for CreateDashboardFilesTool { // First pass - validate and prepare all records for file in files { - match process_dashboard_file(file.clone()).await { + match process_dashboard_file(tool_call_id.clone(), file.clone()).await { Ok((dashboard_file, dashboard_yml)) => { dashboard_records.push(dashboard_file); dashboard_ymls.push(dashboard_yml); diff --git a/api/libs/agents/src/tools/categories/file_tools/create_metric_files.rs b/api/libs/agents/src/tools/categories/file_tools/create_metric_files.rs index b900374d6..d8acc6d8d 100644 --- a/api/libs/agents/src/tools/categories/file_tools/create_metric_files.rs +++ b/api/libs/agents/src/tools/categories/file_tools/create_metric_files.rs @@ -24,7 +24,7 @@ use crate::{ }, }; -use super::{common::validate_sql, FileModificationTool}; +use super::{common::{validate_sql, generate_deterministic_uuid}, FileModificationTool}; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct MetricFileParams { @@ -81,7 +81,7 @@ impl ToolExecutor for CreateMetricFilesTool { } } - async fn execute(&self, params: Self::Params) -> Result { + async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result { let start_time = Instant::now(); let files = params.files; @@ -94,7 +94,7 @@ impl ToolExecutor for CreateMetricFilesTool { let mut results_vec = vec![]; // First pass - validate and prepare all records for file in files { - match process_metric_file(file.name.clone(), file.yml_content.clone()).await { + match process_metric_file(tool_call_id.clone(), file.name.clone(), file.yml_content.clone()).await { Ok((metric_file, metric_yml, message, results)) => { metric_records.push(metric_file); metric_ymls.push(metric_yml); diff --git a/api/libs/agents/src/tools/categories/file_tools/file_types/metric_yml.rs b/api/libs/agents/src/tools/categories/file_tools/file_types/metric_yml.rs index 0b8d085c5..9193f3391 100644 --- a/api/libs/agents/src/tools/categories/file_tools/file_types/metric_yml.rs +++ b/api/libs/agents/src/tools/categories/file_tools/file_types/metric_yml.rs @@ -5,7 +5,9 @@ use uuid::Uuid; #[derive(Debug, Serialize, Deserialize, Clone)] pub struct MetricYml { + #[serde(skip_serializing)] pub id: Option, + #[serde(skip_serializing)] pub updated_at: Option>, #[serde(alias = "name")] pub title: String, @@ -283,17 +285,11 @@ pub struct TableChartConfig { impl MetricYml { pub fn new(yml_content: String) -> Result { - let mut file: MetricYml = match serde_yaml::from_str(&yml_content) { + let file: MetricYml = match serde_yaml::from_str(&yml_content) { Ok(file) => file, Err(e) => return Err(anyhow::anyhow!("Error parsing YAML: {}", e)), }; - if file.id.is_none() { - file.id = Some(Uuid::new_v4()); - } - - file.updated_at = Some(Utc::now()); - match file.validate() { Ok(_) => Ok(file), Err(e) => Err(anyhow::anyhow!("Error compiling file: {}", e)), @@ -356,8 +352,8 @@ mod tests { assert_eq!(metadata[1].data_type, "number"); // Verify auto-generated fields - assert!(metric.id.is_some()); - assert!(metric.updated_at.is_some()); + assert!(metric.id.is_none()); + assert!(metric.updated_at.is_none()); Ok(()) } diff --git a/api/libs/agents/src/tools/categories/file_tools/modify_dashboard_files.rs b/api/libs/agents/src/tools/categories/file_tools/modify_dashboard_files.rs index 7dca119ec..596713d35 100644 --- a/api/libs/agents/src/tools/categories/file_tools/modify_dashboard_files.rs +++ b/api/libs/agents/src/tools/categories/file_tools/modify_dashboard_files.rs @@ -251,7 +251,7 @@ impl ToolExecutor for ModifyDashboardFilesTool { } } - async fn execute(&self, params: Self::Params) -> Result { + async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result { let start_time = Instant::now(); debug!("Starting file modification execution"); diff --git a/api/libs/agents/src/tools/categories/file_tools/modify_metric_files.rs b/api/libs/agents/src/tools/categories/file_tools/modify_metric_files.rs index 24c9151c9..42473d9be 100644 --- a/api/libs/agents/src/tools/categories/file_tools/modify_metric_files.rs +++ b/api/libs/agents/src/tools/categories/file_tools/modify_metric_files.rs @@ -262,7 +262,7 @@ impl ToolExecutor for ModifyMetricFilesTool { } } - async fn execute(&self, params: Self::Params) -> Result { + async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result { let start_time = Instant::now(); debug!("Starting file modification execution"); diff --git a/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs b/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs index f29ccec3c..93c590aad 100644 --- a/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs +++ b/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs @@ -260,7 +260,7 @@ impl ToolExecutor for SearchDataCatalogTool { type Output = SearchDataCatalogOutput; type Params = SearchDataCatalogParams; - async fn execute(&self, params: Self::Params) -> Result { + async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result { let start_time = Instant::now(); // Fetch all non-deleted datasets diff --git a/api/libs/agents/src/tools/categories/planning_tools/create_plan.rs b/api/libs/agents/src/tools/categories/planning_tools/create_plan.rs index cf3265e05..308931f0c 100644 --- a/api/libs/agents/src/tools/categories/planning_tools/create_plan.rs +++ b/api/libs/agents/src/tools/categories/planning_tools/create_plan.rs @@ -35,7 +35,7 @@ impl ToolExecutor for CreatePlan { "create_plan".to_string() } - async fn execute(&self, params: Self::Params) -> Result { + async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result { self.agent .set_state_value(String::from("plan_available"), Value::Bool(true)) .await; diff --git a/api/libs/agents/src/tools/executor.rs b/api/libs/agents/src/tools/executor.rs index 0327ebd01..b031546c8 100644 --- a/api/libs/agents/src/tools/executor.rs +++ b/api/libs/agents/src/tools/executor.rs @@ -12,8 +12,8 @@ pub trait ToolExecutor: Send + Sync { /// The type of the parameters for this tool type Params: DeserializeOwned + Send; - /// Execute the tool with the given parameters. - async fn execute(&self, params: Self::Params) -> Result; + /// Execute the tool with the given parameters and tool call ID. + async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result; /// Get the JSON schema for this tool fn get_schema(&self) -> Value; @@ -53,9 +53,9 @@ where type Output = Value; type Params = Value; - async fn execute(&self, params: Self::Params) -> Result { + async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result { let params = serde_json::from_value(params)?; - let result = self.inner.execute(params).await?; + let result = self.inner.execute(params, tool_call_id).await?; Ok(serde_json::to_value(result)?) } @@ -78,8 +78,8 @@ impl + Send + Sync> ToolExecutor type Output = Value; type Params = Value; - async fn execute(&self, params: Self::Params) -> Result { - (**self).execute(params).await + async fn execute(&self, params: Self::Params, tool_call_id: String) -> Result { + (**self).execute(params, tool_call_id).await } fn get_schema(&self) -> Value { diff --git a/api/libs/handlers/Cargo.toml b/api/libs/handlers/Cargo.toml index f009d446f..cb120fc9e 100644 --- a/api/libs/handlers/Cargo.toml +++ b/api/libs/handlers/Cargo.toml @@ -22,6 +22,7 @@ indexmap = { workspace = true } async-trait = { workspace = true } once_cell = { workspace = true } base64 = { workspace = true } +sha2 = { workspace = true } # Local dependencies database = { path = "../database" } diff --git a/api/libs/handlers/src/chats/streaming_parser.rs b/api/libs/handlers/src/chats/streaming_parser.rs index d196c60dd..2ede13106 100644 --- a/api/libs/handlers/src/chats/streaming_parser.rs +++ b/api/libs/handlers/src/chats/streaming_parser.rs @@ -1,6 +1,8 @@ use anyhow::Result; use serde_json::Value; use uuid::Uuid; +use sha2::{Sha256, Digest}; +use agents::tools::categories::file_tools::common::generate_deterministic_uuid; use super::post_chat_handler::{ BusterReasoningFile, BusterReasoningMessage, BusterReasoningPill, @@ -126,6 +128,7 @@ impl StreamingParser { // Internal function to process file data (shared by metric and dashboard processing) pub fn process_file_data(&mut self, id: String, file_type: String) -> Result> { + // Extract and replace yml_content with placeholders let mut yml_contents = Vec::new(); let mut positions = Vec::new(); @@ -260,13 +263,13 @@ impl StreamingParser { .and_then(Value::as_str) .unwrap_or(""); - // Generate a consistent UUID based on the file name - let file_id = Uuid::new_v4().to_string(); + // Generate deterministic UUID based on tool call ID, file name, and type + let file_id = generate_deterministic_uuid(&id, name, &file_type)?; let buster_file = BusterFile { - id: file_id.clone(), + id: file_id.to_string(), file_type: file_type.clone(), - file_name: name.clone().to_string(), + file_name: name.to_string(), version_number: 1, version_id: Uuid::new_v4().to_string(), status: "loading".to_string(), @@ -278,8 +281,8 @@ impl StreamingParser { metadata: None, }; - file_ids.push(name.clone().to_string()); - files_map.insert(name.clone().to_string(), buster_file); + file_ids.push(file_id.to_string()); + files_map.insert(file_id.to_string(), buster_file); } } }