Refactor dataset deployment logic and enhance request structure

- Introduced a new `is_simple` flag in the `deploy_datasets` function to differentiate between full and simple dataset deployments.
- Updated the `deploy_datasets_handler` to accept the `is_simple` parameter, allowing for conditional processing of inserted datasets.
- Modified the `DeployDatasetsRequest` struct to include an optional `id` and `type_` field, enhancing the request's flexibility.
- Adjusted the handling of the `yml_file` field to be optional in the `DeployDatasetsRequest` struct.
- Updated the `process_batch` function to handle "USER-DEFINED" data types in addition to existing types.

These changes improve the dataset deployment process by allowing for more granular control and flexibility in handling different dataset types.
This commit is contained in:
dal 2025-01-11 15:17:01 -07:00
parent 4b64458938
commit 1f1df4a7bb
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
4 changed files with 28 additions and 16 deletions

View File

@ -124,6 +124,11 @@ pub async fn deploy_datasets(
Extension(user): Extension<User>, Extension(user): Extension<User>,
Json(request): Json<DeployDatasetsRequest>, Json(request): Json<DeployDatasetsRequest>,
) -> Result<ApiResponse<DeployDatasetsResponse>, (axum::http::StatusCode, String)> { ) -> Result<ApiResponse<DeployDatasetsResponse>, (axum::http::StatusCode, String)> {
let is_simple = match request {
DeployDatasetsRequest::Full(_) => false,
DeployDatasetsRequest::Simple { .. } => true,
};
let requests = match process_deploy_request(request).await { let requests = match process_deploy_request(request).await {
Ok(requests) => requests, Ok(requests) => requests,
Err(e) => { Err(e) => {
@ -132,7 +137,7 @@ pub async fn deploy_datasets(
} }
}; };
let _ = match deploy_datasets_handler(&user.id, requests).await { let _ = match deploy_datasets_handler(&user.id, requests, is_simple).await {
Ok(dataset) => dataset, Ok(dataset) => dataset,
Err(e) => { Err(e) => {
tracing::error!("Error creating dataset: {:?}", e); tracing::error!("Error creating dataset: {:?}", e);
@ -215,6 +220,7 @@ async fn process_deploy_request(
async fn deploy_datasets_handler( async fn deploy_datasets_handler(
user_id: &Uuid, user_id: &Uuid,
requests: Vec<FullDeployDatasetsRequest>, requests: Vec<FullDeployDatasetsRequest>,
is_simple: bool,
) -> Result<()> { ) -> Result<()> {
// Get the user organization id. // Get the user organization id.
let organization_id = get_user_organization_id(&user_id).await?; let organization_id = get_user_organization_id(&user_id).await?;
@ -452,8 +458,6 @@ async fn deploy_datasets_handler(
} }
} }
println!("columns_to_upsert: {:?}", columns_to_upsert);
// Dedupe columns based on dataset_id and name // Dedupe columns based on dataset_id and name
let columns_to_upsert: Vec<DatasetColumn> = { let columns_to_upsert: Vec<DatasetColumn> = {
let mut seen = HashSet::new(); let mut seen = HashSet::new();
@ -565,17 +569,19 @@ async fn deploy_datasets_handler(
.map_err(|e| anyhow!("Failed to upsert entity relationships: {}", e))?; .map_err(|e| anyhow!("Failed to upsert entity relationships: {}", e))?;
} }
for dataset in inserted_datasets { if is_simple {
let view_name = format!("{}.{}", dataset.schema, dataset.database_name); for dataset in inserted_datasets {
let view_sql = format!( let view_name = format!("{}.{}", dataset.schema, dataset.database_name);
"CREATE {} {} AS {}", let view_sql = format!(
dataset.type_.to_string(), "CREATE {} {} AS {}",
view_name, dataset.type_.to_string(),
dataset.definition view_name,
); dataset.definition
);
// Write the view to the data source // Write the view to the data source
write_query_engine(&dataset.id, &view_sql).await?; write_query_engine(&dataset.id, &view_sql).await?;
}
} }
// TODO: Need to send back the updated and inserated objects. // TODO: Need to send back the updated and inserated objects.

View File

@ -122,7 +122,7 @@ async fn process_batch(
"INT8" => DataType::Int8(row.try_get::<i64, _>(i).ok()), "INT8" => DataType::Int8(row.try_get::<i64, _>(i).ok()),
"INT4" => DataType::Int4(row.try_get::<i32, _>(i).ok()), "INT4" => DataType::Int4(row.try_get::<i32, _>(i).ok()),
"INT2" => DataType::Int2(row.try_get::<i16, _>(i).ok()), "INT2" => DataType::Int2(row.try_get::<i16, _>(i).ok()),
"TEXT" | "VARCHAR" => DataType::Text(row.try_get::<String, _>(i).ok()), "TEXT" | "VARCHAR" | "USER-DEFINED" => DataType::Text(row.try_get::<String, _>(i).ok()),
"FLOAT4" => DataType::Float4(row.try_get::<f32, _>(i).ok()), "FLOAT4" => DataType::Float4(row.try_get::<f32, _>(i).ok()),
"FLOAT8" => DataType::Float8(row.try_get::<f64, _>(i).ok()), "FLOAT8" => DataType::Float8(row.try_get::<f64, _>(i).ok()),
"NUMERIC" => { "NUMERIC" => {

View File

@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::utils::profiles::Credential; use crate::utils::profiles::Credential;
@ -22,8 +23,11 @@ pub struct PostDataSourcesRequest {
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
pub struct DeployDatasetsRequest { pub struct DeployDatasetsRequest {
pub id: Option<Uuid>,
pub data_source_name: String, pub data_source_name: String,
pub env: String, pub env: String,
#[serde(rename = "type")]
pub type_: String,
pub name: String, pub name: String,
pub model: Option<String>, pub model: Option<String>,
pub schema: String, pub schema: String,
@ -31,7 +35,7 @@ pub struct DeployDatasetsRequest {
pub sql_definition: Option<String>, pub sql_definition: Option<String>,
pub entity_relationships: Option<Vec<DeployDatasetsEntityRelationshipsRequest>>, pub entity_relationships: Option<Vec<DeployDatasetsEntityRelationshipsRequest>>,
pub columns: Vec<DeployDatasetsColumnsRequest>, pub columns: Vec<DeployDatasetsColumnsRequest>,
pub yml_file: String, pub yml_file: Option<String>,
} }
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]

View File

@ -162,7 +162,9 @@ pub async fn upload_model_files(
sql_definition: Some(model.sql_definition.clone()), sql_definition: Some(model.sql_definition.clone()),
entity_relationships: Some(entity_relationships), entity_relationships: Some(entity_relationships),
columns, columns,
yml_file: model.yml_content.clone(), yml_file: Some(model.yml_content.clone()),
id: None,
type_: String::from("view"),
}; };
post_datasets_req_body.push(dataset); post_datasets_req_body.push(dataset);