mirror of https://github.com/buster-so/buster.git
Merge pull request #231 from buster-so/evals
stored valus sync as it goes
This commit is contained in:
commit
e4a61b38df
|
@ -170,6 +170,7 @@ pub async fn sync_distinct_values_chunk(
|
||||||
let mut total_inserted_count: usize = 0;
|
let mut total_inserted_count: usize = 0;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
// 1. Fetch Chunk
|
||||||
let distinct_sql = format!(
|
let distinct_sql = format!(
|
||||||
"SELECT DISTINCT {q}{col}{q} FROM {q}{db}{q}.{q}{schema}{q}.{q}{table}{q} ORDER BY 1 NULLS LAST LIMIT {limit} OFFSET {offset}",
|
"SELECT DISTINCT {q}{col}{q} FROM {q}{db}{q}.{q}{schema}{q}.{q}{table}{q} ORDER BY 1 NULLS LAST LIMIT {limit} OFFSET {offset}",
|
||||||
q = quote,
|
q = quote,
|
||||||
|
@ -187,7 +188,10 @@ pub async fn sync_distinct_values_chunk(
|
||||||
.with_context(|| {
|
.with_context(|| {
|
||||||
format!(
|
format!(
|
||||||
"query_engine failed for distinct query chunk on {}.{}.{} at offset {}",
|
"query_engine failed for distinct query chunk on {}.{}.{} at offset {}",
|
||||||
schema_name, table_name, column_name, offset
|
schema_name,
|
||||||
|
table_name,
|
||||||
|
column_name,
|
||||||
|
offset
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
|
@ -197,11 +201,14 @@ pub async fn sync_distinct_values_chunk(
|
||||||
break; // No more data
|
break; // No more data
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut values_to_process: Vec<String> = Vec::with_capacity(fetched_count);
|
// 2. Extract Values for this Chunk
|
||||||
|
let mut chunk_values_to_process: Vec<String> = Vec::with_capacity(fetched_count);
|
||||||
|
// Use original column name for extraction key as query_engine likely returns keys based on original query, not uppercased
|
||||||
let result_column_name = q(&column_name);
|
let result_column_name = q(&column_name);
|
||||||
for row_map in query_result.data {
|
for row_map in query_result.data {
|
||||||
|
// Existing logic to extract string value based on DataType
|
||||||
if let Some(value_opt) = row_map.get(&result_column_name).and_then(|dt| match dt {
|
if let Some(value_opt) = row_map.get(&result_column_name).and_then(|dt| match dt {
|
||||||
DataType::Text(Some(v)) => Some(v.clone()),
|
DataType::Text(Some(v)) => Some(v.clone()),
|
||||||
DataType::Int2(Some(v)) => Some(v.to_string()),
|
DataType::Int2(Some(v)) => Some(v.to_string()),
|
||||||
DataType::Int4(Some(v)) => Some(v.to_string()),
|
DataType::Int4(Some(v)) => Some(v.to_string()),
|
||||||
DataType::Int8(Some(v)) => Some(v.to_string()),
|
DataType::Int8(Some(v)) => Some(v.to_string()),
|
||||||
|
@ -218,55 +225,61 @@ pub async fn sync_distinct_values_chunk(
|
||||||
_ => None,
|
_ => None,
|
||||||
}) {
|
}) {
|
||||||
if !value_opt.trim().is_empty() {
|
if !value_opt.trim().is_empty() {
|
||||||
values_to_process.push(value_opt);
|
chunk_values_to_process.push(value_opt);
|
||||||
} else {
|
} else {
|
||||||
warn!(%job_id, "Skipping empty or whitespace-only string value.");
|
warn!(%job_id, "Skipping empty or whitespace-only string value in chunk.");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
warn!(%job_id, ?row_map, "Could not extract valid string value from row, skipping.");
|
warn!(%job_id, ?row_map, "Could not extract valid string value from row in chunk, skipping.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if values_to_process.is_empty() {
|
// 3. Check if Empty Chunk (after extraction)
|
||||||
info!(%job_id, fetched = fetched_count, "No non-null, non-empty string values extracted in this chunk at offset {}. Moving to next chunk.", offset);
|
if chunk_values_to_process.is_empty() {
|
||||||
|
info!(%job_id, fetched_in_chunk = fetched_count, "No non-null, non-empty string values extracted in this chunk at offset {}. Moving to next chunk.", offset);
|
||||||
|
// Still need to check if the *fetched* count means end of data
|
||||||
if (fetched_count as i64) < SYNC_CHUNK_LIMIT {
|
if (fetched_count as i64) < SYNC_CHUNK_LIMIT {
|
||||||
info!(%job_id, "Fetched less than limit ({}) at offset {}, assuming end of data.", fetched_count, offset);
|
info!(%job_id, "Fetched less than limit ({}) rows at offset {}, assuming end of data.", fetched_count, offset);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
offset += SYNC_CHUNK_LIMIT;
|
offset += SYNC_CHUNK_LIMIT;
|
||||||
continue;
|
continue; // Move to the next chunk
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(%job_id, count = values_to_process.len(), "Generating embeddings for chunk...");
|
// 4. Embed Chunk
|
||||||
|
info!(%job_id, count = chunk_values_to_process.len(), "Generating embeddings for chunk...");
|
||||||
let embedding_request = EmbeddingRequest {
|
let embedding_request = EmbeddingRequest {
|
||||||
model: "text-embedding-3-small".to_string(),
|
model: "text-embedding-3-small".to_string(),
|
||||||
input: values_to_process.clone(), // Clone the values for the request
|
input: chunk_values_to_process.clone(), // Clone values needed for embedding request
|
||||||
dimensions: Some(1536),
|
dimensions: Some(1536),
|
||||||
encoding_format: Some("float".to_string()),
|
encoding_format: Some("float".to_string()),
|
||||||
user: None, // Optional: Add user identifier if needed
|
user: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let embedding_response = litellm_client
|
let embedding_response = litellm_client
|
||||||
.generate_embeddings(embedding_request)
|
.generate_embeddings(embedding_request)
|
||||||
.await
|
.await
|
||||||
.context("Failed to generate embeddings via LiteLLMClient")?;
|
.context("Failed to generate embeddings via LiteLLMClient for chunk")?;
|
||||||
|
|
||||||
if values_to_process.len() != embedding_response.data.len() {
|
// 5. Check Embedding Count for Chunk
|
||||||
|
if chunk_values_to_process.len() != embedding_response.data.len() {
|
||||||
warn!(
|
warn!(
|
||||||
%job_id,
|
%job_id,
|
||||||
input_count = values_to_process.len(),
|
input_count = chunk_values_to_process.len(),
|
||||||
output_count = embedding_response.data.len(),
|
output_count = embedding_response.data.len(),
|
||||||
"Mismatch between input count and embedding count for chunk. Skipping insertion for this chunk."
|
"Mismatch between input count and embedding count for chunk. Skipping insertion for this chunk."
|
||||||
);
|
);
|
||||||
|
// Still check if fetched_count indicates end of data
|
||||||
if (fetched_count as i64) < SYNC_CHUNK_LIMIT {
|
if (fetched_count as i64) < SYNC_CHUNK_LIMIT {
|
||||||
info!(%job_id, "Fetched less than limit ({}) at offset {}, assuming end of data.", fetched_count, offset);
|
info!(%job_id, "Fetched less than limit ({}) rows at offset {}, assuming end of data.", fetched_count, offset);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
offset += SYNC_CHUNK_LIMIT;
|
offset += SYNC_CHUNK_LIMIT;
|
||||||
continue;
|
continue; // Skip insertion for this chunk and move to the next
|
||||||
}
|
}
|
||||||
|
|
||||||
let values_with_formatted_embeddings: Vec<(String, String)> = values_to_process
|
// 6. Prepare Insert Data for Chunk
|
||||||
|
let values_with_formatted_embeddings: Vec<(String, String)> = chunk_values_to_process // Use original chunk values
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.zip(embedding_response.data.into_iter())
|
.zip(embedding_response.data.into_iter())
|
||||||
.map(|(value, embedding_data)| {
|
.map(|(value, embedding_data)| {
|
||||||
|
@ -283,6 +296,8 @@ pub async fn sync_distinct_values_chunk(
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
// 7. Build & Execute Insert for Chunk
|
||||||
|
// Initialize QueryBuilder for each chunk to avoid growing indefinitely
|
||||||
let mut query_builder: QueryBuilder<sqlx::Postgres> = QueryBuilder::new(format!(
|
let mut query_builder: QueryBuilder<sqlx::Postgres> = QueryBuilder::new(format!(
|
||||||
r#"INSERT INTO "{}"."{}" (value, database_name, schema_name, table_name, column_name, synced_at, embedding) VALUES "#,
|
r#"INSERT INTO "{}"."{}" (value, database_name, schema_name, table_name, column_name, synced_at, embedding) VALUES "#,
|
||||||
target_schema_name, target_table_name
|
target_schema_name, target_table_name
|
||||||
|
@ -291,12 +306,12 @@ pub async fn sync_distinct_values_chunk(
|
||||||
let mut first_row = true;
|
let mut first_row = true;
|
||||||
for (value, embedding_str) in values_with_formatted_embeddings.iter() {
|
for (value, embedding_str) in values_with_formatted_embeddings.iter() {
|
||||||
if !first_row {
|
if !first_row {
|
||||||
query_builder.push(", "); // Add comma between value rows
|
query_builder.push(", ");
|
||||||
}
|
}
|
||||||
query_builder.push("("); // Start row values
|
query_builder.push("(");
|
||||||
query_builder.push_bind(value);
|
query_builder.push_bind(value);
|
||||||
query_builder.push(", ");
|
query_builder.push(", ");
|
||||||
query_builder.push_bind(&database_name);
|
query_builder.push_bind(&database_name); // Use original names for insertion metadata
|
||||||
query_builder.push(", ");
|
query_builder.push(", ");
|
||||||
query_builder.push_bind(&schema_name);
|
query_builder.push_bind(&schema_name);
|
||||||
query_builder.push(", ");
|
query_builder.push(", ");
|
||||||
|
@ -306,35 +321,36 @@ pub async fn sync_distinct_values_chunk(
|
||||||
query_builder.push(", ");
|
query_builder.push(", ");
|
||||||
query_builder.push_bind(Utc::now());
|
query_builder.push_bind(Utc::now());
|
||||||
query_builder.push(", ");
|
query_builder.push(", ");
|
||||||
// Explicitly cast the bound text parameter to halfvec
|
|
||||||
query_builder.push("CAST(");
|
query_builder.push("CAST(");
|
||||||
query_builder.push_bind(embedding_str); // Bind the string
|
query_builder.push_bind(embedding_str);
|
||||||
query_builder.push(" AS halfvec)"); // Cast it to halfvec
|
query_builder.push(" AS halfvec)");
|
||||||
query_builder.push(")"); // End row values
|
query_builder.push(")");
|
||||||
first_row = false;
|
first_row = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
query_builder.push(" ON CONFLICT DO NOTHING"); // Keep the conflict handling
|
query_builder.push(" ON CONFLICT DO NOTHING");
|
||||||
|
|
||||||
let query = query_builder.build();
|
let query = query_builder.build();
|
||||||
|
|
||||||
info!(%job_id, "Executing batch insert with embeddings...");
|
info!(%job_id, row_count = values_with_formatted_embeddings.len(), "Executing batch insert for chunk...");
|
||||||
match query.execute(app_db_pool).await {
|
match query.execute(app_db_pool).await {
|
||||||
Ok(rows_affected) => {
|
Ok(rows_affected) => {
|
||||||
let current_chunk_inserted = rows_affected.rows_affected() as usize;
|
let current_chunk_inserted = rows_affected.rows_affected() as usize;
|
||||||
total_inserted_count += current_chunk_inserted;
|
total_inserted_count += current_chunk_inserted; // Accumulate total count
|
||||||
info!(
|
info!(
|
||||||
%job_id,
|
%job_id,
|
||||||
inserted_in_chunk = current_chunk_inserted,
|
inserted_in_chunk = current_chunk_inserted,
|
||||||
total_inserted = total_inserted_count,
|
total_inserted = total_inserted_count,
|
||||||
fetched_in_chunk = fetched_count, // Note: fetched_count includes rows that might have been skipped
|
fetched_in_chunk = fetched_count,
|
||||||
|
values_processed_in_chunk = values_with_formatted_embeddings.len(),
|
||||||
current_offset = offset,
|
current_offset = offset,
|
||||||
target_table = format!("{}.{}", target_schema_name, target_table_name),
|
target_table = format!("{}.{}", target_schema_name, target_table_name),
|
||||||
"Processed distinct values chunk with embeddings"
|
"Processed distinct values chunk with embeddings"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!(%job_id, "Database insert failed: {:?}", e);
|
error!(%job_id, "Database insert failed for chunk: {:?}", e);
|
||||||
|
// Propagate error to mark the job as failed
|
||||||
return Err(anyhow::Error::new(e).context(format!(
|
return Err(anyhow::Error::new(e).context(format!(
|
||||||
"Failed to insert distinct values chunk with embeddings into {}.{}",
|
"Failed to insert distinct values chunk with embeddings into {}.{}",
|
||||||
target_schema_name, target_table_name
|
target_schema_name, target_table_name
|
||||||
|
@ -342,13 +358,14 @@ pub async fn sync_distinct_values_chunk(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 8. Update Count & Offset (or break)
|
||||||
if (fetched_count as i64) < SYNC_CHUNK_LIMIT {
|
if (fetched_count as i64) < SYNC_CHUNK_LIMIT {
|
||||||
info!(%job_id, "Fetched less than limit ({}) at offset {}, assuming end of data.", fetched_count, offset);
|
info!(%job_id, "Fetched less than limit ({}) rows at offset {}, assuming end of data after processing chunk.", fetched_count, offset);
|
||||||
break;
|
break; // End of data
|
||||||
}
|
}
|
||||||
|
|
||||||
offset += SYNC_CHUNK_LIMIT;
|
offset += SYNC_CHUNK_LIMIT; // Move to the next chunk offset
|
||||||
}
|
} // End loop
|
||||||
|
|
||||||
Ok(total_inserted_count)
|
Ok(total_inserted_count)
|
||||||
}.await;
|
}.await;
|
||||||
|
|
Loading…
Reference in New Issue