mirror of https://github.com/buster-so/buster.git
separate out functions for creation and validation
This commit is contained in:
parent
cf45f4eddd
commit
bb34abcd4c
|
@ -1,14 +1,14 @@
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::{anyhow, Result};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use diesel::insert_into;
|
use diesel::insert_into;
|
||||||
use diesel_async::RunQueryDsl;
|
use diesel_async::RunQueryDsl;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
use tracing::debug;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use tracing::{debug, error};
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
database_dep::{
|
database_dep::{
|
||||||
|
@ -21,9 +21,9 @@ use crate::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
file_types::{dashboard_yml::DashboardYml, file::FileEnum, metric_yml::MetricYml},
|
common::{validate_metric_ids, validate_sql},
|
||||||
|
file_types::{dashboard_yml::DashboardYml, metric_yml::MetricYml},
|
||||||
FileModificationTool,
|
FileModificationTool,
|
||||||
common::{validate_sql, validate_metric_ids},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use litellm::ToolCall;
|
use litellm::ToolCall;
|
||||||
|
@ -64,6 +64,92 @@ impl CreateFilesTool {
|
||||||
|
|
||||||
impl FileModificationTool for CreateFilesTool {}
|
impl FileModificationTool for CreateFilesTool {}
|
||||||
|
|
||||||
|
/// Process a metric file creation request
|
||||||
|
/// Returns Ok((MetricFile, MetricYml)) if successful, or an error message if failed
|
||||||
|
async fn process_metric_file(file: FileParams) -> Result<(MetricFile, MetricYml), String> {
|
||||||
|
debug!("Processing metric file creation: {}", file.name);
|
||||||
|
|
||||||
|
let metric_yml = MetricYml::new(file.yml_content.clone())
|
||||||
|
.map_err(|e| format!("Failed to parse metric YAML: {}", e))?;
|
||||||
|
|
||||||
|
let metric_id = metric_yml.id.ok_or_else(|| {
|
||||||
|
"Metric YML file does not have an id. This should never happen.".to_string()
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Validate SQL
|
||||||
|
if let Err(e) = validate_sql(&metric_yml.sql, &metric_id).await {
|
||||||
|
return Err(format!("SQL validation failed: {}", e));
|
||||||
|
}
|
||||||
|
|
||||||
|
let metric_file = MetricFile {
|
||||||
|
id: metric_id,
|
||||||
|
name: metric_yml.title.clone(),
|
||||||
|
file_name: format!("{}.yml", file.name),
|
||||||
|
content: serde_json::to_value(metric_yml.clone())
|
||||||
|
.map_err(|e| format!("Failed to serialize metric content: {}", e))?,
|
||||||
|
created_by: Uuid::new_v4(),
|
||||||
|
verification: Verification::NotRequested,
|
||||||
|
evaluation_obj: None,
|
||||||
|
evaluation_summary: None,
|
||||||
|
evaluation_score: None,
|
||||||
|
organization_id: Uuid::new_v4(),
|
||||||
|
created_at: Utc::now(),
|
||||||
|
updated_at: Utc::now(),
|
||||||
|
deleted_at: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((metric_file, metric_yml))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process a dashboard file creation request
|
||||||
|
/// Returns Ok((DashboardFile, DashboardYml)) if successful, or an error message if failed
|
||||||
|
async fn process_dashboard_file(file: FileParams) -> Result<(DashboardFile, DashboardYml), String> {
|
||||||
|
debug!("Processing dashboard file creation: {}", file.name);
|
||||||
|
|
||||||
|
let dashboard_yml = DashboardYml::new(file.yml_content.clone())
|
||||||
|
.map_err(|e| format!("Failed to parse dashboard YAML: {}", e))?;
|
||||||
|
|
||||||
|
let dashboard_id = dashboard_yml.id.ok_or_else(|| {
|
||||||
|
"Dashboard YML file does not have an id. This should never happen.".to_string()
|
||||||
|
})?;
|
||||||
|
|
||||||
|
// Collect and validate metric IDs from rows
|
||||||
|
let metric_ids: Vec<Uuid> = dashboard_yml
|
||||||
|
.rows
|
||||||
|
.iter()
|
||||||
|
.flat_map(|row| row.items.iter())
|
||||||
|
.map(|item| item.id)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if !metric_ids.is_empty() {
|
||||||
|
match validate_metric_ids(&metric_ids).await {
|
||||||
|
Ok(missing_ids) if !missing_ids.is_empty() => {
|
||||||
|
return Err(format!("Referenced metrics not found: {:?}", missing_ids));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
return Err(format!("Failed to validate metric IDs: {}", e));
|
||||||
|
}
|
||||||
|
Ok(_) => (), // All metrics exist
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let dashboard_file = DashboardFile {
|
||||||
|
id: dashboard_id,
|
||||||
|
name: dashboard_yml.name.clone(),
|
||||||
|
file_name: format!("{}.yml", file.name),
|
||||||
|
content: serde_json::to_value(dashboard_yml.clone())
|
||||||
|
.map_err(|e| format!("Failed to serialize dashboard content: {}", e))?,
|
||||||
|
filter: None,
|
||||||
|
organization_id: Uuid::new_v4(),
|
||||||
|
created_by: Uuid::new_v4(),
|
||||||
|
created_at: Utc::now(),
|
||||||
|
updated_at: Utc::now(),
|
||||||
|
deleted_at: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((dashboard_file, dashboard_yml))
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ToolExecutor for CreateFilesTool {
|
impl ToolExecutor for CreateFilesTool {
|
||||||
type Output = CreateFilesOutput;
|
type Output = CreateFilesOutput;
|
||||||
|
@ -84,10 +170,7 @@ impl ToolExecutor for CreateFilesTool {
|
||||||
match serde_json::from_str(&tool_call.function.arguments.clone()) {
|
match serde_json::from_str(&tool_call.function.arguments.clone()) {
|
||||||
Ok(params) => params,
|
Ok(params) => params,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
return Err(anyhow::anyhow!(
|
return Err(anyhow!("Failed to parse create files parameters: {}", e));
|
||||||
"Failed to parse create files parameters: {}",
|
|
||||||
e
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -104,92 +187,32 @@ impl ToolExecutor for CreateFilesTool {
|
||||||
// First pass - validate and prepare all records
|
// First pass - validate and prepare all records
|
||||||
for file in files {
|
for file in files {
|
||||||
match file.file_type.as_str() {
|
match file.file_type.as_str() {
|
||||||
"metric" => {
|
"metric" => match process_metric_file(file.clone()).await {
|
||||||
match MetricYml::new(file.yml_content.clone()) {
|
Ok((metric_file, metric_yml)) => {
|
||||||
Ok(metric_yml) => {
|
metric_records.push(metric_file);
|
||||||
if let Some(metric_id) = &metric_yml.id {
|
metric_ymls.push(metric_yml);
|
||||||
// Validate SQL before creating the record
|
|
||||||
if let Err(e) = validate_sql(&metric_yml.sql, metric_id).await {
|
|
||||||
failed_files.push((file.name, format!("SQL validation failed: {}", e)));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let metric_file = MetricFile {
|
|
||||||
id: metric_id.clone(),
|
|
||||||
name: metric_yml.title.clone(),
|
|
||||||
file_name: format!("{}.yml", file.name),
|
|
||||||
content: serde_json::to_value(metric_yml.clone()).unwrap(),
|
|
||||||
created_by: Uuid::new_v4(),
|
|
||||||
verification: Verification::NotRequested,
|
|
||||||
evaluation_obj: None,
|
|
||||||
evaluation_summary: None,
|
|
||||||
evaluation_score: None,
|
|
||||||
organization_id: Uuid::new_v4(),
|
|
||||||
created_at: Utc::now(),
|
|
||||||
updated_at: Utc::now(),
|
|
||||||
deleted_at: None,
|
|
||||||
};
|
|
||||||
metric_records.push(metric_file);
|
|
||||||
metric_ymls.push(metric_yml);
|
|
||||||
} else {
|
|
||||||
failed_files.push((file.name, "Metric YML file does not have an id. This should never happen.".to_string()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
failed_files.push((file.name, e.to_string()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
Err(e) => {
|
||||||
"dashboard" => {
|
failed_files.push((file.name, e));
|
||||||
match DashboardYml::new(file.yml_content.clone()) {
|
|
||||||
Ok(dashboard_yml) => {
|
|
||||||
if let Some(dashboard_id) = &dashboard_yml.id {
|
|
||||||
// Collect and validate metric IDs from rows
|
|
||||||
let metric_ids: Vec<Uuid> = dashboard_yml.rows
|
|
||||||
.iter()
|
|
||||||
.flat_map(|row| row.items.iter())
|
|
||||||
.map(|item| item.id)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
if !metric_ids.is_empty() {
|
|
||||||
match validate_metric_ids(&metric_ids).await {
|
|
||||||
Ok(missing_ids) if !missing_ids.is_empty() => {
|
|
||||||
failed_files.push((file.name, format!("Referenced metrics not found: {:?}", missing_ids)));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
failed_files.push((file.name, format!("Failed to validate metric IDs: {}", e)));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Ok(_) => (), // All metrics exist
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let dashboard_file = DashboardFile {
|
|
||||||
id: dashboard_id.clone(),
|
|
||||||
name: dashboard_yml.name.clone(),
|
|
||||||
file_name: format!("{}.yml", file.name),
|
|
||||||
content: serde_json::to_value(dashboard_yml.clone()).unwrap(),
|
|
||||||
filter: None,
|
|
||||||
organization_id: Uuid::new_v4(),
|
|
||||||
created_by: Uuid::new_v4(),
|
|
||||||
created_at: Utc::now(),
|
|
||||||
updated_at: Utc::now(),
|
|
||||||
deleted_at: None,
|
|
||||||
};
|
|
||||||
dashboard_records.push(dashboard_file);
|
|
||||||
dashboard_ymls.push(dashboard_yml);
|
|
||||||
} else {
|
|
||||||
failed_files.push((file.name, "Dashboard YML file does not have an id. This should never happen.".to_string()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
failed_files.push((file.name, e.to_string()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
},
|
||||||
|
"dashboard" => match process_dashboard_file(file.clone()).await {
|
||||||
|
Ok((dashboard_file, dashboard_yml)) => {
|
||||||
|
dashboard_records.push(dashboard_file);
|
||||||
|
dashboard_ymls.push(dashboard_yml);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
failed_files.push((file.name, e));
|
||||||
|
}
|
||||||
|
},
|
||||||
_ => {
|
_ => {
|
||||||
failed_files.push((file.name, format!("Invalid file type: {}. Currently only `metric` and `dashboard` types are supported.", file.file_type)));
|
failed_files.push((
|
||||||
|
file.name,
|
||||||
|
format!(
|
||||||
|
"Invalid file type: {}. Currently only `metric` and `dashboard` types are supported.",
|
||||||
|
file.file_type
|
||||||
|
),
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -197,7 +220,7 @@ impl ToolExecutor for CreateFilesTool {
|
||||||
// Second pass - bulk insert records
|
// Second pass - bulk insert records
|
||||||
let mut conn = match get_pg_pool().get().await {
|
let mut conn = match get_pg_pool().get().await {
|
||||||
Ok(conn) => conn,
|
Ok(conn) => conn,
|
||||||
Err(e) => return Err(anyhow::anyhow!(e)),
|
Err(e) => return Err(anyhow!(e)),
|
||||||
};
|
};
|
||||||
|
|
||||||
// Insert metric files
|
// Insert metric files
|
||||||
|
@ -210,7 +233,10 @@ impl ToolExecutor for CreateFilesTool {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
for (i, yml) in metric_ymls.into_iter().enumerate() {
|
for (i, yml) in metric_ymls.into_iter().enumerate() {
|
||||||
created_files.push(CreateFilesFile {
|
created_files.push(CreateFilesFile {
|
||||||
name: metric_records[i].file_name.trim_end_matches(".yml").to_string(),
|
name: metric_records[i]
|
||||||
|
.file_name
|
||||||
|
.trim_end_matches(".yml")
|
||||||
|
.to_string(),
|
||||||
file_type: "metric".to_string(),
|
file_type: "metric".to_string(),
|
||||||
yml_content: serde_yaml::to_string(&yml).unwrap_or_default(),
|
yml_content: serde_yaml::to_string(&yml).unwrap_or_default(),
|
||||||
});
|
});
|
||||||
|
@ -237,7 +263,10 @@ impl ToolExecutor for CreateFilesTool {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
for (i, yml) in dashboard_ymls.into_iter().enumerate() {
|
for (i, yml) in dashboard_ymls.into_iter().enumerate() {
|
||||||
created_files.push(CreateFilesFile {
|
created_files.push(CreateFilesFile {
|
||||||
name: dashboard_records[i].file_name.trim_end_matches(".yml").to_string(),
|
name: dashboard_records[i]
|
||||||
|
.file_name
|
||||||
|
.trim_end_matches(".yml")
|
||||||
|
.to_string(),
|
||||||
file_type: "dashboard".to_string(),
|
file_type: "dashboard".to_string(),
|
||||||
yml_content: serde_yaml::to_string(&yml).unwrap_or_default(),
|
yml_content: serde_yaml::to_string(&yml).unwrap_or_default(),
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue