From edbedd84ca058de2b20494adf3fcd29113de0de0 Mon Sep 17 00:00:00 2001 From: dal Date: Thu, 27 Feb 2025 12:43:04 -0700 Subject: [PATCH] modifications for including subsets of data in requests. --- api/src/utils/agent/agents/dashboard_agent.rs | 15 +- api/src/utils/tools/file_tools/common.rs | 333 +++++++++++++++++- .../file_tools/create_dashboard_files.rs | 7 +- .../tools/file_tools/create_metric_files.rs | 74 +--- .../file_tools/file_types/dashboard_yml.rs | 1 + .../utils/tools/file_tools/file_types/file.rs | 7 + ...ashboard_files.rs => filter_dashboards.rs} | 226 ++---------- api/src/utils/tools/file_tools/mod.rs | 4 +- .../file_tools/modify_dashboard_files.rs | 2 + .../tools/file_tools/modify_metric_files.rs | 246 +++---------- .../tools/file_tools/search_data_catalog.rs | 3 + 11 files changed, 443 insertions(+), 475 deletions(-) rename api/src/utils/tools/file_tools/{filter_dashboard_files.rs => filter_dashboards.rs} (76%) diff --git a/api/src/utils/agent/agents/dashboard_agent.rs b/api/src/utils/agent/agents/dashboard_agent.rs index 95c314893..10e7405e1 100644 --- a/api/src/utils/agent/agents/dashboard_agent.rs +++ b/api/src/utils/agent/agents/dashboard_agent.rs @@ -8,7 +8,8 @@ use crate::utils::{ agent::{agent::AgentError, Agent, AgentExt, AgentThread}, tools::{ file_tools::{ - CreateDashboardFilesTool, CreateMetricFilesTool, FilterDashboardFilesTool, ModifyDashboardFilesTool, ModifyMetricFilesTool + CreateDashboardFilesTool, CreateMetricFilesTool, ModifyDashboardFilesTool, + ModifyMetricFilesTool, }, IntoValueTool, ToolExecutor, }, @@ -28,7 +29,6 @@ impl DashboardAgent { let modify_dashboard_tool = ModifyDashboardFilesTool::new(Arc::clone(&self.agent)); let create_metric_tool = CreateMetricFilesTool::new(Arc::clone(&self.agent)); let modify_metric_tool = ModifyMetricFilesTool::new(Arc::clone(&self.agent)); - let filter_dashboard_tool = FilterDashboardFilesTool::new(Arc::clone(&self.agent)); // Add tools to the agent self.agent @@ -55,12 +55,6 @@ impl DashboardAgent { modify_metric_tool.into_value_tool(), ) .await; - self.agent - .add_tool( - filter_dashboard_tool.get_name(), - filter_dashboard_tool.into_value_tool(), - ) - .await; Ok(()) } @@ -82,7 +76,10 @@ impl DashboardAgent { pub async fn from_existing(existing_agent: &Arc) -> Result { // Create a new agent with the same core properties and shared state/stream - let agent = Arc::new(Agent::from_existing(existing_agent, "dashboard_agent".to_string())); + let agent = Arc::new(Agent::from_existing( + existing_agent, + "dashboard_agent".to_string(), + )); let dashboard = Self { agent }; dashboard.load_tools().await?; Ok(dashboard) diff --git a/api/src/utils/tools/file_tools/common.rs b/api/src/utils/tools/file_tools/common.rs index 9bf6841b4..48215c8a8 100644 --- a/api/src/utils/tools/file_tools/common.rs +++ b/api/src/utils/tools/file_tools/common.rs @@ -1,15 +1,32 @@ use anyhow::{anyhow, Result}; -use tracing::debug; +use indexmap::IndexMap; +use tracing::{debug, error}; use uuid::Uuid; +use chrono::{Utc, Duration}; +use serde::{Deserialize, Serialize}; +use serde_json; +use serde_yaml; -use crate::database_dep::{lib::get_pg_pool, schema::metric_files}; +use crate::database_dep::{ + lib::get_pg_pool, + schema::metric_files, + models::MetricFile, + enums::Verification +}; +use crate::utils::data_types::DataType; use crate::utils::query_engine::query_engine::query_engine; +use crate::utils::tools::file_tools::file_types::metric_yml::MetricYml; +use crate::utils::tools::file_tools::file_types::file::FileWithId; use diesel::{ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; +// Import the types needed for the modification function +use crate::utils::tools::file_tools::modify_metric_files::{FileModification, ModificationResult}; +use crate::utils::tools::file_tools::modify_metric_files::apply_modifications_to_content; + /// Validates SQL query using existing query engine by attempting to run it -/// Returns Ok(()) if valid, Err with description if invalid -pub async fn validate_sql(sql: &str, dataset_id: &Uuid) -> Result<()> { +/// Returns a tuple with a message about the number of records and the results (if ≤ 13 records) +pub async fn validate_sql(sql: &str, dataset_id: &Uuid) -> Result<(String, Vec>)> { debug!("Validating SQL query for dataset {}", dataset_id); if sql.trim().is_empty() { @@ -17,10 +34,28 @@ pub async fn validate_sql(sql: &str, dataset_id: &Uuid) -> Result<()> { } // Try to execute the query using query_engine - match query_engine(dataset_id, &sql.to_string()).await { - Ok(_) => Ok(()), - Err(e) => Err(anyhow!("SQL validation failed: {}", e)), - } + let results = match query_engine(dataset_id, &sql.to_string()).await { + Ok(results) => results, + Err(e) => return Err(anyhow!("SQL validation failed: {}", e)), + }; + + let num_records = results.len(); + + // Create appropriate message based on number of records + let message = if num_records == 0 { + "No records were found".to_string() + } else { + format!("{} records were returned", num_records) + }; + + // Return records only if there are 13 or fewer + let return_records = if num_records <= 13 { + results + } else { + Vec::new() // Empty vec when more than 13 records + }; + + Ok((message, return_records)) } /// Validates existence of metric IDs in database @@ -52,7 +87,7 @@ pub const METRIC_YML_SCHEMA: &str = r##" # Required top-level fields: # # title: "Your Metric Title" -# dataset_ids: ["uuid1", "uuid2"] # Dataset UUIDs this metric belongs to +# dataset_ids: ["123e4567-e89b-12d3-a456-426614174000"] # Dataset UUIDs (not names) # time_frame: "Last 30 days" # Human-readable time period covered by the query # sql: | # SELECT @@ -103,6 +138,10 @@ properties: dataset_ids: type: array description: "UUIDs of datasets this metric belongs to" + items: + type: string + format: "uuid" + description: "UUID string of the dataset (not the dataset name)" # TIME FRAME time_frame: @@ -437,6 +476,7 @@ pub const DASHBOARD_YML_SCHEMA: &str = r##" # Required fields: # # title: "Your Dashboard Title" +# description: "A description of the dashboard, it's metrics, and its purpose." # rows: # - items: # - id: "metric-uuid-1" # UUIDv4 of an existing metric @@ -460,6 +500,9 @@ properties: title: type: string description: "The title of the dashboard (e.g. 'Sales & Marketing Dashboard')" + description: + type: string + description: "A description of the dashboard, its metrics, and its purpose" rows: type: array description: "Array of row objects, each containing metric items" @@ -488,9 +531,281 @@ properties: - items required: - title + - description - rows "##; +/// Process a metric file creation request +/// Returns Ok((MetricFile, MetricYml, String, Vec))) if successful, or an error message if failed +/// The string is a message about the number of records returned by the SQL query +/// The vector of IndexMap is the results of the SQL query. Returns empty vector if more than 13 records or no results. +pub async fn process_metric_file( + file_name: String, + yml_content: String, +) -> Result< + ( + MetricFile, + MetricYml, + String, + Vec>, + ), + String, +> { + debug!("Processing metric file: {}", file_name); + + let metric_yml = MetricYml::new(yml_content.clone()) + .map_err(|e| format!("Invalid YAML format: {}", e))?; + + let metric_id = metric_yml + .id + .ok_or_else(|| "Missing required field 'id'".to_string())?; + + // Check if dataset_ids is empty + if metric_yml.dataset_ids.is_empty() { + return Err("Missing required field 'dataset_ids'".to_string()); + } + + // Use the first dataset_id for SQL validation + let dataset_id = metric_yml.dataset_ids[0]; + debug!("Validating SQL using dataset_id: {}", dataset_id); + + // Validate SQL with the selected dataset_id and get results + let (message, results) = match validate_sql(&metric_yml.sql, &dataset_id).await { + Ok(results) => results, + Err(e) => return Err(format!("Invalid SQL query: {}", e)), + }; + + let metric_file = MetricFile { + id: metric_id, + name: metric_yml.title.clone(), + file_name: format!("{}.yml", file_name), + content: serde_json::to_value(metric_yml.clone()) + .map_err(|e| format!("Failed to process metric: {}", e))?, + created_by: Uuid::new_v4(), + verification: Verification::NotRequested, + evaluation_obj: None, + evaluation_summary: None, + evaluation_score: None, + organization_id: Uuid::new_v4(), + created_at: Utc::now(), + updated_at: Utc::now(), + deleted_at: None, + }; + + Ok((metric_file, metric_yml, message, results)) +} + +/// Process a metric file modification request +/// Returns Ok((MetricFile, MetricYml, Vec, String, Vec>)) if successful, or an error if failed +/// The string is a message about the number of records returned by the SQL query +/// The vector of IndexMap is the results of the SQL query. Returns empty vector if more than 13 records or no results. +pub async fn process_metric_file_modification( + mut file: MetricFile, + modification: &FileModification, + duration: i64, +) -> Result<(MetricFile, MetricYml, Vec, String, Vec>), anyhow::Error> { + debug!( + file_id = %file.id, + file_name = %modification.file_name, + "Processing metric file modifications" + ); + + let mut results = Vec::new(); + + // Parse existing content + let current_yml: MetricYml = match serde_json::from_value(file.content.clone()) { + Ok(yml) => yml, + Err(e) => { + let error = format!("Failed to parse existing metric YAML: {}", e); + error!( + file_id = %file.id, + file_name = %modification.file_name, + error = %error, + "YAML parsing error" + ); + results.push(ModificationResult { + file_id: file.id, + file_name: modification.file_name.clone(), + success: false, + original_lines: vec![], + adjusted_lines: vec![], + error: Some(error.clone()), + modification_type: "parsing".to_string(), + timestamp: Utc::now(), + duration, + }); + return Err(anyhow!(error)); + } + }; + + // Convert to YAML string for line modifications + let current_content = match serde_yaml::to_string(¤t_yml) { + Ok(content) => content, + Err(e) => { + let error = format!("Failed to serialize metric YAML: {}", e); + error!( + file_id = %file.id, + file_name = %modification.file_name, + error = %error, + "YAML serialization error" + ); + results.push(ModificationResult { + file_id: file.id, + file_name: modification.file_name.clone(), + success: false, + original_lines: vec![], + adjusted_lines: vec![], + error: Some(error.clone()), + modification_type: "serialization".to_string(), + timestamp: Utc::now(), + duration, + }); + return Err(anyhow!(error)); + } + }; + + // Track original line numbers before modifications + let mut original_lines = Vec::new(); + let mut adjusted_lines = Vec::new(); + + // Apply modifications + for m in &modification.modifications { + original_lines.extend(m.line_numbers.clone()); + } + + // Apply modifications and track results + match apply_modifications_to_content( + ¤t_content, + &modification.modifications, + &modification.file_name, + ) { + Ok(modified_content) => { + // Create and validate new YML object + match MetricYml::new(modified_content) { + Ok(new_yml) => { + debug!( + file_id = %file.id, + file_name = %modification.file_name, + "Successfully modified and validated metric file" + ); + + // Check if dataset_ids is empty + if new_yml.dataset_ids.is_empty() { + let error = "Missing required field 'dataset_ids'".to_string(); + error!( + file_id = %file.id, + file_name = %modification.file_name, + error = %error, + "Validation error" + ); + results.push(ModificationResult { + file_id: file.id, + file_name: modification.file_name.clone(), + success: false, + original_lines: original_lines.clone(), + adjusted_lines: adjusted_lines.clone(), + error: Some(error.clone()), + modification_type: "validation".to_string(), + timestamp: Utc::now(), + duration, + }); + return Err(anyhow!(error)); + } + + // Validate SQL with the selected dataset_id and get results + let dataset_id = new_yml.dataset_ids[0]; + let (message, validation_results) = match validate_sql(&new_yml.sql, &dataset_id).await { + Ok(results) => results, + Err(e) => { + let error = format!("Invalid SQL query: {}", e); + error!( + file_id = %file.id, + file_name = %modification.file_name, + error = %error, + "SQL validation error" + ); + results.push(ModificationResult { + file_id: file.id, + file_name: modification.file_name.clone(), + success: false, + original_lines: original_lines.clone(), + adjusted_lines: adjusted_lines.clone(), + error: Some(error.clone()), + modification_type: "sql_validation".to_string(), + timestamp: Utc::now(), + duration, + }); + return Err(anyhow!(error)); + } + }; + + // Update file record + file.content = serde_json::to_value(&new_yml)?; + file.updated_at = Utc::now(); + file.verification = Verification::NotRequested; + + // Track successful modification + results.push(ModificationResult { + file_id: file.id, + file_name: modification.file_name.clone(), + success: true, + original_lines: original_lines.clone(), + adjusted_lines: adjusted_lines.clone(), + error: None, + modification_type: "content".to_string(), + timestamp: Utc::now(), + duration, + }); + + Ok((file, new_yml, results, message, validation_results)) + } + Err(e) => { + let error = format!("Failed to validate modified YAML: {}", e); + error!( + file_id = %file.id, + file_name = %modification.file_name, + error = %error, + "YAML validation error" + ); + results.push(ModificationResult { + file_id: file.id, + file_name: modification.file_name.clone(), + success: false, + original_lines, + adjusted_lines: vec![], + error: Some(error.clone()), + modification_type: "validation".to_string(), + timestamp: Utc::now(), + duration, + }); + Err(anyhow!(error)) + } + } + } + Err(e) => { + let error = format!("Failed to apply modifications: {}", e); + error!( + file_id = %file.id, + file_name = %modification.file_name, + error = %error, + "Modification application error" + ); + results.push(ModificationResult { + file_id: file.id, + file_name: modification.file_name.clone(), + success: false, + original_lines, + adjusted_lines: vec![], + error: Some(error.clone()), + modification_type: "modification".to_string(), + timestamp: Utc::now(), + duration, + }); + Err(anyhow!(error)) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/api/src/utils/tools/file_tools/create_dashboard_files.rs b/api/src/utils/tools/file_tools/create_dashboard_files.rs index c51d09dcd..91153d6bb 100644 --- a/api/src/utils/tools/file_tools/create_dashboard_files.rs +++ b/api/src/utils/tools/file_tools/create_dashboard_files.rs @@ -13,7 +13,10 @@ use uuid::Uuid; use crate::{ database_dep::{lib::get_pg_pool, models::DashboardFile, schema::dashboard_files}, - utils::{agent::Agent, tools::{file_tools::common::DASHBOARD_YML_SCHEMA, ToolExecutor}}, + utils::{ + agent::Agent, + tools::{file_tools::common::DASHBOARD_YML_SCHEMA, ToolExecutor}, + }, }; use super::{ @@ -178,6 +181,8 @@ impl ToolExecutor for CreateDashboardFilesTool { name: dashboard_records[i].name.clone(), file_type: "dashboard".to_string(), yml_content: serde_yaml::to_string(&yml).unwrap_or_default(), + result_message: None, + results: None, }); } } diff --git a/api/src/utils/tools/file_tools/create_metric_files.rs b/api/src/utils/tools/file_tools/create_metric_files.rs index 5068c764b..b709ef83a 100644 --- a/api/src/utils/tools/file_tools/create_metric_files.rs +++ b/api/src/utils/tools/file_tools/create_metric_files.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use chrono::Utc; use diesel::insert_into; use diesel_async::RunQueryDsl; +use indexmap::IndexMap; use serde::{Deserialize, Serialize}; use serde_json::Value; use tracing::debug; @@ -15,15 +16,21 @@ use crate::{ database_dep::{ enums::Verification, lib::get_pg_pool, models::MetricFile, schema::metric_files, }, - utils::{agent::Agent, tools::{file_tools::common::METRIC_YML_SCHEMA, ToolExecutor}}, + utils::{ + agent::Agent, + data_types::DataType, + tools::{ + file_tools::{ + common::{METRIC_YML_SCHEMA, process_metric_file}, + file_types::{file::FileWithId, metric_yml::MetricYml}, + }, + ToolExecutor, + }, + }, }; use super::{ common::validate_sql, - file_types::{ - file::{FileWithId}, - metric_yml::MetricYml, - }, FileModificationTool, }; @@ -63,52 +70,6 @@ impl CreateMetricFilesTool { impl FileModificationTool for CreateMetricFilesTool {} -/// Process a metric file creation request -/// Returns Ok((MetricFile, MetricYml)) if successful, or an error message if failed -async fn process_metric_file(file: MetricFileParams) -> Result<(MetricFile, MetricYml), String> { - debug!("Processing metric file creation: {}", file.name); - - let metric_yml = MetricYml::new(file.yml_content.clone()) - .map_err(|e| format!("Invalid YAML format: {}", e))?; - - let metric_id = metric_yml - .id - .ok_or_else(|| "Missing required field 'id'".to_string())?; - - // Check if dataset_ids is empty - if metric_yml.dataset_ids.is_empty() { - return Err("Missing required field 'dataset_ids'".to_string()); - } - - // Use the first dataset_id for SQL validation - let dataset_id = metric_yml.dataset_ids[0]; - debug!("Validating SQL using dataset_id: {}", dataset_id); - - // Validate SQL with the selected dataset_id - if let Err(e) = validate_sql(&metric_yml.sql, &dataset_id).await { - return Err(format!("Invalid SQL query: {}", e)); - } - - let metric_file = MetricFile { - id: metric_id, - name: metric_yml.title.clone(), - file_name: format!("{}.yml", file.name), - content: serde_json::to_value(metric_yml.clone()) - .map_err(|e| format!("Failed to process metric: {}", e))?, - created_by: Uuid::new_v4(), - verification: Verification::NotRequested, - evaluation_obj: None, - evaluation_summary: None, - evaluation_score: None, - organization_id: Uuid::new_v4(), - created_at: Utc::now(), - updated_at: Utc::now(), - deleted_at: None, - }; - - Ok((metric_file, metric_yml)) -} - #[async_trait] impl ToolExecutor for CreateMetricFilesTool { type Output = CreateMetricFilesOutput; @@ -138,13 +99,14 @@ impl ToolExecutor for CreateMetricFilesTool { // Process metric files let mut metric_records = vec![]; let mut metric_ymls = vec![]; - + let mut results_vec = vec![]; // First pass - validate and prepare all records for file in files { - match process_metric_file(file.clone()).await { - Ok((metric_file, metric_yml)) => { + match process_metric_file(file.name.clone(), file.yml_content.clone()).await { + Ok((metric_file, metric_yml, message, results)) => { metric_records.push(metric_file); metric_ymls.push(metric_yml); + results_vec.push((message, results)); } Err(e) => { failed_files.push((file.name, e)); @@ -172,6 +134,8 @@ impl ToolExecutor for CreateMetricFilesTool { name: metric_records[i].name.clone(), file_type: "metric".to_string(), yml_content: serde_yaml::to_string(&yml).unwrap_or_default(), + result_message: Some(results_vec[i].0.clone()), + results: Some(results_vec[i].1.clone()), }); } } @@ -265,4 +229,4 @@ impl ToolExecutor for CreateMetricFilesTool { } }) } -} \ No newline at end of file +} diff --git a/api/src/utils/tools/file_tools/file_types/dashboard_yml.rs b/api/src/utils/tools/file_tools/file_types/dashboard_yml.rs index a1ba35f71..b6a40c0bb 100644 --- a/api/src/utils/tools/file_tools/file_types/dashboard_yml.rs +++ b/api/src/utils/tools/file_tools/file_types/dashboard_yml.rs @@ -9,6 +9,7 @@ pub struct DashboardYml { pub updated_at: Option>, #[serde(alias = "title")] pub name: String, + pub description: Option, pub rows: Vec, } diff --git a/api/src/utils/tools/file_tools/file_types/file.rs b/api/src/utils/tools/file_tools/file_types/file.rs index 84e7a8401..55db422b0 100644 --- a/api/src/utils/tools/file_tools/file_types/file.rs +++ b/api/src/utils/tools/file_tools/file_types/file.rs @@ -1,7 +1,10 @@ use chrono::{DateTime, Utc}; +use indexmap::IndexMap; use serde::{Deserialize, Serialize}; use uuid::Uuid; +use crate::utils::data_types::DataType; + use super::dashboard_yml::DashboardYml; use super::metric_yml::MetricYml; @@ -18,6 +21,10 @@ pub struct FileWithId { pub name: String, pub file_type: String, pub yml_content: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub result_message: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub results: Option>>, } #[derive(Debug, Serialize, Deserialize)] diff --git a/api/src/utils/tools/file_tools/filter_dashboard_files.rs b/api/src/utils/tools/file_tools/filter_dashboards.rs similarity index 76% rename from api/src/utils/tools/file_tools/filter_dashboard_files.rs rename to api/src/utils/tools/file_tools/filter_dashboards.rs index 7cf20e7ba..1d7031c08 100644 --- a/api/src/utils/tools/file_tools/filter_dashboard_files.rs +++ b/api/src/utils/tools/file_tools/filter_dashboards.rs @@ -6,6 +6,7 @@ use async_trait::async_trait; use chrono::Utc; use diesel::{upsert::excluded, ExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; +use indexmap::IndexMap; use serde::{Deserialize, Serialize}; use serde_json::Value; use tracing::{debug, error, info, warn}; @@ -27,7 +28,16 @@ use crate::{ models::{DashboardFile, MetricFile}, schema::{dashboard_files, metric_files}, }, - utils::{agent::Agent, tools::ToolExecutor}, + utils::{ + agent::Agent, + data_types::DataType, + tools::{ + file_tools::{ + common::{METRIC_YML_SCHEMA, process_metric_file_modification}, + }, + ToolExecutor, + }, + }, }; use litellm::ToolCall; @@ -77,6 +87,8 @@ struct FileModificationBatch { metric_ymls: Vec, failed_modifications: Vec<(String, String)>, modification_results: Vec, + validation_messages: Vec, + validation_results: Vec>>, } #[derive(Debug)] @@ -232,20 +244,20 @@ pub struct ModifyFilesOutput { files: Vec, } -pub struct FilterDashboardFilesTool { +pub struct FilterDashboardsTool { agent: Arc, } -impl FilterDashboardFilesTool { +impl FilterDashboardsTool { pub fn new(agent: Arc) -> Self { Self { agent } } } -impl FileModificationTool for FilterDashboardFilesTool {} +impl FileModificationTool for FilterDashboardsTool {} #[async_trait] -impl ToolExecutor for FilterDashboardFilesTool { +impl ToolExecutor for FilterDashboardsTool { type Output = ModifyFilesOutput; type Params = ModifyFilesParams; @@ -277,6 +289,8 @@ impl ToolExecutor for FilterDashboardFilesTool { metric_ymls: Vec::new(), failed_modifications: Vec::new(), modification_results: Vec::new(), + validation_messages: Vec::new(), + validation_results: Vec::new(), }; // Collect file IDs and create map @@ -309,17 +323,19 @@ impl ToolExecutor for FilterDashboardFilesTool { Ok(files) => { for file in files { if let Some(modifications) = file_map.get(&file.id) { - match process_metric_file( + match process_metric_file_modification( file, modifications, start_time.elapsed().as_millis() as i64, ) .await { - Ok((metric_file, metric_yml, results)) => { + Ok((metric_file, metric_yml, results, validation_message, validation_results)) => { batch.metric_files.push(metric_file); batch.metric_ymls.push(metric_yml); batch.modification_results.extend(results); + batch.validation_messages.push(validation_message); + batch.validation_results.push(validation_results); } Err(e) => { batch @@ -366,14 +382,17 @@ impl ToolExecutor for FilterDashboardFilesTool { { Ok(_) => { output.files.extend( - batch.metric_files.iter().zip(batch.metric_ymls.iter()).map( - |(file, yml)| FileWithId { + batch.metric_files.iter().enumerate().map(|(i, file)| { + let yml = &batch.metric_ymls[i]; + FileWithId { id: file.id, name: file.name.clone(), file_type: "metric".to_string(), yml_content: serde_yaml::to_string(&yml).unwrap_or_default(), - }, - ), + validation_message: Some(batch.validation_messages[i].clone()), + validation_results: Some(batch.validation_results[i].clone()), + } + }), ); } Err(e) => { @@ -483,189 +502,6 @@ impl ToolExecutor for FilterDashboardFilesTool { } } -async fn process_metric_file( - mut file: MetricFile, - modification: &FileModification, - duration: i64, -) -> Result<(MetricFile, MetricYml, Vec)> { - debug!( - file_id = %file.id, - file_name = %modification.file_name, - "Processing metric file modifications" - ); - - let mut results = Vec::new(); - - // Parse existing content - let current_yml: MetricYml = match serde_json::from_value(file.content.clone()) { - Ok(yml) => yml, - Err(e) => { - let error = format!("Failed to parse existing metric YAML: {}", e); - error!( - file_id = %file.id, - file_name = %modification.file_name, - error = %error, - "YAML parsing error" - ); - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: false, - original_lines: vec![], - adjusted_lines: vec![], - error: Some(error.clone()), - modification_type: "parsing".to_string(), - timestamp: Utc::now(), - duration, - }); - return Err(anyhow::anyhow!(error)); - } - }; - - // Convert to YAML string for line modifications - let current_content = match serde_yaml::to_string(¤t_yml) { - Ok(content) => content, - Err(e) => { - let error = format!("Failed to serialize metric YAML: {}", e); - error!( - file_id = %file.id, - file_name = %modification.file_name, - error = %error, - "YAML serialization error" - ); - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: false, - original_lines: vec![], - adjusted_lines: vec![], - error: Some(error.clone()), - modification_type: "serialization".to_string(), - timestamp: Utc::now(), - duration, - }); - return Err(anyhow::anyhow!(error)); - } - }; - - // Track original line numbers before modifications - let mut original_lines = Vec::new(); - let mut adjusted_lines = Vec::new(); - - // Apply modifications - for m in &modification.modifications { - original_lines.extend(m.line_numbers.clone()); - } - - // Apply modifications and track results - match apply_modifications_to_content( - ¤t_content, - &modification.modifications, - &modification.file_name, - ) { - Ok(modified_content) => { - // Create and validate new YML object - match MetricYml::new(modified_content) { - Ok(new_yml) => { - debug!( - file_id = %file.id, - file_name = %modification.file_name, - "Successfully modified and validated metric file" - ); - - // Validate SQL if it was modified - let sql_changed = current_yml.sql != new_yml.sql; - if sql_changed { - if let Err(e) = validate_sql(&new_yml.sql, &file.id).await { - let error = format!("SQL validation failed: {}", e); - error!( - file_id = %file.id, - file_name = %modification.file_name, - error = %error, - "SQL validation error" - ); - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: false, - original_lines: original_lines.clone(), - adjusted_lines: adjusted_lines.clone(), - error: Some(error.clone()), - modification_type: "sql_validation".to_string(), - timestamp: Utc::now(), - duration, - }); - return Err(anyhow::anyhow!(error)); - } - } - - // Update file record - file.content = serde_json::to_value(&new_yml)?; - file.updated_at = Utc::now(); - file.verification = Verification::NotRequested; - - // Track successful modification - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: true, - original_lines: original_lines.clone(), - adjusted_lines: adjusted_lines.clone(), - error: None, - modification_type: "content".to_string(), - timestamp: Utc::now(), - duration, - }); - - Ok((file, new_yml, results)) - } - Err(e) => { - let error = format!("Failed to validate modified YAML: {}", e); - error!( - file_id = %file.id, - file_name = %modification.file_name, - error = %error, - "YAML validation error" - ); - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: false, - original_lines, - adjusted_lines: vec![], - error: Some(error.clone()), - modification_type: "validation".to_string(), - timestamp: Utc::now(), - duration, - }); - Err(anyhow::anyhow!(error)) - } - } - } - Err(e) => { - let error = format!("Failed to apply modifications: {}", e); - error!( - file_id = %file.id, - file_name = %modification.file_name, - error = %error, - "Modification application error" - ); - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: false, - original_lines, - adjusted_lines: vec![], - error: Some(error.clone()), - modification_type: "modification".to_string(), - timestamp: Utc::now(), - duration, - }); - Err(anyhow::anyhow!(error)) - } - } -} - #[cfg(test)] mod tests { use std::collections::HashMap; @@ -858,7 +694,7 @@ mod tests { #[test] fn test_tool_parameter_validation() { - let tool = FilterDashboardFilesTool { + let tool = FilterDashboardsTool { agent: Arc::new(Agent::new( "o3-mini".to_string(), HashMap::new(), diff --git a/api/src/utils/tools/file_tools/mod.rs b/api/src/utils/tools/file_tools/mod.rs index aa9b9880a..4edc15ceb 100644 --- a/api/src/utils/tools/file_tools/mod.rs +++ b/api/src/utils/tools/file_tools/mod.rs @@ -2,7 +2,7 @@ pub mod common; pub mod create_dashboard_files; pub mod create_metric_files; pub mod file_types; -pub mod filter_dashboard_files; +// pub mod filter_dashboards; pub mod modify_dashboard_files; pub mod modify_metric_files; pub mod open_files; @@ -12,7 +12,7 @@ pub mod send_assets_to_user; pub use create_dashboard_files::CreateDashboardFilesTool; pub use create_metric_files::CreateMetricFilesTool; -pub use filter_dashboard_files::FilterDashboardFilesTool; +// pub use filter_dashboards::FilterDashboardsTool; pub use modify_dashboard_files::ModifyDashboardFilesTool; pub use modify_metric_files::ModifyMetricFilesTool; pub use open_files::OpenFilesTool; diff --git a/api/src/utils/tools/file_tools/modify_dashboard_files.rs b/api/src/utils/tools/file_tools/modify_dashboard_files.rs index 6f55c3594..429d348fe 100644 --- a/api/src/utils/tools/file_tools/modify_dashboard_files.rs +++ b/api/src/utils/tools/file_tools/modify_dashboard_files.rs @@ -363,6 +363,8 @@ impl ToolExecutor for ModifyDashboardFilesTool { name: file.name.clone(), file_type: "dashboard".to_string(), yml_content: serde_yaml::to_string(&yml).unwrap_or_default(), + result_message: None, + results: None, }), ); } diff --git a/api/src/utils/tools/file_tools/modify_metric_files.rs b/api/src/utils/tools/file_tools/modify_metric_files.rs index 9130bb4e4..0b7adf419 100644 --- a/api/src/utils/tools/file_tools/modify_metric_files.rs +++ b/api/src/utils/tools/file_tools/modify_metric_files.rs @@ -10,10 +10,11 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use tracing::{debug, error, info, warn}; use uuid::Uuid; +use indexmap::IndexMap; use super::{ common::{validate_metric_ids, validate_sql}, - file_types::{dashboard_yml::DashboardYml, file::{FileEnum, FileWithId}, metric_yml::MetricYml}, + file_types::{dashboard_yml::DashboardYml, file::FileWithId, metric_yml::MetricYml}, FileModificationTool, }; use crate::{ @@ -23,7 +24,16 @@ use crate::{ models::{DashboardFile, MetricFile}, schema::{dashboard_files, metric_files}, }, - utils::{agent::Agent, tools::{file_tools::common::METRIC_YML_SCHEMA, ToolExecutor}}, + utils::{ + agent::Agent, + data_types::DataType, + tools::{ + file_tools::{ + common::{METRIC_YML_SCHEMA, process_metric_file_modification}, + }, + ToolExecutor, + }, + }, }; use litellm::ToolCall; @@ -55,16 +65,16 @@ pub struct ModifyFilesParams { } #[derive(Debug, Serialize)] -struct ModificationResult { - file_id: Uuid, - file_name: String, - success: bool, - original_lines: Vec, - adjusted_lines: Vec, - error: Option, - modification_type: String, - timestamp: chrono::DateTime, - duration: i64, +pub struct ModificationResult { + pub file_id: Uuid, + pub file_name: String, + pub success: bool, + pub original_lines: Vec, + pub adjusted_lines: Vec, + pub error: Option, + pub modification_type: String, + pub timestamp: chrono::DateTime, + pub duration: i64, } #[derive(Debug)] @@ -73,6 +83,8 @@ struct FileModificationBatch { metric_ymls: Vec, failed_modifications: Vec<(String, String)>, modification_results: Vec, + validation_messages: Vec, + validation_results: Vec>>, } #[derive(Debug)] @@ -153,7 +165,7 @@ fn expand_line_range(line_numbers: &[i64]) -> Vec { (start..=end).collect() } -fn apply_modifications_to_content( +pub fn apply_modifications_to_content( content: &str, modifications: &[Modification], file_name: &str, @@ -272,6 +284,8 @@ impl ToolExecutor for ModifyMetricFilesTool { metric_ymls: Vec::new(), failed_modifications: Vec::new(), modification_results: Vec::new(), + validation_messages: Vec::new(), + validation_results: Vec::new(), }; // Collect file IDs and create map @@ -304,17 +318,19 @@ impl ToolExecutor for ModifyMetricFilesTool { Ok(files) => { for file in files { if let Some(modifications) = file_map.get(&file.id) { - match process_metric_file( + match process_metric_file_modification( file, modifications, start_time.elapsed().as_millis() as i64, ) .await { - Ok((metric_file, metric_yml, results)) => { + Ok((metric_file, metric_yml, results, validation_message, validation_results)) => { batch.metric_files.push(metric_file); batch.metric_ymls.push(metric_yml); batch.modification_results.extend(results); + batch.validation_messages.push(validation_message); + batch.validation_results.push(validation_results); } Err(e) => { batch @@ -361,12 +377,17 @@ impl ToolExecutor for ModifyMetricFilesTool { { Ok(_) => { output.files.extend( - batch.metric_files.iter().zip(batch.metric_ymls.iter()).map(|(file, yml)| FileWithId { - id: file.id, - name: file.name.clone(), - file_type: "metric".to_string(), - yml_content: serde_yaml::to_string(&yml).unwrap_or_default(), - }) + batch.metric_files.iter().enumerate().map(|(i, file)| { + let yml = &batch.metric_ymls[i]; + FileWithId { + id: file.id, + name: file.name.clone(), + file_type: "metric".to_string(), + yml_content: serde_yaml::to_string(&yml).unwrap_or_default(), + result_message: Some(batch.validation_messages[i].clone()), + results: Some(batch.validation_results[i].clone()), + } + }), ); } Err(e) => { @@ -471,189 +492,6 @@ impl ToolExecutor for ModifyMetricFilesTool { } } -async fn process_metric_file( - mut file: MetricFile, - modification: &FileModification, - duration: i64, -) -> Result<(MetricFile, MetricYml, Vec)> { - debug!( - file_id = %file.id, - file_name = %modification.file_name, - "Processing metric file modifications" - ); - - let mut results = Vec::new(); - - // Parse existing content - let current_yml: MetricYml = match serde_json::from_value(file.content.clone()) { - Ok(yml) => yml, - Err(e) => { - let error = format!("Failed to parse existing metric YAML: {}", e); - error!( - file_id = %file.id, - file_name = %modification.file_name, - error = %error, - "YAML parsing error" - ); - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: false, - original_lines: vec![], - adjusted_lines: vec![], - error: Some(error.clone()), - modification_type: "parsing".to_string(), - timestamp: Utc::now(), - duration, - }); - return Err(anyhow::anyhow!(error)); - } - }; - - // Convert to YAML string for line modifications - let current_content = match serde_yaml::to_string(¤t_yml) { - Ok(content) => content, - Err(e) => { - let error = format!("Failed to serialize metric YAML: {}", e); - error!( - file_id = %file.id, - file_name = %modification.file_name, - error = %error, - "YAML serialization error" - ); - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: false, - original_lines: vec![], - adjusted_lines: vec![], - error: Some(error.clone()), - modification_type: "serialization".to_string(), - timestamp: Utc::now(), - duration, - }); - return Err(anyhow::anyhow!(error)); - } - }; - - // Track original line numbers before modifications - let mut original_lines = Vec::new(); - let mut adjusted_lines = Vec::new(); - - // Apply modifications - for m in &modification.modifications { - original_lines.extend(m.line_numbers.clone()); - } - - // Apply modifications and track results - match apply_modifications_to_content( - ¤t_content, - &modification.modifications, - &modification.file_name, - ) { - Ok(modified_content) => { - // Create and validate new YML object - match MetricYml::new(modified_content) { - Ok(new_yml) => { - debug!( - file_id = %file.id, - file_name = %modification.file_name, - "Successfully modified and validated metric file" - ); - - // Validate SQL if it was modified - let sql_changed = current_yml.sql != new_yml.sql; - if sql_changed { - if let Err(e) = validate_sql(&new_yml.sql, &file.id).await { - let error = format!("SQL validation failed: {}", e); - error!( - file_id = %file.id, - file_name = %modification.file_name, - error = %error, - "SQL validation error" - ); - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: false, - original_lines: original_lines.clone(), - adjusted_lines: adjusted_lines.clone(), - error: Some(error.clone()), - modification_type: "sql_validation".to_string(), - timestamp: Utc::now(), - duration, - }); - return Err(anyhow::anyhow!(error)); - } - } - - // Update file record - file.content = serde_json::to_value(&new_yml)?; - file.updated_at = Utc::now(); - file.verification = Verification::NotRequested; - - // Track successful modification - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: true, - original_lines: original_lines.clone(), - adjusted_lines: adjusted_lines.clone(), - error: None, - modification_type: "content".to_string(), - timestamp: Utc::now(), - duration, - }); - - Ok((file, new_yml, results)) - } - Err(e) => { - let error = format!("Failed to validate modified YAML: {}", e); - error!( - file_id = %file.id, - file_name = %modification.file_name, - error = %error, - "YAML validation error" - ); - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: false, - original_lines, - adjusted_lines: vec![], - error: Some(error.clone()), - modification_type: "validation".to_string(), - timestamp: Utc::now(), - duration, - }); - Err(anyhow::anyhow!(error)) - } - } - } - Err(e) => { - let error = format!("Failed to apply modifications: {}", e); - error!( - file_id = %file.id, - file_name = %modification.file_name, - error = %error, - "Modification application error" - ); - results.push(ModificationResult { - file_id: file.id, - file_name: modification.file_name.clone(), - success: false, - original_lines, - adjusted_lines: vec![], - error: Some(error.clone()), - modification_type: "modification".to_string(), - timestamp: Utc::now(), - duration, - }); - Err(anyhow::anyhow!(error)) - } - } -} - #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/api/src/utils/tools/file_tools/search_data_catalog.rs b/api/src/utils/tools/file_tools/search_data_catalog.rs index 68494cf1a..fe66d0bca 100644 --- a/api/src/utils/tools/file_tools/search_data_catalog.rs +++ b/api/src/utils/tools/file_tools/search_data_catalog.rs @@ -78,6 +78,9 @@ Requirements: 4. Each result MUST ONLY include the "id" field containing the UUID string 5. If no datasets meet the relevance criteria, return {"results": []} 6. Exclude datasets that only tangentially relate to the query +7. CRITICAL: Each result MUST contain ONLY a valid UUID string with the key "id" - no other fields are allowed +8. CRITICAL: The "id" value MUST be a valid UUID string (e.g., "550e8400-e29b-41d4-a716-446655440000") +9. Any result without a valid UUID "id" field will be rejected "#; pub struct SearchDataCatalogTool {