diff --git a/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs b/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs index a9ca067d9..229cb9be5 100644 --- a/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs +++ b/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs @@ -1490,9 +1490,12 @@ fn convert_array_to_datatype(column: &arrow::array::ArrayRef, field: &Field, row // Query Execution & Processing // ------------------------- -fn prepare_query(query: &str) -> String { - const MAX_ROWS: usize = 1_000; +// Define the row limit constant here or retrieve from config +const PROCESSING_ROW_LIMIT: usize = 1000; +fn prepare_query(query: &str) -> String { + // Note: This function currently doesn't apply a LIMIT to the query. + // The limit is applied during processing below as a safeguard. query.to_string() } @@ -1527,13 +1530,31 @@ pub async fn snowflake_query( let rows = match snowflake_client.exec(&limited_query).await { Ok(result) => match result { QueryResult::Arrow(result) => { - let mut all_rows = Vec::new(); + // Initialize with capacity, but it might grow beyond the limit initially + // if the first batch is larger than the limit. + let mut all_rows = Vec::with_capacity(PROCESSING_ROW_LIMIT); - // Process each batch in order + // Process each batch in order, stopping if the limit is reached for batch in result.iter() { - println!("Processing batch: {:?}", batch); + // Check if we've already reached the limit before processing the next batch + if all_rows.len() >= PROCESSING_ROW_LIMIT { + tracing::warn!("Processing row limit ({}) reached. Stopping data processing.", PROCESSING_ROW_LIMIT); + break; // Stop processing more batches + } + + // Consider removing or reducing verbosity of this log line + tracing::debug!("Processing batch with {} rows.", batch.num_rows()); + let batch_rows = process_record_batch(batch); - all_rows.extend(batch_rows); + + // Determine how many rows from this batch we can add without exceeding the limit + let remaining_capacity = PROCESSING_ROW_LIMIT.saturating_sub(all_rows.len()); + let rows_to_take = std::cmp::min(batch_rows.len(), remaining_capacity); + + if rows_to_take > 0 { + // Extend with only the rows needed to reach the limit + all_rows.extend(batch_rows.into_iter().take(rows_to_take)); + } } all_rows