Merge branch 'main' into staging

This commit is contained in:
dal 2025-05-09 17:18:00 -06:00
commit 0ff76e4890
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
5 changed files with 235 additions and 45 deletions

136
.github/workflows/update-brew-tap.yml vendored Normal file
View File

@ -0,0 +1,136 @@
name: Release Homebrew Tap for Buster CLI
on:
workflow_call:
inputs:
tag:
required: true
type: string
secrets:
homebrew_tap_rw: # Secret for pushing to buster-so/buster-homebrew
required: true
workflow_dispatch:
inputs:
tag:
description: 'The release tag to use (e.g., v0.1.0)'
required: true
type: string
permissions:
contents: write # Allows the workflow to commit to the tap repository if using GITHUB_TOKEN, though homebrew_tap_rw is primary for cross-repo
jobs:
release:
runs-on: blacksmith
steps:
- name: Checkout Homebrew tap repository
uses: actions/checkout@v4
with:
repository: buster-so/buster-homebrew # Target tap repository
ref: 'main' # Or your default branch for the tap
token: ${{ secrets.homebrew_tap_rw }}
- name: Download macOS ARM64 package
uses: robinraju/release-downloader@v1.12
with:
repository: "buster-so/buster" # Source of the CLI releases
tag: ${{ inputs.tag }} # e.g., v0.1.0
fileName: "buster-cli-darwin-arm64.tar.gz" # Exact asset name
- name: Download macOS Intel package
uses: robinraju/release-downloader@v1.12
with:
repository: "buster-so/buster"
tag: ${{ inputs.tag }}
fileName: "buster-cli-darwin-x86_64.tar.gz"
- name: Download Linux Intel package
uses: robinraju/release-downloader@v1.12
with:
repository: "buster-so/buster"
tag: ${{ inputs.tag }}
fileName: "buster-cli-linux-x86_64.tar.gz"
- name: Generate and Push Formula File
run: |
set -e # Exit immediately if a command exits with a non-zero status.
echo "Calculating SHA256 checksums..."
darwin_arm64_hash=$(shasum -a 256 buster-cli-darwin-arm64.tar.gz | awk '{ print $1 }')
darwin_x86_64_hash=$(shasum -a 256 buster-cli-darwin-x86_64.tar.gz | awk '{ print $1 }')
linux_x86_64_hash=$(shasum -a 256 buster-cli-linux-x86_64.tar.gz | awk '{ print $1 }')
echo "Darwin ARM64 SHA: ${darwin_arm64_hash}"
echo "Darwin x86_64 SHA: ${darwin_x86_64_hash}"
echo "Linux x86_64 SHA: ${linux_x86_64_hash}"
input_tag="${{ inputs.tag }}"
# Strip the leading 'v' if present, to get the version number like "0.1.0"
version="${input_tag#v}"
echo "Input tag: ${input_tag}, Version: ${version}"
formula_file="Formula/buster.rb" # Name of the formula file in the tap repository
echo "Generating ${formula_file}..."
echo '# typed: false' > "${formula_file}"
echo '# frozen_string_literal: true' >> "${formula_file}"
echo '' >> "${formula_file}"
echo 'class Buster < Formula' >> "${formula_file}"
echo ' desc "Command-line interface for using buster Buster"' >> "${formula_file}"
echo ' homepage "https://github.com/buster-so/buster"' >> "${formula_file}"
echo " version \"${version}\"" >> "${formula_file}"
echo ' license "MIT"' >> "${formula_file}"
echo '' >> "${formula_file}"
echo ' livecheck do' >> "${formula_file}"
echo ' url :stable' >> "${formula_file}"
echo ' strategy :github_latest' >> "${formula_file}"
echo ' regex(/^v?(\d+(?:\.\d+)*)$/i)' >> "${formula_file}"
echo ' end' >> "${formula_file}"
echo '' >> "${formula_file}"
echo ' on_macos do' >> "${formula_file}"
echo ' on_arm do' >> "${formula_file}"
echo " url \"https://github.com/buster-so/buster/releases/download/${input_tag}/buster-cli-darwin-arm64.tar.gz\"" >> "${formula_file}"
echo " sha256 \"${darwin_arm64_hash}\"" >> "${formula_file}"
echo ' end' >> "${formula_file}"
echo ' on_intel do' >> "${formula_file}"
echo " url \"https://github.com/buster-so/buster/releases/download/${input_tag}/buster-cli-darwin-x86_64.tar.gz\"" >> "${formula_file}"
echo " sha256 \"${darwin_x86_64_hash}\"" >> "${formula_file}"
echo ' end' >> "${formula_file}"
echo ' end' >> "${formula_file}"
echo '' >> "${formula_file}"
echo ' on_linux do' >> "${formula_file}"
echo " url \"https://github.com/buster-so/buster/releases/download/${input_tag}/buster-cli-linux-x86_64.tar.gz\"" >> "${formula_file}"
echo " sha256 \"${linux_x86_64_hash}\"" >> "${formula_file}"
echo ' end' >> "${formula_file}"
echo '' >> "${formula_file}"
echo ' def install' >> "${formula_file}"
echo ' bin.install "buster-cli" => "buster"' >> "${formula_file}"
echo ' end' >> "${formula_file}"
echo '' >> "${formula_file}"
echo ' test do' >> "${formula_file}"
echo ' assert_match "buster", shell_output("#{bin}/buster --help")' >> "${formula_file}"
echo ' end' >> "${formula_file}"
echo 'end' >> "${formula_file}"
# Ensure a final newline, some linters prefer it
echo '' >> "${formula_file}"
echo "--- Generated ${formula_file} ---"
cat "${formula_file}"
echo "---------------------------"
echo "Configuring git..."
git config user.name "Buster CLI Release Workflow"
git config user.email "actions@github.com"
echo "Adding and committing ${formula_file}..."
git add "${formula_file}"
# Check if there are changes to commit
if git diff --staged --quiet; then
echo "No changes to commit to ${formula_file}."
else
git commit -m "Update ${formula_file} for version ${input_tag}"
echo "Pushing changes..."
git push
echo "Pushed updated ${formula_file} for version ${input_tag} to buster-so/buster-homebrew"
fi

View File

@ -1390,10 +1390,11 @@ async fn inject_prefound_values_into_yml(
.iter()
.filter(|found_val| {
// Match based on db, schema, table (model name), and column (dimension name)
found_val.database_name == *model_database_name
&& found_val.schema_name == *model_schema_name
&& found_val.table_name == model_name
&& found_val.column_name == dim_name
// Case-insensitive comparison
found_val.database_name.to_lowercase() == model_database_name.to_lowercase()
&& found_val.schema_name.to_lowercase() == model_schema_name.to_lowercase()
&& found_val.table_name.to_lowercase() == model_name.to_lowercase()
&& found_val.column_name.to_lowercase() == dim_name.to_lowercase()
})
.map(|found_val| found_val.value.clone())
.collect::<std::collections::HashSet<_>>() // Deduplicate

View File

@ -310,10 +310,10 @@ pub async fn deploy_datasets_handler_core(
if let Err(e) = stored_values_jobs::setup_sync_job(
job_data_source_id,
current_job_database_name.clone(),
current_job_schema_name.clone(),
job_table_name.clone(),
job_column_name.clone(),
current_job_database_name.clone().to_lowercase(),
current_job_schema_name.clone().to_lowercase(),
job_table_name.clone().to_lowercase(),
job_column_name.clone().to_lowercase(),
)
.await
{
@ -326,10 +326,10 @@ pub async fn deploy_datasets_handler_core(
match stored_values_jobs::sync_distinct_values_chunk(
job_data_source_id,
current_job_database_name,
current_job_schema_name,
job_table_name,
job_column_name,
current_job_database_name.clone().to_lowercase(),
current_job_schema_name.clone().to_lowercase(),
job_table_name.clone().to_lowercase(),
job_column_name.clone().to_lowercase(),
).await {
Ok(count) => info!(
"Successfully synced {} distinct values for searchable dimension '{}' (data_source_id: {}).",

View File

@ -113,7 +113,8 @@ pub async fn sync_distinct_values_chunk(
.get()
.await
.context("Failed to get DB connection for finding job ID")?;
stored_values_sync_jobs::table
let find_job_result = stored_values_sync_jobs::table
.filter(stored_values_sync_jobs::data_source_id.eq(data_source_id))
.filter(stored_values_sync_jobs::database_name.eq(&database_name.to_lowercase()))
.filter(stored_values_sync_jobs::schema_name.eq(&schema_name.to_lowercase()))
@ -123,17 +124,28 @@ pub async fn sync_distinct_values_chunk(
.select(stored_values_sync_jobs::id)
.order(stored_values_sync_jobs::created_at.desc()) // Get the most recent pending one
.get_result::<Uuid>(&mut conn)
.await
.with_context(|| {
format!(
"No pending sync job found for data_source_id={}, database={}, schema={}, table={}, column={}",
data_source_id,
database_name,
schema_name,
table_name,
column_name
)
})?
.await;
match find_job_result {
Ok(id) => id,
Err(diesel::NotFound) => {
info!(
"No pending sync job found for data_source_id={}, database={}, schema={}, table={}, column={}. Nothing to sync.",
data_source_id, database_name, schema_name, table_name, column_name
);
return Ok(0); // No job found, so 0 items processed.
}
Err(e) => {
error!(
"Failed to get pending sync job for data_source_id={}, database={}, schema={}, table={}, column={}: {}",
data_source_id, database_name, schema_name, table_name, column_name, e
);
return Err(anyhow::Error::new(e).context(format!(
"Failed to retrieve sync job details for data_source_id={}, database={}, schema={}, table={}, column={}",
data_source_id, database_name, schema_name, table_name, column_name
)));
}
}
};
info!(
@ -453,46 +465,87 @@ pub async fn trigger_stale_sync_jobs() -> Result<()> {
let twenty_four_hours_ago = Utc::now() - chrono::Duration::hours(24);
// Find jobs that were last synced > 24h ago and are not 'in_progress'
// Find jobs that were last synced > 24h ago and are not 'in_progress' or 'error'
// Also include jobs that are 'pending' and have never been synced (last_synced_at is NULL),
// or are 'pending' and their creation time implies they might have been missed.
// For simplicity with Diesel, we can query broadly and then filter in code, or make a more complex query.
// The current query correctly gets jobs that have a last_synced_at > 24h ago and are not in_progress/error.
// To include 'pending' jobs regardless of last_synced_at (as they should be picked up),
// the logic inside the loop will handle ensuring they are 'pending'.
let jobs_to_sync = stored_values_sync_jobs::table
.filter(
stored_values_sync_jobs::last_synced_at
.lt(twenty_four_hours_ago)
.and(stored_values_sync_jobs::status.ne("in_progress")), // Avoid re-triggering already running jobs
.or(stored_values_sync_jobs::last_synced_at.is_null()) // Also consider jobs never synced
.and(stored_values_sync_jobs::status.ne("in_progress"))
.and(stored_values_sync_jobs::status.ne("error")),
)
.load::<StoredValuesSyncJob>(&mut conn)
.await
.context("Failed to load sync jobs from database")?;
let count = jobs_to_sync.len();
info!("Found {} jobs to potentially sync.", count);
info!("Found {} jobs to potentially sync based on age or pending status.", count);
for job in jobs_to_sync {
info!(job_id = %job.id, status = %job.status, last_synced_at = ?job.last_synced_at, "Spawning sync task for job");
info!(job_id = %job.id, status = %job.status, last_synced_at = ?job.last_synced_at, "Considering job: {}", job.column_name);
// Clone the necessary data for the async task
let ds_id = job.data_source_id;
let db_name = job.database_name.clone();
let sch_name = job.schema_name.clone();
let tbl_name = job.table_name.clone();
let col_name = job.column_name.clone();
let job_id_for_task = job.id; // Capture job_id for logging inside task
let mut job_is_ready_to_sync = job.status == "pending";
tokio::spawn(async move {
info!(%job_id_for_task, "Background sync task started.");
match sync_distinct_values_chunk(ds_id, db_name, sch_name, tbl_name, col_name).await {
Ok(inserted_count) => {
info!(%job_id_for_task, %inserted_count, "Background sync task completed successfully.");
// If the job is eligible (old or never synced, and not in_progress/error)
// and not already pending, mark it as 'pending'.
if !job_is_ready_to_sync &&
(job.last_synced_at.map_or(true, |lsa| lsa < twenty_four_hours_ago) && // Explicitly check age here too
job.status != "in_progress" && job.status != "error") {
info!(job_id = %job.id, old_status = %job.status, "Marking stale/eligible job as pending for re-sync.");
match diesel::update(stored_values_sync_jobs::table.find(job.id))
.set(stored_values_sync_jobs::status.eq("pending".to_string()))
// Do not update last_synced_at here; sync_distinct_values_chunk handles it.
.execute(&mut conn)
.await
{
Ok(num_updated) => {
if num_updated > 0 {
info!(job_id = %job.id, "Successfully marked stale/eligible job as pending.");
job_is_ready_to_sync = true;
} else {
warn!(job_id = %job.id, "Failed to mark stale/eligible job as pending (0 rows updated). It might have been deleted or changed concurrently.");
}
}
Err(e) => {
error!(%job_id_for_task, "Background sync task failed: {}", e);
// Note: sync_distinct_values_chunk already updates the job status to 'error'
error!(job_id = %job.id, "DB error while marking stale/eligible job as pending: {}. Skipping this job.", e);
}
}
});
}
// Now, if the job is 'pending' (either originally or just updated), spawn the sync task.
if job_is_ready_to_sync {
info!(job_id = %job.id, status = "pending", "Spawning sync task for job: {}", job.column_name);
// Clone the necessary data for the async task
let ds_id = job.data_source_id;
let db_name = job.database_name.clone();
let sch_name = job.schema_name.clone();
let tbl_name = job.table_name.clone();
let col_name = job.column_name.clone();
let job_id_for_task = job.id; // Capture job_id for logging inside task
tokio::spawn(async move {
info!(%job_id_for_task, "Background sync task started.");
match sync_distinct_values_chunk(ds_id, db_name, sch_name, tbl_name, col_name).await {
Ok(inserted_count) => {
info!(%job_id_for_task, %inserted_count, "Background sync task completed successfully.");
}
Err(e) => {
error!(%job_id_for_task, "Background sync task failed: {}", e);
// Note: sync_distinct_values_chunk already updates the job status to 'error'
}
}
});
}
}
info!("Finished spawning {} sync tasks.", count);
info!("Finished considering {} jobs for sync tasks.", count);
Ok(())
}

View File

@ -69,7 +69,7 @@ async fn main() -> Result<(), anyhow::Error> {
info!("Starting stored values sync job scheduler...");
// Schedule to run every hour
let job = Job::new_async("0 0 * * * *", move |uuid, mut l| {
let job = Job::new_async("*/5 * * * * *", move |uuid, mut l| {
Box::pin(async move {
info!(job_uuid = %uuid, "Running hourly stored values sync job check.");
if let Err(e) = trigger_stale_sync_jobs().await {