fix on stored v alues functions

This commit is contained in:
dal 2025-05-05 13:50:15 -06:00
parent 016ea352cd
commit 82c415e80f
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
2 changed files with 107 additions and 57 deletions

View File

@ -90,65 +90,65 @@ pub async fn create_search_schema(data_source_id: Uuid) -> Result<()> {
info!(%schema_name, "Unique index on (value, db, schema, table, column) created successfully");
// 6. Create schema-specific embedding input function
let input_fn_name = format!("embedding_input_{}", schema_name);
let create_input_fn_query = format!(r#"
CREATE OR REPLACE FUNCTION "{}"."{}"(rec "{}"."searchable_column_values")
RETURNS text
LANGUAGE plpgsql
IMMUTABLE
AS $body$
BEGIN
RETURN rec.value;
END;
$body$;
"#,
schema_name, // Create function within the target schema
input_fn_name,
schema_name // Reference table type within the schema
);
conn.execute(create_input_fn_query.as_str())
.await
.with_context(|| format!("Failed to create embedding input function in schema {}", schema_name))?;
info!(%schema_name, function_name=%input_fn_name, "Embedding input function created");
// let input_fn_name = format!("embedding_input_{}", schema_name);
// let create_input_fn_query = format!(r#"
// CREATE OR REPLACE FUNCTION "{}"."{}"(rec "{}"."searchable_column_values")
// RETURNS text
// LANGUAGE plpgsql
// IMMUTABLE
// AS $body$
// BEGIN
// RETURN rec.value;
// END;
// $body$;
// "#,
// schema_name, // Create function within the target schema
// input_fn_name,
// schema_name // Reference table type within the schema
// );
// conn.execute(create_input_fn_query.as_str())
// .await
// .with_context(|| format!("Failed to create embedding input function in schema {}", schema_name))?;
// info!(%schema_name, function_name=%input_fn_name, "Embedding input function created");
// 7. Create INSERT trigger for embeddings
let insert_trigger_name = format!("embed_values_on_insert_{}", schema_name);
let create_insert_trigger_query = format!(r#"
CREATE OR REPLACE TRIGGER "{}"
AFTER INSERT
ON "{}"."searchable_column_values"
FOR EACH ROW
EXECUTE FUNCTION util.queue_embeddings('{}.{}', 'embedding');
"#,
insert_trigger_name,
schema_name, // Table schema
schema_name, // Function schema
input_fn_name // Function name
);
conn.execute(create_insert_trigger_query.as_str())
.await
.with_context(|| format!("Failed to create insert trigger for embeddings in schema {}", schema_name))?;
info!(%schema_name, trigger_name=%insert_trigger_name, "Insert trigger for embeddings created");
// let insert_trigger_name = format!("embed_values_on_insert_{}", schema_name);
// let create_insert_trigger_query = format!(r#"
// CREATE OR REPLACE TRIGGER "{}"
// AFTER INSERT
// ON "{}"."searchable_column_values"
// FOR EACH ROW
// EXECUTE FUNCTION util.queue_embeddings('{}.{}', 'embedding');
// "#,
// insert_trigger_name,
// schema_name, // Table schema
// schema_name, // Function schema
// input_fn_name // Function name
// );
// conn.execute(create_insert_trigger_query.as_str())
// .await
// .with_context(|| format!("Failed to create insert trigger for embeddings in schema {}", schema_name))?;
// info!(%schema_name, trigger_name=%insert_trigger_name, "Insert trigger for embeddings created");
// 8. Create UPDATE trigger for embeddings
let update_trigger_name = format!("embed_values_on_update_{}", schema_name);
let create_update_trigger_query = format!(r#"
CREATE OR REPLACE TRIGGER "{}"
AFTER UPDATE OF value -- Only trigger if 'value' changes
ON "{}"."searchable_column_values"
FOR EACH ROW
WHEN (OLD.value IS DISTINCT FROM NEW.value) -- Ensure value actually changed
EXECUTE FUNCTION util.queue_embeddings('{}.{}', 'embedding');
"#,
update_trigger_name,
schema_name, // Table schema
schema_name, // Function schema
input_fn_name // Function name
);
conn.execute(create_update_trigger_query.as_str())
.await
.with_context(|| format!("Failed to create update trigger for embeddings in schema {}", schema_name))?;
info!(%schema_name, trigger_name=%update_trigger_name, "Update trigger for embeddings created");
// let update_trigger_name = format!("embed_values_on_update_{}", schema_name);
// let create_update_trigger_query = format!(r#"
// CREATE OR REPLACE TRIGGER "{}"
// AFTER UPDATE OF value -- Only trigger if 'value' changes
// ON "{}"."searchable_column_values"
// FOR EACH ROW
// WHEN (OLD.value IS DISTINCT FROM NEW.value) -- Ensure value actually changed
// EXECUTE FUNCTION util.queue_embeddings('{}.{}', 'embedding');
// "#,
// update_trigger_name,
// schema_name, // Table schema
// schema_name, // Function schema
// input_fn_name // Function name
// );
// conn.execute(create_update_trigger_query.as_str())
// .await
// .with_context(|| format!("Failed to create update trigger for embeddings in schema {}", schema_name))?;
// info!(%schema_name, trigger_name=%update_trigger_name, "Update trigger for embeddings created");
Ok(())
}

View File

@ -47,6 +47,13 @@ impl std::fmt::Display for DatabaseType {
// Using shared RedshiftCredentials from query_engine now, no need for local definition
// Helper struct to parse dbt_project.yml
#[derive(Debug, Deserialize)]
struct DbtProject {
#[serde(rename = "model-paths")]
model_paths: Option<Vec<String>>,
}
pub async fn init(destination_path: Option<&str>) -> Result<()> {
println!("{}", "Initializing Buster...".bold().green());
@ -1152,12 +1159,55 @@ fn create_buster_config_file(
database: &str,
schema: Option<&str>,
) -> Result<()> {
// Prompt for model paths (optional)
// --- BEGIN DBT PROJECT DETECTION ---
let mut suggested_model_paths = "".to_string(); // Default to empty string
// Construct path to dbt_project.yml relative to the buster.yml path
if let Some(parent_dir) = path.parent() {
let dbt_project_path = parent_dir.join("dbt_project.yml");
if dbt_project_path.exists() && dbt_project_path.is_file() {
match fs::read_to_string(&dbt_project_path) {
Ok(content) => {
match serde_yaml::from_str::<DbtProject>(&content) {
Ok(dbt_config) => {
// Use specified model-paths or default ["models"] if present in file
let paths_to_suggest = dbt_config.model_paths.unwrap_or_else(|| vec!["models".to_string()]);
if !paths_to_suggest.is_empty() {
suggested_model_paths = paths_to_suggest.join(",");
println!(
"{}",
format!("Found dbt_project.yml, suggesting model paths: {}", suggested_model_paths.cyan()).dimmed()
);
}
},
Err(e) => {
// Log error but don't fail the init process
eprintln!(
"{}",
format!("Warning: Failed to parse {}: {}. Proceeding without suggested model paths.", dbt_project_path.display(), e).yellow()
);
}
}
},
Err(e) => {
eprintln!(
"{}",
format!("Warning: Failed to read {}: {}. Proceeding without suggested model paths.", dbt_project_path.display(), e).yellow()
);
}
}
}
}
// --- END DBT PROJECT DETECTION ---
// Prompt for model paths (optional), now with potential initial input
let model_paths_input = Text::new(
"Enter paths to your SQL models (optional, comma-separated):",
)
.with_default(&suggested_model_paths) // Use with_default instead
.with_help_message(
"Leave blank to use current directory, or specify paths like './models,./analytics/models'",
"Leave blank if none, or specify paths like './models,./analytics/models'",
)
.prompt()?;