ITS WORKING

This commit is contained in:
dal 2025-02-11 16:57:44 -07:00
parent 29900149dd
commit 89912fed59
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 88 additions and 21 deletions

View File

@ -344,10 +344,32 @@ async fn deploy_datasets_handler(
.map(|req| (req.name.clone(), req.schema.clone()))
.collect();
tracing::info!(
"Validating tables for data source '{}': {:?}",
data_source_name,
tables_to_validate
);
// Get all columns in one batch - this acts as our validation
let ds_columns = match retrieve_dataset_columns_batch(&tables_to_validate, &credentials).await {
Ok(cols) => cols,
Ok(cols) => {
// Add debug logging
tracing::info!(
"Retrieved {} columns for data source '{}'. Tables found: {:?}",
cols.len(),
data_source_name,
cols.iter()
.map(|c| format!("{}.{}", c.schema_name, c.dataset_name))
.collect::<HashSet<_>>()
);
cols
},
Err(e) => {
tracing::error!(
"Error retrieving columns for data source '{}': {:?}",
data_source_name,
e
);
for req in group {
let mut validation = ValidationResult::new(
req.name.clone(),
@ -378,13 +400,49 @@ async fn deploy_datasets_handler(
// Get columns for this dataset
let columns: Vec<_> = ds_columns
.iter()
.filter(|col| col.dataset_name == req.name && col.schema_name == req.schema)
.filter(|col| {
let name_match = col.dataset_name.to_lowercase() == req.name.to_lowercase();
let schema_match = col.schema_name.to_lowercase() == req.schema.to_lowercase();
// Add detailed debug logging for column matching
tracing::info!(
"Matching table '{}.{}': name_match={}, schema_match={} (comparing against {}.{})",
col.schema_name,
col.dataset_name,
name_match,
schema_match,
req.schema,
req.name
);
name_match && schema_match
})
.collect();
if columns.is_empty() {
validation.add_error(ValidationError::table_not_found(&req.name));
tracing::warn!(
"No columns found for dataset '{}' in schema '{}'. Available tables:\n{}",
req.name,
req.schema,
ds_columns
.iter()
.map(|c| format!(" - {}.{}", c.schema_name, c.dataset_name))
.collect::<Vec<_>>()
.join("\n")
);
validation.add_error(ValidationError::table_not_found(&format!(
"{}.{}",
req.schema,
req.name
)));
validation.success = false;
} else {
tracing::info!(
"✅ Found {} columns for dataset '{}.{}'",
columns.len(),
req.schema,
req.name
);
validation.success = true;
valid_datasets.push(req);
dataset_columns_map.insert(req.name.clone(), columns);
@ -436,7 +494,7 @@ async fn deploy_datasets_handler(
// Bulk upsert datasets
diesel::insert_into(datasets::table)
.values(&datasets_to_upsert)
.on_conflict(datasets::id)
.on_conflict((datasets::database_name, datasets::data_source_id))
.do_update()
.set((
datasets::updated_at.eq(excluded(datasets::updated_at)),
@ -445,35 +503,44 @@ async fn deploy_datasets_handler(
datasets::when_to_use.eq(excluded(datasets::when_to_use)),
datasets::model.eq(excluded(datasets::model)),
datasets::yml_file.eq(excluded(datasets::yml_file)),
datasets::schema.eq(excluded(datasets::schema)),
datasets::name.eq(excluded(datasets::name)),
datasets::deleted_at.eq(None::<DateTime<Utc>>),
))
.execute(&mut conn)
.await?;
// Get the dataset IDs after upsert for column operations
let dataset_ids: HashMap<String, Uuid> = datasets::table
.filter(datasets::data_source_id.eq(&data_source.id))
.filter(datasets::database_name.eq_any(valid_datasets.iter().map(|req| &req.name)))
.filter(datasets::deleted_at.is_null())
.select((datasets::database_name, datasets::id))
.load::<(String, Uuid)>(&mut conn)
.await?
.into_iter()
.collect();
// Get the new dataset names
let new_dataset_names: HashSet<String> = valid_datasets
.iter()
.map(|req| req.name.clone())
.collect();
// Soft delete datasets that are no longer present
let datasets_to_delete: Vec<String> = existing_datasets
.difference(&new_dataset_names)
.cloned()
.collect();
if !datasets_to_delete.is_empty() {
diesel::update(datasets::table)
.filter(datasets::data_source_id.eq(&data_source.id))
.filter(datasets::name.eq_any(&datasets_to_delete))
.filter(datasets::deleted_at.is_null())
.set(datasets::deleted_at.eq(now))
.execute(&mut conn)
.await?;
}
// Bulk upsert columns for each dataset
for req in valid_datasets {
let dataset_id = req.id.unwrap_or_else(Uuid::new_v4);
let dataset_id = match dataset_ids.get(&req.name) {
Some(id) => *id,
None => {
tracing::error!(
"Dataset ID not found after upsert for {}.{}",
req.schema,
req.name
);
continue;
}
};
let columns: Vec<DatasetColumn> = req
.columns
.iter()