mirror of https://github.com/buster-so/buster.git
Merge pull request #86 from buster-so/staging
refactor(stored_values): improve background processing and error hand…
This commit is contained in:
commit
d2f377dd0f
|
@ -23,6 +23,7 @@ use crate::{
|
||||||
write_query_engine::write_query_engine,
|
write_query_engine::write_query_engine,
|
||||||
}, security::checks::is_user_workspace_admin_or_data_admin, user::user_info::get_user_organization_id
|
}, security::checks::is_user_workspace_admin_or_data_admin, user::user_info::get_user_organization_id
|
||||||
},
|
},
|
||||||
|
utils::stored_values::{store_column_values, process_stored_values_background, StoredValueColumn},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
|
@ -558,17 +559,23 @@ async fn deploy_datasets_handler(
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find the foreign dataset by the relationship name
|
// Find the foreign dataset by the relationship name
|
||||||
let foreign_dataset = inserted_datasets
|
if let Some(foreign_dataset) = inserted_datasets
|
||||||
.iter()
|
.iter()
|
||||||
.find(|d| d.name == rel.name)
|
.find(|d| d.name == rel.name)
|
||||||
.ok_or(anyhow!("Foreign dataset not found for relationship"))?;
|
{
|
||||||
|
entity_relationships_to_upsert.push(EntityRelationship {
|
||||||
entity_relationships_to_upsert.push(EntityRelationship {
|
primary_dataset_id: current_dataset.id,
|
||||||
primary_dataset_id: current_dataset.id,
|
foreign_dataset_id: foreign_dataset.id,
|
||||||
foreign_dataset_id: foreign_dataset.id,
|
relationship_type: rel.type_.clone(),
|
||||||
relationship_type: rel.type_.clone(),
|
created_at: Utc::now(),
|
||||||
created_at: Utc::now(),
|
});
|
||||||
});
|
} else {
|
||||||
|
tracing::warn!(
|
||||||
|
"Skipping relationship: foreign dataset '{}' not found for dataset '{}'",
|
||||||
|
rel.name,
|
||||||
|
current_dataset.name
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -616,6 +623,7 @@ async fn deploy_datasets_handler(
|
||||||
.ok_or(anyhow!("Dataset not found"))?;
|
.ok_or(anyhow!("Dataset not found"))?;
|
||||||
|
|
||||||
// Process columns that have stored_values enabled
|
// Process columns that have stored_values enabled
|
||||||
|
let mut stored_value_columns = Vec::new();
|
||||||
for col in &req.columns {
|
for col in &req.columns {
|
||||||
if col.stored_values {
|
if col.stored_values {
|
||||||
// Find the column ID from inserted_columns
|
// Find the column ID from inserted_columns
|
||||||
|
@ -624,28 +632,24 @@ async fn deploy_datasets_handler(
|
||||||
.find(|c| c.dataset_id == dataset.id && c.name == col.name)
|
.find(|c| c.dataset_id == dataset.id && c.name == col.name)
|
||||||
.ok_or(anyhow!("Column not found"))?;
|
.ok_or(anyhow!("Column not found"))?;
|
||||||
|
|
||||||
match crate::utils::stored_values::store_column_values(
|
stored_value_columns.push(StoredValueColumn {
|
||||||
&organization_id,
|
organization_id: organization_id.clone(),
|
||||||
&dataset.id,
|
dataset_id: dataset.id,
|
||||||
&col.name,
|
column_name: col.name.clone(),
|
||||||
&column.id, // Pass the column ID
|
column_id: column.id,
|
||||||
&data_source.id,
|
data_source_id: data_source.id,
|
||||||
&dataset.schema,
|
schema: dataset.schema.clone(),
|
||||||
&dataset.database_name,
|
table_name: dataset.database_name.clone(),
|
||||||
).await {
|
});
|
||||||
Ok(_) => (),
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(
|
|
||||||
"Error storing values for column {}: {:?}",
|
|
||||||
col.name,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
// Continue with other columns even if one fails
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Spawn background task to process stored values
|
||||||
|
if !stored_value_columns.is_empty() {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
process_stored_values_background(stored_value_columns).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if is_simple {
|
if is_simple {
|
||||||
|
|
|
@ -194,4 +194,44 @@ pub async fn search_stored_values(
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(results.into_iter().map(|r| (r.value, r.column_name, r.column_id)).collect())
|
Ok(results.into_iter().map(|r| (r.value, r.column_name, r.column_id)).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct StoredValueColumn {
|
||||||
|
pub organization_id: Uuid,
|
||||||
|
pub dataset_id: Uuid,
|
||||||
|
pub column_name: String,
|
||||||
|
pub column_id: Uuid,
|
||||||
|
pub data_source_id: Uuid,
|
||||||
|
pub schema: String,
|
||||||
|
pub table_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process_stored_values_background(columns: Vec<StoredValueColumn>) {
|
||||||
|
for column in columns {
|
||||||
|
match store_column_values(
|
||||||
|
&column.organization_id,
|
||||||
|
&column.dataset_id,
|
||||||
|
&column.column_name,
|
||||||
|
&column.column_id,
|
||||||
|
&column.data_source_id,
|
||||||
|
&column.schema,
|
||||||
|
&column.table_name,
|
||||||
|
).await {
|
||||||
|
Ok(_) => {
|
||||||
|
tracing::info!(
|
||||||
|
"Successfully processed stored values for column '{}' in dataset '{}'",
|
||||||
|
column.column_name,
|
||||||
|
column.table_name
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(
|
||||||
|
"Failed to process stored values for column '{}' in dataset '{}': {:?}",
|
||||||
|
column.column_name,
|
||||||
|
column.table_name,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -14,4 +14,6 @@ Cargo.lock
|
||||||
*.pdb
|
*.pdb
|
||||||
|
|
||||||
Makefile
|
Makefile
|
||||||
.vscode/
|
.vscode/
|
||||||
|
|
||||||
|
/prd
|
|
@ -93,7 +93,7 @@ pub async fn deploy(
|
||||||
processed_files.insert(
|
processed_files.insert(
|
||||||
model.name.clone(),
|
model.name.clone(),
|
||||||
format!("SQL: {} bytes, YML: {} bytes",
|
format!("SQL: {} bytes, YML: {} bytes",
|
||||||
obj.sql_definition.len(),
|
obj.sql_definition.as_ref().map_or(0, |s| s.len()),
|
||||||
obj.yml_content.len()
|
obj.yml_content.len()
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
|
|
@ -15,7 +15,7 @@ use super::{
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub struct BusterModelObject {
|
pub struct BusterModelObject {
|
||||||
pub sql_definition: String,
|
pub sql_definition: Option<String>,
|
||||||
pub model_file: BusterModel,
|
pub model_file: BusterModel,
|
||||||
pub yml_content: String,
|
pub yml_content: String,
|
||||||
}
|
}
|
||||||
|
@ -32,8 +32,11 @@ pub struct Model {
|
||||||
pub description: String,
|
pub description: String,
|
||||||
pub model: Option<String>,
|
pub model: Option<String>,
|
||||||
pub schema: Option<String>,
|
pub schema: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
pub entities: Vec<Entity>,
|
pub entities: Vec<Entity>,
|
||||||
|
#[serde(default)]
|
||||||
pub dimensions: Vec<Dimension>,
|
pub dimensions: Vec<Dimension>,
|
||||||
|
#[serde(default)]
|
||||||
pub measures: Vec<Measure>,
|
pub measures: Vec<Measure>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,6 +55,8 @@ pub struct Dimension {
|
||||||
#[serde(rename = "type")]
|
#[serde(rename = "type")]
|
||||||
pub dimension_type: String,
|
pub dimension_type: String,
|
||||||
pub description: String,
|
pub description: String,
|
||||||
|
#[serde(default = "bool::default")]
|
||||||
|
pub stored_values: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
@ -65,21 +70,93 @@ pub struct Measure {
|
||||||
pub async fn get_model_files(dir_path: Option<&str>) -> Result<Vec<BusterModelObject>> {
|
pub async fn get_model_files(dir_path: Option<&str>) -> Result<Vec<BusterModelObject>> {
|
||||||
let mut model_objects = Vec::new();
|
let mut model_objects = Vec::new();
|
||||||
let path = match dir_path {
|
let path = match dir_path {
|
||||||
Some(p) => std::path::Path::new("models").join(p),
|
Some(p) => std::path::PathBuf::from(p),
|
||||||
None => std::path::Path::new("models").to_path_buf(),
|
None => std::path::Path::new("models").to_path_buf(),
|
||||||
};
|
};
|
||||||
|
|
||||||
if !path.exists() {
|
if !path.exists() {
|
||||||
return Err(anyhow::anyhow!("Directory not found: {}", path.display()));
|
return Err(anyhow::anyhow!("Path not found: {}", path.display()));
|
||||||
|
}
|
||||||
|
|
||||||
|
if path.is_file() {
|
||||||
|
if let Some(ext) = path.extension() {
|
||||||
|
if ext == "yml" {
|
||||||
|
process_yml_file(&path, &mut model_objects, dir_path.is_some()).await?;
|
||||||
|
} else {
|
||||||
|
return Err(anyhow::anyhow!("File must be a YML file: {}", path.display()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return Err(anyhow::anyhow!("File must be a YML file: {}", path.display()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
process_directory(&path, &mut model_objects, dir_path.is_some()).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
process_directory(&path, &mut model_objects).await?;
|
|
||||||
Ok(model_objects)
|
Ok(model_objects)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn process_yml_file(
|
||||||
|
path: &std::path::Path,
|
||||||
|
model_objects: &mut Vec<BusterModelObject>,
|
||||||
|
is_custom_path: bool,
|
||||||
|
) -> Result<()> {
|
||||||
|
println!("📄 Processing YAML file: {}", path.display());
|
||||||
|
|
||||||
|
let yaml_content = match fs::read_to_string(path).await {
|
||||||
|
Ok(content) => content,
|
||||||
|
Err(e) => {
|
||||||
|
println!("❌ Failed to read YAML file {}: {}", path.display(), e);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let model: BusterModel = match serde_yaml::from_str(&yaml_content) {
|
||||||
|
Ok(model) => model,
|
||||||
|
Err(e) => {
|
||||||
|
println!("⚠️ Skipping invalid YAML file {}: {}", path.display(), e);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if is_custom_path {
|
||||||
|
// In custom path mode, we don't need SQL files
|
||||||
|
model_objects.push(BusterModelObject {
|
||||||
|
sql_definition: None,
|
||||||
|
model_file: model,
|
||||||
|
yml_content: yaml_content,
|
||||||
|
});
|
||||||
|
println!("✅ Successfully processed YML file: {}", path.display());
|
||||||
|
} else {
|
||||||
|
// In default mode, we require SQL files
|
||||||
|
let sql_path = path.with_extension("sql");
|
||||||
|
if !sql_path.exists() {
|
||||||
|
println!("⚠️ Skipping {} - No matching SQL file found at {}", path.display(), sql_path.display());
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let sql_definition = match tokio::fs::read_to_string(&sql_path).await {
|
||||||
|
Ok(content) => content,
|
||||||
|
Err(e) => {
|
||||||
|
println!("❌ Failed to read SQL file {}: {}", sql_path.display(), e);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
model_objects.push(BusterModelObject {
|
||||||
|
sql_definition: Some(sql_definition),
|
||||||
|
model_file: model,
|
||||||
|
yml_content: yaml_content,
|
||||||
|
});
|
||||||
|
println!("✅ Successfully processed model file: {}", path.display());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn process_directory(
|
async fn process_directory(
|
||||||
dir_path: &std::path::Path,
|
dir_path: &std::path::Path,
|
||||||
model_objects: &mut Vec<BusterModelObject>,
|
model_objects: &mut Vec<BusterModelObject>,
|
||||||
|
is_custom_path: bool,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
println!("📂 Processing directory: {}", dir_path.display());
|
println!("📂 Processing directory: {}", dir_path.display());
|
||||||
let mut dir = tokio::fs::read_dir(dir_path).await?;
|
let mut dir = tokio::fs::read_dir(dir_path).await?;
|
||||||
|
@ -88,50 +165,13 @@ async fn process_directory(
|
||||||
let path = entry.path();
|
let path = entry.path();
|
||||||
|
|
||||||
if path.is_dir() {
|
if path.is_dir() {
|
||||||
Box::pin(process_directory(&path, model_objects)).await?;
|
Box::pin(process_directory(&path, model_objects, is_custom_path)).await?;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(ext) = path.extension() {
|
if let Some(ext) = path.extension() {
|
||||||
if ext == "yml" {
|
if ext == "yml" {
|
||||||
println!("📄 Found YAML file: {}", path.display());
|
process_yml_file(&path, model_objects, is_custom_path).await?;
|
||||||
let sql_path = path.with_extension("sql");
|
|
||||||
|
|
||||||
if !sql_path.exists() {
|
|
||||||
println!("⚠️ Skipping {} - No matching SQL file found at {}", path.display(), sql_path.display());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let sql_definition = match tokio::fs::read_to_string(&sql_path).await {
|
|
||||||
Ok(content) => content,
|
|
||||||
Err(e) => {
|
|
||||||
println!("❌ Failed to read SQL file {}: {}", sql_path.display(), e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let yaml_content = match fs::read_to_string(&path).await {
|
|
||||||
Ok(content) => content,
|
|
||||||
Err(e) => {
|
|
||||||
println!("❌ Failed to read YAML file {}: {}", path.display(), e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let model: BusterModel = match serde_yaml::from_str(&yaml_content) {
|
|
||||||
Ok(model) => model,
|
|
||||||
Err(e) => {
|
|
||||||
println!("❌ Failed to parse YAML file {}: {}", path.display(), e);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
println!("✅ Successfully processed model file: {}", path.display());
|
|
||||||
model_objects.push(BusterModelObject {
|
|
||||||
sql_definition,
|
|
||||||
model_file: model,
|
|
||||||
yml_content: yaml_content,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -206,7 +246,7 @@ pub async fn upload_model_files(
|
||||||
model: semantic_model.model,
|
model: semantic_model.model,
|
||||||
schema: schema_name.clone(),
|
schema: schema_name.clone(),
|
||||||
description: semantic_model.description,
|
description: semantic_model.description,
|
||||||
sql_definition: Some(model.sql_definition.clone()),
|
sql_definition: model.sql_definition.clone(),
|
||||||
entity_relationships: Some(entity_relationships),
|
entity_relationships: Some(entity_relationships),
|
||||||
columns,
|
columns,
|
||||||
yml_file: Some(model.yml_content.clone()),
|
yml_file: Some(model.yml_content.clone()),
|
||||||
|
|
Loading…
Reference in New Issue