improvement: sync jobs

This commit is contained in:
dal 2025-05-09 15:54:36 -06:00
parent 6a60055160
commit f01755d41c
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
2 changed files with 86 additions and 33 deletions

View File

@ -113,7 +113,8 @@ pub async fn sync_distinct_values_chunk(
.get()
.await
.context("Failed to get DB connection for finding job ID")?;
stored_values_sync_jobs::table
let find_job_result = stored_values_sync_jobs::table
.filter(stored_values_sync_jobs::data_source_id.eq(data_source_id))
.filter(stored_values_sync_jobs::database_name.eq(&database_name.to_lowercase()))
.filter(stored_values_sync_jobs::schema_name.eq(&schema_name.to_lowercase()))
@ -123,17 +124,28 @@ pub async fn sync_distinct_values_chunk(
.select(stored_values_sync_jobs::id)
.order(stored_values_sync_jobs::created_at.desc()) // Get the most recent pending one
.get_result::<Uuid>(&mut conn)
.await
.with_context(|| {
format!(
"No pending sync job found for data_source_id={}, database={}, schema={}, table={}, column={}",
data_source_id,
database_name,
schema_name,
table_name,
column_name
)
})?
.await;
match find_job_result {
Ok(id) => id,
Err(diesel::NotFound) => {
info!(
"No pending sync job found for data_source_id={}, database={}, schema={}, table={}, column={}. Nothing to sync.",
data_source_id, database_name, schema_name, table_name, column_name
);
return Ok(0); // No job found, so 0 items processed.
}
Err(e) => {
error!(
"Failed to get pending sync job for data_source_id={}, database={}, schema={}, table={}, column={}: {}",
data_source_id, database_name, schema_name, table_name, column_name, e
);
return Err(anyhow::Error::new(e).context(format!(
"Failed to retrieve sync job details for data_source_id={}, database={}, schema={}, table={}, column={}",
data_source_id, database_name, schema_name, table_name, column_name
)));
}
}
};
info!(
@ -453,46 +465,87 @@ pub async fn trigger_stale_sync_jobs() -> Result<()> {
let twenty_four_hours_ago = Utc::now() - chrono::Duration::hours(24);
// Find jobs that were last synced > 24h ago and are not 'in_progress'
// Find jobs that were last synced > 24h ago and are not 'in_progress' or 'error'
// Also include jobs that are 'pending' and have never been synced (last_synced_at is NULL),
// or are 'pending' and their creation time implies they might have been missed.
// For simplicity with Diesel, we can query broadly and then filter in code, or make a more complex query.
// The current query correctly gets jobs that have a last_synced_at > 24h ago and are not in_progress/error.
// To include 'pending' jobs regardless of last_synced_at (as they should be picked up),
// the logic inside the loop will handle ensuring they are 'pending'.
let jobs_to_sync = stored_values_sync_jobs::table
.filter(
stored_values_sync_jobs::last_synced_at
.lt(twenty_four_hours_ago)
.and(stored_values_sync_jobs::status.ne("in_progress")), // Avoid re-triggering already running jobs
.or(stored_values_sync_jobs::last_synced_at.is_null()) // Also consider jobs never synced
.and(stored_values_sync_jobs::status.ne("in_progress"))
.and(stored_values_sync_jobs::status.ne("error")),
)
.load::<StoredValuesSyncJob>(&mut conn)
.await
.context("Failed to load sync jobs from database")?;
let count = jobs_to_sync.len();
info!("Found {} jobs to potentially sync.", count);
info!("Found {} jobs to potentially sync based on age or pending status.", count);
for job in jobs_to_sync {
info!(job_id = %job.id, status = %job.status, last_synced_at = ?job.last_synced_at, "Spawning sync task for job");
info!(job_id = %job.id, status = %job.status, last_synced_at = ?job.last_synced_at, "Considering job: {}", job.column_name);
// Clone the necessary data for the async task
let ds_id = job.data_source_id;
let db_name = job.database_name.clone();
let sch_name = job.schema_name.clone();
let tbl_name = job.table_name.clone();
let col_name = job.column_name.clone();
let job_id_for_task = job.id; // Capture job_id for logging inside task
let mut job_is_ready_to_sync = job.status == "pending";
tokio::spawn(async move {
info!(%job_id_for_task, "Background sync task started.");
match sync_distinct_values_chunk(ds_id, db_name, sch_name, tbl_name, col_name).await {
Ok(inserted_count) => {
info!(%job_id_for_task, %inserted_count, "Background sync task completed successfully.");
// If the job is eligible (old or never synced, and not in_progress/error)
// and not already pending, mark it as 'pending'.
if !job_is_ready_to_sync &&
(job.last_synced_at.map_or(true, |lsa| lsa < twenty_four_hours_ago) && // Explicitly check age here too
job.status != "in_progress" && job.status != "error") {
info!(job_id = %job.id, old_status = %job.status, "Marking stale/eligible job as pending for re-sync.");
match diesel::update(stored_values_sync_jobs::table.find(job.id))
.set(stored_values_sync_jobs::status.eq("pending".to_string()))
// Do not update last_synced_at here; sync_distinct_values_chunk handles it.
.execute(&mut conn)
.await
{
Ok(num_updated) => {
if num_updated > 0 {
info!(job_id = %job.id, "Successfully marked stale/eligible job as pending.");
job_is_ready_to_sync = true;
} else {
warn!(job_id = %job.id, "Failed to mark stale/eligible job as pending (0 rows updated). It might have been deleted or changed concurrently.");
}
}
Err(e) => {
error!(%job_id_for_task, "Background sync task failed: {}", e);
// Note: sync_distinct_values_chunk already updates the job status to 'error'
error!(job_id = %job.id, "DB error while marking stale/eligible job as pending: {}. Skipping this job.", e);
}
}
});
}
// Now, if the job is 'pending' (either originally or just updated), spawn the sync task.
if job_is_ready_to_sync {
info!(job_id = %job.id, status = "pending", "Spawning sync task for job: {}", job.column_name);
// Clone the necessary data for the async task
let ds_id = job.data_source_id;
let db_name = job.database_name.clone();
let sch_name = job.schema_name.clone();
let tbl_name = job.table_name.clone();
let col_name = job.column_name.clone();
let job_id_for_task = job.id; // Capture job_id for logging inside task
tokio::spawn(async move {
info!(%job_id_for_task, "Background sync task started.");
match sync_distinct_values_chunk(ds_id, db_name, sch_name, tbl_name, col_name).await {
Ok(inserted_count) => {
info!(%job_id_for_task, %inserted_count, "Background sync task completed successfully.");
}
Err(e) => {
error!(%job_id_for_task, "Background sync task failed: {}", e);
// Note: sync_distinct_values_chunk already updates the job status to 'error'
}
}
});
}
}
info!("Finished spawning {} sync tasks.", count);
info!("Finished considering {} jobs for sync tasks.", count);
Ok(())
}

View File

@ -69,7 +69,7 @@ async fn main() -> Result<(), anyhow::Error> {
info!("Starting stored values sync job scheduler...");
// Schedule to run every hour
let job = Job::new_async("0 0 * * * *", move |uuid, mut l| {
let job = Job::new_async("0 */5 * * * *", move |uuid, mut l| {
Box::pin(async move {
info!(job_uuid = %uuid, "Running hourly stored values sync job check.");
if let Err(e) = trigger_stale_sync_jobs().await {