diff --git a/api/Cargo.toml b/api/Cargo.toml index 391197450..d48493e05 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -89,6 +89,7 @@ tokio-postgres = "0.7" futures-util = "0.3" rayon = "1.10.0" diesel_migrations = "2.0.0" +serde_yaml = "0.9.34" [profile.release] debug = false diff --git a/api/migrations/2025-01-09-183703_add_unique_constraint_on_datasets/down.sql b/api/migrations/2025-01-09-183703_add_unique_constraint_on_datasets/down.sql new file mode 100644 index 000000000..c6c303217 --- /dev/null +++ b/api/migrations/2025-01-09-183703_add_unique_constraint_on_datasets/down.sql @@ -0,0 +1,3 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE datasets +DROP CONSTRAINT datasets_database_name_data_source_id_key; diff --git a/api/migrations/2025-01-09-183703_add_unique_constraint_on_datasets/up.sql b/api/migrations/2025-01-09-183703_add_unique_constraint_on_datasets/up.sql new file mode 100644 index 000000000..f7563aea0 --- /dev/null +++ b/api/migrations/2025-01-09-183703_add_unique_constraint_on_datasets/up.sql @@ -0,0 +1,9 @@ +-- Your SQL goes here + -- First delete any duplicate rows + DELETE FROM datasets a USING datasets b + WHERE a.id > b.id + AND a.database_name = b.database_name + AND a.data_source_id = b.data_source_id; + + ALTER TABLE datasets + ADD CONSTRAINT datasets_database_name_data_source_id_key UNIQUE (id, database_name, data_source_id); \ No newline at end of file diff --git a/api/src/database/enums.rs b/api/src/database/enums.rs index 4911dd7f8..e3c05b850 100644 --- a/api/src/database/enums.rs +++ b/api/src/database/enums.rs @@ -340,6 +340,14 @@ impl DatasetType { _ => None, } } + + pub fn to_string(&self) -> &'static str { + match *self { + DatasetType::Table => "table", + DatasetType::View => "view", + DatasetType::MaterializedView => "materialized view", + } + } } impl ToSql for DatasetType { diff --git a/api/src/routes/rest/routes/datasets/deploy_datasets.rs b/api/src/routes/rest/routes/datasets/deploy_datasets.rs index 706047b90..804c2362c 100644 --- a/api/src/routes/rest/routes/datasets/deploy_datasets.rs +++ b/api/src/routes/rest/routes/datasets/deploy_datasets.rs @@ -4,6 +4,7 @@ use chrono::{DateTime, Utc}; use diesel::{upsert::excluded, ExpressionMethods, QueryDsl, SelectableHelper}; use diesel_async::RunQueryDsl; use serde::{Deserialize, Serialize}; +use serde_yaml; use std::collections::HashSet; use uuid::Uuid; @@ -19,15 +20,26 @@ use crate::{ query_engine::{ credentials::get_data_source_credentials, import_dataset_columns::retrieve_dataset_columns, + write_query_engine::write_query_engine, }, user::user_info::get_user_organization_id, }, }; #[derive(Debug, Deserialize)] -pub struct DeployDatasetsRequest { +#[serde(untagged)] +pub enum DeployDatasetsRequest { + Full(Vec), + Simple { id: Uuid, sql: String, yml: String }, +} + +#[derive(Debug, Deserialize)] +pub struct FullDeployDatasetsRequest { + pub id: Option, pub data_source_name: String, pub env: String, + #[serde(rename = "type")] + pub type_: String, pub name: String, pub model: Option, pub schema: String, @@ -62,11 +74,65 @@ pub struct DeployDatasetsResponse { pub ids: Vec, } +#[derive(Debug, Deserialize)] +pub struct BusterModel { + pub version: i32, + pub models: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct Model { + pub name: String, + pub data_source_name: String, + pub schema: String, + pub env: String, + pub description: String, + pub model: Option, + #[serde(rename = "type")] + pub type_: String, + pub entities: Vec, + pub dimensions: Vec, + pub measures: Vec, +} + +#[derive(Debug, Deserialize)] +pub struct Entity { + pub name: String, + pub expr: String, + #[serde(rename = "type")] + pub entity_type: String, +} + +#[derive(Debug, Deserialize)] +pub struct Dimension { + pub name: String, + pub expr: String, + #[serde(rename = "type")] + pub dimension_type: String, + pub description: String, +} + +#[derive(Debug, Deserialize)] +pub struct Measure { + pub name: String, + pub expr: String, + pub agg: String, + pub description: String, +} + pub async fn deploy_datasets( Extension(user): Extension, - Json(request): Json>, + Json(request): Json, ) -> Result, (axum::http::StatusCode, String)> { - let _ = match deploy_datasets_handler(&user.id, request).await { + let requests = match process_deploy_request(request).await { + Ok(requests) => requests, + Err(e) => { + tracing::error!("Error processing deploy request: {:?}", e); + return Err((axum::http::StatusCode::BAD_REQUEST, e.to_string())); + } + }; + + let _ = match deploy_datasets_handler(&user.id, requests).await { Ok(dataset) => dataset, Err(e) => { tracing::error!("Error creating dataset: {:?}", e); @@ -77,7 +143,79 @@ pub async fn deploy_datasets( Ok(ApiResponse::OK) } -async fn deploy_datasets_handler(user_id: &Uuid, requests: Vec) -> Result<()> { +async fn process_deploy_request( + request: DeployDatasetsRequest, +) -> Result> { + match request { + DeployDatasetsRequest::Full(requests) => Ok(requests), + DeployDatasetsRequest::Simple { id, sql, yml } => { + let model: BusterModel = serde_yaml::from_str(&yml)?; + let mut requests = Vec::new(); + + for semantic_model in model.models { + // Create the view in the data source + let mut columns = Vec::new(); + + // Process dimensions + for dim in semantic_model.dimensions { + columns.push(DeployDatasetsColumnsRequest { + name: dim.name, + description: dim.description, + semantic_type: Some(String::from("dimension")), + expr: Some(dim.expr), + type_: Some(dim.dimension_type), + agg: None, + }); + } + + // Process measures + for measure in semantic_model.measures { + columns.push(DeployDatasetsColumnsRequest { + name: measure.name, + description: measure.description, + semantic_type: Some(String::from("measure")), + expr: Some(measure.expr), + type_: None, + agg: Some(measure.agg), + }); + } + + // Process entity relationships + let entity_relationships = semantic_model + .entities + .into_iter() + .map(|entity| DeployDatasetsEntityRelationshipsRequest { + name: entity.name, + expr: entity.expr, + type_: entity.entity_type, + }) + .collect(); + + requests.push(FullDeployDatasetsRequest { + id: Some(id), + data_source_name: semantic_model.data_source_name, + env: semantic_model.env, + type_: semantic_model.type_, + name: semantic_model.name, + model: semantic_model.model, + schema: semantic_model.schema, + description: semantic_model.description, + sql_definition: Some(sql.clone()), + entity_relationships: Some(entity_relationships), + columns, + yml_file: Some(yml.clone()), + }); + } + + Ok(requests) + } + } +} + +async fn deploy_datasets_handler( + user_id: &Uuid, + requests: Vec, +) -> Result<()> { // Get the user organization id. let organization_id = get_user_organization_id(&user_id).await?; @@ -124,13 +262,15 @@ async fn deploy_datasets_handler(user_id: &Uuid, requests: Vec = Vec::new(); - println!("inserted_datasets: {:?}", inserted_datasets); - for req in &requests { if let Some(relationships) = &req.entity_relationships { let current_dataset = inserted_datasets @@ -397,6 +539,19 @@ async fn deploy_datasets_handler(user_id: &Uuid, requests: Vec, Json(request): Json, ) -> Result, (axum::http::StatusCode, String)> { - let dataset = match post_dataset_handler(&user.id, &request.data_source_id, &request.name).await { + let dataset = match post_dataset_handler(&user.id, &request.data_source_id, &request.name).await + { Ok(dataset) => dataset, Err(e) => { tracing::error!("Error creating dataset: {:?}", e); @@ -37,7 +38,11 @@ pub async fn post_dataset( Ok(ApiResponse::JsonData(dataset)) } -async fn post_dataset_handler(user_id: &Uuid, data_source_id: &Uuid, name: &str) -> Result { +async fn post_dataset_handler( + user_id: &Uuid, + data_source_id: &Uuid, + name: &str, +) -> Result { // Get the user organization id. let organization_id = get_user_organization_id(&user_id).await?; @@ -87,6 +92,8 @@ async fn post_dataset_handler(user_id: &Uuid, data_source_id: &Uuid, name: &str) Err(e) => return Err(anyhow!("Data sources not found: {}", e)), }; + let database_name = name.replace(" ", "_"); + let dataset = Dataset { id: Uuid::new_v4(), name: name.to_string(), @@ -94,7 +101,7 @@ async fn post_dataset_handler(user_id: &Uuid, data_source_id: &Uuid, name: &str) organization_id: organization_id.clone(), created_at: chrono::Utc::now(), updated_at: chrono::Utc::now(), - database_name: String::new(), + database_name, when_to_use: None, when_not_to_use: None, type_: DatasetType::Table, diff --git a/api/src/routes/ws/datasets/post_dataset.rs b/api/src/routes/ws/datasets/post_dataset.rs index 49bc6de04..13a7bcda4 100644 --- a/api/src/routes/ws/datasets/post_dataset.rs +++ b/api/src/routes/ws/datasets/post_dataset.rs @@ -106,30 +106,6 @@ async fn post_dataset_handler( } }; - let name = format!("dataset_index_{}", dataset_id.clone()); - let new_collection_schema = json!({ - "name": name, - "fields": [ - {"name": "id", "type": "string"}, - {"name": "value", "type": "string"}, - {"name": "dataset_id", "type": "string"}, - {"name": "dataset_column_id", "type": "string"}, - {"name": "value_embedding", "type": "float[]", "embed": { - "from": [ - "value" - ], - "model_config": { - "model_name": "ts/jina-embeddings-v2-base-en" - } - }}, - ], - }); - - match typesense::create_collection(new_collection_schema).await { - Ok(_) => (), - Err(e) => return Err(anyhow!("Error creating dataset index: {}", e)), - }; - let dataset_state = match get_dataset_state(&dataset_id, &user_id).await { Ok(dataset_state) => dataset_state, Err(e) => return Err(anyhow!("Error getting dataset state: {}", e)),