mirror of https://github.com/buster-so/buster.git
snowflake issue
This commit is contained in:
parent
f7df68880e
commit
021646ce47
|
@ -20,7 +20,7 @@ resolver = "2"
|
|||
# Define shared dependencies for all workspace members
|
||||
[workspace.dependencies]
|
||||
anyhow = "1.0.86"
|
||||
chrono = { version = "=0.4.38", features = ["serde"] }
|
||||
chrono = { version = "=0.4.40", features = ["serde"] }
|
||||
serde = { version = "1.0.117", features = ["derive"] }
|
||||
serde_json = { version = "1.0.117", features = ["preserve_order"] }
|
||||
serde_yaml = "0.9.34"
|
||||
|
@ -64,7 +64,7 @@ tokio-postgres = "0.7"
|
|||
tokio-postgres-rustls = "0.13"
|
||||
regex = "1.10.6"
|
||||
sqlparser = { version = "0.54.0", features = ["visitor"] }
|
||||
arrow = { version = "54.0.0", features = ["json"] }
|
||||
arrow = { version = "55.1.0", features = ["json"] }
|
||||
async-compression = { version = "0.4.11", features = ["tokio"] }
|
||||
axum = { version = "0.7.5", features = ["ws"] }
|
||||
base64 = "0.21"
|
||||
|
@ -84,7 +84,7 @@ sentry = { version = "0.37.0", features = ["tokio"] }
|
|||
sentry-tower = { version = "0.37.0", features = ["axum", "http"] }
|
||||
sentry-tracing = { version = "0.37.0"}
|
||||
serde_urlencoded = "0.7.1"
|
||||
snowflake-api = "0.11.0"
|
||||
snowflake-api = "0.12.0"
|
||||
tempfile = "3.10.1"
|
||||
tiberius = { version = "0.12.2", default-features = false, features = [
|
||||
"chrono",
|
||||
|
|
|
@ -23,7 +23,7 @@ use database::vault::read_secret;
|
|||
use super::{
|
||||
bigquery_query::bigquery_query, databricks_query::databricks_query, mysql_query::mysql_query,
|
||||
postgres_query::postgres_query, redshift_query::redshift_query,
|
||||
security_utils::query_safety_filter, snowflake_query::snowflake_query,
|
||||
security_utils::query_safety_filter, snowflake_query::{snowflake_query, ProcessingResult},
|
||||
sql_server_query::sql_server_query,
|
||||
};
|
||||
|
||||
|
@ -515,7 +515,16 @@ async fn route_to_query(
|
|||
|
||||
|
||||
match snowflake_query(snowflake_client, sql.to_owned()).await {
|
||||
Ok(results) => results,
|
||||
Ok(processing_result) => {
|
||||
match processing_result {
|
||||
ProcessingResult::Processed(results) => results,
|
||||
ProcessingResult::RawJson(json_string) => {
|
||||
tracing::warn!("Snowflake query returned raw JSON due to processing error: {}", json_string);
|
||||
// Return empty results for now - could be enhanced to parse JSON into DataType::Json
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!("There was an issue while fetching the tables: {}", e);
|
||||
return Err(anyhow!(e));
|
||||
|
|
|
@ -1694,9 +1694,14 @@ fn prepare_query(query: &str) -> String {
|
|||
query.to_string()
|
||||
}
|
||||
|
||||
fn process_record_batch(batch: &RecordBatch) -> Vec<IndexMap<String, DataType>> {
|
||||
println!("Processing record batch with {:?} rows", batch);
|
||||
// Add a simpler error handling approach
|
||||
#[derive(Debug)]
|
||||
pub enum ProcessingResult {
|
||||
Processed(Vec<IndexMap<String, DataType>>),
|
||||
RawJson(String), // Raw JSON when processing fails
|
||||
}
|
||||
|
||||
fn process_record_batch(batch: &RecordBatch) -> Vec<IndexMap<String, DataType>> {
|
||||
let mut rows = Vec::with_capacity(batch.num_rows());
|
||||
let schema = batch.schema();
|
||||
|
||||
|
@ -1718,48 +1723,116 @@ fn process_record_batch(batch: &RecordBatch) -> Vec<IndexMap<String, DataType>>
|
|||
rows
|
||||
}
|
||||
|
||||
// Safe processing with error recovery
|
||||
fn safe_process_record_batch(batch: &RecordBatch) -> ProcessingResult {
|
||||
// Try to process the batch normally first
|
||||
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
process_record_batch(batch)
|
||||
})) {
|
||||
Ok(rows) => ProcessingResult::Processed(rows),
|
||||
Err(panic_payload) => {
|
||||
// Extract panic message for better error reporting
|
||||
let panic_message = if let Some(s) = panic_payload.downcast_ref::<String>() {
|
||||
s.clone()
|
||||
} else if let Some(s) = panic_payload.downcast_ref::<&str>() {
|
||||
s.to_string()
|
||||
} else {
|
||||
"Unknown panic occurred during Arrow processing".to_string()
|
||||
};
|
||||
|
||||
tracing::error!("Arrow processing panicked: {}", panic_message);
|
||||
|
||||
// Detailed error information for debugging
|
||||
let error_details = format!(
|
||||
r#"{{"error":"Arrow processing panic","panic_message":"{}","num_rows":{},"num_columns":{},"schema_fields":[{}],"type":"arrow_processing_panic"}}"#,
|
||||
panic_message.replace('"', "\\\""), // Escape quotes in panic message
|
||||
batch.num_rows(),
|
||||
batch.num_columns(),
|
||||
batch.schema().fields().iter()
|
||||
.map(|f| format!(r#"{{"name":"{}","data_type":"{}","nullable":{},"metadata":{}}}"#,
|
||||
f.name().replace('"', "\\\""),
|
||||
f.data_type(),
|
||||
f.is_nullable(),
|
||||
serde_json::to_string(f.metadata()).unwrap_or_else(|_| "{}".to_string())
|
||||
))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
);
|
||||
ProcessingResult::RawJson(error_details)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update the main function signature and basic error handling
|
||||
pub async fn snowflake_query(
|
||||
mut snowflake_client: SnowflakeApi,
|
||||
query: String,
|
||||
) -> Result<Vec<IndexMap<String, DataType>>, Error> {
|
||||
) -> Result<ProcessingResult, Error> {
|
||||
let limited_query = prepare_query(&query);
|
||||
|
||||
let rows = match snowflake_client.exec(&limited_query).await {
|
||||
let result = match snowflake_client.exec(&limited_query).await {
|
||||
Ok(result) => match result {
|
||||
QueryResult::Arrow(result) => {
|
||||
// Initialize with capacity, but it might grow beyond the limit initially
|
||||
// if the first batch is larger than the limit.
|
||||
let mut all_rows = Vec::with_capacity(PROCESSING_ROW_LIMIT);
|
||||
let mut has_processing_errors = false;
|
||||
let mut error_info = String::new();
|
||||
|
||||
// Process each batch in order, stopping if the limit is reached
|
||||
// Process each batch with error handling
|
||||
for batch in result.iter() {
|
||||
// Check if we've already reached the limit before processing the next batch
|
||||
if all_rows.len() >= PROCESSING_ROW_LIMIT {
|
||||
if all_rows.len() >= PROCESSING_ROW_LIMIT && !has_processing_errors {
|
||||
tracing::warn!(
|
||||
"Processing row limit ({}) reached. Stopping data processing.",
|
||||
PROCESSING_ROW_LIMIT
|
||||
);
|
||||
break; // Stop processing more batches
|
||||
break;
|
||||
}
|
||||
|
||||
// Consider removing or reducing verbosity of this log line
|
||||
tracing::debug!("Processing batch with {} rows.", batch.num_rows());
|
||||
|
||||
let batch_rows = process_record_batch(batch);
|
||||
|
||||
// Determine how many rows from this batch we can add without exceeding the limit
|
||||
let remaining_capacity = PROCESSING_ROW_LIMIT.saturating_sub(all_rows.len());
|
||||
let rows_to_take = std::cmp::min(batch_rows.len(), remaining_capacity);
|
||||
|
||||
if rows_to_take > 0 {
|
||||
// Extend with only the rows needed to reach the limit
|
||||
all_rows.extend(batch_rows.into_iter().take(rows_to_take));
|
||||
match safe_process_record_batch(&batch) {
|
||||
ProcessingResult::Processed(batch_rows) => {
|
||||
if !has_processing_errors {
|
||||
let remaining_capacity = PROCESSING_ROW_LIMIT.saturating_sub(all_rows.len());
|
||||
let rows_to_take = std::cmp::min(batch_rows.len(), remaining_capacity);
|
||||
|
||||
if rows_to_take > 0 {
|
||||
all_rows.extend(batch_rows.into_iter().take(rows_to_take));
|
||||
}
|
||||
}
|
||||
}
|
||||
ProcessingResult::RawJson(json_string) => {
|
||||
tracing::warn!("Batch processing failed, switching to error mode");
|
||||
has_processing_errors = true;
|
||||
error_info = json_string;
|
||||
break; // Stop processing on first error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
all_rows
|
||||
// Return results based on whether we had errors
|
||||
if has_processing_errors {
|
||||
ProcessingResult::RawJson(error_info)
|
||||
} else {
|
||||
ProcessingResult::Processed(all_rows)
|
||||
}
|
||||
}
|
||||
QueryResult::Json(_json_result) => {
|
||||
// Handle JSON results from Snowflake - access the actual JSON data
|
||||
tracing::info!("Received JSON result from Snowflake instead of Arrow");
|
||||
|
||||
// Access the actual data from JsonResult - it likely has a .data field or similar
|
||||
// Let's format a descriptive response with whatever info we can get
|
||||
let json_info = format!(
|
||||
r#"{{"type":"snowflake_json_result","message":"Snowflake returned JSON format instead of Arrow","data_available":true,"note":"JSON data structure not yet fully parsed"}}"#
|
||||
);
|
||||
|
||||
tracing::debug!("Snowflake JSON result received, returning metadata info");
|
||||
ProcessingResult::RawJson(json_info)
|
||||
}
|
||||
_ => {
|
||||
tracing::warn!("Unexpected QueryResult variant received");
|
||||
ProcessingResult::RawJson(r#"{"error":"Unexpected result format from Snowflake"}"#.to_string())
|
||||
}
|
||||
_ => Vec::new(),
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!("There was an issue while fetching the tables: {}", e);
|
||||
|
@ -1777,7 +1850,7 @@ pub async fn snowflake_query(
|
|||
}
|
||||
}
|
||||
|
||||
Ok(rows)
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
Loading…
Reference in New Issue