From f01755d41c3414af92d80fd15700d433c3d74005 Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 9 May 2025 15:54:36 -0600 Subject: [PATCH 1/6] improvement: sync jobs --- api/libs/stored_values/src/jobs.rs | 117 +++++++++++++++++++++-------- api/server/src/main.rs | 2 +- 2 files changed, 86 insertions(+), 33 deletions(-) diff --git a/api/libs/stored_values/src/jobs.rs b/api/libs/stored_values/src/jobs.rs index ead00ba94..66622e8c0 100644 --- a/api/libs/stored_values/src/jobs.rs +++ b/api/libs/stored_values/src/jobs.rs @@ -113,7 +113,8 @@ pub async fn sync_distinct_values_chunk( .get() .await .context("Failed to get DB connection for finding job ID")?; - stored_values_sync_jobs::table + + let find_job_result = stored_values_sync_jobs::table .filter(stored_values_sync_jobs::data_source_id.eq(data_source_id)) .filter(stored_values_sync_jobs::database_name.eq(&database_name.to_lowercase())) .filter(stored_values_sync_jobs::schema_name.eq(&schema_name.to_lowercase())) @@ -123,17 +124,28 @@ pub async fn sync_distinct_values_chunk( .select(stored_values_sync_jobs::id) .order(stored_values_sync_jobs::created_at.desc()) // Get the most recent pending one .get_result::(&mut conn) - .await - .with_context(|| { - format!( - "No pending sync job found for data_source_id={}, database={}, schema={}, table={}, column={}", - data_source_id, - database_name, - schema_name, - table_name, - column_name - ) - })? + .await; + + match find_job_result { + Ok(id) => id, + Err(diesel::NotFound) => { + info!( + "No pending sync job found for data_source_id={}, database={}, schema={}, table={}, column={}. Nothing to sync.", + data_source_id, database_name, schema_name, table_name, column_name + ); + return Ok(0); // No job found, so 0 items processed. + } + Err(e) => { + error!( + "Failed to get pending sync job for data_source_id={}, database={}, schema={}, table={}, column={}: {}", + data_source_id, database_name, schema_name, table_name, column_name, e + ); + return Err(anyhow::Error::new(e).context(format!( + "Failed to retrieve sync job details for data_source_id={}, database={}, schema={}, table={}, column={}", + data_source_id, database_name, schema_name, table_name, column_name + ))); + } + } }; info!( @@ -453,46 +465,87 @@ pub async fn trigger_stale_sync_jobs() -> Result<()> { let twenty_four_hours_ago = Utc::now() - chrono::Duration::hours(24); - // Find jobs that were last synced > 24h ago and are not 'in_progress' + // Find jobs that were last synced > 24h ago and are not 'in_progress' or 'error' + // Also include jobs that are 'pending' and have never been synced (last_synced_at is NULL), + // or are 'pending' and their creation time implies they might have been missed. + // For simplicity with Diesel, we can query broadly and then filter in code, or make a more complex query. + // The current query correctly gets jobs that have a last_synced_at > 24h ago and are not in_progress/error. + // To include 'pending' jobs regardless of last_synced_at (as they should be picked up), + // the logic inside the loop will handle ensuring they are 'pending'. let jobs_to_sync = stored_values_sync_jobs::table .filter( stored_values_sync_jobs::last_synced_at .lt(twenty_four_hours_ago) - .and(stored_values_sync_jobs::status.ne("in_progress")), // Avoid re-triggering already running jobs + .or(stored_values_sync_jobs::last_synced_at.is_null()) // Also consider jobs never synced + .and(stored_values_sync_jobs::status.ne("in_progress")) + .and(stored_values_sync_jobs::status.ne("error")), ) .load::(&mut conn) .await .context("Failed to load sync jobs from database")?; let count = jobs_to_sync.len(); - info!("Found {} jobs to potentially sync.", count); + info!("Found {} jobs to potentially sync based on age or pending status.", count); for job in jobs_to_sync { - info!(job_id = %job.id, status = %job.status, last_synced_at = ?job.last_synced_at, "Spawning sync task for job"); + info!(job_id = %job.id, status = %job.status, last_synced_at = ?job.last_synced_at, "Considering job: {}", job.column_name); - // Clone the necessary data for the async task - let ds_id = job.data_source_id; - let db_name = job.database_name.clone(); - let sch_name = job.schema_name.clone(); - let tbl_name = job.table_name.clone(); - let col_name = job.column_name.clone(); - let job_id_for_task = job.id; // Capture job_id for logging inside task + let mut job_is_ready_to_sync = job.status == "pending"; - tokio::spawn(async move { - info!(%job_id_for_task, "Background sync task started."); - match sync_distinct_values_chunk(ds_id, db_name, sch_name, tbl_name, col_name).await { - Ok(inserted_count) => { - info!(%job_id_for_task, %inserted_count, "Background sync task completed successfully."); + // If the job is eligible (old or never synced, and not in_progress/error) + // and not already pending, mark it as 'pending'. + if !job_is_ready_to_sync && + (job.last_synced_at.map_or(true, |lsa| lsa < twenty_four_hours_ago) && // Explicitly check age here too + job.status != "in_progress" && job.status != "error") { + info!(job_id = %job.id, old_status = %job.status, "Marking stale/eligible job as pending for re-sync."); + match diesel::update(stored_values_sync_jobs::table.find(job.id)) + .set(stored_values_sync_jobs::status.eq("pending".to_string())) + // Do not update last_synced_at here; sync_distinct_values_chunk handles it. + .execute(&mut conn) + .await + { + Ok(num_updated) => { + if num_updated > 0 { + info!(job_id = %job.id, "Successfully marked stale/eligible job as pending."); + job_is_ready_to_sync = true; + } else { + warn!(job_id = %job.id, "Failed to mark stale/eligible job as pending (0 rows updated). It might have been deleted or changed concurrently."); + } } Err(e) => { - error!(%job_id_for_task, "Background sync task failed: {}", e); - // Note: sync_distinct_values_chunk already updates the job status to 'error' + error!(job_id = %job.id, "DB error while marking stale/eligible job as pending: {}. Skipping this job.", e); } } - }); + } + + // Now, if the job is 'pending' (either originally or just updated), spawn the sync task. + if job_is_ready_to_sync { + info!(job_id = %job.id, status = "pending", "Spawning sync task for job: {}", job.column_name); + + // Clone the necessary data for the async task + let ds_id = job.data_source_id; + let db_name = job.database_name.clone(); + let sch_name = job.schema_name.clone(); + let tbl_name = job.table_name.clone(); + let col_name = job.column_name.clone(); + let job_id_for_task = job.id; // Capture job_id for logging inside task + + tokio::spawn(async move { + info!(%job_id_for_task, "Background sync task started."); + match sync_distinct_values_chunk(ds_id, db_name, sch_name, tbl_name, col_name).await { + Ok(inserted_count) => { + info!(%job_id_for_task, %inserted_count, "Background sync task completed successfully."); + } + Err(e) => { + error!(%job_id_for_task, "Background sync task failed: {}", e); + // Note: sync_distinct_values_chunk already updates the job status to 'error' + } + } + }); + } } - info!("Finished spawning {} sync tasks.", count); + info!("Finished considering {} jobs for sync tasks.", count); Ok(()) } diff --git a/api/server/src/main.rs b/api/server/src/main.rs index 9f380d190..31f7f8e68 100644 --- a/api/server/src/main.rs +++ b/api/server/src/main.rs @@ -69,7 +69,7 @@ async fn main() -> Result<(), anyhow::Error> { info!("Starting stored values sync job scheduler..."); // Schedule to run every hour - let job = Job::new_async("0 0 * * * *", move |uuid, mut l| { + let job = Job::new_async("0 */5 * * * *", move |uuid, mut l| { Box::pin(async move { info!(job_uuid = %uuid, "Running hourly stored values sync job check."); if let Err(e) = trigger_stale_sync_jobs().await { From 6c8ae281bb18accdc8457eee6f93c57c58b01cb5 Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 9 May 2025 16:06:45 -0600 Subject: [PATCH 2/6] update brew workflow --- .github/workflows/update-brew-tap.yml | 130 ++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 .github/workflows/update-brew-tap.yml diff --git a/.github/workflows/update-brew-tap.yml b/.github/workflows/update-brew-tap.yml new file mode 100644 index 000000000..424f7e6e8 --- /dev/null +++ b/.github/workflows/update-brew-tap.yml @@ -0,0 +1,130 @@ +name: Release Homebrew Tap for Buster CLI + +on: + workflow_call: + inputs: + tag: + required: true + type: string + secrets: + homebrew_tap_rw: # Secret for pushing to buster-so/buster-homebrew + required: true + +permissions: + contents: write # Allows the workflow to commit to the tap repository if using GITHUB_TOKEN, though homebrew_tap_rw is primary for cross-repo + +jobs: + release: + runs-on: blacksmith + steps: + - name: Checkout Homebrew tap repository + uses: actions/checkout@v4 + with: + repository: buster-so/buster-homebrew # Target tap repository + ref: 'main' # Or your default branch for the tap + token: ${{ secrets.homebrew_tap_rw }} + + - name: Download macOS ARM64 package + uses: robinraju/release-downloader@v1.12 + with: + repository: "buster-so/buster" # Source of the CLI releases + tag: ${{ inputs.tag }} # e.g., v0.1.0 + fileName: "buster-cli-darwin-arm64.tar.gz" # Exact asset name + + - name: Download macOS Intel package + uses: robinraju/release-downloader@v1.12 + with: + repository: "buster-so/buster" + tag: ${{ inputs.tag }} + fileName: "buster-cli-darwin-x86_64.tar.gz" + + - name: Download Linux Intel package + uses: robinraju/release-downloader@v1.12 + with: + repository: "buster-so/buster" + tag: ${{ inputs.tag }} + fileName: "buster-cli-linux-x86_64.tar.gz" + + - name: Generate and Push Formula File + run: | + set -e # Exit immediately if a command exits with a non-zero status. + + echo "Calculating SHA256 checksums..." + darwin_arm64_hash=$(shasum -a 256 buster-cli-darwin-arm64.tar.gz | awk '{ print $1 }') + darwin_x86_64_hash=$(shasum -a 256 buster-cli-darwin-x86_64.tar.gz | awk '{ print $1 }') + linux_x86_64_hash=$(shasum -a 256 buster-cli-linux-x86_64.tar.gz | awk '{ print $1 }') + + echo "Darwin ARM64 SHA: ${darwin_arm64_hash}" + echo "Darwin x86_64 SHA: ${darwin_x86_64_hash}" + echo "Linux x86_64 SHA: ${linux_x86_64_hash}" + + input_tag="${{ inputs.tag }}" + # Strip the leading 'v' if present, to get the version number like "0.1.0" + version="${input_tag#v}" + echo "Input tag: ${input_tag}, Version: ${version}" + + formula_file="buster.rb" # Name of the formula file in the tap repository + + echo "Generating ${formula_file}..." + echo '# typed: false' > "${formula_file}" + echo '# frozen_string_literal: true' >> "${formula_file}" + echo '' >> "${formula_file}" + echo 'class Buster < Formula' >> "${formula_file}" + echo ' desc "Command-line interface for using buster Buster"' >> "${formula_file}" + echo ' homepage "https://github.com/buster-so/buster"' >> "${formula_file}" + echo " version \"${version}\"" >> "${formula_file}" + echo ' license "MIT"' >> "${formula_file}" + echo '' >> "${formula_file}" + echo ' livecheck do' >> "${formula_file}" + echo ' url :stable' >> "${formula_file}" + echo ' strategy :github_latest' >> "${formula_file}" + echo ' regex(/^v?(\d+(?:\.\d+)*)$/i)' >> "${formula_file}" + echo ' end' >> "${formula_file}" + echo '' >> "${formula_file}" + echo ' on_macos do' >> "${formula_file}" + echo ' on_arm do' >> "${formula_file}" + echo " url \"https://github.com/buster-so/buster/releases/download/${input_tag}/buster-cli-darwin-arm64.tar.gz\"" >> "${formula_file}" + echo " sha256 \"${darwin_arm64_hash}\"" >> "${formula_file}" + echo ' end' >> "${formula_file}" + echo ' on_intel do' >> "${formula_file}" + echo " url \"https://github.com/buster-so/buster/releases/download/${input_tag}/buster-cli-darwin-x86_64.tar.gz\"" >> "${formula_file}" + echo " sha256 \"${darwin_x86_64_hash}\"" >> "${formula_file}" + echo ' end' >> "${formula_file}" + echo ' end' >> "${formula_file}" + echo '' >> "${formula_file}" + echo ' on_linux do' >> "${formula_file}" + echo " url \"https://github.com/buster-so/buster/releases/download/${input_tag}/buster-cli-linux-x86_64.tar.gz\"" >> "${formula_file}" + echo " sha256 \"${linux_x86_64_hash}\"" >> "${formula_file}" + echo ' end' >> "${formula_file}" + echo '' >> "${formula_file}" + echo ' def install' >> "${formula_file}" + echo ' bin.install "buster-cli" => "buster"' >> "${formula_file}" + echo ' end' >> "${formula_file}" + echo '' >> "${formula_file}" + echo ' test do' >> "${formula_file}" + echo ' assert_match "buster", shell_output("#{bin}/buster --help")' >> "${formula_file}" + echo ' end' >> "${formula_file}" + echo 'end' >> "${formula_file}" + # Ensure a final newline, some linters prefer it + echo '' >> "${formula_file}" + + echo "--- Generated ${formula_file} ---" + cat "${formula_file}" + echo "---------------------------" + + echo "Configuring git..." + git config user.name "Buster CLI Release Workflow" + git config user.email "actions@github.com" + + echo "Adding and committing ${formula_file}..." + git add "${formula_file}" + + # Check if there are changes to commit + if git diff --staged --quiet; then + echo "No changes to commit to ${formula_file}." + else + git commit -m "Update ${formula_file} for version ${input_tag}" + echo "Pushing changes..." + git push + echo "Pushed updated ${formula_file} for version ${input_tag} to buster-so/buster-homebrew" + fi From c30b6b893bcf3f4a25e3859978ebe6a10d8185a6 Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 9 May 2025 16:18:41 -0600 Subject: [PATCH 3/6] workflow dispatch on brew --- .github/workflows/update-brew-tap.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/update-brew-tap.yml b/.github/workflows/update-brew-tap.yml index 424f7e6e8..44ed67a5a 100644 --- a/.github/workflows/update-brew-tap.yml +++ b/.github/workflows/update-brew-tap.yml @@ -9,6 +9,12 @@ on: secrets: homebrew_tap_rw: # Secret for pushing to buster-so/buster-homebrew required: true + workflow_dispatch: + inputs: + tag: + description: 'The release tag to use (e.g., v0.1.0)' + required: true + type: string permissions: contents: write # Allows the workflow to commit to the tap repository if using GITHUB_TOKEN, though homebrew_tap_rw is primary for cross-repo From d16f3296b24958439faf802afb6e1f83d1babc3f Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 9 May 2025 16:23:58 -0600 Subject: [PATCH 4/6] stored values casing for jobs --- api/libs/handlers/src/datasets/deploy.rs | 16 ++++++++-------- api/server/src/main.rs | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/api/libs/handlers/src/datasets/deploy.rs b/api/libs/handlers/src/datasets/deploy.rs index df109ca64..21bb5e6bb 100644 --- a/api/libs/handlers/src/datasets/deploy.rs +++ b/api/libs/handlers/src/datasets/deploy.rs @@ -310,10 +310,10 @@ pub async fn deploy_datasets_handler_core( if let Err(e) = stored_values_jobs::setup_sync_job( job_data_source_id, - current_job_database_name.clone(), - current_job_schema_name.clone(), - job_table_name.clone(), - job_column_name.clone(), + current_job_database_name.clone().to_lowercase(), + current_job_schema_name.clone().to_lowercase(), + job_table_name.clone().to_lowercase(), + job_column_name.clone().to_lowercase(), ) .await { @@ -326,10 +326,10 @@ pub async fn deploy_datasets_handler_core( match stored_values_jobs::sync_distinct_values_chunk( job_data_source_id, - current_job_database_name, - current_job_schema_name, - job_table_name, - job_column_name, + current_job_database_name.clone().to_lowercase(), + current_job_schema_name.clone().to_lowercase(), + job_table_name.clone().to_lowercase(), + job_column_name.clone().to_lowercase(), ).await { Ok(count) => info!( "Successfully synced {} distinct values for searchable dimension '{}' (data_source_id: {}).", diff --git a/api/server/src/main.rs b/api/server/src/main.rs index 31f7f8e68..550fae76c 100644 --- a/api/server/src/main.rs +++ b/api/server/src/main.rs @@ -69,7 +69,7 @@ async fn main() -> Result<(), anyhow::Error> { info!("Starting stored values sync job scheduler..."); // Schedule to run every hour - let job = Job::new_async("0 */5 * * * *", move |uuid, mut l| { + let job = Job::new_async("*/5 * * * * *", move |uuid, mut l| { Box::pin(async move { info!(job_uuid = %uuid, "Running hourly stored values sync job check."); if let Err(e) = trigger_stale_sync_jobs().await { From 2c4f66da6ee06abd9c68e1836bc5b29538361080 Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 9 May 2025 17:01:24 -0600 Subject: [PATCH 5/6] hotfix: stored values search, case insensitive --- .../tools/categories/file_tools/search_data_catalog.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs b/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs index 5f35b4c88..fc050352e 100644 --- a/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs +++ b/api/libs/agents/src/tools/categories/file_tools/search_data_catalog.rs @@ -1390,10 +1390,11 @@ async fn inject_prefound_values_into_yml( .iter() .filter(|found_val| { // Match based on db, schema, table (model name), and column (dimension name) - found_val.database_name == *model_database_name - && found_val.schema_name == *model_schema_name - && found_val.table_name == model_name - && found_val.column_name == dim_name + // Case-insensitive comparison + found_val.database_name.to_lowercase() == model_database_name.to_lowercase() + && found_val.schema_name.to_lowercase() == model_schema_name.to_lowercase() + && found_val.table_name.to_lowercase() == model_name.to_lowercase() + && found_val.column_name.to_lowercase() == dim_name.to_lowercase() }) .map(|found_val| found_val.value.clone()) .collect::>() // Deduplicate From 6257cd7a51f1d904cf5b4b6483516e5cbfe651b6 Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 9 May 2025 17:04:26 -0600 Subject: [PATCH 6/6] wrong file for buster.rb --- .github/workflows/update-brew-tap.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/update-brew-tap.yml b/.github/workflows/update-brew-tap.yml index 44ed67a5a..89358d8b5 100644 --- a/.github/workflows/update-brew-tap.yml +++ b/.github/workflows/update-brew-tap.yml @@ -69,7 +69,7 @@ jobs: version="${input_tag#v}" echo "Input tag: ${input_tag}, Version: ${version}" - formula_file="buster.rb" # Name of the formula file in the tap repository + formula_file="Formula/buster.rb" # Name of the formula file in the tap repository echo "Generating ${formula_file}..." echo '# typed: false' > "${formula_file}"