modifications for including subsets of data in requests.

This commit is contained in:
dal 2025-02-27 12:43:04 -07:00
parent f013295714
commit edbedd84ca
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
11 changed files with 443 additions and 475 deletions

View File

@ -8,7 +8,8 @@ use crate::utils::{
agent::{agent::AgentError, Agent, AgentExt, AgentThread},
tools::{
file_tools::{
CreateDashboardFilesTool, CreateMetricFilesTool, FilterDashboardFilesTool, ModifyDashboardFilesTool, ModifyMetricFilesTool
CreateDashboardFilesTool, CreateMetricFilesTool, ModifyDashboardFilesTool,
ModifyMetricFilesTool,
},
IntoValueTool, ToolExecutor,
},
@ -28,7 +29,6 @@ impl DashboardAgent {
let modify_dashboard_tool = ModifyDashboardFilesTool::new(Arc::clone(&self.agent));
let create_metric_tool = CreateMetricFilesTool::new(Arc::clone(&self.agent));
let modify_metric_tool = ModifyMetricFilesTool::new(Arc::clone(&self.agent));
let filter_dashboard_tool = FilterDashboardFilesTool::new(Arc::clone(&self.agent));
// Add tools to the agent
self.agent
@ -55,12 +55,6 @@ impl DashboardAgent {
modify_metric_tool.into_value_tool(),
)
.await;
self.agent
.add_tool(
filter_dashboard_tool.get_name(),
filter_dashboard_tool.into_value_tool(),
)
.await;
Ok(())
}
@ -82,7 +76,10 @@ impl DashboardAgent {
pub async fn from_existing(existing_agent: &Arc<Agent>) -> Result<Self> {
// Create a new agent with the same core properties and shared state/stream
let agent = Arc::new(Agent::from_existing(existing_agent, "dashboard_agent".to_string()));
let agent = Arc::new(Agent::from_existing(
existing_agent,
"dashboard_agent".to_string(),
));
let dashboard = Self { agent };
dashboard.load_tools().await?;
Ok(dashboard)

View File

@ -1,15 +1,32 @@
use anyhow::{anyhow, Result};
use tracing::debug;
use indexmap::IndexMap;
use tracing::{debug, error};
use uuid::Uuid;
use chrono::{Utc, Duration};
use serde::{Deserialize, Serialize};
use serde_json;
use serde_yaml;
use crate::database_dep::{lib::get_pg_pool, schema::metric_files};
use crate::database_dep::{
lib::get_pg_pool,
schema::metric_files,
models::MetricFile,
enums::Verification
};
use crate::utils::data_types::DataType;
use crate::utils::query_engine::query_engine::query_engine;
use crate::utils::tools::file_tools::file_types::metric_yml::MetricYml;
use crate::utils::tools::file_tools::file_types::file::FileWithId;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
// Import the types needed for the modification function
use crate::utils::tools::file_tools::modify_metric_files::{FileModification, ModificationResult};
use crate::utils::tools::file_tools::modify_metric_files::apply_modifications_to_content;
/// Validates SQL query using existing query engine by attempting to run it
/// Returns Ok(()) if valid, Err with description if invalid
pub async fn validate_sql(sql: &str, dataset_id: &Uuid) -> Result<()> {
/// Returns a tuple with a message about the number of records and the results (if ≤ 13 records)
pub async fn validate_sql(sql: &str, dataset_id: &Uuid) -> Result<(String, Vec<IndexMap<String, DataType>>)> {
debug!("Validating SQL query for dataset {}", dataset_id);
if sql.trim().is_empty() {
@ -17,10 +34,28 @@ pub async fn validate_sql(sql: &str, dataset_id: &Uuid) -> Result<()> {
}
// Try to execute the query using query_engine
match query_engine(dataset_id, &sql.to_string()).await {
Ok(_) => Ok(()),
Err(e) => Err(anyhow!("SQL validation failed: {}", e)),
}
let results = match query_engine(dataset_id, &sql.to_string()).await {
Ok(results) => results,
Err(e) => return Err(anyhow!("SQL validation failed: {}", e)),
};
let num_records = results.len();
// Create appropriate message based on number of records
let message = if num_records == 0 {
"No records were found".to_string()
} else {
format!("{} records were returned", num_records)
};
// Return records only if there are 13 or fewer
let return_records = if num_records <= 13 {
results
} else {
Vec::new() // Empty vec when more than 13 records
};
Ok((message, return_records))
}
/// Validates existence of metric IDs in database
@ -52,7 +87,7 @@ pub const METRIC_YML_SCHEMA: &str = r##"
# Required top-level fields:
#
# title: "Your Metric Title"
# dataset_ids: ["uuid1", "uuid2"] # Dataset UUIDs this metric belongs to
# dataset_ids: ["123e4567-e89b-12d3-a456-426614174000"] # Dataset UUIDs (not names)
# time_frame: "Last 30 days" # Human-readable time period covered by the query
# sql: |
# SELECT
@ -103,6 +138,10 @@ properties:
dataset_ids:
type: array
description: "UUIDs of datasets this metric belongs to"
items:
type: string
format: "uuid"
description: "UUID string of the dataset (not the dataset name)"
# TIME FRAME
time_frame:
@ -437,6 +476,7 @@ pub const DASHBOARD_YML_SCHEMA: &str = r##"
# Required fields:
#
# title: "Your Dashboard Title"
# description: "A description of the dashboard, it's metrics, and its purpose."
# rows:
# - items:
# - id: "metric-uuid-1" # UUIDv4 of an existing metric
@ -460,6 +500,9 @@ properties:
title:
type: string
description: "The title of the dashboard (e.g. 'Sales & Marketing Dashboard')"
description:
type: string
description: "A description of the dashboard, its metrics, and its purpose"
rows:
type: array
description: "Array of row objects, each containing metric items"
@ -488,9 +531,281 @@ properties:
- items
required:
- title
- description
- rows
"##;
/// Process a metric file creation request
/// Returns Ok((MetricFile, MetricYml, String, Vec<IndexMap<String, DataType>))) if successful, or an error message if failed
/// The string is a message about the number of records returned by the SQL query
/// The vector of IndexMap<String, DataType> is the results of the SQL query. Returns empty vector if more than 13 records or no results.
pub async fn process_metric_file(
file_name: String,
yml_content: String,
) -> Result<
(
MetricFile,
MetricYml,
String,
Vec<IndexMap<String, DataType>>,
),
String,
> {
debug!("Processing metric file: {}", file_name);
let metric_yml = MetricYml::new(yml_content.clone())
.map_err(|e| format!("Invalid YAML format: {}", e))?;
let metric_id = metric_yml
.id
.ok_or_else(|| "Missing required field 'id'".to_string())?;
// Check if dataset_ids is empty
if metric_yml.dataset_ids.is_empty() {
return Err("Missing required field 'dataset_ids'".to_string());
}
// Use the first dataset_id for SQL validation
let dataset_id = metric_yml.dataset_ids[0];
debug!("Validating SQL using dataset_id: {}", dataset_id);
// Validate SQL with the selected dataset_id and get results
let (message, results) = match validate_sql(&metric_yml.sql, &dataset_id).await {
Ok(results) => results,
Err(e) => return Err(format!("Invalid SQL query: {}", e)),
};
let metric_file = MetricFile {
id: metric_id,
name: metric_yml.title.clone(),
file_name: format!("{}.yml", file_name),
content: serde_json::to_value(metric_yml.clone())
.map_err(|e| format!("Failed to process metric: {}", e))?,
created_by: Uuid::new_v4(),
verification: Verification::NotRequested,
evaluation_obj: None,
evaluation_summary: None,
evaluation_score: None,
organization_id: Uuid::new_v4(),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
Ok((metric_file, metric_yml, message, results))
}
/// Process a metric file modification request
/// Returns Ok((MetricFile, MetricYml, Vec<ModificationResult>, String, Vec<IndexMap<String, DataType>>)) if successful, or an error if failed
/// The string is a message about the number of records returned by the SQL query
/// The vector of IndexMap<String, DataType> is the results of the SQL query. Returns empty vector if more than 13 records or no results.
pub async fn process_metric_file_modification(
mut file: MetricFile,
modification: &FileModification,
duration: i64,
) -> Result<(MetricFile, MetricYml, Vec<ModificationResult>, String, Vec<IndexMap<String, DataType>>), anyhow::Error> {
debug!(
file_id = %file.id,
file_name = %modification.file_name,
"Processing metric file modifications"
);
let mut results = Vec::new();
// Parse existing content
let current_yml: MetricYml = match serde_json::from_value(file.content.clone()) {
Ok(yml) => yml,
Err(e) => {
let error = format!("Failed to parse existing metric YAML: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"YAML parsing error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines: vec![],
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "parsing".to_string(),
timestamp: Utc::now(),
duration,
});
return Err(anyhow!(error));
}
};
// Convert to YAML string for line modifications
let current_content = match serde_yaml::to_string(&current_yml) {
Ok(content) => content,
Err(e) => {
let error = format!("Failed to serialize metric YAML: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"YAML serialization error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines: vec![],
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "serialization".to_string(),
timestamp: Utc::now(),
duration,
});
return Err(anyhow!(error));
}
};
// Track original line numbers before modifications
let mut original_lines = Vec::new();
let mut adjusted_lines = Vec::new();
// Apply modifications
for m in &modification.modifications {
original_lines.extend(m.line_numbers.clone());
}
// Apply modifications and track results
match apply_modifications_to_content(
&current_content,
&modification.modifications,
&modification.file_name,
) {
Ok(modified_content) => {
// Create and validate new YML object
match MetricYml::new(modified_content) {
Ok(new_yml) => {
debug!(
file_id = %file.id,
file_name = %modification.file_name,
"Successfully modified and validated metric file"
);
// Check if dataset_ids is empty
if new_yml.dataset_ids.is_empty() {
let error = "Missing required field 'dataset_ids'".to_string();
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"Validation error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines: original_lines.clone(),
adjusted_lines: adjusted_lines.clone(),
error: Some(error.clone()),
modification_type: "validation".to_string(),
timestamp: Utc::now(),
duration,
});
return Err(anyhow!(error));
}
// Validate SQL with the selected dataset_id and get results
let dataset_id = new_yml.dataset_ids[0];
let (message, validation_results) = match validate_sql(&new_yml.sql, &dataset_id).await {
Ok(results) => results,
Err(e) => {
let error = format!("Invalid SQL query: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"SQL validation error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines: original_lines.clone(),
adjusted_lines: adjusted_lines.clone(),
error: Some(error.clone()),
modification_type: "sql_validation".to_string(),
timestamp: Utc::now(),
duration,
});
return Err(anyhow!(error));
}
};
// Update file record
file.content = serde_json::to_value(&new_yml)?;
file.updated_at = Utc::now();
file.verification = Verification::NotRequested;
// Track successful modification
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: true,
original_lines: original_lines.clone(),
adjusted_lines: adjusted_lines.clone(),
error: None,
modification_type: "content".to_string(),
timestamp: Utc::now(),
duration,
});
Ok((file, new_yml, results, message, validation_results))
}
Err(e) => {
let error = format!("Failed to validate modified YAML: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"YAML validation error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines,
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "validation".to_string(),
timestamp: Utc::now(),
duration,
});
Err(anyhow!(error))
}
}
}
Err(e) => {
let error = format!("Failed to apply modifications: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"Modification application error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines,
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "modification".to_string(),
timestamp: Utc::now(),
duration,
});
Err(anyhow!(error))
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -13,7 +13,10 @@ use uuid::Uuid;
use crate::{
database_dep::{lib::get_pg_pool, models::DashboardFile, schema::dashboard_files},
utils::{agent::Agent, tools::{file_tools::common::DASHBOARD_YML_SCHEMA, ToolExecutor}},
utils::{
agent::Agent,
tools::{file_tools::common::DASHBOARD_YML_SCHEMA, ToolExecutor},
},
};
use super::{
@ -178,6 +181,8 @@ impl ToolExecutor for CreateDashboardFilesTool {
name: dashboard_records[i].name.clone(),
file_type: "dashboard".to_string(),
yml_content: serde_yaml::to_string(&yml).unwrap_or_default(),
result_message: None,
results: None,
});
}
}

View File

@ -6,6 +6,7 @@ use async_trait::async_trait;
use chrono::Utc;
use diesel::insert_into;
use diesel_async::RunQueryDsl;
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::debug;
@ -15,15 +16,21 @@ use crate::{
database_dep::{
enums::Verification, lib::get_pg_pool, models::MetricFile, schema::metric_files,
},
utils::{agent::Agent, tools::{file_tools::common::METRIC_YML_SCHEMA, ToolExecutor}},
utils::{
agent::Agent,
data_types::DataType,
tools::{
file_tools::{
common::{METRIC_YML_SCHEMA, process_metric_file},
file_types::{file::FileWithId, metric_yml::MetricYml},
},
ToolExecutor,
},
},
};
use super::{
common::validate_sql,
file_types::{
file::{FileWithId},
metric_yml::MetricYml,
},
FileModificationTool,
};
@ -63,52 +70,6 @@ impl CreateMetricFilesTool {
impl FileModificationTool for CreateMetricFilesTool {}
/// Process a metric file creation request
/// Returns Ok((MetricFile, MetricYml)) if successful, or an error message if failed
async fn process_metric_file(file: MetricFileParams) -> Result<(MetricFile, MetricYml), String> {
debug!("Processing metric file creation: {}", file.name);
let metric_yml = MetricYml::new(file.yml_content.clone())
.map_err(|e| format!("Invalid YAML format: {}", e))?;
let metric_id = metric_yml
.id
.ok_or_else(|| "Missing required field 'id'".to_string())?;
// Check if dataset_ids is empty
if metric_yml.dataset_ids.is_empty() {
return Err("Missing required field 'dataset_ids'".to_string());
}
// Use the first dataset_id for SQL validation
let dataset_id = metric_yml.dataset_ids[0];
debug!("Validating SQL using dataset_id: {}", dataset_id);
// Validate SQL with the selected dataset_id
if let Err(e) = validate_sql(&metric_yml.sql, &dataset_id).await {
return Err(format!("Invalid SQL query: {}", e));
}
let metric_file = MetricFile {
id: metric_id,
name: metric_yml.title.clone(),
file_name: format!("{}.yml", file.name),
content: serde_json::to_value(metric_yml.clone())
.map_err(|e| format!("Failed to process metric: {}", e))?,
created_by: Uuid::new_v4(),
verification: Verification::NotRequested,
evaluation_obj: None,
evaluation_summary: None,
evaluation_score: None,
organization_id: Uuid::new_v4(),
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
};
Ok((metric_file, metric_yml))
}
#[async_trait]
impl ToolExecutor for CreateMetricFilesTool {
type Output = CreateMetricFilesOutput;
@ -138,13 +99,14 @@ impl ToolExecutor for CreateMetricFilesTool {
// Process metric files
let mut metric_records = vec![];
let mut metric_ymls = vec![];
let mut results_vec = vec![];
// First pass - validate and prepare all records
for file in files {
match process_metric_file(file.clone()).await {
Ok((metric_file, metric_yml)) => {
match process_metric_file(file.name.clone(), file.yml_content.clone()).await {
Ok((metric_file, metric_yml, message, results)) => {
metric_records.push(metric_file);
metric_ymls.push(metric_yml);
results_vec.push((message, results));
}
Err(e) => {
failed_files.push((file.name, e));
@ -172,6 +134,8 @@ impl ToolExecutor for CreateMetricFilesTool {
name: metric_records[i].name.clone(),
file_type: "metric".to_string(),
yml_content: serde_yaml::to_string(&yml).unwrap_or_default(),
result_message: Some(results_vec[i].0.clone()),
results: Some(results_vec[i].1.clone()),
});
}
}
@ -265,4 +229,4 @@ impl ToolExecutor for CreateMetricFilesTool {
}
})
}
}
}

View File

@ -9,6 +9,7 @@ pub struct DashboardYml {
pub updated_at: Option<DateTime<Utc>>,
#[serde(alias = "title")]
pub name: String,
pub description: Option<String>,
pub rows: Vec<Row>,
}

View File

@ -1,7 +1,10 @@
use chrono::{DateTime, Utc};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::utils::data_types::DataType;
use super::dashboard_yml::DashboardYml;
use super::metric_yml::MetricYml;
@ -18,6 +21,10 @@ pub struct FileWithId {
pub name: String,
pub file_type: String,
pub yml_content: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub result_message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub results: Option<Vec<IndexMap<String, DataType>>>,
}
#[derive(Debug, Serialize, Deserialize)]

View File

@ -6,6 +6,7 @@ use async_trait::async_trait;
use chrono::Utc;
use diesel::{upsert::excluded, ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::{debug, error, info, warn};
@ -27,7 +28,16 @@ use crate::{
models::{DashboardFile, MetricFile},
schema::{dashboard_files, metric_files},
},
utils::{agent::Agent, tools::ToolExecutor},
utils::{
agent::Agent,
data_types::DataType,
tools::{
file_tools::{
common::{METRIC_YML_SCHEMA, process_metric_file_modification},
},
ToolExecutor,
},
},
};
use litellm::ToolCall;
@ -77,6 +87,8 @@ struct FileModificationBatch {
metric_ymls: Vec<MetricYml>,
failed_modifications: Vec<(String, String)>,
modification_results: Vec<ModificationResult>,
validation_messages: Vec<String>,
validation_results: Vec<Vec<IndexMap<String, DataType>>>,
}
#[derive(Debug)]
@ -232,20 +244,20 @@ pub struct ModifyFilesOutput {
files: Vec<FileWithId>,
}
pub struct FilterDashboardFilesTool {
pub struct FilterDashboardsTool {
agent: Arc<Agent>,
}
impl FilterDashboardFilesTool {
impl FilterDashboardsTool {
pub fn new(agent: Arc<Agent>) -> Self {
Self { agent }
}
}
impl FileModificationTool for FilterDashboardFilesTool {}
impl FileModificationTool for FilterDashboardsTool {}
#[async_trait]
impl ToolExecutor for FilterDashboardFilesTool {
impl ToolExecutor for FilterDashboardsTool {
type Output = ModifyFilesOutput;
type Params = ModifyFilesParams;
@ -277,6 +289,8 @@ impl ToolExecutor for FilterDashboardFilesTool {
metric_ymls: Vec::new(),
failed_modifications: Vec::new(),
modification_results: Vec::new(),
validation_messages: Vec::new(),
validation_results: Vec::new(),
};
// Collect file IDs and create map
@ -309,17 +323,19 @@ impl ToolExecutor for FilterDashboardFilesTool {
Ok(files) => {
for file in files {
if let Some(modifications) = file_map.get(&file.id) {
match process_metric_file(
match process_metric_file_modification(
file,
modifications,
start_time.elapsed().as_millis() as i64,
)
.await
{
Ok((metric_file, metric_yml, results)) => {
Ok((metric_file, metric_yml, results, validation_message, validation_results)) => {
batch.metric_files.push(metric_file);
batch.metric_ymls.push(metric_yml);
batch.modification_results.extend(results);
batch.validation_messages.push(validation_message);
batch.validation_results.push(validation_results);
}
Err(e) => {
batch
@ -366,14 +382,17 @@ impl ToolExecutor for FilterDashboardFilesTool {
{
Ok(_) => {
output.files.extend(
batch.metric_files.iter().zip(batch.metric_ymls.iter()).map(
|(file, yml)| FileWithId {
batch.metric_files.iter().enumerate().map(|(i, file)| {
let yml = &batch.metric_ymls[i];
FileWithId {
id: file.id,
name: file.name.clone(),
file_type: "metric".to_string(),
yml_content: serde_yaml::to_string(&yml).unwrap_or_default(),
},
),
validation_message: Some(batch.validation_messages[i].clone()),
validation_results: Some(batch.validation_results[i].clone()),
}
}),
);
}
Err(e) => {
@ -483,189 +502,6 @@ impl ToolExecutor for FilterDashboardFilesTool {
}
}
async fn process_metric_file(
mut file: MetricFile,
modification: &FileModification,
duration: i64,
) -> Result<(MetricFile, MetricYml, Vec<ModificationResult>)> {
debug!(
file_id = %file.id,
file_name = %modification.file_name,
"Processing metric file modifications"
);
let mut results = Vec::new();
// Parse existing content
let current_yml: MetricYml = match serde_json::from_value(file.content.clone()) {
Ok(yml) => yml,
Err(e) => {
let error = format!("Failed to parse existing metric YAML: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"YAML parsing error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines: vec![],
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "parsing".to_string(),
timestamp: Utc::now(),
duration,
});
return Err(anyhow::anyhow!(error));
}
};
// Convert to YAML string for line modifications
let current_content = match serde_yaml::to_string(&current_yml) {
Ok(content) => content,
Err(e) => {
let error = format!("Failed to serialize metric YAML: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"YAML serialization error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines: vec![],
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "serialization".to_string(),
timestamp: Utc::now(),
duration,
});
return Err(anyhow::anyhow!(error));
}
};
// Track original line numbers before modifications
let mut original_lines = Vec::new();
let mut adjusted_lines = Vec::new();
// Apply modifications
for m in &modification.modifications {
original_lines.extend(m.line_numbers.clone());
}
// Apply modifications and track results
match apply_modifications_to_content(
&current_content,
&modification.modifications,
&modification.file_name,
) {
Ok(modified_content) => {
// Create and validate new YML object
match MetricYml::new(modified_content) {
Ok(new_yml) => {
debug!(
file_id = %file.id,
file_name = %modification.file_name,
"Successfully modified and validated metric file"
);
// Validate SQL if it was modified
let sql_changed = current_yml.sql != new_yml.sql;
if sql_changed {
if let Err(e) = validate_sql(&new_yml.sql, &file.id).await {
let error = format!("SQL validation failed: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"SQL validation error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines: original_lines.clone(),
adjusted_lines: adjusted_lines.clone(),
error: Some(error.clone()),
modification_type: "sql_validation".to_string(),
timestamp: Utc::now(),
duration,
});
return Err(anyhow::anyhow!(error));
}
}
// Update file record
file.content = serde_json::to_value(&new_yml)?;
file.updated_at = Utc::now();
file.verification = Verification::NotRequested;
// Track successful modification
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: true,
original_lines: original_lines.clone(),
adjusted_lines: adjusted_lines.clone(),
error: None,
modification_type: "content".to_string(),
timestamp: Utc::now(),
duration,
});
Ok((file, new_yml, results))
}
Err(e) => {
let error = format!("Failed to validate modified YAML: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"YAML validation error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines,
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "validation".to_string(),
timestamp: Utc::now(),
duration,
});
Err(anyhow::anyhow!(error))
}
}
}
Err(e) => {
let error = format!("Failed to apply modifications: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"Modification application error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines,
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "modification".to_string(),
timestamp: Utc::now(),
duration,
});
Err(anyhow::anyhow!(error))
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
@ -858,7 +694,7 @@ mod tests {
#[test]
fn test_tool_parameter_validation() {
let tool = FilterDashboardFilesTool {
let tool = FilterDashboardsTool {
agent: Arc::new(Agent::new(
"o3-mini".to_string(),
HashMap::new(),

View File

@ -2,7 +2,7 @@ pub mod common;
pub mod create_dashboard_files;
pub mod create_metric_files;
pub mod file_types;
pub mod filter_dashboard_files;
// pub mod filter_dashboards;
pub mod modify_dashboard_files;
pub mod modify_metric_files;
pub mod open_files;
@ -12,7 +12,7 @@ pub mod send_assets_to_user;
pub use create_dashboard_files::CreateDashboardFilesTool;
pub use create_metric_files::CreateMetricFilesTool;
pub use filter_dashboard_files::FilterDashboardFilesTool;
// pub use filter_dashboards::FilterDashboardsTool;
pub use modify_dashboard_files::ModifyDashboardFilesTool;
pub use modify_metric_files::ModifyMetricFilesTool;
pub use open_files::OpenFilesTool;

View File

@ -363,6 +363,8 @@ impl ToolExecutor for ModifyDashboardFilesTool {
name: file.name.clone(),
file_type: "dashboard".to_string(),
yml_content: serde_yaml::to_string(&yml).unwrap_or_default(),
result_message: None,
results: None,
}),
);
}

View File

@ -10,10 +10,11 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use indexmap::IndexMap;
use super::{
common::{validate_metric_ids, validate_sql},
file_types::{dashboard_yml::DashboardYml, file::{FileEnum, FileWithId}, metric_yml::MetricYml},
file_types::{dashboard_yml::DashboardYml, file::FileWithId, metric_yml::MetricYml},
FileModificationTool,
};
use crate::{
@ -23,7 +24,16 @@ use crate::{
models::{DashboardFile, MetricFile},
schema::{dashboard_files, metric_files},
},
utils::{agent::Agent, tools::{file_tools::common::METRIC_YML_SCHEMA, ToolExecutor}},
utils::{
agent::Agent,
data_types::DataType,
tools::{
file_tools::{
common::{METRIC_YML_SCHEMA, process_metric_file_modification},
},
ToolExecutor,
},
},
};
use litellm::ToolCall;
@ -55,16 +65,16 @@ pub struct ModifyFilesParams {
}
#[derive(Debug, Serialize)]
struct ModificationResult {
file_id: Uuid,
file_name: String,
success: bool,
original_lines: Vec<i64>,
adjusted_lines: Vec<i64>,
error: Option<String>,
modification_type: String,
timestamp: chrono::DateTime<Utc>,
duration: i64,
pub struct ModificationResult {
pub file_id: Uuid,
pub file_name: String,
pub success: bool,
pub original_lines: Vec<i64>,
pub adjusted_lines: Vec<i64>,
pub error: Option<String>,
pub modification_type: String,
pub timestamp: chrono::DateTime<Utc>,
pub duration: i64,
}
#[derive(Debug)]
@ -73,6 +83,8 @@ struct FileModificationBatch {
metric_ymls: Vec<MetricYml>,
failed_modifications: Vec<(String, String)>,
modification_results: Vec<ModificationResult>,
validation_messages: Vec<String>,
validation_results: Vec<Vec<IndexMap<String, DataType>>>,
}
#[derive(Debug)]
@ -153,7 +165,7 @@ fn expand_line_range(line_numbers: &[i64]) -> Vec<i64> {
(start..=end).collect()
}
fn apply_modifications_to_content(
pub fn apply_modifications_to_content(
content: &str,
modifications: &[Modification],
file_name: &str,
@ -272,6 +284,8 @@ impl ToolExecutor for ModifyMetricFilesTool {
metric_ymls: Vec::new(),
failed_modifications: Vec::new(),
modification_results: Vec::new(),
validation_messages: Vec::new(),
validation_results: Vec::new(),
};
// Collect file IDs and create map
@ -304,17 +318,19 @@ impl ToolExecutor for ModifyMetricFilesTool {
Ok(files) => {
for file in files {
if let Some(modifications) = file_map.get(&file.id) {
match process_metric_file(
match process_metric_file_modification(
file,
modifications,
start_time.elapsed().as_millis() as i64,
)
.await
{
Ok((metric_file, metric_yml, results)) => {
Ok((metric_file, metric_yml, results, validation_message, validation_results)) => {
batch.metric_files.push(metric_file);
batch.metric_ymls.push(metric_yml);
batch.modification_results.extend(results);
batch.validation_messages.push(validation_message);
batch.validation_results.push(validation_results);
}
Err(e) => {
batch
@ -361,12 +377,17 @@ impl ToolExecutor for ModifyMetricFilesTool {
{
Ok(_) => {
output.files.extend(
batch.metric_files.iter().zip(batch.metric_ymls.iter()).map(|(file, yml)| FileWithId {
id: file.id,
name: file.name.clone(),
file_type: "metric".to_string(),
yml_content: serde_yaml::to_string(&yml).unwrap_or_default(),
})
batch.metric_files.iter().enumerate().map(|(i, file)| {
let yml = &batch.metric_ymls[i];
FileWithId {
id: file.id,
name: file.name.clone(),
file_type: "metric".to_string(),
yml_content: serde_yaml::to_string(&yml).unwrap_or_default(),
result_message: Some(batch.validation_messages[i].clone()),
results: Some(batch.validation_results[i].clone()),
}
}),
);
}
Err(e) => {
@ -471,189 +492,6 @@ impl ToolExecutor for ModifyMetricFilesTool {
}
}
async fn process_metric_file(
mut file: MetricFile,
modification: &FileModification,
duration: i64,
) -> Result<(MetricFile, MetricYml, Vec<ModificationResult>)> {
debug!(
file_id = %file.id,
file_name = %modification.file_name,
"Processing metric file modifications"
);
let mut results = Vec::new();
// Parse existing content
let current_yml: MetricYml = match serde_json::from_value(file.content.clone()) {
Ok(yml) => yml,
Err(e) => {
let error = format!("Failed to parse existing metric YAML: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"YAML parsing error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines: vec![],
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "parsing".to_string(),
timestamp: Utc::now(),
duration,
});
return Err(anyhow::anyhow!(error));
}
};
// Convert to YAML string for line modifications
let current_content = match serde_yaml::to_string(&current_yml) {
Ok(content) => content,
Err(e) => {
let error = format!("Failed to serialize metric YAML: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"YAML serialization error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines: vec![],
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "serialization".to_string(),
timestamp: Utc::now(),
duration,
});
return Err(anyhow::anyhow!(error));
}
};
// Track original line numbers before modifications
let mut original_lines = Vec::new();
let mut adjusted_lines = Vec::new();
// Apply modifications
for m in &modification.modifications {
original_lines.extend(m.line_numbers.clone());
}
// Apply modifications and track results
match apply_modifications_to_content(
&current_content,
&modification.modifications,
&modification.file_name,
) {
Ok(modified_content) => {
// Create and validate new YML object
match MetricYml::new(modified_content) {
Ok(new_yml) => {
debug!(
file_id = %file.id,
file_name = %modification.file_name,
"Successfully modified and validated metric file"
);
// Validate SQL if it was modified
let sql_changed = current_yml.sql != new_yml.sql;
if sql_changed {
if let Err(e) = validate_sql(&new_yml.sql, &file.id).await {
let error = format!("SQL validation failed: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"SQL validation error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines: original_lines.clone(),
adjusted_lines: adjusted_lines.clone(),
error: Some(error.clone()),
modification_type: "sql_validation".to_string(),
timestamp: Utc::now(),
duration,
});
return Err(anyhow::anyhow!(error));
}
}
// Update file record
file.content = serde_json::to_value(&new_yml)?;
file.updated_at = Utc::now();
file.verification = Verification::NotRequested;
// Track successful modification
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: true,
original_lines: original_lines.clone(),
adjusted_lines: adjusted_lines.clone(),
error: None,
modification_type: "content".to_string(),
timestamp: Utc::now(),
duration,
});
Ok((file, new_yml, results))
}
Err(e) => {
let error = format!("Failed to validate modified YAML: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"YAML validation error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines,
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "validation".to_string(),
timestamp: Utc::now(),
duration,
});
Err(anyhow::anyhow!(error))
}
}
}
Err(e) => {
let error = format!("Failed to apply modifications: {}", e);
error!(
file_id = %file.id,
file_name = %modification.file_name,
error = %error,
"Modification application error"
);
results.push(ModificationResult {
file_id: file.id,
file_name: modification.file_name.clone(),
success: false,
original_lines,
adjusted_lines: vec![],
error: Some(error.clone()),
modification_type: "modification".to_string(),
timestamp: Utc::now(),
duration,
});
Err(anyhow::anyhow!(error))
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;

View File

@ -78,6 +78,9 @@ Requirements:
4. Each result MUST ONLY include the "id" field containing the UUID string
5. If no datasets meet the relevance criteria, return {"results": []}
6. Exclude datasets that only tangentially relate to the query
7. CRITICAL: Each result MUST contain ONLY a valid UUID string with the key "id" - no other fields are allowed
8. CRITICAL: The "id" value MUST be a valid UUID string (e.g., "550e8400-e29b-41d4-a716-446655440000")
9. Any result without a valid UUID "id" field will be rejected
"#;
pub struct SearchDataCatalogTool {