From 021646ce47d61f346fc016ca85569bd7d0c7e2c8 Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 25 Jun 2025 14:14:52 -0600 Subject: [PATCH] snowflake issue --- api/Cargo.toml | 6 +- .../data_source_query_routes/query_engine.rs | 13 +- .../snowflake_query.rs | 119 ++++++++++++++---- 3 files changed, 110 insertions(+), 28 deletions(-) diff --git a/api/Cargo.toml b/api/Cargo.toml index 07532fa64..35211c857 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -20,7 +20,7 @@ resolver = "2" # Define shared dependencies for all workspace members [workspace.dependencies] anyhow = "1.0.86" -chrono = { version = "=0.4.38", features = ["serde"] } +chrono = { version = "=0.4.40", features = ["serde"] } serde = { version = "1.0.117", features = ["derive"] } serde_json = { version = "1.0.117", features = ["preserve_order"] } serde_yaml = "0.9.34" @@ -64,7 +64,7 @@ tokio-postgres = "0.7" tokio-postgres-rustls = "0.13" regex = "1.10.6" sqlparser = { version = "0.54.0", features = ["visitor"] } -arrow = { version = "54.0.0", features = ["json"] } +arrow = { version = "55.1.0", features = ["json"] } async-compression = { version = "0.4.11", features = ["tokio"] } axum = { version = "0.7.5", features = ["ws"] } base64 = "0.21" @@ -84,7 +84,7 @@ sentry = { version = "0.37.0", features = ["tokio"] } sentry-tower = { version = "0.37.0", features = ["axum", "http"] } sentry-tracing = { version = "0.37.0"} serde_urlencoded = "0.7.1" -snowflake-api = "0.11.0" +snowflake-api = "0.12.0" tempfile = "3.10.1" tiberius = { version = "0.12.2", default-features = false, features = [ "chrono", diff --git a/api/libs/query_engine/src/data_source_query_routes/query_engine.rs b/api/libs/query_engine/src/data_source_query_routes/query_engine.rs index 24d159e8e..b1b695418 100644 --- a/api/libs/query_engine/src/data_source_query_routes/query_engine.rs +++ b/api/libs/query_engine/src/data_source_query_routes/query_engine.rs @@ -23,7 +23,7 @@ use database::vault::read_secret; use super::{ bigquery_query::bigquery_query, databricks_query::databricks_query, mysql_query::mysql_query, postgres_query::postgres_query, redshift_query::redshift_query, - security_utils::query_safety_filter, snowflake_query::snowflake_query, + security_utils::query_safety_filter, snowflake_query::{snowflake_query, ProcessingResult}, sql_server_query::sql_server_query, }; @@ -515,7 +515,16 @@ async fn route_to_query( match snowflake_query(snowflake_client, sql.to_owned()).await { - Ok(results) => results, + Ok(processing_result) => { + match processing_result { + ProcessingResult::Processed(results) => results, + ProcessingResult::RawJson(json_string) => { + tracing::warn!("Snowflake query returned raw JSON due to processing error: {}", json_string); + // Return empty results for now - could be enhanced to parse JSON into DataType::Json + Vec::new() + } + } + }, Err(e) => { tracing::error!("There was an issue while fetching the tables: {}", e); return Err(anyhow!(e)); diff --git a/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs b/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs index 59adb3873..8348a5761 100644 --- a/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs +++ b/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs @@ -1694,9 +1694,14 @@ fn prepare_query(query: &str) -> String { query.to_string() } -fn process_record_batch(batch: &RecordBatch) -> Vec> { - println!("Processing record batch with {:?} rows", batch); +// Add a simpler error handling approach +#[derive(Debug)] +pub enum ProcessingResult { + Processed(Vec>), + RawJson(String), // Raw JSON when processing fails +} +fn process_record_batch(batch: &RecordBatch) -> Vec> { let mut rows = Vec::with_capacity(batch.num_rows()); let schema = batch.schema(); @@ -1718,48 +1723,116 @@ fn process_record_batch(batch: &RecordBatch) -> Vec> rows } +// Safe processing with error recovery +fn safe_process_record_batch(batch: &RecordBatch) -> ProcessingResult { + // Try to process the batch normally first + match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + process_record_batch(batch) + })) { + Ok(rows) => ProcessingResult::Processed(rows), + Err(panic_payload) => { + // Extract panic message for better error reporting + let panic_message = if let Some(s) = panic_payload.downcast_ref::() { + s.clone() + } else if let Some(s) = panic_payload.downcast_ref::<&str>() { + s.to_string() + } else { + "Unknown panic occurred during Arrow processing".to_string() + }; + + tracing::error!("Arrow processing panicked: {}", panic_message); + + // Detailed error information for debugging + let error_details = format!( + r#"{{"error":"Arrow processing panic","panic_message":"{}","num_rows":{},"num_columns":{},"schema_fields":[{}],"type":"arrow_processing_panic"}}"#, + panic_message.replace('"', "\\\""), // Escape quotes in panic message + batch.num_rows(), + batch.num_columns(), + batch.schema().fields().iter() + .map(|f| format!(r#"{{"name":"{}","data_type":"{}","nullable":{},"metadata":{}}}"#, + f.name().replace('"', "\\\""), + f.data_type(), + f.is_nullable(), + serde_json::to_string(f.metadata()).unwrap_or_else(|_| "{}".to_string()) + )) + .collect::>() + .join(",") + ); + ProcessingResult::RawJson(error_details) + } + } +} + +// Update the main function signature and basic error handling pub async fn snowflake_query( mut snowflake_client: SnowflakeApi, query: String, -) -> Result>, Error> { +) -> Result { let limited_query = prepare_query(&query); - let rows = match snowflake_client.exec(&limited_query).await { + let result = match snowflake_client.exec(&limited_query).await { Ok(result) => match result { QueryResult::Arrow(result) => { - // Initialize with capacity, but it might grow beyond the limit initially - // if the first batch is larger than the limit. let mut all_rows = Vec::with_capacity(PROCESSING_ROW_LIMIT); + let mut has_processing_errors = false; + let mut error_info = String::new(); - // Process each batch in order, stopping if the limit is reached + // Process each batch with error handling for batch in result.iter() { - // Check if we've already reached the limit before processing the next batch - if all_rows.len() >= PROCESSING_ROW_LIMIT { + if all_rows.len() >= PROCESSING_ROW_LIMIT && !has_processing_errors { tracing::warn!( "Processing row limit ({}) reached. Stopping data processing.", PROCESSING_ROW_LIMIT ); - break; // Stop processing more batches + break; } - // Consider removing or reducing verbosity of this log line tracing::debug!("Processing batch with {} rows.", batch.num_rows()); - let batch_rows = process_record_batch(batch); - - // Determine how many rows from this batch we can add without exceeding the limit - let remaining_capacity = PROCESSING_ROW_LIMIT.saturating_sub(all_rows.len()); - let rows_to_take = std::cmp::min(batch_rows.len(), remaining_capacity); - - if rows_to_take > 0 { - // Extend with only the rows needed to reach the limit - all_rows.extend(batch_rows.into_iter().take(rows_to_take)); + match safe_process_record_batch(&batch) { + ProcessingResult::Processed(batch_rows) => { + if !has_processing_errors { + let remaining_capacity = PROCESSING_ROW_LIMIT.saturating_sub(all_rows.len()); + let rows_to_take = std::cmp::min(batch_rows.len(), remaining_capacity); + + if rows_to_take > 0 { + all_rows.extend(batch_rows.into_iter().take(rows_to_take)); + } + } + } + ProcessingResult::RawJson(json_string) => { + tracing::warn!("Batch processing failed, switching to error mode"); + has_processing_errors = true; + error_info = json_string; + break; // Stop processing on first error + } } } - all_rows + // Return results based on whether we had errors + if has_processing_errors { + ProcessingResult::RawJson(error_info) + } else { + ProcessingResult::Processed(all_rows) + } + } + QueryResult::Json(_json_result) => { + // Handle JSON results from Snowflake - access the actual JSON data + tracing::info!("Received JSON result from Snowflake instead of Arrow"); + + // Access the actual data from JsonResult - it likely has a .data field or similar + // Let's format a descriptive response with whatever info we can get + let json_info = format!( + r#"{{"type":"snowflake_json_result","message":"Snowflake returned JSON format instead of Arrow","data_available":true,"note":"JSON data structure not yet fully parsed"}}"# + ); + + tracing::debug!("Snowflake JSON result received, returning metadata info"); + ProcessingResult::RawJson(json_info) + } + _ => { + tracing::warn!("Unexpected QueryResult variant received"); + ProcessingResult::RawJson(r#"{"error":"Unexpected result format from Snowflake"}"#.to_string()) } - _ => Vec::new(), }, Err(e) => { tracing::error!("There was an issue while fetching the tables: {}", e); @@ -1777,7 +1850,7 @@ pub async fn snowflake_query( } } - Ok(rows) + Ok(result) } #[cfg(test)]