diff --git a/api/migrations/2025-01-09-183703_add_unique_constraint_on_datasets/up.sql b/api/migrations/2025-01-09-183703_add_unique_constraint_on_datasets/up.sql index f7563aea0..ecf554e67 100644 --- a/api/migrations/2025-01-09-183703_add_unique_constraint_on_datasets/up.sql +++ b/api/migrations/2025-01-09-183703_add_unique_constraint_on_datasets/up.sql @@ -6,4 +6,4 @@ AND a.data_source_id = b.data_source_id; ALTER TABLE datasets - ADD CONSTRAINT datasets_database_name_data_source_id_key UNIQUE (id, database_name, data_source_id); \ No newline at end of file + ADD CONSTRAINT datasets_database_name_data_source_id_key UNIQUE (database_name, data_source_id); \ No newline at end of file diff --git a/api/src/routes/rest/routes/datasets/deploy_datasets.rs b/api/src/routes/rest/routes/datasets/deploy_datasets.rs index 804c2362c..2cf0b261f 100644 --- a/api/src/routes/rest/routes/datasets/deploy_datasets.rs +++ b/api/src/routes/rest/routes/datasets/deploy_datasets.rs @@ -252,7 +252,8 @@ async fn deploy_datasets_handler( .map_err(|_| anyhow!("Data sources not found"))?; // Need to create the datasets, multiple datasets can be created at once. - let mut datasets = Vec::new(); + let mut datasets_with_ids = Vec::new(); + let mut datasets_without_ids = Vec::new(); for req in &requests { let data_source = data_sources @@ -289,33 +290,58 @@ async fn deploy_datasets_handler( yml_file: req.yml_file.clone(), }; - datasets.push(dataset); + match req.id { + Some(_) => datasets_with_ids.push(dataset), + None => datasets_without_ids.push(dataset), + } } - // Upsert the datasets into the database. - let inserted_datasets = diesel::insert_into(datasets::table) - .values(&datasets) - .on_conflict(( - datasets::id, - datasets::database_name, - datasets::data_source_id, - )) - .do_update() - .set(( - datasets::name.eq(excluded(datasets::name)), - datasets::data_source_id.eq(excluded(datasets::data_source_id)), - datasets::when_to_use.eq(excluded(datasets::when_to_use)), - datasets::definition.eq(excluded(datasets::definition)), - datasets::type_.eq(excluded(datasets::type_)), - datasets::schema.eq(excluded(datasets::schema)), - datasets::updated_at.eq(Utc::now()), - datasets::deleted_at.eq(None::>), - )) - .returning(Dataset::as_select()) - .get_results::(&mut conn) - .await - .map_err(|e| anyhow!("Failed to create dataset: {}", e))?; + // Upsert the datasets into the database concurrently + let (inserted_id_datasets, inserted_no_id_datasets) = tokio::try_join!( + async { + let mut conn = pg_pool.get().await?; + diesel::insert_into(datasets::table) + .values(&datasets_with_ids) + .on_conflict(datasets::id) + .do_update() + .set(( + datasets::name.eq(excluded(datasets::name)), + datasets::when_to_use.eq(excluded(datasets::when_to_use)), + datasets::definition.eq(excluded(datasets::definition)), + datasets::type_.eq(excluded(datasets::type_)), + datasets::schema.eq(excluded(datasets::schema)), + datasets::updated_at.eq(Utc::now()), + datasets::deleted_at.eq(None::>), + )) + .returning(Dataset::as_select()) + .get_results::(&mut conn) + .await + .map_err(|e| anyhow!("Failed to create dataset: {}", e)) + }, + async { + let mut conn = pg_pool.get().await?; + diesel::insert_into(datasets::table) + .values(&datasets_without_ids) + .on_conflict((datasets::database_name, datasets::data_source_id)) + .do_update() + .set(( + datasets::name.eq(excluded(datasets::name)), + datasets::when_to_use.eq(excluded(datasets::when_to_use)), + datasets::definition.eq(excluded(datasets::definition)), + datasets::type_.eq(excluded(datasets::type_)), + datasets::schema.eq(excluded(datasets::schema)), + datasets::updated_at.eq(Utc::now()), + datasets::deleted_at.eq(None::>), + )) + .returning(Dataset::as_select()) + .get_results::(&mut conn) + .await + .map_err(|e| anyhow!("Failed to create dataset: {}", e)) + } + )?; + let mut inserted_datasets = inserted_id_datasets; + inserted_datasets.extend(inserted_no_id_datasets); // Upsert the dataset columns into the database. We will pull these from the data source for their types. let mut columns_to_upsert: Vec = Vec::new();