stored valus sync as it goes

This commit is contained in:
dal 2025-04-24 12:16:49 -06:00
parent c1f35d6144
commit 2e84d64e6e
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 52 additions and 35 deletions

View File

@ -170,6 +170,7 @@ pub async fn sync_distinct_values_chunk(
let mut total_inserted_count: usize = 0; let mut total_inserted_count: usize = 0;
loop { loop {
// 1. Fetch Chunk
let distinct_sql = format!( let distinct_sql = format!(
"SELECT DISTINCT {q}{col}{q} FROM {q}{db}{q}.{q}{schema}{q}.{q}{table}{q} ORDER BY 1 NULLS LAST LIMIT {limit} OFFSET {offset}", "SELECT DISTINCT {q}{col}{q} FROM {q}{db}{q}.{q}{schema}{q}.{q}{table}{q} ORDER BY 1 NULLS LAST LIMIT {limit} OFFSET {offset}",
q = quote, q = quote,
@ -187,7 +188,10 @@ pub async fn sync_distinct_values_chunk(
.with_context(|| { .with_context(|| {
format!( format!(
"query_engine failed for distinct query chunk on {}.{}.{} at offset {}", "query_engine failed for distinct query chunk on {}.{}.{} at offset {}",
schema_name, table_name, column_name, offset schema_name,
table_name,
column_name,
offset
) )
})?; })?;
@ -197,9 +201,12 @@ pub async fn sync_distinct_values_chunk(
break; // No more data break; // No more data
} }
let mut values_to_process: Vec<String> = Vec::with_capacity(fetched_count); // 2. Extract Values for this Chunk
let mut chunk_values_to_process: Vec<String> = Vec::with_capacity(fetched_count);
// Use original column name for extraction key as query_engine likely returns keys based on original query, not uppercased
let result_column_name = q(&column_name); let result_column_name = q(&column_name);
for row_map in query_result.data { for row_map in query_result.data {
// Existing logic to extract string value based on DataType
if let Some(value_opt) = row_map.get(&result_column_name).and_then(|dt| match dt { if let Some(value_opt) = row_map.get(&result_column_name).and_then(|dt| match dt {
DataType::Text(Some(v)) => Some(v.clone()), DataType::Text(Some(v)) => Some(v.clone()),
DataType::Int2(Some(v)) => Some(v.to_string()), DataType::Int2(Some(v)) => Some(v.to_string()),
@ -218,55 +225,61 @@ pub async fn sync_distinct_values_chunk(
_ => None, _ => None,
}) { }) {
if !value_opt.trim().is_empty() { if !value_opt.trim().is_empty() {
values_to_process.push(value_opt); chunk_values_to_process.push(value_opt);
} else { } else {
warn!(%job_id, "Skipping empty or whitespace-only string value."); warn!(%job_id, "Skipping empty or whitespace-only string value in chunk.");
} }
} else { } else {
warn!(%job_id, ?row_map, "Could not extract valid string value from row, skipping."); warn!(%job_id, ?row_map, "Could not extract valid string value from row in chunk, skipping.");
} }
} }
if values_to_process.is_empty() { // 3. Check if Empty Chunk (after extraction)
info!(%job_id, fetched = fetched_count, "No non-null, non-empty string values extracted in this chunk at offset {}. Moving to next chunk.", offset); if chunk_values_to_process.is_empty() {
info!(%job_id, fetched_in_chunk = fetched_count, "No non-null, non-empty string values extracted in this chunk at offset {}. Moving to next chunk.", offset);
// Still need to check if the *fetched* count means end of data
if (fetched_count as i64) < SYNC_CHUNK_LIMIT { if (fetched_count as i64) < SYNC_CHUNK_LIMIT {
info!(%job_id, "Fetched less than limit ({}) at offset {}, assuming end of data.", fetched_count, offset); info!(%job_id, "Fetched less than limit ({}) rows at offset {}, assuming end of data.", fetched_count, offset);
break; break;
} }
offset += SYNC_CHUNK_LIMIT; offset += SYNC_CHUNK_LIMIT;
continue; continue; // Move to the next chunk
} }
info!(%job_id, count = values_to_process.len(), "Generating embeddings for chunk..."); // 4. Embed Chunk
info!(%job_id, count = chunk_values_to_process.len(), "Generating embeddings for chunk...");
let embedding_request = EmbeddingRequest { let embedding_request = EmbeddingRequest {
model: "text-embedding-3-small".to_string(), model: "text-embedding-3-small".to_string(),
input: values_to_process.clone(), // Clone the values for the request input: chunk_values_to_process.clone(), // Clone values needed for embedding request
dimensions: Some(1536), dimensions: Some(1536),
encoding_format: Some("float".to_string()), encoding_format: Some("float".to_string()),
user: None, // Optional: Add user identifier if needed user: None,
}; };
let embedding_response = litellm_client let embedding_response = litellm_client
.generate_embeddings(embedding_request) .generate_embeddings(embedding_request)
.await .await
.context("Failed to generate embeddings via LiteLLMClient")?; .context("Failed to generate embeddings via LiteLLMClient for chunk")?;
if values_to_process.len() != embedding_response.data.len() { // 5. Check Embedding Count for Chunk
if chunk_values_to_process.len() != embedding_response.data.len() {
warn!( warn!(
%job_id, %job_id,
input_count = values_to_process.len(), input_count = chunk_values_to_process.len(),
output_count = embedding_response.data.len(), output_count = embedding_response.data.len(),
"Mismatch between input count and embedding count for chunk. Skipping insertion for this chunk." "Mismatch between input count and embedding count for chunk. Skipping insertion for this chunk."
); );
// Still check if fetched_count indicates end of data
if (fetched_count as i64) < SYNC_CHUNK_LIMIT { if (fetched_count as i64) < SYNC_CHUNK_LIMIT {
info!(%job_id, "Fetched less than limit ({}) at offset {}, assuming end of data.", fetched_count, offset); info!(%job_id, "Fetched less than limit ({}) rows at offset {}, assuming end of data.", fetched_count, offset);
break; break;
} }
offset += SYNC_CHUNK_LIMIT; offset += SYNC_CHUNK_LIMIT;
continue; continue; // Skip insertion for this chunk and move to the next
} }
let values_with_formatted_embeddings: Vec<(String, String)> = values_to_process // 6. Prepare Insert Data for Chunk
let values_with_formatted_embeddings: Vec<(String, String)> = chunk_values_to_process // Use original chunk values
.into_iter() .into_iter()
.zip(embedding_response.data.into_iter()) .zip(embedding_response.data.into_iter())
.map(|(value, embedding_data)| { .map(|(value, embedding_data)| {
@ -283,6 +296,8 @@ pub async fn sync_distinct_values_chunk(
}) })
.collect(); .collect();
// 7. Build & Execute Insert for Chunk
// Initialize QueryBuilder for each chunk to avoid growing indefinitely
let mut query_builder: QueryBuilder<sqlx::Postgres> = QueryBuilder::new(format!( let mut query_builder: QueryBuilder<sqlx::Postgres> = QueryBuilder::new(format!(
r#"INSERT INTO "{}"."{}" (value, database_name, schema_name, table_name, column_name, synced_at, embedding) VALUES "#, r#"INSERT INTO "{}"."{}" (value, database_name, schema_name, table_name, column_name, synced_at, embedding) VALUES "#,
target_schema_name, target_table_name target_schema_name, target_table_name
@ -291,12 +306,12 @@ pub async fn sync_distinct_values_chunk(
let mut first_row = true; let mut first_row = true;
for (value, embedding_str) in values_with_formatted_embeddings.iter() { for (value, embedding_str) in values_with_formatted_embeddings.iter() {
if !first_row { if !first_row {
query_builder.push(", "); // Add comma between value rows query_builder.push(", ");
} }
query_builder.push("("); // Start row values query_builder.push("(");
query_builder.push_bind(value); query_builder.push_bind(value);
query_builder.push(", "); query_builder.push(", ");
query_builder.push_bind(&database_name); query_builder.push_bind(&database_name); // Use original names for insertion metadata
query_builder.push(", "); query_builder.push(", ");
query_builder.push_bind(&schema_name); query_builder.push_bind(&schema_name);
query_builder.push(", "); query_builder.push(", ");
@ -306,35 +321,36 @@ pub async fn sync_distinct_values_chunk(
query_builder.push(", "); query_builder.push(", ");
query_builder.push_bind(Utc::now()); query_builder.push_bind(Utc::now());
query_builder.push(", "); query_builder.push(", ");
// Explicitly cast the bound text parameter to halfvec
query_builder.push("CAST("); query_builder.push("CAST(");
query_builder.push_bind(embedding_str); // Bind the string query_builder.push_bind(embedding_str);
query_builder.push(" AS halfvec)"); // Cast it to halfvec query_builder.push(" AS halfvec)");
query_builder.push(")"); // End row values query_builder.push(")");
first_row = false; first_row = false;
} }
query_builder.push(" ON CONFLICT DO NOTHING"); // Keep the conflict handling query_builder.push(" ON CONFLICT DO NOTHING");
let query = query_builder.build(); let query = query_builder.build();
info!(%job_id, "Executing batch insert with embeddings..."); info!(%job_id, row_count = values_with_formatted_embeddings.len(), "Executing batch insert for chunk...");
match query.execute(app_db_pool).await { match query.execute(app_db_pool).await {
Ok(rows_affected) => { Ok(rows_affected) => {
let current_chunk_inserted = rows_affected.rows_affected() as usize; let current_chunk_inserted = rows_affected.rows_affected() as usize;
total_inserted_count += current_chunk_inserted; total_inserted_count += current_chunk_inserted; // Accumulate total count
info!( info!(
%job_id, %job_id,
inserted_in_chunk = current_chunk_inserted, inserted_in_chunk = current_chunk_inserted,
total_inserted = total_inserted_count, total_inserted = total_inserted_count,
fetched_in_chunk = fetched_count, // Note: fetched_count includes rows that might have been skipped fetched_in_chunk = fetched_count,
values_processed_in_chunk = values_with_formatted_embeddings.len(),
current_offset = offset, current_offset = offset,
target_table = format!("{}.{}", target_schema_name, target_table_name), target_table = format!("{}.{}", target_schema_name, target_table_name),
"Processed distinct values chunk with embeddings" "Processed distinct values chunk with embeddings"
); );
} }
Err(e) => { Err(e) => {
error!(%job_id, "Database insert failed: {:?}", e); error!(%job_id, "Database insert failed for chunk: {:?}", e);
// Propagate error to mark the job as failed
return Err(anyhow::Error::new(e).context(format!( return Err(anyhow::Error::new(e).context(format!(
"Failed to insert distinct values chunk with embeddings into {}.{}", "Failed to insert distinct values chunk with embeddings into {}.{}",
target_schema_name, target_table_name target_schema_name, target_table_name
@ -342,13 +358,14 @@ pub async fn sync_distinct_values_chunk(
} }
} }
// 8. Update Count & Offset (or break)
if (fetched_count as i64) < SYNC_CHUNK_LIMIT { if (fetched_count as i64) < SYNC_CHUNK_LIMIT {
info!(%job_id, "Fetched less than limit ({}) at offset {}, assuming end of data.", fetched_count, offset); info!(%job_id, "Fetched less than limit ({}) rows at offset {}, assuming end of data after processing chunk.", fetched_count, offset);
break; break; // End of data
} }
offset += SYNC_CHUNK_LIMIT; offset += SYNC_CHUNK_LIMIT; // Move to the next chunk offset
} } // End loop
Ok(total_inserted_count) Ok(total_inserted_count)
}.await; }.await;