From 2e84d64e6e4d0cf09d29fef182f4923225841af8 Mon Sep 17 00:00:00 2001 From: dal Date: Thu, 24 Apr 2025 12:16:49 -0600 Subject: [PATCH] stored valus sync as it goes --- api/libs/stored_values/src/jobs.rs | 87 ++++++++++++++++++------------ 1 file changed, 52 insertions(+), 35 deletions(-) diff --git a/api/libs/stored_values/src/jobs.rs b/api/libs/stored_values/src/jobs.rs index 6c4325d3d..d465038f8 100644 --- a/api/libs/stored_values/src/jobs.rs +++ b/api/libs/stored_values/src/jobs.rs @@ -170,6 +170,7 @@ pub async fn sync_distinct_values_chunk( let mut total_inserted_count: usize = 0; loop { + // 1. Fetch Chunk let distinct_sql = format!( "SELECT DISTINCT {q}{col}{q} FROM {q}{db}{q}.{q}{schema}{q}.{q}{table}{q} ORDER BY 1 NULLS LAST LIMIT {limit} OFFSET {offset}", q = quote, @@ -187,7 +188,10 @@ pub async fn sync_distinct_values_chunk( .with_context(|| { format!( "query_engine failed for distinct query chunk on {}.{}.{} at offset {}", - schema_name, table_name, column_name, offset + schema_name, + table_name, + column_name, + offset ) })?; @@ -197,11 +201,14 @@ pub async fn sync_distinct_values_chunk( break; // No more data } - let mut values_to_process: Vec = Vec::with_capacity(fetched_count); + // 2. Extract Values for this Chunk + let mut chunk_values_to_process: Vec = Vec::with_capacity(fetched_count); + // Use original column name for extraction key as query_engine likely returns keys based on original query, not uppercased let result_column_name = q(&column_name); for row_map in query_result.data { + // Existing logic to extract string value based on DataType if let Some(value_opt) = row_map.get(&result_column_name).and_then(|dt| match dt { - DataType::Text(Some(v)) => Some(v.clone()), + DataType::Text(Some(v)) => Some(v.clone()), DataType::Int2(Some(v)) => Some(v.to_string()), DataType::Int4(Some(v)) => Some(v.to_string()), DataType::Int8(Some(v)) => Some(v.to_string()), @@ -218,55 +225,61 @@ pub async fn sync_distinct_values_chunk( _ => None, }) { if !value_opt.trim().is_empty() { - values_to_process.push(value_opt); + chunk_values_to_process.push(value_opt); } else { - warn!(%job_id, "Skipping empty or whitespace-only string value."); + warn!(%job_id, "Skipping empty or whitespace-only string value in chunk."); } } else { - warn!(%job_id, ?row_map, "Could not extract valid string value from row, skipping."); + warn!(%job_id, ?row_map, "Could not extract valid string value from row in chunk, skipping."); } } - if values_to_process.is_empty() { - info!(%job_id, fetched = fetched_count, "No non-null, non-empty string values extracted in this chunk at offset {}. Moving to next chunk.", offset); + // 3. Check if Empty Chunk (after extraction) + if chunk_values_to_process.is_empty() { + info!(%job_id, fetched_in_chunk = fetched_count, "No non-null, non-empty string values extracted in this chunk at offset {}. Moving to next chunk.", offset); + // Still need to check if the *fetched* count means end of data if (fetched_count as i64) < SYNC_CHUNK_LIMIT { - info!(%job_id, "Fetched less than limit ({}) at offset {}, assuming end of data.", fetched_count, offset); + info!(%job_id, "Fetched less than limit ({}) rows at offset {}, assuming end of data.", fetched_count, offset); break; } offset += SYNC_CHUNK_LIMIT; - continue; + continue; // Move to the next chunk } - info!(%job_id, count = values_to_process.len(), "Generating embeddings for chunk..."); + // 4. Embed Chunk + info!(%job_id, count = chunk_values_to_process.len(), "Generating embeddings for chunk..."); let embedding_request = EmbeddingRequest { model: "text-embedding-3-small".to_string(), - input: values_to_process.clone(), // Clone the values for the request + input: chunk_values_to_process.clone(), // Clone values needed for embedding request dimensions: Some(1536), encoding_format: Some("float".to_string()), - user: None, // Optional: Add user identifier if needed + user: None, }; let embedding_response = litellm_client .generate_embeddings(embedding_request) .await - .context("Failed to generate embeddings via LiteLLMClient")?; + .context("Failed to generate embeddings via LiteLLMClient for chunk")?; - if values_to_process.len() != embedding_response.data.len() { + // 5. Check Embedding Count for Chunk + if chunk_values_to_process.len() != embedding_response.data.len() { warn!( %job_id, - input_count = values_to_process.len(), + input_count = chunk_values_to_process.len(), output_count = embedding_response.data.len(), "Mismatch between input count and embedding count for chunk. Skipping insertion for this chunk." ); + // Still check if fetched_count indicates end of data if (fetched_count as i64) < SYNC_CHUNK_LIMIT { - info!(%job_id, "Fetched less than limit ({}) at offset {}, assuming end of data.", fetched_count, offset); + info!(%job_id, "Fetched less than limit ({}) rows at offset {}, assuming end of data.", fetched_count, offset); break; } offset += SYNC_CHUNK_LIMIT; - continue; + continue; // Skip insertion for this chunk and move to the next } - let values_with_formatted_embeddings: Vec<(String, String)> = values_to_process + // 6. Prepare Insert Data for Chunk + let values_with_formatted_embeddings: Vec<(String, String)> = chunk_values_to_process // Use original chunk values .into_iter() .zip(embedding_response.data.into_iter()) .map(|(value, embedding_data)| { @@ -283,6 +296,8 @@ pub async fn sync_distinct_values_chunk( }) .collect(); + // 7. Build & Execute Insert for Chunk + // Initialize QueryBuilder for each chunk to avoid growing indefinitely let mut query_builder: QueryBuilder = QueryBuilder::new(format!( r#"INSERT INTO "{}"."{}" (value, database_name, schema_name, table_name, column_name, synced_at, embedding) VALUES "#, target_schema_name, target_table_name @@ -291,12 +306,12 @@ pub async fn sync_distinct_values_chunk( let mut first_row = true; for (value, embedding_str) in values_with_formatted_embeddings.iter() { if !first_row { - query_builder.push(", "); // Add comma between value rows + query_builder.push(", "); } - query_builder.push("("); // Start row values + query_builder.push("("); query_builder.push_bind(value); query_builder.push(", "); - query_builder.push_bind(&database_name); + query_builder.push_bind(&database_name); // Use original names for insertion metadata query_builder.push(", "); query_builder.push_bind(&schema_name); query_builder.push(", "); @@ -306,35 +321,36 @@ pub async fn sync_distinct_values_chunk( query_builder.push(", "); query_builder.push_bind(Utc::now()); query_builder.push(", "); - // Explicitly cast the bound text parameter to halfvec query_builder.push("CAST("); - query_builder.push_bind(embedding_str); // Bind the string - query_builder.push(" AS halfvec)"); // Cast it to halfvec - query_builder.push(")"); // End row values + query_builder.push_bind(embedding_str); + query_builder.push(" AS halfvec)"); + query_builder.push(")"); first_row = false; } - query_builder.push(" ON CONFLICT DO NOTHING"); // Keep the conflict handling + query_builder.push(" ON CONFLICT DO NOTHING"); let query = query_builder.build(); - info!(%job_id, "Executing batch insert with embeddings..."); + info!(%job_id, row_count = values_with_formatted_embeddings.len(), "Executing batch insert for chunk..."); match query.execute(app_db_pool).await { Ok(rows_affected) => { let current_chunk_inserted = rows_affected.rows_affected() as usize; - total_inserted_count += current_chunk_inserted; + total_inserted_count += current_chunk_inserted; // Accumulate total count info!( %job_id, inserted_in_chunk = current_chunk_inserted, total_inserted = total_inserted_count, - fetched_in_chunk = fetched_count, // Note: fetched_count includes rows that might have been skipped + fetched_in_chunk = fetched_count, + values_processed_in_chunk = values_with_formatted_embeddings.len(), current_offset = offset, target_table = format!("{}.{}", target_schema_name, target_table_name), "Processed distinct values chunk with embeddings" ); } Err(e) => { - error!(%job_id, "Database insert failed: {:?}", e); + error!(%job_id, "Database insert failed for chunk: {:?}", e); + // Propagate error to mark the job as failed return Err(anyhow::Error::new(e).context(format!( "Failed to insert distinct values chunk with embeddings into {}.{}", target_schema_name, target_table_name @@ -342,13 +358,14 @@ pub async fn sync_distinct_values_chunk( } } + // 8. Update Count & Offset (or break) if (fetched_count as i64) < SYNC_CHUNK_LIMIT { - info!(%job_id, "Fetched less than limit ({}) at offset {}, assuming end of data.", fetched_count, offset); - break; + info!(%job_id, "Fetched less than limit ({}) rows at offset {}, assuming end of data after processing chunk.", fetched_count, offset); + break; // End of data } - offset += SYNC_CHUNK_LIMIT; - } + offset += SYNC_CHUNK_LIMIT; // Move to the next chunk offset + } // End loop Ok(total_inserted_count) }.await;