query engine fix

This commit is contained in:
dal 2025-04-01 13:39:27 -06:00
parent a24ba84fd3
commit 72fb9089e5
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
3 changed files with 31 additions and 39 deletions

View File

@ -12,24 +12,15 @@ pub async fn mysql_query(
query: String,
limit: Option<i64>,
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
// Apply the limit directly at the database level
// Get the limit value, defaulting to 5000 if not specified
let default_limit = 5000;
let limit_value = limit.unwrap_or(default_limit);
let limit_value = limit.unwrap_or(default_limit) as usize;
// Append LIMIT to the query if it doesn't already contain a LIMIT clause
let sql_with_limit = if !query.to_lowercase().contains("limit") {
format!("{} LIMIT ?", query)
} else {
query
};
// Create query with the limit parameter
let mut stream = sqlx::query(&sql_with_limit)
.bind(limit_value)
.fetch(&pool);
// Create query stream without appending LIMIT
let mut stream = sqlx::query(&query).fetch(&pool);
// Pre-allocate result vector with estimated capacity to reduce allocations
let mut result: Vec<IndexMap<String, DataType>> = Vec::with_capacity(limit_value as usize);
let mut result: Vec<IndexMap<String, DataType>> = Vec::with_capacity(limit_value);
// Process all rows without spawning tasks per row
while let Some(row) = stream.try_next().await? {
@ -63,6 +54,11 @@ pub async fn mysql_query(
}
result.push(row_map);
// Stop processing if we've reached the limit
if result.len() >= limit_value {
break;
}
}
Ok(result)

View File

@ -69,20 +69,15 @@ pub async fn postgres_query(
let formatted_sql = ast[0].to_string();
// Apply the limit directly at the database level
// Get the limit value, defaulting to 5000 if not specified
let default_limit = 5000;
let limit_value = limit.unwrap_or(default_limit);
let limit_value = limit.unwrap_or(default_limit) as usize;
// Append LIMIT to the query with parameter
let sql_with_limit = format!("{} LIMIT $1", formatted_sql);
// Create query with the limit parameter
let mut stream = sqlx::query(&sql_with_limit)
.bind(limit_value)
.fetch(&pg_pool);
// Create query stream without appending LIMIT
let mut stream = sqlx::query(&formatted_sql).fetch(&pg_pool);
// Pre-allocate result vector with estimated capacity to reduce allocations
let mut result: Vec<IndexMap<String, DataType>> = Vec::with_capacity(limit_value as usize);
let mut result: Vec<IndexMap<String, DataType>> = Vec::with_capacity(limit_value);
// Process all rows without spawning tasks per row
while let Some(row) = stream.try_next().await? {
@ -123,6 +118,11 @@ pub async fn postgres_query(
}
result.push(row_map);
// Stop processing if we've reached the limit
if result.len() >= limit_value {
break;
}
}
Ok(result)

View File

@ -13,26 +13,17 @@ pub async fn redshift_query(
query: String,
limit: Option<i64>,
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
// Apply the limit directly at the database level
// Get the limit value, defaulting to 5000 if not specified
let default_limit = 5000;
let limit_value = limit.unwrap_or(default_limit);
let limit_value = limit.unwrap_or(default_limit) as usize;
// Append LIMIT to the query if it doesn't already contain a LIMIT clause
let sql_with_limit = if !query.to_lowercase().contains("limit") {
format!("{} LIMIT $1", query)
} else {
query
};
// Create query with the limit parameter
let mut stream = sqlx::query(&sql_with_limit)
.bind(limit_value)
.fetch(&pg_pool);
// Create query stream without appending LIMIT
let mut stream = sqlx::query(&query).fetch(&pg_pool);
// Pre-allocate result vector with estimated capacity
let mut result: Vec<IndexMap<String, DataType>> = Vec::with_capacity(limit_value as usize);
let mut result: Vec<IndexMap<String, DataType>> = Vec::with_capacity(limit_value);
// Process rows sequentially
// Process rows sequentially until we reach the limit
while let Some(row) = stream.try_next().await? {
let mut row_map: IndexMap<String, DataType> = IndexMap::with_capacity(row.len());
@ -69,6 +60,11 @@ pub async fn redshift_query(
}
result.push(row_map);
// Stop processing if we've reached the limit
if result.len() >= limit_value {
break;
}
}
Ok(result)