mirror of https://github.com/buster-so/buster.git
added in the data endpoint for a given metric.
This commit is contained in:
parent
d54ab726fe
commit
2575b61fd6
|
@ -18,11 +18,13 @@ diesel-async = { workspace = true }
|
|||
futures = { workspace = true }
|
||||
redis = { workspace = true }
|
||||
regex = { workspace = true }
|
||||
indexmap = { workspace = true }
|
||||
|
||||
# Local dependencies
|
||||
database = { path = "../database" }
|
||||
agents = { path = "../agents" }
|
||||
litellm = { path = "../litellm" }
|
||||
query_engine = { path = "../query_engine" }
|
||||
|
||||
# Add any handler-specific dependencies here
|
||||
|
||||
|
|
|
@ -36,7 +36,6 @@ pub async fn get_metric(metric_id: &Uuid, user_id: &Uuid) -> Result<BusterMetric
|
|||
// Query the metric file
|
||||
let metric_file = metric_files::table
|
||||
.filter(metric_files::id.eq(metric_id))
|
||||
.filter(metric_files::created_by.eq(user_id))
|
||||
.filter(metric_files::deleted_at.is_null())
|
||||
.select((
|
||||
metric_files::id,
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
pub mod messages;
|
||||
pub mod chats;
|
||||
pub mod files;
|
||||
pub mod metrics;
|
||||
|
||||
// Re-export commonly used types and functions
|
||||
pub use chats::types as thread_types;
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
use agents::tools::file_tools::file_types::metric_yml::MetricYml;
|
||||
use anyhow::{anyhow, Result};
|
||||
use database::models::User;
|
||||
use indexmap::IndexMap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::files::metric_files::get_metric;
|
||||
use query_engine::data_types::DataType;
|
||||
use query_engine::data_source_helpers;
|
||||
|
||||
/// Request structure for the get_metric_data handler
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct GetMetricDataRequest {
|
||||
pub metric_id: Uuid,
|
||||
pub limit: Option<i64>,
|
||||
}
|
||||
|
||||
/// Structure for the metric data response
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct MetricDataResponse {
|
||||
pub data: Vec<IndexMap<String, DataType>>,
|
||||
}
|
||||
|
||||
/// Handler to retrieve both the metric definition and its associated data
|
||||
pub async fn get_metric_data_handler(
|
||||
request: GetMetricDataRequest,
|
||||
user: User,
|
||||
) -> Result<MetricDataResponse> {
|
||||
tracing::info!(
|
||||
"Getting metric data for metric_id: {}, user_id: {}",
|
||||
request.metric_id,
|
||||
user.id
|
||||
);
|
||||
|
||||
let user_id = user.id;
|
||||
|
||||
// Retrieve the metric definition
|
||||
let metric = get_metric(&request.metric_id, &user_id).await?;
|
||||
|
||||
// Parse the metric definition from YAML to get SQL and dataset IDs
|
||||
let metric_yml = serde_json::from_str::<MetricYml>(&metric.file)?;
|
||||
let sql = metric_yml.sql;
|
||||
let dataset_ids = metric_yml.dataset_ids;
|
||||
|
||||
if dataset_ids.is_empty() {
|
||||
return Err(anyhow!("No dataset IDs found in metric"));
|
||||
}
|
||||
|
||||
// Get the first dataset ID to use for querying
|
||||
let primary_dataset_id = dataset_ids[0];
|
||||
|
||||
// Get the data source ID for the dataset
|
||||
let dataset_sources = data_source_helpers::get_data_sources_for_datasets(&dataset_ids).await?;
|
||||
|
||||
if dataset_sources.is_empty() {
|
||||
return Err(anyhow!(
|
||||
"Could not find data sources for the specified datasets"
|
||||
));
|
||||
}
|
||||
|
||||
// Find the data source for the primary dataset
|
||||
let data_source = dataset_sources
|
||||
.iter()
|
||||
.find(|ds| ds.dataset_id == primary_dataset_id)
|
||||
.ok_or_else(|| anyhow!("Primary dataset not found"))?;
|
||||
|
||||
tracing::info!(
|
||||
"Querying data for metric. Dataset: {}, Data source: {}",
|
||||
data_source.name,
|
||||
data_source.data_source_id
|
||||
);
|
||||
|
||||
// Execute the query to get the metric data
|
||||
let result = match query_engine::data_source_query_routes::query_engine::query_engine(
|
||||
&data_source.data_source_id,
|
||||
&sql,
|
||||
request.limit,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
tracing::error!("Error executing metric query: {}", e);
|
||||
return Err(anyhow!("Error executing metric query: {}", e));
|
||||
}
|
||||
};
|
||||
|
||||
// Construct and return the response
|
||||
Ok(MetricDataResponse {
|
||||
data: result,
|
||||
})
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
pub mod get_metric_data_handler;
|
||||
|
||||
pub use get_metric_data_handler::*;
|
|
@ -0,0 +1,84 @@
|
|||
use anyhow::{anyhow, Result};
|
||||
use database::{
|
||||
pool::get_pg_pool,
|
||||
schema::datasets,
|
||||
};
|
||||
use diesel::{ExpressionMethods, QueryDsl};
|
||||
use diesel_async::RunQueryDsl;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// Response structure that maps dataset IDs to their data source IDs
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct DatasetWithDataSource {
|
||||
pub dataset_id: Uuid,
|
||||
pub data_source_id: Uuid,
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
/// Helper function to get data source IDs for an array of dataset IDs
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `dataset_ids` - An array of dataset UUIDs to lookup
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A Result containing a Vec of DatasetWithDataSource structs, each mapping a dataset ID to its data source ID
|
||||
pub async fn get_data_sources_for_datasets(dataset_ids: &[Uuid]) -> Result<Vec<DatasetWithDataSource>> {
|
||||
if dataset_ids.is_empty() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let mut conn = match get_pg_pool().get().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => return Err(anyhow!("Failed to get database connection: {}", e)),
|
||||
};
|
||||
|
||||
let datasets_result = datasets::table
|
||||
.filter(datasets::id.eq_any(dataset_ids))
|
||||
.filter(datasets::deleted_at.is_null())
|
||||
.select((
|
||||
datasets::id,
|
||||
datasets::data_source_id,
|
||||
datasets::name,
|
||||
))
|
||||
.load::<(Uuid, Uuid, String)>(&mut conn)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Error querying datasets: {}", e))?;
|
||||
|
||||
// Map the results to the response structure
|
||||
let result = datasets_result
|
||||
.into_iter()
|
||||
.map(|(dataset_id, data_source_id, name)| {
|
||||
DatasetWithDataSource {
|
||||
dataset_id,
|
||||
data_source_id,
|
||||
name,
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Helper function that returns a HashMap mapping dataset IDs to their data source IDs
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `dataset_ids` - An array of dataset UUIDs to lookup
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// A Result containing a HashMap where keys are dataset IDs and values are data source IDs
|
||||
pub async fn get_data_source_map_for_datasets(dataset_ids: &[Uuid]) -> Result<HashMap<Uuid, Uuid>> {
|
||||
let datasets = get_data_sources_for_datasets(dataset_ids).await?;
|
||||
|
||||
let map = datasets
|
||||
.into_iter()
|
||||
.map(|ds| (ds.dataset_id, ds.data_source_id))
|
||||
.collect();
|
||||
|
||||
Ok(map)
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
pub mod credentials;
|
||||
pub mod data_source_connections;
|
||||
pub mod data_source_query_routes;
|
||||
pub mod data_source_connections;
|
||||
pub mod data_types;
|
||||
pub mod credentials;
|
||||
pub mod data_source_helpers;
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
use crate::routes::rest::ApiResponse;
|
||||
use axum::extract::{Path, Query};
|
||||
use axum::http::StatusCode;
|
||||
use axum::Extension;
|
||||
use database::models::User;
|
||||
use handlers::metrics::get_metric_data_handler::{GetMetricDataRequest, MetricDataResponse};
|
||||
use serde::Deserialize;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct GetMetricDataParams {
|
||||
pub limit: Option<i64>,
|
||||
}
|
||||
|
||||
pub async fn get_metric_data_rest_handler(
|
||||
Extension(user): Extension<User>,
|
||||
Path(metric_id): Path<Uuid>,
|
||||
Query(params): Query<GetMetricDataParams>,
|
||||
) -> Result<ApiResponse<MetricDataResponse>, (StatusCode, &'static str)> {
|
||||
tracing::info!(
|
||||
"Processing GET request for metric data with ID: {}",
|
||||
metric_id
|
||||
);
|
||||
|
||||
let request = GetMetricDataRequest {
|
||||
metric_id,
|
||||
limit: params.limit,
|
||||
};
|
||||
|
||||
match handlers::metrics::get_metric_data_handler(request, user).await {
|
||||
Ok(response) => Ok(ApiResponse::JsonData(response)),
|
||||
Err(e) => {
|
||||
tracing::error!("Error getting metric data: {}", e);
|
||||
Err((
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"Failed to get metric data",
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,10 +3,12 @@ use axum::{
|
|||
Router,
|
||||
};
|
||||
|
||||
// Placeholder modules that you'll need to create
|
||||
// Import modules
|
||||
mod get_metric;
|
||||
mod get_metric_data;
|
||||
|
||||
pub fn router() -> Router {
|
||||
Router::new()
|
||||
.route("/:id", get(get_metric::get_metric_rest_handler))
|
||||
.route("/:id/data", get(get_metric_data::get_metric_data_rest_handler))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue