2025-03-04 04:33:32 +08:00
|
|
|
use anyhow::{anyhow, Result};
|
2025-04-03 23:30:59 +08:00
|
|
|
use database::{
|
|
|
|
pool::get_pg_pool,
|
|
|
|
schema::metric_files,
|
|
|
|
types::{MetricYml, data_metadata::DataMetadata},
|
|
|
|
};
|
|
|
|
use diesel::{ExpressionMethods, QueryDsl};
|
|
|
|
use diesel_async::RunQueryDsl;
|
2025-03-04 04:33:32 +08:00
|
|
|
use indexmap::IndexMap;
|
2025-03-11 03:29:22 +08:00
|
|
|
use middleware::AuthenticatedUser;
|
2025-03-04 04:33:32 +08:00
|
|
|
use serde::{Deserialize, Serialize};
|
2025-03-12 09:30:46 +08:00
|
|
|
use uuid::Uuid;
|
2025-03-04 04:33:32 +08:00
|
|
|
|
|
|
|
use query_engine::data_source_helpers;
|
2025-03-04 04:45:53 +08:00
|
|
|
use query_engine::data_types::DataType;
|
2025-03-04 04:33:32 +08:00
|
|
|
|
2025-03-06 04:22:01 +08:00
|
|
|
use crate::metrics::get_metric_handler;
|
|
|
|
|
2025-03-04 04:33:32 +08:00
|
|
|
/// Request structure for the get_metric_data handler
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
|
|
pub struct GetMetricDataRequest {
|
|
|
|
pub metric_id: Uuid,
|
2025-03-25 00:54:26 +08:00
|
|
|
pub version_number: Option<i32>,
|
2025-03-04 04:33:32 +08:00
|
|
|
pub limit: Option<i64>,
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Structure for the metric data response
|
|
|
|
#[derive(Debug, Serialize)]
|
|
|
|
pub struct MetricDataResponse {
|
2025-03-11 03:29:22 +08:00
|
|
|
pub metric_id: Uuid,
|
2025-03-04 04:33:32 +08:00
|
|
|
pub data: Vec<IndexMap<String, DataType>>,
|
2025-04-03 23:30:59 +08:00
|
|
|
pub data_metadata: DataMetadata,
|
2025-03-04 04:33:32 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Handler to retrieve both the metric definition and its associated data
|
|
|
|
pub async fn get_metric_data_handler(
|
|
|
|
request: GetMetricDataRequest,
|
2025-03-07 07:21:26 +08:00
|
|
|
user: AuthenticatedUser,
|
2025-03-04 04:33:32 +08:00
|
|
|
) -> Result<MetricDataResponse> {
|
|
|
|
tracing::info!(
|
|
|
|
"Getting metric data for metric_id: {}, user_id: {}",
|
|
|
|
request.metric_id,
|
|
|
|
user.id
|
|
|
|
);
|
|
|
|
|
2025-03-25 00:54:26 +08:00
|
|
|
// Retrieve the metric definition based on version, if none, use latest.
|
2025-04-08 07:44:41 +08:00
|
|
|
let metric = get_metric_handler(&request.metric_id, &user, request.version_number, None).await?;
|
2025-03-04 04:33:32 +08:00
|
|
|
|
|
|
|
// Parse the metric definition from YAML to get SQL and dataset IDs
|
2025-03-10 23:41:32 +08:00
|
|
|
let metric_yml = serde_yaml::from_str::<MetricYml>(&metric.file)?;
|
2025-03-04 04:33:32 +08:00
|
|
|
let sql = metric_yml.sql;
|
|
|
|
let dataset_ids = metric_yml.dataset_ids;
|
|
|
|
|
|
|
|
if dataset_ids.is_empty() {
|
|
|
|
return Err(anyhow!("No dataset IDs found in metric"));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the first dataset ID to use for querying
|
|
|
|
let primary_dataset_id = dataset_ids[0];
|
|
|
|
|
|
|
|
// Get the data source ID for the dataset
|
|
|
|
let dataset_sources = data_source_helpers::get_data_sources_for_datasets(&dataset_ids).await?;
|
|
|
|
|
|
|
|
if dataset_sources.is_empty() {
|
|
|
|
return Err(anyhow!(
|
|
|
|
"Could not find data sources for the specified datasets"
|
|
|
|
));
|
|
|
|
}
|
|
|
|
|
|
|
|
// Find the data source for the primary dataset
|
|
|
|
let data_source = dataset_sources
|
|
|
|
.iter()
|
|
|
|
.find(|ds| ds.dataset_id == primary_dataset_id)
|
|
|
|
.ok_or_else(|| anyhow!("Primary dataset not found"))?;
|
|
|
|
|
|
|
|
tracing::info!(
|
|
|
|
"Querying data for metric. Dataset: {}, Data source: {}",
|
|
|
|
data_source.name,
|
|
|
|
data_source.data_source_id
|
|
|
|
);
|
|
|
|
|
2025-04-03 23:30:59 +08:00
|
|
|
// Try to get cached metadata first
|
|
|
|
let mut conn = get_pg_pool().get().await?;
|
|
|
|
let cached_metadata = metric_files::table
|
|
|
|
.filter(metric_files::id.eq(request.metric_id))
|
|
|
|
.select(metric_files::data_metadata)
|
|
|
|
.first::<Option<DataMetadata>>(&mut conn)
|
|
|
|
.await
|
|
|
|
.map_err(|e| anyhow!("Error retrieving metadata: {}", e))?;
|
|
|
|
|
2025-03-04 04:33:32 +08:00
|
|
|
// Execute the query to get the metric data
|
2025-04-03 23:30:59 +08:00
|
|
|
let query_result = match query_engine::data_source_query_routes::query_engine::query_engine(
|
2025-03-04 04:33:32 +08:00
|
|
|
&data_source.data_source_id,
|
|
|
|
&sql,
|
|
|
|
request.limit,
|
|
|
|
)
|
|
|
|
.await
|
|
|
|
{
|
2025-04-03 23:30:59 +08:00
|
|
|
Ok(result) => result,
|
2025-03-04 04:33:32 +08:00
|
|
|
Err(e) => {
|
|
|
|
tracing::error!("Error executing metric query: {}", e);
|
|
|
|
return Err(anyhow!("Error executing metric query: {}", e));
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2025-04-03 23:30:59 +08:00
|
|
|
// Determine which metadata to use
|
|
|
|
let metadata = if let Some(metadata) = cached_metadata {
|
|
|
|
// Use cached metadata but update row count if it differs
|
|
|
|
if metadata.row_count != query_result.data.len() as i64 {
|
|
|
|
let mut updated_metadata = metadata.clone();
|
|
|
|
updated_metadata.row_count = query_result.data.len() as i64;
|
|
|
|
updated_metadata
|
|
|
|
} else {
|
|
|
|
metadata
|
2025-03-11 03:29:22 +08:00
|
|
|
}
|
2025-04-03 23:30:59 +08:00
|
|
|
} else {
|
|
|
|
// No cached metadata, use the one from query_result
|
|
|
|
query_result.metadata.clone()
|
|
|
|
};
|
2025-03-11 03:29:22 +08:00
|
|
|
|
2025-03-04 04:33:32 +08:00
|
|
|
// Construct and return the response
|
2025-03-11 03:29:22 +08:00
|
|
|
Ok(MetricDataResponse {
|
|
|
|
metric_id: request.metric_id,
|
2025-04-03 23:30:59 +08:00
|
|
|
data: query_result.data,
|
|
|
|
data_metadata: metadata,
|
2025-03-11 03:29:22 +08:00
|
|
|
})
|
|
|
|
}
|