From c8d2c0ff0feab4fc423dd91bf04de0006fbf9d5c Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 13 Aug 2025 11:38:44 -0600 Subject: [PATCH] Refactor metric data handler and snowflake query to improve limit handling. Adjusted query limit logic to use display limit and added optional limit parameter to snowflake query function for better flexibility. --- .../src/metrics/get_metric_data_handler.rs | 16 +++++++--------- .../src/data_source_query_routes/query_engine.rs | 2 +- .../data_source_query_routes/snowflake_query.rs | 16 +++++++++------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/apps/api/libs/handlers/src/metrics/get_metric_data_handler.rs b/apps/api/libs/handlers/src/metrics/get_metric_data_handler.rs index c5789494e..458d9fc7e 100644 --- a/apps/api/libs/handlers/src/metrics/get_metric_data_handler.rs +++ b/apps/api/libs/handlers/src/metrics/get_metric_data_handler.rs @@ -198,11 +198,9 @@ pub async fn get_metric_data_handler( request.limit ); - // Determine the actual query limit - we query for 5001 to check if there are more records - let query_limit = match request.limit { - Some(limit) => std::cmp::min(limit, 5001), - None => 5001, - }; + // Determine the actual query limit - we query for one extra record to check if there are more + let display_limit = request.limit.unwrap_or(5000).min(5000); + let query_limit = display_limit + 1; // Try to get cached metadata first let mut conn_meta = get_pg_pool().get().await?; @@ -239,13 +237,13 @@ pub async fn get_metric_data_handler( } }; - // Check if we have more than 5000 records - let has_more_records = query_result.data.len() > 5000; + // Check if we have more records than the display limit + let has_more_records = query_result.data.len() > display_limit as usize; - // Truncate to 5000 records if we got more + // Truncate to display limit if we got more let mut data = query_result.data; if has_more_records { - data.truncate(5000); + data.truncate(display_limit as usize); } // Determine which metadata to use diff --git a/apps/api/libs/query_engine/src/data_source_query_routes/query_engine.rs b/apps/api/libs/query_engine/src/data_source_query_routes/query_engine.rs index 3917c5bc6..a297eb835 100644 --- a/apps/api/libs/query_engine/src/data_source_query_routes/query_engine.rs +++ b/apps/api/libs/query_engine/src/data_source_query_routes/query_engine.rs @@ -537,7 +537,7 @@ async fn route_to_query( - match snowflake_query(snowflake_client, sql.to_owned()).await { + match snowflake_query(snowflake_client, sql.to_owned(), limit).await { Ok(processing_result) => { match processing_result { ProcessingResult::Processed(results) => results, diff --git a/apps/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs b/apps/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs index d6f10a030..16e336fa8 100644 --- a/apps/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs +++ b/apps/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs @@ -1685,9 +1685,6 @@ fn convert_array_to_datatype( // Query Execution & Processing // ------------------------- -// Define the row limit constant here or retrieve from config -const PROCESSING_ROW_LIMIT: usize = 5000; - fn prepare_query(query: &str) -> String { // Note: This function currently doesn't apply a LIMIT to the query. // The limit is applied during processing below as a safeguard. @@ -1767,22 +1764,27 @@ fn safe_process_record_batch(batch: &RecordBatch) -> ProcessingResult { pub async fn snowflake_query( mut snowflake_client: SnowflakeApi, query: String, + limit: Option, ) -> Result { + // Get the limit value, defaulting to 5000 if not specified + let default_limit = 5000; + let limit_value = limit.unwrap_or(default_limit) as usize; + let limited_query = prepare_query(&query); let result = match snowflake_client.exec(&limited_query).await { Ok(result) => match result { QueryResult::Arrow(result) => { - let mut all_rows = Vec::with_capacity(PROCESSING_ROW_LIMIT); + let mut all_rows = Vec::with_capacity(limit_value); let mut has_processing_errors = false; let mut error_info = String::new(); // Process each batch with error handling for batch in result.iter() { - if all_rows.len() >= PROCESSING_ROW_LIMIT && !has_processing_errors { + if all_rows.len() >= limit_value && !has_processing_errors { tracing::warn!( "Processing row limit ({}) reached. Stopping data processing.", - PROCESSING_ROW_LIMIT + limit_value ); break; } @@ -1792,7 +1794,7 @@ pub async fn snowflake_query( match safe_process_record_batch(&batch) { ProcessingResult::Processed(batch_rows) => { if !has_processing_errors { - let remaining_capacity = PROCESSING_ROW_LIMIT.saturating_sub(all_rows.len()); + let remaining_capacity = limit_value.saturating_sub(all_rows.len()); let rows_to_take = std::cmp::min(batch_rows.len(), remaining_capacity); if rows_to_take > 0 {