From f01755d41c3414af92d80fd15700d433c3d74005 Mon Sep 17 00:00:00 2001 From: dal Date: Fri, 9 May 2025 15:54:36 -0600 Subject: [PATCH] improvement: sync jobs --- api/libs/stored_values/src/jobs.rs | 117 +++++++++++++++++++++-------- api/server/src/main.rs | 2 +- 2 files changed, 86 insertions(+), 33 deletions(-) diff --git a/api/libs/stored_values/src/jobs.rs b/api/libs/stored_values/src/jobs.rs index ead00ba94..66622e8c0 100644 --- a/api/libs/stored_values/src/jobs.rs +++ b/api/libs/stored_values/src/jobs.rs @@ -113,7 +113,8 @@ pub async fn sync_distinct_values_chunk( .get() .await .context("Failed to get DB connection for finding job ID")?; - stored_values_sync_jobs::table + + let find_job_result = stored_values_sync_jobs::table .filter(stored_values_sync_jobs::data_source_id.eq(data_source_id)) .filter(stored_values_sync_jobs::database_name.eq(&database_name.to_lowercase())) .filter(stored_values_sync_jobs::schema_name.eq(&schema_name.to_lowercase())) @@ -123,17 +124,28 @@ pub async fn sync_distinct_values_chunk( .select(stored_values_sync_jobs::id) .order(stored_values_sync_jobs::created_at.desc()) // Get the most recent pending one .get_result::(&mut conn) - .await - .with_context(|| { - format!( - "No pending sync job found for data_source_id={}, database={}, schema={}, table={}, column={}", - data_source_id, - database_name, - schema_name, - table_name, - column_name - ) - })? + .await; + + match find_job_result { + Ok(id) => id, + Err(diesel::NotFound) => { + info!( + "No pending sync job found for data_source_id={}, database={}, schema={}, table={}, column={}. Nothing to sync.", + data_source_id, database_name, schema_name, table_name, column_name + ); + return Ok(0); // No job found, so 0 items processed. + } + Err(e) => { + error!( + "Failed to get pending sync job for data_source_id={}, database={}, schema={}, table={}, column={}: {}", + data_source_id, database_name, schema_name, table_name, column_name, e + ); + return Err(anyhow::Error::new(e).context(format!( + "Failed to retrieve sync job details for data_source_id={}, database={}, schema={}, table={}, column={}", + data_source_id, database_name, schema_name, table_name, column_name + ))); + } + } }; info!( @@ -453,46 +465,87 @@ pub async fn trigger_stale_sync_jobs() -> Result<()> { let twenty_four_hours_ago = Utc::now() - chrono::Duration::hours(24); - // Find jobs that were last synced > 24h ago and are not 'in_progress' + // Find jobs that were last synced > 24h ago and are not 'in_progress' or 'error' + // Also include jobs that are 'pending' and have never been synced (last_synced_at is NULL), + // or are 'pending' and their creation time implies they might have been missed. + // For simplicity with Diesel, we can query broadly and then filter in code, or make a more complex query. + // The current query correctly gets jobs that have a last_synced_at > 24h ago and are not in_progress/error. + // To include 'pending' jobs regardless of last_synced_at (as they should be picked up), + // the logic inside the loop will handle ensuring they are 'pending'. let jobs_to_sync = stored_values_sync_jobs::table .filter( stored_values_sync_jobs::last_synced_at .lt(twenty_four_hours_ago) - .and(stored_values_sync_jobs::status.ne("in_progress")), // Avoid re-triggering already running jobs + .or(stored_values_sync_jobs::last_synced_at.is_null()) // Also consider jobs never synced + .and(stored_values_sync_jobs::status.ne("in_progress")) + .and(stored_values_sync_jobs::status.ne("error")), ) .load::(&mut conn) .await .context("Failed to load sync jobs from database")?; let count = jobs_to_sync.len(); - info!("Found {} jobs to potentially sync.", count); + info!("Found {} jobs to potentially sync based on age or pending status.", count); for job in jobs_to_sync { - info!(job_id = %job.id, status = %job.status, last_synced_at = ?job.last_synced_at, "Spawning sync task for job"); + info!(job_id = %job.id, status = %job.status, last_synced_at = ?job.last_synced_at, "Considering job: {}", job.column_name); - // Clone the necessary data for the async task - let ds_id = job.data_source_id; - let db_name = job.database_name.clone(); - let sch_name = job.schema_name.clone(); - let tbl_name = job.table_name.clone(); - let col_name = job.column_name.clone(); - let job_id_for_task = job.id; // Capture job_id for logging inside task + let mut job_is_ready_to_sync = job.status == "pending"; - tokio::spawn(async move { - info!(%job_id_for_task, "Background sync task started."); - match sync_distinct_values_chunk(ds_id, db_name, sch_name, tbl_name, col_name).await { - Ok(inserted_count) => { - info!(%job_id_for_task, %inserted_count, "Background sync task completed successfully."); + // If the job is eligible (old or never synced, and not in_progress/error) + // and not already pending, mark it as 'pending'. + if !job_is_ready_to_sync && + (job.last_synced_at.map_or(true, |lsa| lsa < twenty_four_hours_ago) && // Explicitly check age here too + job.status != "in_progress" && job.status != "error") { + info!(job_id = %job.id, old_status = %job.status, "Marking stale/eligible job as pending for re-sync."); + match diesel::update(stored_values_sync_jobs::table.find(job.id)) + .set(stored_values_sync_jobs::status.eq("pending".to_string())) + // Do not update last_synced_at here; sync_distinct_values_chunk handles it. + .execute(&mut conn) + .await + { + Ok(num_updated) => { + if num_updated > 0 { + info!(job_id = %job.id, "Successfully marked stale/eligible job as pending."); + job_is_ready_to_sync = true; + } else { + warn!(job_id = %job.id, "Failed to mark stale/eligible job as pending (0 rows updated). It might have been deleted or changed concurrently."); + } } Err(e) => { - error!(%job_id_for_task, "Background sync task failed: {}", e); - // Note: sync_distinct_values_chunk already updates the job status to 'error' + error!(job_id = %job.id, "DB error while marking stale/eligible job as pending: {}. Skipping this job.", e); } } - }); + } + + // Now, if the job is 'pending' (either originally or just updated), spawn the sync task. + if job_is_ready_to_sync { + info!(job_id = %job.id, status = "pending", "Spawning sync task for job: {}", job.column_name); + + // Clone the necessary data for the async task + let ds_id = job.data_source_id; + let db_name = job.database_name.clone(); + let sch_name = job.schema_name.clone(); + let tbl_name = job.table_name.clone(); + let col_name = job.column_name.clone(); + let job_id_for_task = job.id; // Capture job_id for logging inside task + + tokio::spawn(async move { + info!(%job_id_for_task, "Background sync task started."); + match sync_distinct_values_chunk(ds_id, db_name, sch_name, tbl_name, col_name).await { + Ok(inserted_count) => { + info!(%job_id_for_task, %inserted_count, "Background sync task completed successfully."); + } + Err(e) => { + error!(%job_id_for_task, "Background sync task failed: {}", e); + // Note: sync_distinct_values_chunk already updates the job status to 'error' + } + } + }); + } } - info!("Finished spawning {} sync tasks.", count); + info!("Finished considering {} jobs for sync tasks.", count); Ok(()) } diff --git a/api/server/src/main.rs b/api/server/src/main.rs index 9f380d190..31f7f8e68 100644 --- a/api/server/src/main.rs +++ b/api/server/src/main.rs @@ -69,7 +69,7 @@ async fn main() -> Result<(), anyhow::Error> { info!("Starting stored values sync job scheduler..."); // Schedule to run every hour - let job = Job::new_async("0 0 * * * *", move |uuid, mut l| { + let job = Job::new_async("0 */5 * * * *", move |uuid, mut l| { Box::pin(async move { info!(job_uuid = %uuid, "Running hourly stored values sync job check."); if let Err(e) = trigger_stale_sync_jobs().await {