mirror of https://github.com/buster-so/buster.git
apply optional limit to all query routes in query engine
This commit is contained in:
parent
8899fb8549
commit
bf008d70b1
1496
api/CLAUDE.md
1496
api/CLAUDE.md
File diff suppressed because it is too large
Load Diff
|
@ -4,6 +4,10 @@
|
||||||
|
|
||||||
The Query Engine library provides connectivity and query execution functionality for various data sources in Buster. It abstracts away the details of connecting to different database systems, allows secure credential management, and provides a unified interface for executing queries across multiple database technologies.
|
The Query Engine library provides connectivity and query execution functionality for various data sources in Buster. It abstracts away the details of connecting to different database systems, allows secure credential management, and provides a unified interface for executing queries across multiple database technologies.
|
||||||
|
|
||||||
|
## Result Limitations
|
||||||
|
|
||||||
|
All database queries are capped at a maximum of 5000 rows by default to ensure performance and prevent excessive resource usage. This limit can be overridden by passing a specific limit parameter when calling the query functions.
|
||||||
|
|
||||||
## Key Functionality
|
## Key Functionality
|
||||||
|
|
||||||
- Data source connection management for multiple database types
|
- Data source connection management for multiple database types
|
||||||
|
@ -58,13 +62,18 @@ src/
|
||||||
## Usage Patterns
|
## Usage Patterns
|
||||||
|
|
||||||
```rust
|
```rust
|
||||||
use query_engine::data_source_query_routes::query_engine::{execute_query, QueryResult};
|
use query_engine::data_source_query_routes::query_engine::query_engine;
|
||||||
use query_engine::data_types::{DataSource, DataSourceType};
|
use query_engine::data_types::DataType;
|
||||||
|
use uuid::Uuid;
|
||||||
|
use indexmap::IndexMap;
|
||||||
|
|
||||||
async fn example_query(data_source: DataSource) -> Result<QueryResult, anyhow::Error> {
|
async fn example_query(data_source_id: &Uuid, sql: &str) -> Result<Vec<IndexMap<String, DataType>>, anyhow::Error> {
|
||||||
// Execute a query against a data source
|
// Execute a query against a data source using the default 5000-row limit
|
||||||
let query = "SELECT * FROM users LIMIT 10";
|
let result = query_engine(data_source_id, sql, None).await?;
|
||||||
let result = execute_query(&data_source, query).await?;
|
|
||||||
|
// Or specify a custom limit
|
||||||
|
let custom_limit = Some(1000);
|
||||||
|
let limited_result = query_engine(data_source_id, sql, custom_limit).await?;
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ pub async fn bigquery_query(
|
||||||
client: Client,
|
client: Client,
|
||||||
project_id: String,
|
project_id: String,
|
||||||
query: String,
|
query: String,
|
||||||
|
limit: Option<i64>,
|
||||||
) -> Result<Vec<IndexMap<String, DataType>>> {
|
) -> Result<Vec<IndexMap<String, DataType>>> {
|
||||||
let query_request = QueryRequest {
|
let query_request = QueryRequest {
|
||||||
connection_properties: None,
|
connection_properties: None,
|
||||||
|
@ -19,7 +20,7 @@ pub async fn bigquery_query(
|
||||||
kind: None,
|
kind: None,
|
||||||
labels: None,
|
labels: None,
|
||||||
location: None,
|
location: None,
|
||||||
max_results: Some(500),
|
max_results: Some(limit.unwrap_or(5000).min(i32::MAX as i64) as i32),
|
||||||
maximum_bytes_billed: None,
|
maximum_bytes_billed: None,
|
||||||
parameter_mode: None,
|
parameter_mode: None,
|
||||||
preserve_nulls: None,
|
preserve_nulls: None,
|
||||||
|
|
|
@ -10,6 +10,7 @@ use crate::{
|
||||||
pub async fn databricks_query(
|
pub async fn databricks_query(
|
||||||
databricks_client: Databricks,
|
databricks_client: Databricks,
|
||||||
query: String,
|
query: String,
|
||||||
|
limit: Option<i64>,
|
||||||
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
|
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
|
||||||
let results = match databricks_client.query(query).await {
|
let results = match databricks_client.query(query).await {
|
||||||
Ok(results) => results,
|
Ok(results) => results,
|
||||||
|
@ -20,9 +21,10 @@ pub async fn databricks_query(
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut result: Vec<IndexMap<String, DataType>> = Vec::new();
|
let mut result: Vec<IndexMap<String, DataType>> = Vec::new();
|
||||||
|
let max_rows = limit.unwrap_or(5000) as usize;
|
||||||
|
|
||||||
let rows = match results.result.data_array {
|
let rows = match results.result.data_array {
|
||||||
Some(rows) => rows,
|
Some(rows) => rows.into_iter().take(max_rows).collect::<Vec<_>>(),
|
||||||
None => return Ok(Vec::new()),
|
None => return Ok(Vec::new()),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ use crate::data_types::DataType;
|
||||||
pub async fn mysql_query(
|
pub async fn mysql_query(
|
||||||
pg_pool: Pool<MySql>,
|
pg_pool: Pool<MySql>,
|
||||||
query: String,
|
query: String,
|
||||||
|
limit: Option<i64>,
|
||||||
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
|
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
|
||||||
let mut stream = sqlx::query(&query).fetch(&pg_pool);
|
let mut stream = sqlx::query(&query).fetch(&pg_pool);
|
||||||
|
|
||||||
|
@ -61,7 +62,11 @@ pub async fn mysql_query(
|
||||||
}
|
}
|
||||||
|
|
||||||
count += 1;
|
count += 1;
|
||||||
if count >= 5000 {
|
if let Some(row_limit) = limit {
|
||||||
|
if count >= row_limit {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else if count >= 5000 {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,7 +95,7 @@ async fn route_to_query(
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
match redshift_query(redshift_client, sql.to_owned()).await {
|
match redshift_query(redshift_client, sql.to_owned(), limit).await {
|
||||||
Ok(results) => results,
|
Ok(results) => results,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("There was an issue while fetching the tables: {}", e);
|
tracing::error!("There was an issue while fetching the tables: {}", e);
|
||||||
|
@ -114,7 +114,7 @@ async fn route_to_query(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let results = match mysql_query(mysql_pool, sql.to_owned()).await {
|
let results = match mysql_query(mysql_pool, sql.to_owned(), limit).await {
|
||||||
Ok(results) => results,
|
Ok(results) => results,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("There was an issue while fetching the tables: {}", e);
|
tracing::error!("There was an issue while fetching the tables: {}", e);
|
||||||
|
@ -139,7 +139,7 @@ async fn route_to_query(
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
match bigquery_query(bq_client, project_id, sql.to_owned()).await {
|
match bigquery_query(bq_client, project_id, sql.to_owned(), limit).await {
|
||||||
Ok(results) => results,
|
Ok(results) => results,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("There was an issue while fetching the tables: {}", e);
|
tracing::error!("There was an issue while fetching the tables: {}", e);
|
||||||
|
@ -160,7 +160,7 @@ async fn route_to_query(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let results = match sql_server_query(sql_server_pool, sql.to_owned()).await {
|
let results = match sql_server_query(sql_server_pool, sql.to_owned(), limit).await {
|
||||||
Ok(results) => results,
|
Ok(results) => results,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("There was an issue while fetching the tables: {}", e);
|
tracing::error!("There was an issue while fetching the tables: {}", e);
|
||||||
|
@ -185,7 +185,7 @@ async fn route_to_query(
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
match databricks_query(databricks_client, sql.to_owned()).await {
|
match databricks_query(databricks_client, sql.to_owned(), limit).await {
|
||||||
Ok(results) => results,
|
Ok(results) => results,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("There was an issue while fetching the tables: {}", e);
|
tracing::error!("There was an issue while fetching the tables: {}", e);
|
||||||
|
@ -204,7 +204,7 @@ async fn route_to_query(
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
match snowflake_query(snowflake_client, sql.to_owned()).await {
|
match snowflake_query(snowflake_client, sql.to_owned(), limit).await {
|
||||||
Ok(results) => results,
|
Ok(results) => results,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("There was an issue while fetching the tables: {}", e);
|
tracing::error!("There was an issue while fetching the tables: {}", e);
|
||||||
|
|
|
@ -11,6 +11,7 @@ use crate::data_types::DataType;
|
||||||
pub async fn redshift_query(
|
pub async fn redshift_query(
|
||||||
pg_pool: Pool<Postgres>,
|
pg_pool: Pool<Postgres>,
|
||||||
query: String,
|
query: String,
|
||||||
|
limit: Option<i64>,
|
||||||
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
|
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
|
||||||
let mut stream = sqlx::query(&query).fetch(&pg_pool);
|
let mut stream = sqlx::query(&query).fetch(&pg_pool);
|
||||||
|
|
||||||
|
@ -56,7 +57,11 @@ pub async fn redshift_query(
|
||||||
result.push(row_map);
|
result.push(row_map);
|
||||||
|
|
||||||
count += 1;
|
count += 1;
|
||||||
if count >= 1000 {
|
if let Some(row_limit) = limit {
|
||||||
|
if count >= row_limit {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else if count >= 5000 {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -115,12 +115,14 @@ fn handle_snowflake_timestamp_struct(
|
||||||
pub async fn snowflake_query(
|
pub async fn snowflake_query(
|
||||||
mut snowflake_client: SnowflakeApi,
|
mut snowflake_client: SnowflakeApi,
|
||||||
query: String,
|
query: String,
|
||||||
|
limit: Option<i64>,
|
||||||
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
|
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
|
||||||
const MAX_ROWS: usize = 1_000;
|
const DEFAULT_MAX_ROWS: usize = 5000;
|
||||||
|
|
||||||
let query_no_semicolon = query.trim_end_matches(';');
|
let query_no_semicolon = query.trim_end_matches(';');
|
||||||
|
let max_rows = limit.map(|l| l as usize).unwrap_or(DEFAULT_MAX_ROWS);
|
||||||
let limited_query = if !query_no_semicolon.to_lowercase().contains("limit") {
|
let limited_query = if !query_no_semicolon.to_lowercase().contains("limit") {
|
||||||
format!("{} FETCH FIRST {} ROWS ONLY", query_no_semicolon, MAX_ROWS)
|
format!("{} FETCH FIRST {} ROWS ONLY", query_no_semicolon, max_rows)
|
||||||
} else {
|
} else {
|
||||||
query_no_semicolon.to_string()
|
query_no_semicolon.to_string()
|
||||||
};
|
};
|
||||||
|
|
|
@ -10,6 +10,7 @@ use tokio_util::compat::Compat;
|
||||||
pub async fn sql_server_query(
|
pub async fn sql_server_query(
|
||||||
mut client: Client<Compat<TcpStream>>,
|
mut client: Client<Compat<TcpStream>>,
|
||||||
query: String,
|
query: String,
|
||||||
|
limit: Option<i64>,
|
||||||
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
|
) -> Result<Vec<IndexMap<std::string::String, DataType>>, Error> {
|
||||||
let rows = match client.query(query, &[]).await {
|
let rows = match client.query(query, &[]).await {
|
||||||
Ok(rows) => rows,
|
Ok(rows) => rows,
|
||||||
|
@ -22,7 +23,7 @@ pub async fn sql_server_query(
|
||||||
|
|
||||||
let mut result: Vec<IndexMap<String, DataType>> = Vec::new();
|
let mut result: Vec<IndexMap<String, DataType>> = Vec::new();
|
||||||
let query_result = match rows.into_first_result().await {
|
let query_result = match rows.into_first_result().await {
|
||||||
Ok(query_result) => query_result.into_iter().take(1000),
|
Ok(query_result) => query_result.into_iter().take(limit.unwrap_or(5000) as usize),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("Unable to fetch query result: {:?}", e);
|
tracing::error!("Unable to fetch query result: {:?}", e);
|
||||||
let err = anyhow!("Unable to fetch query result: {}", e);
|
let err = anyhow!("Unable to fetch query result: {}", e);
|
||||||
|
|
Loading…
Reference in New Issue