concurrent metric validation

This commit is contained in:
dal 2025-04-04 10:25:20 -06:00
parent a810e6261a
commit b51cbc54e1
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
2 changed files with 119 additions and 87 deletions

View File

@ -12,6 +12,9 @@ use database::{
}; };
use diesel::insert_into; use diesel::insert_into;
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
use futures::future::join_all;
use indexmap::IndexMap;
use query_engine::data_types::DataType;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
@ -87,27 +90,44 @@ impl ToolExecutor for CreateMetricFilesTool {
let mut created_files = vec![]; let mut created_files = vec![];
let mut failed_files = vec![]; let mut failed_files = vec![];
// Process metric files // Create futures for concurrent processing
let process_futures = files
.into_iter()
.map(|file| {
let tool_call_id_clone = tool_call_id.clone();
let user_id = self.agent.get_user_id();
async move {
let result = process_metric_file(
tool_call_id_clone,
file.name.clone(),
file.yml_content.clone(),
&user_id,
)
.await;
(file.name.clone(), result)
}
})
.collect::<Vec<_>>();
// Wait for all futures to complete
let results = join_all(process_futures).await;
// Process results
let mut metric_records = vec![]; let mut metric_records = vec![];
let mut metric_ymls = vec![]; let mut metric_ymls = vec![];
let mut results_vec = vec![]; let mut results_vec = vec![];
// First pass - validate and prepare all records
for file in files { for (file_name, result) in results {
match process_metric_file( match result {
tool_call_id.clone(),
file.name.clone(),
file.yml_content.clone(),
&self.agent.get_user_id(),
)
.await
{
Ok((metric_file, metric_yml, message, results)) => { Ok((metric_file, metric_yml, message, results)) => {
metric_records.push(metric_file); metric_records.push(metric_file);
metric_ymls.push(metric_yml); metric_ymls.push(metric_yml);
results_vec.push((message, results)); results_vec.push((message, results));
} }
Err(e) => { Err(e) => {
failed_files.push((file.name, e)); failed_files.push((file_name, e));
} }
} }
} }

View File

@ -11,6 +11,7 @@ use database::{
}; };
use diesel::{upsert::excluded, ExpressionMethods, QueryDsl}; use diesel::{upsert::excluded, ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl; use diesel_async::RunQueryDsl;
use futures::future::join_all;
use indexmap::IndexMap; use indexmap::IndexMap;
use query_engine::{data_source_query_routes::query_engine::query_engine, data_types::DataType}; use query_engine::{data_source_query_routes::query_engine::query_engine, data_types::DataType};
use serde_json::Value; use serde_json::Value;
@ -114,92 +115,103 @@ impl ToolExecutor for ModifyMetricFilesTool {
.await .await
{ {
Ok(files) => { Ok(files) => {
for file in files { // Create futures for concurrent processing of file modifications
if let Some(modifications) = file_map.get(&file.id) { let modification_futures = files
match process_metric_file_modification( .into_iter()
file.clone(), .filter_map(|file| {
modifications, let modifications = file_map.get(&file.id)?;
start_time.elapsed().as_millis() as i64, let start_time_elapsed = start_time.elapsed().as_millis() as i64;
)
.await Some(async move {
{ let result = process_metric_file_modification(
Ok(( file.clone(),
mut metric_file, modifications,
metric_yml, start_time_elapsed,
results, ).await;
validation_message,
validation_results, match result {
)) => { Ok((metric_file, metric_yml, results, validation_message, validation_results)) => {
// Calculate next version number from existing version history Ok((metric_file, metric_yml, results, validation_message, validation_results))
let next_version = }
match metric_file.version_history.get_latest_version() { Err(e) => Err((modifications.file_name.clone(), e.to_string())),
Some(version) => version.version_number + 1, }
None => 1, })
}; })
.collect::<Vec<_>>();
// Wait for all futures to complete
let results = join_all(modification_futures).await;
// Process results
for result in results {
match result {
Ok((mut metric_file, metric_yml, results, validation_message, validation_results)) => {
// Calculate next version number from existing version history
let next_version = match metric_file.version_history.get_latest_version() {
Some(version) => version.version_number + 1,
None => 1,
};
// Add new version to history // Add new version to history
metric_file metric_file
.version_history .version_history
.add_version(next_version, metric_yml.clone()); .add_version(next_version, metric_yml.clone());
// Update metadata if SQL has changed // Update metadata if SQL has changed
// The SQL is already validated by process_metric_file_modification // The SQL is already validated by process_metric_file_modification
if results.iter().any(|r| r.modification_type == "content") { if results.iter().any(|r| r.modification_type == "content") {
// Update the name field from the metric_yml // Update the name field from the metric_yml
// This is redundant but ensures the name is set correctly // This is redundant but ensures the name is set correctly
metric_file.name = metric_yml.name.clone(); metric_file.name = metric_yml.name.clone();
// Check if we have a dataset to work with // Check if we have a dataset to work with
if !metric_yml.dataset_ids.is_empty() { if !metric_yml.dataset_ids.is_empty() {
let dataset_id = metric_yml.dataset_ids[0]; let dataset_id = metric_yml.dataset_ids[0];
// Get data source for the dataset // Get data source for the dataset
match datasets::table match datasets::table
.filter(datasets::id.eq(dataset_id)) .filter(datasets::id.eq(dataset_id))
.select(datasets::data_source_id) .select(datasets::data_source_id)
.first::<Uuid>(&mut conn) .first::<Uuid>(&mut conn)
.await
{
Ok(data_source_id) => {
// Execute query to get metadata
match query_engine(
&data_source_id,
&metric_yml.sql,
Some(100),
)
.await .await
{ {
Ok(data_source_id) => { Ok(query_result) => {
// Execute query to get metadata // Update metadata
match query_engine( metric_file.data_metadata =
&data_source_id, Some(query_result.metadata);
&metric_yml.sql, debug!("Updated metadata for metric file {}", metric_file.id);
Some(100), }
) Err(e) => {
.await debug!("Failed to execute SQL for metadata: {}", e);
{ // Continue with the update even if metadata refresh fails
Ok(query_result) => {
// Update metadata
metric_file.data_metadata =
Some(query_result.metadata);
debug!("Updated metadata for metric file {}", metric_file.id);
}
Err(e) => {
debug!("Failed to execute SQL for metadata: {}", e);
// Continue with the update even if metadata refresh fails
}
} }
} }
Err(e) => { }
debug!("Failed to get data source ID: {}", e); Err(e) => {
// Continue with the update even if we can't get data source debug!("Failed to get data source ID: {}", e);
} // Continue with the update even if we can't get data source
} }
} }
} }
}
batch.files.push(metric_file); batch.files.push(metric_file);
batch.ymls.push(metric_yml); batch.ymls.push(metric_yml);
batch.modification_results.extend(results); batch.modification_results.extend(results);
batch.validation_messages.push(validation_message); batch.validation_messages.push(validation_message);
batch.validation_results.push(validation_results); batch.validation_results.push(validation_results);
} }
Err(e) => { Err((file_name, error)) => {
batch batch.failed_modifications.push((file_name, error));
.failed_modifications
.push((modifications.file_name.clone(), e.to_string()));
}
} }
} }
} }