Add support for YAML-based dataset deployment and enhance dataset structures

This commit is contained in:
dal 2025-01-09 11:57:56 -07:00
parent d962af3883
commit 79b3df107d
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
7 changed files with 196 additions and 37 deletions

View File

@ -89,6 +89,7 @@ tokio-postgres = "0.7"
futures-util = "0.3"
rayon = "1.10.0"
diesel_migrations = "2.0.0"
serde_yaml = "0.9.34"
[profile.release]
debug = false

View File

@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
ALTER TABLE datasets
DROP CONSTRAINT datasets_database_name_data_source_id_key;

View File

@ -0,0 +1,9 @@
-- Your SQL goes here
-- First delete any duplicate rows
DELETE FROM datasets a USING datasets b
WHERE a.id > b.id
AND a.database_name = b.database_name
AND a.data_source_id = b.data_source_id;
ALTER TABLE datasets
ADD CONSTRAINT datasets_database_name_data_source_id_key UNIQUE (id, database_name, data_source_id);

View File

@ -340,6 +340,14 @@ impl DatasetType {
_ => None,
}
}
pub fn to_string(&self) -> &'static str {
match *self {
DatasetType::Table => "table",
DatasetType::View => "view",
DatasetType::MaterializedView => "materialized view",
}
}
}
impl ToSql<sql_types::DatasetTypeEnum, Pg> for DatasetType {

View File

@ -4,6 +4,7 @@ use chrono::{DateTime, Utc};
use diesel::{upsert::excluded, ExpressionMethods, QueryDsl, SelectableHelper};
use diesel_async::RunQueryDsl;
use serde::{Deserialize, Serialize};
use serde_yaml;
use std::collections::HashSet;
use uuid::Uuid;
@ -19,15 +20,26 @@ use crate::{
query_engine::{
credentials::get_data_source_credentials,
import_dataset_columns::retrieve_dataset_columns,
write_query_engine::write_query_engine,
},
user::user_info::get_user_organization_id,
},
};
#[derive(Debug, Deserialize)]
pub struct DeployDatasetsRequest {
#[serde(untagged)]
pub enum DeployDatasetsRequest {
Full(Vec<FullDeployDatasetsRequest>),
Simple { id: Uuid, sql: String, yml: String },
}
#[derive(Debug, Deserialize)]
pub struct FullDeployDatasetsRequest {
pub id: Option<Uuid>,
pub data_source_name: String,
pub env: String,
#[serde(rename = "type")]
pub type_: String,
pub name: String,
pub model: Option<String>,
pub schema: String,
@ -62,11 +74,65 @@ pub struct DeployDatasetsResponse {
pub ids: Vec<Uuid>,
}
#[derive(Debug, Deserialize)]
pub struct BusterModel {
pub version: i32,
pub models: Vec<Model>,
}
#[derive(Debug, Deserialize)]
pub struct Model {
pub name: String,
pub data_source_name: String,
pub schema: String,
pub env: String,
pub description: String,
pub model: Option<String>,
#[serde(rename = "type")]
pub type_: String,
pub entities: Vec<Entity>,
pub dimensions: Vec<Dimension>,
pub measures: Vec<Measure>,
}
#[derive(Debug, Deserialize)]
pub struct Entity {
pub name: String,
pub expr: String,
#[serde(rename = "type")]
pub entity_type: String,
}
#[derive(Debug, Deserialize)]
pub struct Dimension {
pub name: String,
pub expr: String,
#[serde(rename = "type")]
pub dimension_type: String,
pub description: String,
}
#[derive(Debug, Deserialize)]
pub struct Measure {
pub name: String,
pub expr: String,
pub agg: String,
pub description: String,
}
pub async fn deploy_datasets(
Extension(user): Extension<User>,
Json(request): Json<Vec<DeployDatasetsRequest>>,
Json(request): Json<DeployDatasetsRequest>,
) -> Result<ApiResponse<DeployDatasetsResponse>, (axum::http::StatusCode, String)> {
let _ = match deploy_datasets_handler(&user.id, request).await {
let requests = match process_deploy_request(request).await {
Ok(requests) => requests,
Err(e) => {
tracing::error!("Error processing deploy request: {:?}", e);
return Err((axum::http::StatusCode::BAD_REQUEST, e.to_string()));
}
};
let _ = match deploy_datasets_handler(&user.id, requests).await {
Ok(dataset) => dataset,
Err(e) => {
tracing::error!("Error creating dataset: {:?}", e);
@ -77,7 +143,79 @@ pub async fn deploy_datasets(
Ok(ApiResponse::OK)
}
async fn deploy_datasets_handler(user_id: &Uuid, requests: Vec<DeployDatasetsRequest>) -> Result<()> {
async fn process_deploy_request(
request: DeployDatasetsRequest,
) -> Result<Vec<FullDeployDatasetsRequest>> {
match request {
DeployDatasetsRequest::Full(requests) => Ok(requests),
DeployDatasetsRequest::Simple { id, sql, yml } => {
let model: BusterModel = serde_yaml::from_str(&yml)?;
let mut requests = Vec::new();
for semantic_model in model.models {
// Create the view in the data source
let mut columns = Vec::new();
// Process dimensions
for dim in semantic_model.dimensions {
columns.push(DeployDatasetsColumnsRequest {
name: dim.name,
description: dim.description,
semantic_type: Some(String::from("dimension")),
expr: Some(dim.expr),
type_: Some(dim.dimension_type),
agg: None,
});
}
// Process measures
for measure in semantic_model.measures {
columns.push(DeployDatasetsColumnsRequest {
name: measure.name,
description: measure.description,
semantic_type: Some(String::from("measure")),
expr: Some(measure.expr),
type_: None,
agg: Some(measure.agg),
});
}
// Process entity relationships
let entity_relationships = semantic_model
.entities
.into_iter()
.map(|entity| DeployDatasetsEntityRelationshipsRequest {
name: entity.name,
expr: entity.expr,
type_: entity.entity_type,
})
.collect();
requests.push(FullDeployDatasetsRequest {
id: Some(id),
data_source_name: semantic_model.data_source_name,
env: semantic_model.env,
type_: semantic_model.type_,
name: semantic_model.name,
model: semantic_model.model,
schema: semantic_model.schema,
description: semantic_model.description,
sql_definition: Some(sql.clone()),
entity_relationships: Some(entity_relationships),
columns,
yml_file: Some(yml.clone()),
});
}
Ok(requests)
}
}
}
async fn deploy_datasets_handler(
user_id: &Uuid,
requests: Vec<FullDeployDatasetsRequest>,
) -> Result<()> {
// Get the user organization id.
let organization_id = get_user_organization_id(&user_id).await?;
@ -124,13 +262,15 @@ async fn deploy_datasets_handler(user_id: &Uuid, requests: Vec<DeployDatasetsReq
})
.ok_or(anyhow!("Data source not found"))?;
let database_name = req.name.replace(" ", "_");
let dataset = Dataset {
id: Uuid::new_v4(),
id: req.id.unwrap_or_else(|| Uuid::new_v4()),
name: req.name.clone(),
data_source_id: data_source.id,
created_at: Utc::now(),
updated_at: Utc::now(),
database_name: req.name.clone(),
database_name,
when_to_use: Some(req.description.clone()),
when_not_to_use: None,
type_: DatasetType::View,
@ -146,7 +286,7 @@ async fn deploy_datasets_handler(user_id: &Uuid, requests: Vec<DeployDatasetsReq
imported: false,
organization_id,
model: req.model.clone(),
yml_file: None,
yml_file: req.yml_file.clone(),
};
datasets.push(dataset);
@ -155,7 +295,11 @@ async fn deploy_datasets_handler(user_id: &Uuid, requests: Vec<DeployDatasetsReq
// Upsert the datasets into the database.
let inserted_datasets = diesel::insert_into(datasets::table)
.values(&datasets)
.on_conflict((datasets::database_name, datasets::data_source_id))
.on_conflict((
datasets::id,
datasets::database_name,
datasets::data_source_id,
))
.do_update()
.set((
datasets::name.eq(excluded(datasets::name)),
@ -339,8 +483,6 @@ async fn deploy_datasets_handler(user_id: &Uuid, requests: Vec<DeployDatasetsReq
// Handle entity relationships
let mut entity_relationships_to_upsert: Vec<EntityRelationship> = Vec::new();
println!("inserted_datasets: {:?}", inserted_datasets);
for req in &requests {
if let Some(relationships) = &req.entity_relationships {
let current_dataset = inserted_datasets
@ -397,6 +539,19 @@ async fn deploy_datasets_handler(user_id: &Uuid, requests: Vec<DeployDatasetsReq
.map_err(|e| anyhow!("Failed to upsert entity relationships: {}", e))?;
}
for dataset in inserted_datasets {
let view_name = format!("{}.{}", dataset.schema, dataset.database_name);
let view_sql = format!(
"CREATE {} {} AS {}",
dataset.type_.to_string(),
view_name,
dataset.definition
);
// Write the view to the data source
write_query_engine(&dataset.id, &view_sql).await?;
}
// TODO: Need to send back the updated and inserated objects.
Ok(())
}

View File

@ -26,7 +26,8 @@ pub async fn post_dataset(
Extension(user): Extension<User>,
Json(request): Json<PostDatasetReq>,
) -> Result<ApiResponse<Dataset>, (axum::http::StatusCode, String)> {
let dataset = match post_dataset_handler(&user.id, &request.data_source_id, &request.name).await {
let dataset = match post_dataset_handler(&user.id, &request.data_source_id, &request.name).await
{
Ok(dataset) => dataset,
Err(e) => {
tracing::error!("Error creating dataset: {:?}", e);
@ -37,7 +38,11 @@ pub async fn post_dataset(
Ok(ApiResponse::JsonData(dataset))
}
async fn post_dataset_handler(user_id: &Uuid, data_source_id: &Uuid, name: &str) -> Result<Dataset> {
async fn post_dataset_handler(
user_id: &Uuid,
data_source_id: &Uuid,
name: &str,
) -> Result<Dataset> {
// Get the user organization id.
let organization_id = get_user_organization_id(&user_id).await?;
@ -87,6 +92,8 @@ async fn post_dataset_handler(user_id: &Uuid, data_source_id: &Uuid, name: &str)
Err(e) => return Err(anyhow!("Data sources not found: {}", e)),
};
let database_name = name.replace(" ", "_");
let dataset = Dataset {
id: Uuid::new_v4(),
name: name.to_string(),
@ -94,7 +101,7 @@ async fn post_dataset_handler(user_id: &Uuid, data_source_id: &Uuid, name: &str)
organization_id: organization_id.clone(),
created_at: chrono::Utc::now(),
updated_at: chrono::Utc::now(),
database_name: String::new(),
database_name,
when_to_use: None,
when_not_to_use: None,
type_: DatasetType::Table,

View File

@ -106,30 +106,6 @@ async fn post_dataset_handler(
}
};
let name = format!("dataset_index_{}", dataset_id.clone());
let new_collection_schema = json!({
"name": name,
"fields": [
{"name": "id", "type": "string"},
{"name": "value", "type": "string"},
{"name": "dataset_id", "type": "string"},
{"name": "dataset_column_id", "type": "string"},
{"name": "value_embedding", "type": "float[]", "embed": {
"from": [
"value"
],
"model_config": {
"model_name": "ts/jina-embeddings-v2-base-en"
}
}},
],
});
match typesense::create_collection(new_collection_schema).await {
Ok(_) => (),
Err(e) => return Err(anyhow!("Error creating dataset index: {}", e)),
};
let dataset_state = match get_dataset_state(&dataset_id, &user_id).await {
Ok(dataset_state) => dataset_state,
Err(e) => return Err(anyhow!("Error getting dataset state: {}", e)),