mirror of https://github.com/buster-so/buster.git
Refactor metric data handler and snowflake query to improve limit handling. Adjusted query limit logic to use display limit and added optional limit parameter to snowflake query function for better flexibility.
This commit is contained in:
parent
73746ca41d
commit
c8d2c0ff0f
|
@ -198,11 +198,9 @@ pub async fn get_metric_data_handler(
|
||||||
request.limit
|
request.limit
|
||||||
);
|
);
|
||||||
|
|
||||||
// Determine the actual query limit - we query for 5001 to check if there are more records
|
// Determine the actual query limit - we query for one extra record to check if there are more
|
||||||
let query_limit = match request.limit {
|
let display_limit = request.limit.unwrap_or(5000).min(5000);
|
||||||
Some(limit) => std::cmp::min(limit, 5001),
|
let query_limit = display_limit + 1;
|
||||||
None => 5001,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Try to get cached metadata first
|
// Try to get cached metadata first
|
||||||
let mut conn_meta = get_pg_pool().get().await?;
|
let mut conn_meta = get_pg_pool().get().await?;
|
||||||
|
@ -239,13 +237,13 @@ pub async fn get_metric_data_handler(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check if we have more than 5000 records
|
// Check if we have more records than the display limit
|
||||||
let has_more_records = query_result.data.len() > 5000;
|
let has_more_records = query_result.data.len() > display_limit as usize;
|
||||||
|
|
||||||
// Truncate to 5000 records if we got more
|
// Truncate to display limit if we got more
|
||||||
let mut data = query_result.data;
|
let mut data = query_result.data;
|
||||||
if has_more_records {
|
if has_more_records {
|
||||||
data.truncate(5000);
|
data.truncate(display_limit as usize);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Determine which metadata to use
|
// Determine which metadata to use
|
||||||
|
|
|
@ -537,7 +537,7 @@ async fn route_to_query(
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
match snowflake_query(snowflake_client, sql.to_owned()).await {
|
match snowflake_query(snowflake_client, sql.to_owned(), limit).await {
|
||||||
Ok(processing_result) => {
|
Ok(processing_result) => {
|
||||||
match processing_result {
|
match processing_result {
|
||||||
ProcessingResult::Processed(results) => results,
|
ProcessingResult::Processed(results) => results,
|
||||||
|
|
|
@ -1685,9 +1685,6 @@ fn convert_array_to_datatype(
|
||||||
// Query Execution & Processing
|
// Query Execution & Processing
|
||||||
// -------------------------
|
// -------------------------
|
||||||
|
|
||||||
// Define the row limit constant here or retrieve from config
|
|
||||||
const PROCESSING_ROW_LIMIT: usize = 5000;
|
|
||||||
|
|
||||||
fn prepare_query(query: &str) -> String {
|
fn prepare_query(query: &str) -> String {
|
||||||
// Note: This function currently doesn't apply a LIMIT to the query.
|
// Note: This function currently doesn't apply a LIMIT to the query.
|
||||||
// The limit is applied during processing below as a safeguard.
|
// The limit is applied during processing below as a safeguard.
|
||||||
|
@ -1767,22 +1764,27 @@ fn safe_process_record_batch(batch: &RecordBatch) -> ProcessingResult {
|
||||||
pub async fn snowflake_query(
|
pub async fn snowflake_query(
|
||||||
mut snowflake_client: SnowflakeApi,
|
mut snowflake_client: SnowflakeApi,
|
||||||
query: String,
|
query: String,
|
||||||
|
limit: Option<i64>,
|
||||||
) -> Result<ProcessingResult, Error> {
|
) -> Result<ProcessingResult, Error> {
|
||||||
|
// Get the limit value, defaulting to 5000 if not specified
|
||||||
|
let default_limit = 5000;
|
||||||
|
let limit_value = limit.unwrap_or(default_limit) as usize;
|
||||||
|
|
||||||
let limited_query = prepare_query(&query);
|
let limited_query = prepare_query(&query);
|
||||||
|
|
||||||
let result = match snowflake_client.exec(&limited_query).await {
|
let result = match snowflake_client.exec(&limited_query).await {
|
||||||
Ok(result) => match result {
|
Ok(result) => match result {
|
||||||
QueryResult::Arrow(result) => {
|
QueryResult::Arrow(result) => {
|
||||||
let mut all_rows = Vec::with_capacity(PROCESSING_ROW_LIMIT);
|
let mut all_rows = Vec::with_capacity(limit_value);
|
||||||
let mut has_processing_errors = false;
|
let mut has_processing_errors = false;
|
||||||
let mut error_info = String::new();
|
let mut error_info = String::new();
|
||||||
|
|
||||||
// Process each batch with error handling
|
// Process each batch with error handling
|
||||||
for batch in result.iter() {
|
for batch in result.iter() {
|
||||||
if all_rows.len() >= PROCESSING_ROW_LIMIT && !has_processing_errors {
|
if all_rows.len() >= limit_value && !has_processing_errors {
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
"Processing row limit ({}) reached. Stopping data processing.",
|
"Processing row limit ({}) reached. Stopping data processing.",
|
||||||
PROCESSING_ROW_LIMIT
|
limit_value
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1792,7 +1794,7 @@ pub async fn snowflake_query(
|
||||||
match safe_process_record_batch(&batch) {
|
match safe_process_record_batch(&batch) {
|
||||||
ProcessingResult::Processed(batch_rows) => {
|
ProcessingResult::Processed(batch_rows) => {
|
||||||
if !has_processing_errors {
|
if !has_processing_errors {
|
||||||
let remaining_capacity = PROCESSING_ROW_LIMIT.saturating_sub(all_rows.len());
|
let remaining_capacity = limit_value.saturating_sub(all_rows.len());
|
||||||
let rows_to_take = std::cmp::min(batch_rows.len(), remaining_capacity);
|
let rows_to_take = std::cmp::min(batch_rows.len(), remaining_capacity);
|
||||||
|
|
||||||
if rows_to_take > 0 {
|
if rows_to_take > 0 {
|
||||||
|
|
Loading…
Reference in New Issue