1k limit for processing records.

This commit is contained in:
dal 2025-04-28 14:14:13 -06:00
parent 89ddef316d
commit 6f8793b674
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 27 additions and 6 deletions

View File

@ -1490,9 +1490,12 @@ fn convert_array_to_datatype(column: &arrow::array::ArrayRef, field: &Field, row
// Query Execution & Processing
// -------------------------
fn prepare_query(query: &str) -> String {
const MAX_ROWS: usize = 1_000;
// Define the row limit constant here or retrieve from config
const PROCESSING_ROW_LIMIT: usize = 1000;
fn prepare_query(query: &str) -> String {
// Note: This function currently doesn't apply a LIMIT to the query.
// The limit is applied during processing below as a safeguard.
query.to_string()
}
@ -1527,13 +1530,31 @@ pub async fn snowflake_query(
let rows = match snowflake_client.exec(&limited_query).await {
Ok(result) => match result {
QueryResult::Arrow(result) => {
let mut all_rows = Vec::new();
// Initialize with capacity, but it might grow beyond the limit initially
// if the first batch is larger than the limit.
let mut all_rows = Vec::with_capacity(PROCESSING_ROW_LIMIT);
// Process each batch in order
// Process each batch in order, stopping if the limit is reached
for batch in result.iter() {
println!("Processing batch: {:?}", batch);
// Check if we've already reached the limit before processing the next batch
if all_rows.len() >= PROCESSING_ROW_LIMIT {
tracing::warn!("Processing row limit ({}) reached. Stopping data processing.", PROCESSING_ROW_LIMIT);
break; // Stop processing more batches
}
// Consider removing or reducing verbosity of this log line
tracing::debug!("Processing batch with {} rows.", batch.num_rows());
let batch_rows = process_record_batch(batch);
all_rows.extend(batch_rows);
// Determine how many rows from this batch we can add without exceeding the limit
let remaining_capacity = PROCESSING_ROW_LIMIT.saturating_sub(all_rows.len());
let rows_to_take = std::cmp::min(batch_rows.len(), remaining_capacity);
if rows_to_take > 0 {
// Extend with only the rows needed to reach the limit
all_rows.extend(batch_rows.into_iter().take(rows_to_take));
}
}
all_rows