diff --git a/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs b/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs index 229cb9be5..cd9a14837 100644 --- a/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs +++ b/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs @@ -1,4 +1,6 @@ -use arrow::array::{Array, AsArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampSecondArray}; +use arrow::array::{ + Array, AsArray, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampSecondArray, +}; use arrow::array::{ BinaryArray, BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, @@ -20,7 +22,7 @@ use std::sync::Arc; use crate::data_types::DataType; // ------------------------- -// String & JSON Processing +// String & JSON Processing // ------------------------- fn process_string_value(value: String) -> String { @@ -105,7 +107,7 @@ fn handle_snowflake_timestamp_struct( } else { fraction.value(row_idx) }; - + // Important: Check if epoch might be in milliseconds instead of seconds // If the epoch value is larger than typical Unix timestamps (e.g., > 50 years worth of seconds) // it's likely in milliseconds or microseconds @@ -113,7 +115,10 @@ fn handle_snowflake_timestamp_struct( // Likely milliseconds or microseconds - determine which if epoch_value > 5_000_000_000_000 { // Microseconds - (epoch_value / 1_000_000, (epoch_value % 1_000_000 * 1000) as u32) + ( + epoch_value / 1_000_000, + (epoch_value % 1_000_000 * 1000) as u32, + ) } else { // Milliseconds (epoch_value / 1000, (epoch_value % 1000 * 1_000_000) as u32) @@ -123,7 +128,7 @@ fn handle_snowflake_timestamp_struct( // For scale 3 (milliseconds), multiply by 10^6 to get nanoseconds (epoch_value, (fraction_value as u32) * 1_000_000) }; - + match parse_snowflake_timestamp(adjusted_epoch, adjusted_nanos) { Ok(dt) => Some(dt), Err(e) => { @@ -140,7 +145,12 @@ fn handle_snowflake_timestamp_struct( // Decimal Handling // ------------------------- -fn format_decimal_as_string(abs_val_str: &str, scale: i8, is_negative: bool, val_str: &str) -> DataType { +fn format_decimal_as_string( + abs_val_str: &str, + scale: i8, + is_negative: bool, + val_str: &str, +) -> DataType { let decimal_str = if scale > 0 { if abs_val_str.len() <= scale as usize { // Need to pad with zeros @@ -150,7 +160,11 @@ fn format_decimal_as_string(abs_val_str: &str, scale: i8, is_negative: bool, val result.push('0'); } result.push_str(abs_val_str); - if is_negative { format!("-{}", result) } else { result } + if is_negative { + format!("-{}", result) + } else { + result + } } else { // Insert decimal point let decimal_pos = abs_val_str.len() - scale as usize; @@ -167,11 +181,15 @@ fn format_decimal_as_string(abs_val_str: &str, scale: i8, is_negative: bool, val for _ in 0..(-scale as usize) { result.push('0'); } - if is_negative { format!("-{}", result) } else { result } + if is_negative { + format!("-{}", result) + } else { + result + } } else { val_str.to_string() }; - + DataType::Text(Some(decimal_str)) } @@ -181,10 +199,10 @@ fn handle_decimal128_array(array: &Decimal128Array, row_idx: usize, scale: i8) - } let val = array.value(row_idx); - + // Convert to string first to avoid immediate precision loss let val_str = val.to_string(); - + // Special case for very small numbers with high precision if scale > 7 && val.abs() < 1000 { // Use text for very small decimals with many decimal places @@ -192,7 +210,7 @@ fn handle_decimal128_array(array: &Decimal128Array, row_idx: usize, scale: i8) - let abs_val_str = if is_negative { &val_str[1..] } else { &val_str }; return format_decimal_as_string(abs_val_str, scale, is_negative, &val_str); } - + // Try parsing as f64 only for values within safe range // 2^53 is approximately the largest integer precisely representable in f64 if val.abs() < 9_007_199_254_740_992_i128 { @@ -209,11 +227,11 @@ fn handle_decimal128_array(array: &Decimal128Array, row_idx: usize, scale: i8) - // For larger values, use string formatting let is_negative = val < 0; let abs_val_str = if is_negative { &val_str[1..] } else { &val_str }; - + // Debug the string formatting for large numbers with scale - // This test is failing because "9007199254740992" with scale 4 + // This test is failing because "9007199254740992" with scale 4 // should become "900719925474.0992" - + format_decimal_as_string(abs_val_str, scale, is_negative, &val_str) } } @@ -225,16 +243,17 @@ fn handle_decimal256_array(array: &Decimal256Array, row_idx: usize, scale: i8) - let val = array.value(row_idx); let val_str = val.to_string(); - + // Special case for very large values with negative scale - always use text if scale < -5 { let is_negative = val_str.starts_with('-'); let abs_val_str = if is_negative { &val_str[1..] } else { &val_str }; return format_decimal_as_string(abs_val_str, scale, is_negative, &val_str); } - + // Try to determine if value is within safe f64 range (< 2^53) - if val_str.len() < 16 { // Conservatively less than 16 digits + if val_str.len() < 16 { + // Conservatively less than 16 digits if let Ok(unscaled_val) = val_str.parse::() { // Only use f64 if it's within the safe integer range if unscaled_val.abs() < 9_007_199_254_740_992_f64 { @@ -249,7 +268,7 @@ fn handle_decimal256_array(array: &Decimal256Array, row_idx: usize, scale: i8) - } } } - + // For all other cases, use string formatting for precision let is_negative = val_str.starts_with('-'); let abs_val_str = if is_negative { &val_str[1..] } else { &val_str }; @@ -268,13 +287,18 @@ fn handle_boolean_array(array: &BooleanArray, row_idx: usize) -> DataType { } } -fn handle_int8_array(array: &Int8Array, row_idx: usize, scale_str: Option<&str>, field: &Field) -> DataType { +fn handle_int8_array( + array: &Int8Array, + row_idx: usize, + scale_str: Option<&str>, + field: &Field, +) -> DataType { if array.is_null(row_idx) { return DataType::Null; } - + let val = array.value(row_idx); - + // Check if this is actually a timestamp based on metadata if let Some(logical_type) = field.metadata().get("logicalType") { if logical_type.contains("TIMESTAMP") { @@ -286,15 +310,15 @@ fn handle_int8_array(array: &Int8Array, row_idx: usize, scale_str: Option<&str>, } else { 0 }; - + // Convert based on scale let (secs, nanos) = match scale { 3 => (val_i64 / 1000, ((val_i64 % 1000) * 1_000_000) as u32), // milliseconds 6 => (val_i64 / 1_000_000, ((val_i64 % 1_000_000) * 1000) as u32), // microseconds 9 => (val_i64 / 1_000_000_000, (val_i64 % 1_000_000_000) as u32), // nanoseconds - _ => (val_i64, 0), // seconds or unknown + _ => (val_i64, 0), // seconds or unknown }; - + match Utc.timestamp_opt(secs, nanos) { LocalResult::Single(dt) => { if logical_type.contains("_TZ") { @@ -302,15 +326,19 @@ fn handle_int8_array(array: &Int8Array, row_idx: usize, scale_str: Option<&str>, } else { return DataType::Timestamp(Some(dt.naive_utc())); } - }, + } _ => { - tracing::error!("Failed to create DateTime from timestamp: {} {}", secs, nanos); + tracing::error!( + "Failed to create DateTime from timestamp: {} {}", + secs, + nanos + ); return DataType::Null; } } } } - + // Check if this is actually a decimal with scale if let Some(scale_str) = scale_str { if let Ok(scale) = scale_str.parse::() { @@ -326,18 +354,23 @@ fn handle_int8_array(array: &Int8Array, row_idx: usize, scale_str: Option<&str>, } } } - + // Default case for regular integer DataType::Int2(Some(val as i16)) } -fn handle_int16_array(array: &Int16Array, row_idx: usize, scale_str: Option<&str>, field: &Field) -> DataType { +fn handle_int16_array( + array: &Int16Array, + row_idx: usize, + scale_str: Option<&str>, + field: &Field, +) -> DataType { if array.is_null(row_idx) { return DataType::Null; } - + let val = array.value(row_idx); - + // Check if this is actually a timestamp based on metadata if let Some(logical_type) = field.metadata().get("logicalType") { if logical_type.contains("TIMESTAMP") { @@ -349,15 +382,15 @@ fn handle_int16_array(array: &Int16Array, row_idx: usize, scale_str: Option<&str } else { 0 }; - + // Convert based on scale let (secs, nanos) = match scale { 3 => (val_i64 / 1000, ((val_i64 % 1000) * 1_000_000) as u32), // milliseconds 6 => (val_i64 / 1_000_000, ((val_i64 % 1_000_000) * 1000) as u32), // microseconds 9 => (val_i64 / 1_000_000_000, (val_i64 % 1_000_000_000) as u32), // nanoseconds - _ => (val_i64, 0), // seconds or unknown + _ => (val_i64, 0), // seconds or unknown }; - + match Utc.timestamp_opt(secs, nanos) { LocalResult::Single(dt) => { if logical_type.contains("_TZ") { @@ -365,15 +398,19 @@ fn handle_int16_array(array: &Int16Array, row_idx: usize, scale_str: Option<&str } else { return DataType::Timestamp(Some(dt.naive_utc())); } - }, + } _ => { - tracing::error!("Failed to create DateTime from timestamp: {} {}", secs, nanos); + tracing::error!( + "Failed to create DateTime from timestamp: {} {}", + secs, + nanos + ); return DataType::Null; } } } } - + // Check if this is actually a decimal with scale if let Some(scale_str) = scale_str { if let Ok(scale) = scale_str.parse::() { @@ -389,18 +426,23 @@ fn handle_int16_array(array: &Int16Array, row_idx: usize, scale_str: Option<&str } } } - + // Default case for regular integer DataType::Int2(Some(val)) } -fn handle_int32_array(array: &Int32Array, row_idx: usize, scale_str: Option<&str>, field: &Field) -> DataType { +fn handle_int32_array( + array: &Int32Array, + row_idx: usize, + scale_str: Option<&str>, + field: &Field, +) -> DataType { if array.is_null(row_idx) { return DataType::Null; } - + let val = array.value(row_idx); - + // Check if this is actually a timestamp based on metadata if let Some(logical_type) = field.metadata().get("logicalType") { if logical_type.contains("TIMESTAMP") { @@ -412,15 +454,15 @@ fn handle_int32_array(array: &Int32Array, row_idx: usize, scale_str: Option<&str } else { 0 }; - + // Convert based on scale let (secs, nanos) = match scale { 3 => (val_i64 / 1000, ((val_i64 % 1000) * 1_000_000) as u32), // milliseconds 6 => (val_i64 / 1_000_000, ((val_i64 % 1_000_000) * 1000) as u32), // microseconds 9 => (val_i64 / 1_000_000_000, (val_i64 % 1_000_000_000) as u32), // nanoseconds - _ => (val_i64, 0), // seconds or unknown + _ => (val_i64, 0), // seconds or unknown }; - + match Utc.timestamp_opt(secs, nanos) { LocalResult::Single(dt) => { if logical_type.contains("_TZ") { @@ -428,15 +470,19 @@ fn handle_int32_array(array: &Int32Array, row_idx: usize, scale_str: Option<&str } else { return DataType::Timestamp(Some(dt.naive_utc())); } - }, + } _ => { - tracing::error!("Failed to create DateTime from timestamp: {} {}", secs, nanos); + tracing::error!( + "Failed to create DateTime from timestamp: {} {}", + secs, + nanos + ); return DataType::Null; } } } } - + // Check if this is actually a decimal with scale if let Some(scale_str) = scale_str { if let Ok(scale) = scale_str.parse::() { @@ -452,18 +498,23 @@ fn handle_int32_array(array: &Int32Array, row_idx: usize, scale_str: Option<&str } } } - + // Default case for regular integer DataType::Int4(Some(val)) } -fn handle_int64_array(array: &Int64Array, row_idx: usize, scale_str: Option<&str>, field: &Field) -> DataType { +fn handle_int64_array( + array: &Int64Array, + row_idx: usize, + scale_str: Option<&str>, + field: &Field, +) -> DataType { if array.is_null(row_idx) { return DataType::Null; } - + let val = array.value(row_idx); - + // Check if this is actually a timestamp based on metadata if let Some(logical_type) = field.metadata().get("logicalType") { if logical_type.contains("TIMESTAMP") { @@ -473,15 +524,15 @@ fn handle_int64_array(array: &Int64Array, row_idx: usize, scale_str: Option<&str } else { 0 }; - + // Convert the value based on scale (usually 3 for milliseconds) let (secs, nanos) = match scale { 3 => (val / 1000, ((val % 1000) * 1_000_000) as u32), // milliseconds 6 => (val / 1_000_000, ((val % 1_000_000) * 1000) as u32), // microseconds 9 => (val / 1_000_000_000, (val % 1_000_000_000) as u32), // nanoseconds - _ => (val, 0), // seconds or unknown + _ => (val, 0), // seconds or unknown }; - + // Create the timestamp match Utc.timestamp_opt(secs, nanos) { LocalResult::Single(dt) => { @@ -491,15 +542,19 @@ fn handle_int64_array(array: &Int64Array, row_idx: usize, scale_str: Option<&str } else { return DataType::Timestamp(Some(dt.naive_utc())); } - }, + } _ => { - tracing::error!("Failed to create DateTime from timestamp: {} {}", secs, nanos); + tracing::error!( + "Failed to create DateTime from timestamp: {} {}", + secs, + nanos + ); return DataType::Null; } } } } - + // Check if this is actually a decimal with scale if let Some(scale_str) = scale_str { if let Ok(scale) = scale_str.parse::() { @@ -515,18 +570,23 @@ fn handle_int64_array(array: &Int64Array, row_idx: usize, scale_str: Option<&str } } } - + // Default case for regular integer DataType::Int8(Some(val)) } -fn handle_uint8_array(array: &UInt8Array, row_idx: usize, scale_str: Option<&str>, field: &Field) -> DataType { +fn handle_uint8_array( + array: &UInt8Array, + row_idx: usize, + scale_str: Option<&str>, + field: &Field, +) -> DataType { if array.is_null(row_idx) { return DataType::Null; } - + let val = array.value(row_idx); - + // Check if this is actually a timestamp based on metadata if let Some(logical_type) = field.metadata().get("logicalType") { if logical_type.contains("TIMESTAMP") { @@ -538,15 +598,15 @@ fn handle_uint8_array(array: &UInt8Array, row_idx: usize, scale_str: Option<&str } else { 0 }; - + // Convert based on scale let (secs, nanos) = match scale { 3 => (val_i64 / 1000, ((val_i64 % 1000) * 1_000_000) as u32), // milliseconds 6 => (val_i64 / 1_000_000, ((val_i64 % 1_000_000) * 1000) as u32), // microseconds 9 => (val_i64 / 1_000_000_000, (val_i64 % 1_000_000_000) as u32), // nanoseconds - _ => (val_i64, 0), // seconds or unknown + _ => (val_i64, 0), // seconds or unknown }; - + match Utc.timestamp_opt(secs, nanos) { LocalResult::Single(dt) => { if logical_type.contains("_TZ") { @@ -554,15 +614,19 @@ fn handle_uint8_array(array: &UInt8Array, row_idx: usize, scale_str: Option<&str } else { return DataType::Timestamp(Some(dt.naive_utc())); } - }, + } _ => { - tracing::error!("Failed to create DateTime from timestamp: {} {}", secs, nanos); + tracing::error!( + "Failed to create DateTime from timestamp: {} {}", + secs, + nanos + ); return DataType::Null; } } } } - + // Check if this is actually a decimal with scale if let Some(scale_str) = scale_str { if let Ok(scale) = scale_str.parse::() { @@ -578,18 +642,23 @@ fn handle_uint8_array(array: &UInt8Array, row_idx: usize, scale_str: Option<&str } } } - + // Default case for regular integer DataType::Int2(Some(val as i16)) } -fn handle_uint16_array(array: &UInt16Array, row_idx: usize, scale_str: Option<&str>, field: &Field) -> DataType { +fn handle_uint16_array( + array: &UInt16Array, + row_idx: usize, + scale_str: Option<&str>, + field: &Field, +) -> DataType { if array.is_null(row_idx) { return DataType::Null; } - + let val = array.value(row_idx); - + // Check if this is actually a timestamp based on metadata if let Some(logical_type) = field.metadata().get("logicalType") { if logical_type.contains("TIMESTAMP") { @@ -601,15 +670,15 @@ fn handle_uint16_array(array: &UInt16Array, row_idx: usize, scale_str: Option<&s } else { 0 }; - + // Convert based on scale let (secs, nanos) = match scale { 3 => (val_i64 / 1000, ((val_i64 % 1000) * 1_000_000) as u32), // milliseconds 6 => (val_i64 / 1_000_000, ((val_i64 % 1_000_000) * 1000) as u32), // microseconds 9 => (val_i64 / 1_000_000_000, (val_i64 % 1_000_000_000) as u32), // nanoseconds - _ => (val_i64, 0), // seconds or unknown + _ => (val_i64, 0), // seconds or unknown }; - + match Utc.timestamp_opt(secs, nanos) { LocalResult::Single(dt) => { if logical_type.contains("_TZ") { @@ -617,15 +686,19 @@ fn handle_uint16_array(array: &UInt16Array, row_idx: usize, scale_str: Option<&s } else { return DataType::Timestamp(Some(dt.naive_utc())); } - }, + } _ => { - tracing::error!("Failed to create DateTime from timestamp: {} {}", secs, nanos); + tracing::error!( + "Failed to create DateTime from timestamp: {} {}", + secs, + nanos + ); return DataType::Null; } } } } - + // Check if this is actually a decimal with scale if let Some(scale_str) = scale_str { if let Ok(scale) = scale_str.parse::() { @@ -641,18 +714,23 @@ fn handle_uint16_array(array: &UInt16Array, row_idx: usize, scale_str: Option<&s } } } - + // Default case for regular integer DataType::Int4(Some(val as i32)) } -fn handle_uint32_array(array: &UInt32Array, row_idx: usize, scale_str: Option<&str>, field: &Field) -> DataType { +fn handle_uint32_array( + array: &UInt32Array, + row_idx: usize, + scale_str: Option<&str>, + field: &Field, +) -> DataType { if array.is_null(row_idx) { return DataType::Null; } - + let val = array.value(row_idx); - + // Check if this is actually a timestamp based on metadata if let Some(logical_type) = field.metadata().get("logicalType") { if logical_type.contains("TIMESTAMP") { @@ -664,15 +742,15 @@ fn handle_uint32_array(array: &UInt32Array, row_idx: usize, scale_str: Option<&s } else { 0 }; - + // Convert based on scale let (secs, nanos) = match scale { 3 => (val_i64 / 1000, ((val_i64 % 1000) * 1_000_000) as u32), // milliseconds 6 => (val_i64 / 1_000_000, ((val_i64 % 1_000_000) * 1000) as u32), // microseconds 9 => (val_i64 / 1_000_000_000, (val_i64 % 1_000_000_000) as u32), // nanoseconds - _ => (val_i64, 0), // seconds or unknown + _ => (val_i64, 0), // seconds or unknown }; - + match Utc.timestamp_opt(secs, nanos) { LocalResult::Single(dt) => { if logical_type.contains("_TZ") { @@ -680,15 +758,19 @@ fn handle_uint32_array(array: &UInt32Array, row_idx: usize, scale_str: Option<&s } else { return DataType::Timestamp(Some(dt.naive_utc())); } - }, + } _ => { - tracing::error!("Failed to create DateTime from timestamp: {} {}", secs, nanos); + tracing::error!( + "Failed to create DateTime from timestamp: {} {}", + secs, + nanos + ); return DataType::Null; } } } } - + // Check if this is actually a decimal with scale if let Some(scale_str) = scale_str { if let Ok(scale) = scale_str.parse::() { @@ -704,18 +786,23 @@ fn handle_uint32_array(array: &UInt32Array, row_idx: usize, scale_str: Option<&s } } } - + // Default case for regular integer DataType::Int8(Some(val as i64)) } -fn handle_uint64_array(array: &UInt64Array, row_idx: usize, scale_str: Option<&str>, field: &Field) -> DataType { +fn handle_uint64_array( + array: &UInt64Array, + row_idx: usize, + scale_str: Option<&str>, + field: &Field, +) -> DataType { if array.is_null(row_idx) { return DataType::Null; } - + let val = array.value(row_idx); - + // Check if this is actually a timestamp based on metadata if let Some(logical_type) = field.metadata().get("logicalType") { if logical_type.contains("TIMESTAMP") { @@ -727,15 +814,15 @@ fn handle_uint64_array(array: &UInt64Array, row_idx: usize, scale_str: Option<&s } else { 0 }; - + // Convert based on scale let (secs, nanos) = match scale { 3 => (val_i64 / 1000, ((val_i64 % 1000) * 1_000_000) as u32), // milliseconds 6 => (val_i64 / 1_000_000, ((val_i64 % 1_000_000) * 1000) as u32), // microseconds 9 => (val_i64 / 1_000_000_000, (val_i64 % 1_000_000_000) as u32), // nanoseconds - _ => (val_i64, 0), // seconds or unknown + _ => (val_i64, 0), // seconds or unknown }; - + match Utc.timestamp_opt(secs, nanos) { LocalResult::Single(dt) => { if logical_type.contains("_TZ") { @@ -743,15 +830,19 @@ fn handle_uint64_array(array: &UInt64Array, row_idx: usize, scale_str: Option<&s } else { return DataType::Timestamp(Some(dt.naive_utc())); } - }, + } _ => { - tracing::error!("Failed to create DateTime from timestamp: {} {}", secs, nanos); + tracing::error!( + "Failed to create DateTime from timestamp: {} {}", + secs, + nanos + ); return DataType::Null; } } } } - + // Check if this is actually a decimal with scale if let Some(scale_str) = scale_str { if let Ok(scale) = scale_str.parse::() { @@ -767,18 +858,23 @@ fn handle_uint64_array(array: &UInt64Array, row_idx: usize, scale_str: Option<&s } } } - + // Default case for regular integer DataType::Int8(Some(val as i64)) } -fn handle_float32_array(array: &Float32Array, row_idx: usize, scale_str: Option<&str>, field: &Field) -> DataType { +fn handle_float32_array( + array: &Float32Array, + row_idx: usize, + scale_str: Option<&str>, + field: &Field, +) -> DataType { if array.is_null(row_idx) { return DataType::Null; } - + let val = array.value(row_idx); - + // Check if this is actually a timestamp based on metadata if let Some(logical_type) = field.metadata().get("logicalType") { if logical_type.contains("TIMESTAMP") { @@ -790,15 +886,15 @@ fn handle_float32_array(array: &Float32Array, row_idx: usize, scale_str: Option< } else { 0 }; - + // Convert based on scale let (secs, nanos) = match scale { 3 => (val_i64 / 1000, ((val_i64 % 1000) * 1_000_000) as u32), // milliseconds 6 => (val_i64 / 1_000_000, ((val_i64 % 1_000_000) * 1000) as u32), // microseconds 9 => (val_i64 / 1_000_000_000, (val_i64 % 1_000_000_000) as u32), // nanoseconds - _ => (val_i64, 0), // seconds or unknown + _ => (val_i64, 0), // seconds or unknown }; - + match Utc.timestamp_opt(secs, nanos) { LocalResult::Single(dt) => { if logical_type.contains("_TZ") { @@ -806,15 +902,19 @@ fn handle_float32_array(array: &Float32Array, row_idx: usize, scale_str: Option< } else { return DataType::Timestamp(Some(dt.naive_utc())); } - }, + } _ => { - tracing::error!("Failed to create DateTime from timestamp: {} {}", secs, nanos); + tracing::error!( + "Failed to create DateTime from timestamp: {} {}", + secs, + nanos + ); return DataType::Null; } } } } - + // Check if this is actually a decimal with scale if let Some(scale_str) = scale_str { if let Ok(scale) = scale_str.parse::() { @@ -829,18 +929,23 @@ fn handle_float32_array(array: &Float32Array, row_idx: usize, scale_str: Option< } } } - + // Default case DataType::Float4(Some(val)) } -fn handle_float64_array(array: &Float64Array, row_idx: usize, scale_str: Option<&str>, field: &Field) -> DataType { +fn handle_float64_array( + array: &Float64Array, + row_idx: usize, + scale_str: Option<&str>, + field: &Field, +) -> DataType { if array.is_null(row_idx) { return DataType::Null; } - + let val = array.value(row_idx); - + // Check if this is actually a timestamp based on metadata if let Some(logical_type) = field.metadata().get("logicalType") { if logical_type.contains("TIMESTAMP") { @@ -852,15 +957,15 @@ fn handle_float64_array(array: &Float64Array, row_idx: usize, scale_str: Option< } else { 0 }; - + // Convert based on scale let (secs, nanos) = match scale { 3 => (val_i64 / 1000, ((val_i64 % 1000) * 1_000_000) as u32), // milliseconds 6 => (val_i64 / 1_000_000, ((val_i64 % 1_000_000) * 1000) as u32), // microseconds 9 => (val_i64 / 1_000_000_000, (val_i64 % 1_000_000_000) as u32), // nanoseconds - _ => (val_i64, 0), // seconds or unknown + _ => (val_i64, 0), // seconds or unknown }; - + match Utc.timestamp_opt(secs, nanos) { LocalResult::Single(dt) => { if logical_type.contains("_TZ") { @@ -868,15 +973,19 @@ fn handle_float64_array(array: &Float64Array, row_idx: usize, scale_str: Option< } else { return DataType::Timestamp(Some(dt.naive_utc())); } - }, + } _ => { - tracing::error!("Failed to create DateTime from timestamp: {} {}", secs, nanos); + tracing::error!( + "Failed to create DateTime from timestamp: {} {}", + secs, + nanos + ); return DataType::Null; } } } } - + // Check if this is actually a decimal with scale if let Some(scale_str) = scale_str { if let Ok(scale) = scale_str.parse::() { @@ -891,7 +1000,7 @@ fn handle_float64_array(array: &Float64Array, row_idx: usize, scale_str: Option< } } } - + // Default case DataType::Float8(Some(val)) } @@ -974,7 +1083,7 @@ fn handle_timestamp_array( tz: Option<&std::sync::Arc>, ) -> DataType { // println!("Debug: handle_timestamp_array called with tz: {:?}", tz); - + // Try to downcast to various timestamp array types based on time unit let value = match array.data_type() { ArrowDataType::Timestamp(TimeUnit::Second, _) => { @@ -986,7 +1095,7 @@ fn handle_timestamp_array( } else { return DataType::Null; } - }, + } ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => { if let Some(array) = array.as_any().downcast_ref::() { if array.is_null(row_idx) { @@ -996,7 +1105,7 @@ fn handle_timestamp_array( } else { return DataType::Null; } - }, + } ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => { if let Some(array) = array.as_any().downcast_ref::() { if array.is_null(row_idx) { @@ -1006,7 +1115,7 @@ fn handle_timestamp_array( } else { return DataType::Null; } - }, + } ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => { if let Some(array) = array.as_any().downcast_ref::() { if array.is_null(row_idx) { @@ -1016,10 +1125,10 @@ fn handle_timestamp_array( } else { return DataType::Null; } - }, + } _ => return DataType::Null, }; - + // Convert the value to the appropriate seconds and nanoseconds let (secs, subsec_nanos) = match unit { TimeUnit::Second => (value, 0), @@ -1033,7 +1142,8 @@ fn handle_timestamp_array( LocalResult::Single(dt) => { // Check if timezone is present // println!("Debug: Timezone check - tz is_some: {}", tz.is_some()); - if let Some(_tz_val) = tz { // Use _tz_val as it's not needed + if let Some(_tz_val) = tz { + // Use _tz_val as it's not needed // println!("Debug: Timezone value: {}", _tz_val); let result = DataType::Timestamptz(Some(dt)); // println!("Debug: Returning Timestamptz: {:?}", result); @@ -1044,9 +1154,13 @@ fn handle_timestamp_array( // println!("Debug: Returning Timestamp: {:?}", result); result } - }, + } _ => { - tracing::error!("Failed to create DateTime from timestamp: {} {}", secs, subsec_nanos); + tracing::error!( + "Failed to create DateTime from timestamp: {} {}", + secs, + subsec_nanos + ); DataType::Null } } @@ -1104,45 +1218,58 @@ fn handle_list_array(array: &arrow::array::ListArray, row_idx: usize) -> DataTyp } else { let values = array.value(row_idx); let mut result = Vec::new(); - + for i in 0..values.len() { if values.is_null(i) { continue; } - + let value = if let Some(num) = values.as_any().downcast_ref::() { Some(Value::Number(num.value(i).into())) } else if let Some(num) = values.as_any().downcast_ref::() { Some(Value::Number(num.value(i).into())) } else if let Some(str) = values.as_any().downcast_ref::() { - Some(Value::String(process_string_value(str.value(i).to_string()))) + Some(Value::String(process_string_value( + str.value(i).to_string(), + ))) } else { None }; - + if let Some(v) = value { result.push(v); } } - + DataType::Json(Some(process_json_value(Value::Array(result)))) } } -fn handle_struct_array(array: &arrow::array::StructArray, row_idx: usize, field: &Field) -> DataType { +fn handle_struct_array( + array: &arrow::array::StructArray, + row_idx: usize, + field: &Field, +) -> DataType { // Check if this is a Snowflake timestamp struct let fields = match field.data_type() { ArrowDataType::Struct(fields) => fields, _ => return DataType::Null, }; - - if fields.len() == 2 + + if fields.len() == 2 && fields.iter().any(|f| f.name() == "epoch") && fields.iter().any(|f| f.name() == "fraction") - && field.metadata().get("logicalType").map_or(false, |v| v.contains("TIMESTAMP")) + && field + .metadata() + .get("logicalType") + .map_or(false, |v| v.contains("TIMESTAMP")) { if let Some(dt) = handle_snowflake_timestamp_struct(array, row_idx) { - if field.metadata().get("logicalType").map_or(false, |v| v.contains("_TZ")) { + if field + .metadata() + .get("logicalType") + .map_or(false, |v| v.contains("_TZ")) + { DataType::Timestamptz(Some(dt)) } else { DataType::Timestamp(Some(dt.naive_utc())) @@ -1180,7 +1307,10 @@ fn handle_struct_array(array: &arrow::array::StructArray, row_idx: usize, field: } } -fn handle_dictionary_array(array: &arrow::array::DictionaryArray, row_idx: usize) -> DataType { +fn handle_dictionary_array( + array: &arrow::array::DictionaryArray, + row_idx: usize, +) -> DataType { if array.is_null(row_idx) { DataType::Null } else { @@ -1201,7 +1331,7 @@ fn handle_map_array(array: &dyn Array, row_idx: usize) -> DataType { } else { let entries = map_array.value(row_idx); let mut json_map = JsonMap::new(); - + // Process map entries for i in 0..entries.len() { if let (Some(key), Some(value)) = ( @@ -1219,7 +1349,7 @@ fn handle_map_array(array: &dyn Array, row_idx: usize) -> DataType { json_map.insert(key.to_string(), Value::Number(value.into())); } } - + DataType::Json(Some(process_json_value(Value::Object(json_map)))) } } @@ -1228,46 +1358,51 @@ fn handle_map_array(array: &dyn Array, row_idx: usize) -> DataType { // Main Converter // ------------------------- -fn convert_array_to_datatype(column: &arrow::array::ArrayRef, field: &Field, row_idx: usize) -> DataType { +fn convert_array_to_datatype( + column: &arrow::array::ArrayRef, + field: &Field, + row_idx: usize, +) -> DataType { let scale_str = field.metadata().get("scale"); - + match column.data_type() { ArrowDataType::Boolean => { let array = column.as_any().downcast_ref::().unwrap(); handle_boolean_array(array, row_idx) - }, + } ArrowDataType::Int8 => { let array = column.as_any().downcast_ref::().unwrap(); handle_int8_array(array, row_idx, scale_str.map(|s| s.as_str()), field) - }, + } ArrowDataType::Int16 => { let array = column.as_any().downcast_ref::().unwrap(); handle_int16_array(array, row_idx, scale_str.map(|s| s.as_str()), field) - }, + } ArrowDataType::Int32 => { let array = column.as_any().downcast_ref::().unwrap(); handle_int32_array(array, row_idx, scale_str.map(|s| s.as_str()), field) - }, + } ArrowDataType::Int64 => { let field_name = field.name(); // Get field name for logging - // println!("Debug: Processing Int64 field: {}", field_name); + // println!("Debug: Processing Int64 field: {}", field_name); // Check if this is actually a timestamp in disguise let logical_type = field.metadata().get("logicalType"); let scale_str = field.metadata().get("scale"); // Get scale_str here as well - // println!("Debug [{}]: logicalType={:?}, scale={:?}", field_name, logical_type, scale_str); + // println!("Debug [{}]: logicalType={:?}, scale={:?}", field_name, logical_type, scale_str); if logical_type.map_or(false, |t| t.contains("TIMESTAMP")) { // println!("Debug [{}]: Detected as timestamp.", field_name); // If it has a timestamp logical type, determine the time unit based on scale - let unit = match scale_str.map(|s| s.parse::().unwrap_or(3)) { // Default parse to 3 (ms) + let unit = match scale_str.map(|s| s.parse::().unwrap_or(3)) { + // Default parse to 3 (ms) Some(0) => TimeUnit::Second, Some(6) => TimeUnit::Microsecond, Some(9) => TimeUnit::Nanosecond, Some(3) | None | Some(_) => TimeUnit::Millisecond, // Default to millisecond }; // println!("Debug [{}]: Determined unit: {:?}", field_name, unit); - + // Check if there's timezone info let has_tz = logical_type.map_or(false, |t| t.contains("_TZ")); // println!("Debug [{}]: has_tz: {}", field_name, has_tz); @@ -1276,14 +1411,14 @@ fn convert_array_to_datatype(column: &arrow::array::ArrayRef, field: &Field, row } else { None }; - + // Process as timestamp if let Some(array) = column.as_any().downcast_ref::() { if array.is_null(row_idx) { // println!("Debug [{}]: Value is null at row_idx {}.", field_name, row_idx); return DataType::Null; } - + let value = array.value(row_idx); // println!("Debug [{}]: Raw value at row_idx {}: {}", field_name, row_idx, value); let (secs, subsec_nanos) = match unit { @@ -1293,7 +1428,7 @@ fn convert_array_to_datatype(column: &arrow::array::ArrayRef, field: &Field, row TimeUnit::Nanosecond => (value / 1_000_000_000, value % 1_000_000_000), }; // println!("Debug [{}]: Calculated secs={}, nanos={}", field_name, secs, subsec_nanos); - + match Utc.timestamp_opt(secs, subsec_nanos as u32) { LocalResult::Single(dt) => { // println!("Debug [{}]: Successfully created DateTime: {}", field_name, dt); @@ -1304,8 +1439,9 @@ fn convert_array_to_datatype(column: &arrow::array::ArrayRef, field: &Field, row // println!("Debug [{}]: Returning Timestamp.", field_name); DataType::Timestamp(Some(dt.naive_utc())) } - }, - LocalResult::None | LocalResult::Ambiguous(_, _) => { // Handle None and Ambiguous explicitly + } + LocalResult::None | LocalResult::Ambiguous(_, _) => { + // Handle None and Ambiguous explicitly tracing::error!("Failed to create DateTime (None or Ambiguous) from timestamp: secs={}, nanos={}", secs, subsec_nanos); // println!("Debug [{}]: Failed to create DateTime (None or Ambiguous) from timestamp: secs={}, nanos={}", field_name, secs, subsec_nanos); DataType::Null @@ -1316,111 +1452,108 @@ fn convert_array_to_datatype(column: &arrow::array::ArrayRef, field: &Field, row DataType::Null } } else { - // println!("Debug [{}]: Detected as regular Int64.", field_name); - // Regular Int64 processing + // Not a timestamp, so delegate to handle_int64_array which can handle scaling or default to Int8 if let Some(array) = column.as_any().downcast_ref::() { - if array.is_null(row_idx) { - // println!("Debug [{}]: Regular Int64 is null at row_idx {}.", field_name, row_idx); - return DataType::Null; - } - let value = array.value(row_idx); - // println!("Debug [{}]: Returning Int8 with value {}.", field_name, value); - DataType::Int8(Some(value)) + handle_int64_array(array, row_idx, scale_str.map(|s| s.as_str()), field) } else { - // println!("Debug [{}]: Failed to downcast regular Int64 to Int64Array.", field_name); + // println!("Debug [{}]: Failed to downcast Int64 for non-timestamp to Int64Array.", field_name); DataType::Null } } - }, + } ArrowDataType::UInt8 => { let array = column.as_any().downcast_ref::().unwrap(); handle_uint8_array(array, row_idx, scale_str.map(|s| s.as_str()), field) - }, + } ArrowDataType::UInt16 => { let array = column.as_any().downcast_ref::().unwrap(); handle_uint16_array(array, row_idx, scale_str.map(|s| s.as_str()), field) - }, + } ArrowDataType::UInt32 => { let array = column.as_any().downcast_ref::().unwrap(); handle_uint32_array(array, row_idx, scale_str.map(|s| s.as_str()), field) - }, + } ArrowDataType::UInt64 => { let array = column.as_any().downcast_ref::().unwrap(); handle_uint64_array(array, row_idx, scale_str.map(|s| s.as_str()), field) - }, + } ArrowDataType::Float32 => { let array = column.as_any().downcast_ref::().unwrap(); handle_float32_array(array, row_idx, scale_str.map(|s| s.as_str()), field) - }, + } ArrowDataType::Float64 => { let array = column.as_any().downcast_ref::().unwrap(); handle_float64_array(array, row_idx, scale_str.map(|s| s.as_str()), field) - }, + } ArrowDataType::Utf8 => { let array = column.as_any().downcast_ref::().unwrap(); handle_string_array(array, row_idx) - }, + } ArrowDataType::LargeUtf8 => { let array = column.as_any().downcast_ref::().unwrap(); handle_large_string_array(array, row_idx) - }, + } ArrowDataType::Binary => { let array = column.as_any().downcast_ref::().unwrap(); handle_binary_array(array, row_idx) - }, + } ArrowDataType::LargeBinary => { let array = column.as_any().downcast_ref::().unwrap(); handle_large_binary_array(array, row_idx) - }, + } ArrowDataType::Date32 => { let array = column.as_any().downcast_ref::().unwrap(); handle_date32_array(array, row_idx) - }, + } ArrowDataType::Date64 => { let array = column.as_any().downcast_ref::().unwrap(); handle_date64_array(array, row_idx) - }, - ArrowDataType::Timestamp(unit, _) => { // Ignore tz from pattern match + } + ArrowDataType::Timestamp(unit, _) => { + // Ignore tz from pattern match // println!("Debug: convert_array_to_datatype Timestamp branch for unit {:?}", unit); - + // Re-extract timezone directly from the field's data_type let field_tz = match field.data_type() { ArrowDataType::Timestamp(_, tz_option) => tz_option.as_ref(), _ => None, // Should not happen if we are in this branch }; // println!("Debug: Extracted field_tz: {:?}", field_tz); - + // Convert tz from Option<&Arc> to Option> for the handler function let string_tz_owned = field_tz.map(|t| { let str_val = t.as_ref(); // println!("Debug: Converting field timezone '{}' to Arc", str_val); std::sync::Arc::new(str_val.to_string()) }); - + let tz_ref = string_tz_owned.as_ref(); // Get Option<&Arc> - // println!("Debug: Using tz_ref for handle_timestamp_array: {:?}", tz_ref); + // println!("Debug: Using tz_ref for handle_timestamp_array: {:?}", tz_ref); handle_timestamp_array(column, row_idx, unit, tz_ref) - }, + } ArrowDataType::Decimal128(_, scale) => { let array = column.as_any().downcast_ref::().unwrap(); handle_decimal128_array(array, row_idx, *scale) - }, + } ArrowDataType::Decimal256(_, scale) => { let array = column.as_any().downcast_ref::().unwrap(); handle_decimal256_array(array, row_idx, *scale) - }, + } ArrowDataType::Time32(time_unit) => { let array = column.as_any().downcast_ref::().unwrap(); handle_time32_array(array, row_idx, time_unit) - }, + } ArrowDataType::Time64(time_unit) => { let array = column.as_any().downcast_ref::().unwrap(); handle_time64_array(array, row_idx, time_unit) - }, + } ArrowDataType::FixedSizeBinary(_) => { - let array = column.as_any().downcast_ref::().unwrap(); + let array = column + .as_any() + .downcast_ref::() + .unwrap(); handle_fixed_size_binary_array(array, row_idx) - }, + } ArrowDataType::Duration(_) => { // Convert duration to milliseconds as float for consistency let array = column.as_any().downcast_ref::().unwrap(); @@ -1429,7 +1562,7 @@ fn convert_array_to_datatype(column: &arrow::array::ArrayRef, field: &Field, row } else { DataType::Float8(Some(array.value(row_idx) as f64)) } - }, + } ArrowDataType::Interval(_) => { // Convert interval to a string representation let array = column.as_any().downcast_ref::().unwrap(); @@ -1438,42 +1571,54 @@ fn convert_array_to_datatype(column: &arrow::array::ArrayRef, field: &Field, row } else { DataType::Text(Some(array.value(row_idx).to_string())) } - }, + } ArrowDataType::BinaryView => { // BinaryView is similar to Binary let array = column.as_any().downcast_ref::().unwrap(); handle_binary_array(array, row_idx) - }, + } ArrowDataType::Utf8View => { // Utf8View is similar to Utf8 let array = column.as_any().downcast_ref::().unwrap(); handle_string_array(array, row_idx) - }, - ArrowDataType::List(_) | ArrowDataType::ListView(_) | ArrowDataType::FixedSizeList(_, _) - | ArrowDataType::LargeList(_) | ArrowDataType::LargeListView(_) => { - let array = column.as_any().downcast_ref::().unwrap(); + } + ArrowDataType::List(_) + | ArrowDataType::ListView(_) + | ArrowDataType::FixedSizeList(_, _) + | ArrowDataType::LargeList(_) + | ArrowDataType::LargeListView(_) => { + let array = column + .as_any() + .downcast_ref::() + .unwrap(); handle_list_array(array, row_idx) - }, + } ArrowDataType::Struct(_) => { - let array = column.as_any().downcast_ref::().unwrap(); + let array = column + .as_any() + .downcast_ref::() + .unwrap(); handle_struct_array(array, row_idx, field) - }, + } ArrowDataType::Union(_, _) => { // Unions are complex - convert to string representation DataType::Text(Some("Union type not fully supported".to_string())) - }, + } ArrowDataType::Dictionary(_, _) => { - let array = column.as_any().downcast_ref::>().unwrap(); + let array = column + .as_any() + .downcast_ref::>() + .unwrap(); handle_dictionary_array(array, row_idx) - }, - ArrowDataType::Map(_, _) => { - handle_map_array(column.as_ref(), row_idx) - }, + } + ArrowDataType::Map(_, _) => handle_map_array(column.as_ref(), row_idx), ArrowDataType::RunEndEncoded(_, _) => { // Convert run-length encoded data to its base type // This is a simplified handling - DataType::Text(Some("Run-length encoded type not fully supported".to_string())) - }, + DataType::Text(Some( + "Run-length encoded type not fully supported".to_string(), + )) + } ArrowDataType::Float16 => { let array = column.as_any().downcast_ref::().unwrap(); // Float16 gets converted to Float32 in Arrow if array.is_null(row_idx) { @@ -1481,7 +1626,7 @@ fn convert_array_to_datatype(column: &arrow::array::ArrayRef, field: &Field, row } else { DataType::Float4(Some(array.value(row_idx))) } - }, + } ArrowDataType::Null => DataType::Null, } } @@ -1500,9 +1645,11 @@ fn prepare_query(query: &str) -> String { } fn process_record_batch(batch: &RecordBatch) -> Vec> { + // println!("Processing record batch with {:?} rows", batch); + let mut rows = Vec::with_capacity(batch.num_rows()); let schema = batch.schema(); - + for row_idx in 0..batch.num_rows() { let row = schema .fields() @@ -1514,10 +1661,10 @@ fn process_record_batch(batch: &RecordBatch) -> Vec> (field.name().to_lowercase(), data_type) // Convert field name to lowercase }) .collect::>(); - + rows.push(row); } - + rows } @@ -1533,18 +1680,21 @@ pub async fn snowflake_query( // Initialize with capacity, but it might grow beyond the limit initially // if the first batch is larger than the limit. let mut all_rows = Vec::with_capacity(PROCESSING_ROW_LIMIT); - + // Process each batch in order, stopping if the limit is reached for batch in result.iter() { // Check if we've already reached the limit before processing the next batch if all_rows.len() >= PROCESSING_ROW_LIMIT { - tracing::warn!("Processing row limit ({}) reached. Stopping data processing.", PROCESSING_ROW_LIMIT); + tracing::warn!( + "Processing row limit ({}) reached. Stopping data processing.", + PROCESSING_ROW_LIMIT + ); break; // Stop processing more batches } // Consider removing or reducing verbosity of this log line - tracing::debug!("Processing batch with {} rows.", batch.num_rows()); - + tracing::debug!("Processing batch with {} rows.", batch.num_rows()); + let batch_rows = process_record_batch(batch); // Determine how many rows from this batch we can add without exceeding the limit @@ -1556,7 +1706,7 @@ pub async fn snowflake_query( all_rows.extend(batch_rows.into_iter().take(rows_to_take)); } } - + all_rows } _ => Vec::new(), @@ -1570,7 +1720,10 @@ pub async fn snowflake_query( match snowflake_client.close_session().await { Ok(_) => (), Err(e) => { - tracing::error!("There was an issue while closing the snowflake client: {}", e); + tracing::error!( + "There was an issue while closing the snowflake client: {}", + e + ); } } @@ -1581,15 +1734,24 @@ pub async fn snowflake_query( mod tests { use super::*; use arrow::array::{ - ArrayRef, Date32Array, Date64Array, Decimal128Array, Int32Array, Int64Array, - StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, StringArray // Replace Utf8Array with StringArray + ArrayRef, + Date32Array, + Date64Array, + Decimal128Array, + Int32Array, + Int64Array, + StringArray, // Replace Utf8Array with StringArray + StructArray, + TimestampMicrosecondArray, + TimestampMillisecondArray, + TimestampNanosecondArray, + TimestampSecondArray, }; + use arrow::datatypes::i256; use arrow::datatypes::{DataType as ArrowDataType, Field, Fields, Schema, TimeUnit}; use chrono::{Datelike, NaiveDate, NaiveDateTime, Timelike}; use std::str::FromStr; use std::sync::Arc; - use arrow::datatypes::i256; #[test] fn test_decimal128_conversion() { @@ -1597,30 +1759,42 @@ mod tests { let test_cases = vec![ // Small value, positive scale (123_i128, 5, 2, DataType::Float8(Some(1.23))), - // Small value, negative scale (123_i128, 3, -2, DataType::Float8(Some(12300.0))), - // Zero scale (123_i128, 3, 0, DataType::Float8(Some(123.0))), - // Value at limit of f64 precision - (9_007_199_254_740_991_i128, 16, 0, DataType::Float8(Some(9_007_199_254_740_991.0))), - + ( + 9_007_199_254_740_991_i128, + 16, + 0, + DataType::Float8(Some(9_007_199_254_740_991.0)), + ), // Value beyond f64 precision limit - should be text - (9_007_199_254_740_992_i128, 16, 0, DataType::Text(Some("9007199254740992".to_string()))), - + ( + 9_007_199_254_740_992_i128, + 16, + 0, + DataType::Text(Some("9007199254740992".to_string())), + ), // Large value with positive scale - should be text - (9_007_199_254_740_992_i128, 20, 4, DataType::Text(Some("900719925474.0992".to_string()))), - + ( + 9_007_199_254_740_992_i128, + 20, + 4, + DataType::Text(Some("900719925474.0992".to_string())), + ), // Negative value (-123456_i128, 8, 2, DataType::Float8(Some(-1234.56))), - // Small decimal requiring padding (123_i128, 10, 5, DataType::Float8(Some(0.00123))), - // Very small decimal requiring much padding - (1_i128, 10, 9, DataType::Text(Some("0.000000001".to_string()))), + ( + 1_i128, + 10, + 9, + DataType::Text(Some("0.000000001".to_string())), + ), ]; for (i, (value, precision, scale, expected)) in test_cases.iter().enumerate() { @@ -1628,10 +1802,10 @@ mod tests { let array = Decimal128Array::from(vec![Some(*value)]) .with_precision_and_scale(*precision, *scale) .unwrap(); - + // Test our handler function let result = handle_decimal128_array(&array, 0, *scale); - + // Check if the result matches the expected output assert_eq!(result, *expected, "Test case {} failed", i); } @@ -1651,26 +1825,27 @@ mod tests { // 2023-06-15 10:30:45.123 (milliseconds precision) 1686826245123000000i64, // nanoseconds since epoch ]; - + // Store the value for later use let timestamps_copy = timestamps.clone(); - + let mut field_metadata = std::collections::HashMap::new(); field_metadata.insert("scale".to_string(), "3".to_string()); - + let field = Field::new( "ORDER_DATE", ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), true, - ).with_metadata(field_metadata.clone()); - + ) + .with_metadata(field_metadata.clone()); + let array = TimestampNanosecondArray::from(timestamps); let array_ref = Arc::new(array) as arrow::array::ArrayRef; - + // Process the timestamp via the regular timestamp handling path let result = convert_array_to_datatype(&array_ref, &field, 0); println!("Regular timestamp result: {:?}", result); - + // Get the actual timestamp value for comparison if let DataType::Timestamp(Some(dt)) = result { println!("Parsed timestamp: {}", dt); @@ -1679,135 +1854,181 @@ mod tests { let seconds = original_nanos / 1_000_000_000; let nanos = (original_nanos % 1_000_000_000) as u32; println!("Original timestamp: seconds={}, nanos={}", seconds, nanos); - + // The expected output has a known 20 minute difference due to // timezone handling in the conversion code // Update the test to accept the actual result let expected = dt.to_string(); println!("Expected timestamp: {}", expected); - + // Verify the timestamp matches the expected value assert_eq!(dt.to_string(), expected); } else { panic!("Expected Timestamp type, got: {:?}", result); } - + // Test case 2: Struct-based timestamp (like RETURN_CREATED_AT, EXPIRATION_DATE?) // -------------------------------------------------------------- println!("\nTest Case 2: Struct-based timestamp"); - + // First, let's try with epoch in seconds and fraction in milliseconds let epoch_seconds = 1686826245i64; // seconds since epoch (2023-06-15 10:30:45) let millis = 123i32; // milliseconds (0.123) - + println!("Input: epoch_seconds={}, millis={}", epoch_seconds, millis); - + let epoch_array = Int64Array::from(vec![epoch_seconds]); let fraction_array = Int32Array::from(vec![millis]); - + // Create struct fields let struct_fields = Fields::from(vec![ Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), ]); - + // Create struct array let struct_array = StructArray::from(vec![ - (Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), Arc::new(epoch_array) as arrow::array::ArrayRef), - (Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), Arc::new(fraction_array) as arrow::array::ArrayRef), + ( + Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), + Arc::new(epoch_array) as arrow::array::ArrayRef, + ), + ( + Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), + Arc::new(fraction_array) as arrow::array::ArrayRef, + ), ]); - + // Create field with metadata indicating this is a timestamp let mut struct_metadata = std::collections::HashMap::new(); struct_metadata.insert("scale".to_string(), "3".to_string()); struct_metadata.insert("logicalType".to_string(), "TIMESTAMP_NTZ".to_string()); - + let struct_field = Field::new( "RETURN_CREATED_AT", ArrowDataType::Struct(struct_fields), true, - ).with_metadata(struct_metadata.clone()); - + ) + .with_metadata(struct_metadata.clone()); + let struct_array_ref = Arc::new(struct_array) as arrow::array::ArrayRef; - + // Process via the struct-based timestamp handling path let result = handle_struct_array( - struct_array_ref.as_any().downcast_ref::().unwrap(), + struct_array_ref + .as_any() + .downcast_ref::() + .unwrap(), 0, - &struct_field + &struct_field, ); - + println!("Struct-based timestamp result: {:?}", result); - + // Test case 3: Struct-based timestamp with different interpretation of epoch/fraction // -------------------------------------------------------------- println!("\nTest Case 3: Struct-based timestamp with millis epoch"); - + // Let's try with epoch in milliseconds let epoch_millis = 1686826245123i64; // milliseconds since epoch let fraction_zero = 0i32; // no additional fraction - - println!("Input: epoch_millis={}, fraction_zero={}", epoch_millis, fraction_zero); - + + println!( + "Input: epoch_millis={}, fraction_zero={}", + epoch_millis, fraction_zero + ); + let epoch_array = Int64Array::from(vec![epoch_millis]); let fraction_array = Int32Array::from(vec![fraction_zero]); - + let struct_array = StructArray::from(vec![ - (Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), Arc::new(epoch_array) as arrow::array::ArrayRef), - (Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), Arc::new(fraction_array) as arrow::array::ArrayRef), + ( + Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), + Arc::new(epoch_array) as arrow::array::ArrayRef, + ), + ( + Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), + Arc::new(fraction_array) as arrow::array::ArrayRef, + ), ]); - + let struct_array_ref = Arc::new(struct_array) as arrow::array::ArrayRef; - + // Process via the struct-based timestamp handling path let result = handle_struct_array( - struct_array_ref.as_any().downcast_ref::().unwrap(), + struct_array_ref + .as_any() + .downcast_ref::() + .unwrap(), 0, - &struct_field + &struct_field, ); - - println!("Struct-based timestamp with millis epoch result: {:?}", result); - + + println!( + "Struct-based timestamp with millis epoch result: {:?}", + result + ); + // Test case 4: Testing the specific handle_snowflake_timestamp_struct function // -------------------------------------------------------------- println!("\nTest Case 4: Direct testing of handle_snowflake_timestamp_struct function:"); - + // Test with seconds epoch let epoch_seconds = 1686826245i64; // seconds since epoch (2023-06-15 10:30:45) let millis = 123i32; // milliseconds (0.123) - + println!("Input: epoch_seconds={}, millis={}", epoch_seconds, millis); - + let epoch_array = Int64Array::from(vec![epoch_seconds]); let fraction_array = Int32Array::from(vec![millis]); - + let struct_array = StructArray::from(vec![ - (Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), Arc::new(epoch_array) as arrow::array::ArrayRef), - (Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), Arc::new(fraction_array) as arrow::array::ArrayRef), + ( + Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), + Arc::new(epoch_array) as arrow::array::ArrayRef, + ), + ( + Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), + Arc::new(fraction_array) as arrow::array::ArrayRef, + ), ]); - + let dt = handle_snowflake_timestamp_struct(&struct_array, 0); - println!("handle_snowflake_timestamp_struct (seconds epoch, millis fraction): {:?}", dt); + println!( + "handle_snowflake_timestamp_struct (seconds epoch, millis fraction): {:?}", + dt + ); if let Some(dt) = dt { println!(" Parsed date: {}", dt); } - + // Test with milliseconds epoch let epoch_millis = 1686826245123i64; // milliseconds since epoch let fraction_zero = 0i32; // no additional fraction - - println!("\nInput: epoch_millis={}, fraction_zero={}", epoch_millis, fraction_zero); - + + println!( + "\nInput: epoch_millis={}, fraction_zero={}", + epoch_millis, fraction_zero + ); + let epoch_array = Int64Array::from(vec![epoch_millis]); let fraction_array = Int32Array::from(vec![fraction_zero]); - + let struct_array = StructArray::from(vec![ - (Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), Arc::new(epoch_array) as arrow::array::ArrayRef), - (Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), Arc::new(fraction_array) as arrow::array::ArrayRef), + ( + Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), + Arc::new(epoch_array) as arrow::array::ArrayRef, + ), + ( + Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), + Arc::new(fraction_array) as arrow::array::ArrayRef, + ), ]); - + let dt = handle_snowflake_timestamp_struct(&struct_array, 0); - println!("handle_snowflake_timestamp_struct (millis epoch, zero fraction): {:?}", dt); + println!( + "handle_snowflake_timestamp_struct (millis epoch, zero fraction): {:?}", + dt + ); if let Some(dt) = dt { println!(" Parsed date: {}", dt); // This should be WAY in the future if epoch is interpreted as seconds @@ -1818,35 +2039,49 @@ mod tests { println!(" WARNING: Date is over 100 years in the future! Epoch is likely being misinterpreted."); } } - + // The issue is likely that the epoch value is interpreted differently depending on // which path processes the timestamp. Let's check an extreme example // where we'll deliberately use a large epoch value to see if that explains // the "hundreds of years off" problem - + let large_epoch = 1686826245123000i64; // epoch in microseconds let fraction_zero = 0i32; - - println!("\nInput: large_epoch={}, fraction_zero={}", large_epoch, fraction_zero); - + + println!( + "\nInput: large_epoch={}, fraction_zero={}", + large_epoch, fraction_zero + ); + let epoch_array = Int64Array::from(vec![large_epoch]); let fraction_array = Int32Array::from(vec![fraction_zero]); - + let struct_array = StructArray::from(vec![ - (Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), Arc::new(epoch_array) as arrow::array::ArrayRef), - (Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), Arc::new(fraction_array) as arrow::array::ArrayRef), + ( + Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), + Arc::new(epoch_array) as arrow::array::ArrayRef, + ), + ( + Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), + Arc::new(fraction_array) as arrow::array::ArrayRef, + ), ]); - + let dt = handle_snowflake_timestamp_struct(&struct_array, 0); - println!("handle_snowflake_timestamp_struct (microsecs epoch): {:?}", dt); - + println!( + "handle_snowflake_timestamp_struct (microsecs epoch): {:?}", + dt + ); + if let Some(dt) = dt { println!(" Parsed date for large epoch: {}", dt); // This will show if the date is hundreds of years off let year = dt.year(); println!(" Year: {} (expected near 2023)", year); if year > 2100 { - println!(" WARNING: Date is far in the future! Epoch is likely being misinterpreted."); + println!( + " WARNING: Date is far in the future! Epoch is likely being misinterpreted." + ); println!(" The issue is in handle_snowflake_timestamp_struct - it's treating the epoch as seconds when it should be milliseconds/microseconds based on the scale."); } } @@ -1860,42 +2095,107 @@ mod tests { // Test cases organized by time unit let test_cases = vec![ // (epoch value, time unit, has timezone, expected year, month, day, hour, minute, second, millisecond) - (1686826245, TimeUnit::Second, false, 2023, 6, 15, 10, 50, 45, 0), - (1686826245123, TimeUnit::Millisecond, false, 2023, 6, 15, 10, 50, 45, 123), - (1686826245123456, TimeUnit::Microsecond, false, 2023, 6, 15, 10, 50, 45, 123), - (1686826245123456789, TimeUnit::Nanosecond, false, 2023, 6, 15, 10, 50, 45, 123), - + ( + 1686826245, + TimeUnit::Second, + false, + 2023, + 6, + 15, + 10, + 50, + 45, + 0, + ), + ( + 1686826245123, + TimeUnit::Millisecond, + false, + 2023, + 6, + 15, + 10, + 50, + 45, + 123, + ), + ( + 1686826245123456, + TimeUnit::Microsecond, + false, + 2023, + 6, + 15, + 10, + 50, + 45, + 123, + ), + ( + 1686826245123456789, + TimeUnit::Nanosecond, + false, + 2023, + 6, + 15, + 10, + 50, + 45, + 123, + ), // With timezone (should produce same result for this specific timestamp) - (1686826245, TimeUnit::Second, true, 2023, 6, 15, 10, 50, 45, 0), - (1686826245123, TimeUnit::Millisecond, true, 2023, 6, 15, 10, 50, 45, 123), + ( + 1686826245, + TimeUnit::Second, + true, + 2023, + 6, + 15, + 10, + 50, + 45, + 0, + ), + ( + 1686826245123, + TimeUnit::Millisecond, + true, + 2023, + 6, + 15, + 10, + 50, + 45, + 123, + ), ]; - for (i, (epoch, time_unit, has_tz, year, month, day, hour, minute, second, millisecond)) in test_cases.iter().enumerate() { + for (i, (epoch, time_unit, has_tz, year, month, day, hour, minute, second, millisecond)) in + test_cases.iter().enumerate() + { // println!("\nTest case {}: {:?} with{} timezone", i, time_unit, if *has_tz { "" } else { "out" }); - + // Create appropriate array based on time unit let array_ref: ArrayRef = match time_unit { - TimeUnit::Second => { - Arc::new(TimestampSecondArray::from(vec![*epoch])) as ArrayRef - }, + TimeUnit::Second => Arc::new(TimestampSecondArray::from(vec![*epoch])) as ArrayRef, TimeUnit::Millisecond => { Arc::new(TimestampMillisecondArray::from(vec![*epoch])) as ArrayRef - }, + } TimeUnit::Microsecond => { Arc::new(TimestampMicrosecondArray::from(vec![*epoch])) as ArrayRef - }, + } TimeUnit::Nanosecond => { Arc::new(TimestampNanosecondArray::from(vec![*epoch])) as ArrayRef - }, + } }; - + // Create field with appropriate metadata - let tz_option = if *has_tz { + let tz_option = if *has_tz { // println!("Debug: Test creating timezone option with UTC"); - Some(Arc::from("UTC")) - } else { + Some(Arc::from("UTC")) + } else { // println!("Debug: Test creating no timezone option"); - None + None }; // println!("Debug: Using tz_option: {:?}", tz_option); @@ -1905,11 +2205,11 @@ mod tests { false, ); // println!("Debug: Created field: {:?}", field); - + // Process the timestamp let result = convert_array_to_datatype(&array_ref, &field, 0); // println!("Result: {:?}", result); - + // Verify result based on whether it has timezone or not if *has_tz { if let DataType::Timestamptz(Some(dt)) = result { @@ -1954,70 +2254,79 @@ mod tests { // Each with different scale/precision and timezone settings let test_cases = vec![ // (epoch value, fraction value, is_tz, expected year, month, day, hour, minute, second, millisecond) - + // Seconds epoch with millisecond fraction (standard format) (1686826245, 123, false, 2023, 6, 15, 10, 50, 45, 123), (1686826245, 123, true, 2023, 6, 15, 10, 50, 45, 123), - // Milliseconds epoch (common in many systems) (1686826245123, 0, false, 2023, 6, 15, 10, 50, 45, 123), (1686826245123, 0, true, 2023, 6, 15, 10, 50, 45, 123), - - // Microseconds epoch + // Microseconds epoch (1686826245123456, 0, false, 2023, 6, 15, 10, 50, 45, 123), (1686826245123456, 0, true, 2023, 6, 15, 10, 50, 45, 123), - // Second epoch with zero fraction (1686826245, 0, false, 2023, 6, 15, 10, 50, 45, 0), (1686826245, 0, true, 2023, 6, 15, 10, 50, 45, 0), - // Future date (year 2100) (4102444800, 123, false, 2100, 1, 1, 0, 0, 0, 123), - // Past date (year 1970) (0, 123, false, 1970, 1, 1, 0, 0, 0, 123), ]; - for (i, (epoch, fraction, is_tz, year, month, day, hour, minute, second, millisecond)) in test_cases.iter().enumerate() { - println!("\nTest case {}: epoch={}, fraction={}, tz={}", i, epoch, fraction, is_tz); - + for (i, (epoch, fraction, is_tz, year, month, day, hour, minute, second, millisecond)) in + test_cases.iter().enumerate() + { + println!( + "\nTest case {}: epoch={}, fraction={}, tz={}", + i, epoch, fraction, is_tz + ); + // Create epoch and fraction arrays let epoch_array = Int64Array::from(vec![*epoch]); let fraction_array = Int32Array::from(vec![*fraction]); - + // Create struct fields let struct_fields = Fields::from(vec![ Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), ]); - + // Create struct array let struct_array = StructArray::from(vec![ - (Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), Arc::new(epoch_array) as arrow::array::ArrayRef), - (Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), Arc::new(fraction_array) as arrow::array::ArrayRef), + ( + Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), + Arc::new(epoch_array) as arrow::array::ArrayRef, + ), + ( + Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), + Arc::new(fraction_array) as arrow::array::ArrayRef, + ), ]); - + // Create field with metadata indicating this is a timestamp let mut struct_metadata = std::collections::HashMap::new(); struct_metadata.insert("scale".to_string(), "3".to_string()); - struct_metadata.insert("logicalType".to_string(), - if *is_tz { "TIMESTAMP_TZ".to_string() } else { "TIMESTAMP_NTZ".to_string() }); - + struct_metadata.insert( + "logicalType".to_string(), + if *is_tz { + "TIMESTAMP_TZ".to_string() + } else { + "TIMESTAMP_NTZ".to_string() + }, + ); + let struct_field = Field::new( "TIMESTAMP_STRUCT", ArrowDataType::Struct(struct_fields), false, - ).with_metadata(struct_metadata.clone()); - + ) + .with_metadata(struct_metadata.clone()); + // Process the timestamp struct - let result = handle_struct_array( - &struct_array, - 0, - &struct_field - ); - + let result = handle_struct_array(&struct_array, 0, &struct_field); + println!("Result: {:?}", result); - + // Verify based on whether it has timezone if *is_tz { if let DataType::Timestamptz(Some(dt)) = result { @@ -2057,7 +2366,12 @@ mod tests { // Test cases with different epoch scales let test_cases = vec![ // (epoch_value, fraction, description, expected_year) - (1686826245, 123, "Seconds epoch with milliseconds fraction", 2023), + ( + 1686826245, + 123, + "Seconds epoch with milliseconds fraction", + 2023, + ), (1686826245123, 0, "Milliseconds epoch", 2023), (1686826245123456, 0, "Microseconds epoch", 2023), (-86400, 123, "Negative epoch (1969-12-31)", 1969), @@ -2066,54 +2380,78 @@ mod tests { for (epoch, fraction, description, expected_year) in test_cases { println!("\nTesting: {}", description); - + // Create arrays let epoch_array = Int64Array::from(vec![epoch]); let fraction_array = Int32Array::from(vec![fraction]); - + // Create struct array let struct_array = StructArray::from(vec![ - (Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), Arc::new(epoch_array) as arrow::array::ArrayRef), - (Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), Arc::new(fraction_array) as arrow::array::ArrayRef), + ( + Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), + Arc::new(epoch_array) as arrow::array::ArrayRef, + ), + ( + Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), + Arc::new(fraction_array) as arrow::array::ArrayRef, + ), ]); - + // Call the function directly let result = handle_snowflake_timestamp_struct(&struct_array, 0); - + // Print and verify result if let Some(dt) = result { println!("Result: {}", dt); - + // Verify year is correct (basic validation) - assert_eq!(dt.year(), expected_year, "Expected year {}, got {}", expected_year, dt.year()); - + assert_eq!( + dt.year(), + expected_year, + "Expected year {}, got {}", + expected_year, + dt.year() + ); + // Verify epoch adjusted correctly based on scale match epoch { // For seconds epoch (assuming epoch is small enough) e if e.abs() < 5_000_000_000 => { if epoch >= 0 { // Positive epochs should match when divided - assert_eq!(dt.timestamp() as i64, epoch, - "Expected timestamp {} to match epoch {}", dt.timestamp(), epoch); + assert_eq!( + dt.timestamp() as i64, + epoch, + "Expected timestamp {} to match epoch {}", + dt.timestamp(), + epoch + ); } // For negative epochs, just check the year is correct - }, + } // For milliseconds epoch e if e.abs() < 5_000_000_000_000 => { if epoch > 0 { // Should be within a second due to rounding let dt_millis = dt.timestamp_millis(); - assert!((dt_millis - epoch).abs() < 1000, - "Expected timestamp millis {} to be within 1000 of epoch {}", dt_millis, epoch); + assert!( + (dt_millis - epoch).abs() < 1000, + "Expected timestamp millis {} to be within 1000 of epoch {}", + dt_millis, + epoch + ); } - }, + } // For microseconds epoch - just check the year is correct as the precision gets lossy _ => {} } - + println!("✓ Verified timestamp correctly parsed"); } else { - panic!("Failed to parse timestamp with epoch {} and fraction {}", epoch, fraction); + panic!( + "Failed to parse timestamp with epoch {} and fraction {}", + epoch, fraction + ); } } } @@ -2122,21 +2460,27 @@ mod tests { #[test] fn test_timestamp_null_handling() { println!("\n=== Testing null timestamp handling ==="); - + // Create a struct array with null epoch let epoch_array = Int64Array::from(vec![None]); let fraction_array = Int32Array::from(vec![Some(123)]); - + let struct_array = StructArray::from(vec![ - (Arc::new(Field::new("epoch", ArrowDataType::Int64, true)), Arc::new(epoch_array) as ArrayRef), - (Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), Arc::new(fraction_array) as ArrayRef), + ( + Arc::new(Field::new("epoch", ArrowDataType::Int64, true)), + Arc::new(epoch_array) as ArrayRef, + ), + ( + Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), + Arc::new(fraction_array) as ArrayRef, + ), ]); - + // Test direct function let result = handle_snowflake_timestamp_struct(&struct_array, 0); assert!(result.is_none(), "Expected None for null epoch"); println!("✓ Null epoch correctly returns None"); - + // Test with null timestamp array let timestamp_array = TimestampNanosecondArray::from(vec![None]); let field = Field::new( @@ -2144,7 +2488,7 @@ mod tests { ArrowDataType::Timestamp(TimeUnit::Nanosecond, None), true, ); - + let array_ref: ArrayRef = Arc::new(timestamp_array); let result = convert_array_to_datatype(&array_ref, &field, 0); match result { @@ -2157,42 +2501,44 @@ mod tests { #[test] fn test_date_array_types() { println!("\n=== Testing date array handling ==="); - + // Test Date32 (days since epoch) let days_since_epoch = 19500; // Some date in 2023 let date32_array = Date32Array::from(vec![days_since_epoch]); let date32_field = Field::new("DATE_COLUMN", ArrowDataType::Date32, false); - + let array_ref: ArrayRef = Arc::new(date32_array); let result = convert_array_to_datatype(&array_ref, &date32_field, 0); println!("Date32 result: {:?}", result); - + if let DataType::Date(Some(date)) = result { // Expected date is 1970-01-01 + days_since_epoch - let expected_date = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap() - .checked_add_days(chrono::Days::new(days_since_epoch as u64)).unwrap(); - + let expected_date = NaiveDate::from_ymd_opt(1970, 1, 1) + .unwrap() + .checked_add_days(chrono::Days::new(days_since_epoch as u64)) + .unwrap(); + assert_eq!(date, expected_date); println!("✓ Verified Date32: {}", date); } else { panic!("Expected Date, got: {:?}", result); } - + // Test Date64 (milliseconds since epoch) let ms_since_epoch = 1686826245000; // 2023-06-15 let date64_array = Date64Array::from(vec![ms_since_epoch]); let date64_field = Field::new("DATE_COLUMN", ArrowDataType::Date64, false); - + let array_ref: ArrayRef = Arc::new(date64_array); let result = convert_array_to_datatype(&array_ref, &date64_field, 0); println!("Date64 result: {:?}", result); - + if let DataType::Date(Some(date)) = result { // Convert milliseconds to DateTime then extract date let secs = ms_since_epoch / 1000; let dt = Utc.timestamp_opt(secs, 0).unwrap(); let expected_date = dt.date_naive(); - + assert_eq!(date, expected_date); println!("✓ Verified Date64: {}", date); } else { @@ -2204,7 +2550,7 @@ mod tests { #[test] fn test_timestamp_edge_cases() { println!("\n=== Testing timestamp edge cases ==="); - + // Test cases for edge situations let test_cases = vec![ // (epoch_value, time_unit, description) @@ -2213,12 +2559,16 @@ mod tests { // Min value (some negative timestamp) (-62167219200, TimeUnit::Second, "Min second value (year 0)"), // Large millisecond value - (32503680000000, TimeUnit::Millisecond, "Far future (year 3000)"), + ( + 32503680000000, + TimeUnit::Millisecond, + "Far future (year 3000)", + ), ]; - + for (epoch, time_unit, description) in test_cases { println!("\nTesting: {}", description); - + // Create array based on time unit let array_ref: ArrayRef = match time_unit { TimeUnit::Second => Arc::new(TimestampSecondArray::from(vec![epoch])), @@ -2226,23 +2576,23 @@ mod tests { TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(vec![epoch])), TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(vec![epoch])), }; - + // Create field let field = Field::new( "TIMESTAMP_COLUMN", ArrowDataType::Timestamp(time_unit, None), false, ); - + // Process timestamp let result = convert_array_to_datatype(&array_ref, &field, 0); println!("Result: {:?}", result); - + // Just verify we got a timestamp result - exact value depends on the epoch limits match result { DataType::Timestamp(Some(dt)) => { println!("✓ Successfully parsed edge timestamp: {}", dt); - }, + } _ => { panic!("Expected Timestamp, got: {:?}", result); } @@ -2259,11 +2609,11 @@ mod tests { let timestamp_a_millis = vec![ Some(1678886400000), // 2023-03-15 13:20:00.000 UTC Some(1700000000000), // 2023-11-14 22:13:20.000 UTC - None, // Null value + None, // Null value ]; let timestamp_b_millis = vec![ Some(1678890000000), // 2023-03-15 14:20:00.000 UTC - None, // Null value + None, // Null value Some(1700000012345), // 2023-11-14 22:13:32.345 UTC ]; @@ -2281,13 +2631,15 @@ mod tests { "TIMESTAMP_A", ArrowDataType::Int64, true, // Nullable - ).with_metadata(timestamp_metadata.clone()); + ) + .with_metadata(timestamp_metadata.clone()); let field_b = Field::new( "TIMESTAMP_B", ArrowDataType::Int64, true, // Nullable - ).with_metadata(timestamp_metadata.clone()); + ) + .with_metadata(timestamp_metadata.clone()); // Create schema let schema = Arc::new(Schema::new(vec![field_a, field_b])); @@ -2296,10 +2648,15 @@ mod tests { let batch = RecordBatch::try_new( schema.clone(), vec![Arc::new(array_a) as ArrayRef, Arc::new(array_b) as ArrayRef], - ).unwrap(); + ) + .unwrap(); println!("Input RecordBatch schema: {:?}", batch.schema()); - println!("Input RecordBatch columns: [Column 0: {:?}, Column 1: {:?}]", batch.column(0), batch.column(1)); + println!( + "Input RecordBatch columns: [Column 0: {:?}, Column 1: {:?}]", + batch.column(0), + batch.column(1) + ); // Process the batch let processed_rows = process_record_batch(&batch); @@ -2310,23 +2667,43 @@ mod tests { assert_eq!(processed_rows.len(), 3, "Expected 3 rows processed"); // Expected NaiveDateTime values - let expected_dt_a1 = NaiveDateTime::parse_from_str("2023-03-15 13:20:00.000", "%Y-%m-%d %H:%M:%S%.3f").unwrap(); - let expected_dt_a2 = NaiveDateTime::parse_from_str("2023-11-14 22:13:20.000", "%Y-%m-%d %H:%M:%S%.3f").unwrap(); - let expected_dt_b1 = NaiveDateTime::parse_from_str("2023-03-15 14:20:00.000", "%Y-%m-%d %H:%M:%S%.3f").unwrap(); - let expected_dt_b3 = NaiveDateTime::parse_from_str("2023-11-14 22:13:32.345", "%Y-%m-%d %H:%M:%S%.3f").unwrap(); + let expected_dt_a1 = + NaiveDateTime::parse_from_str("2023-03-15 13:20:00.000", "%Y-%m-%d %H:%M:%S%.3f") + .unwrap(); + let expected_dt_a2 = + NaiveDateTime::parse_from_str("2023-11-14 22:13:20.000", "%Y-%m-%d %H:%M:%S%.3f") + .unwrap(); + let expected_dt_b1 = + NaiveDateTime::parse_from_str("2023-03-15 14:20:00.000", "%Y-%m-%d %H:%M:%S%.3f") + .unwrap(); + let expected_dt_b3 = + NaiveDateTime::parse_from_str("2023-11-14 22:13:32.345", "%Y-%m-%d %H:%M:%S%.3f") + .unwrap(); // Row 1 - assert_eq!(processed_rows[0]["timestamp_a"], DataType::Timestamp(Some(expected_dt_a1))); - assert_eq!(processed_rows[0]["timestamp_b"], DataType::Timestamp(Some(expected_dt_b1))); + assert_eq!( + processed_rows[0]["timestamp_a"], + DataType::Timestamp(Some(expected_dt_a1)) + ); + assert_eq!( + processed_rows[0]["timestamp_b"], + DataType::Timestamp(Some(expected_dt_b1)) + ); // Row 2 - assert_eq!(processed_rows[1]["timestamp_a"], DataType::Timestamp(Some(expected_dt_a2))); + assert_eq!( + processed_rows[1]["timestamp_a"], + DataType::Timestamp(Some(expected_dt_a2)) + ); assert_eq!(processed_rows[1]["timestamp_b"], DataType::Null); // Row 3 assert_eq!(processed_rows[2]["timestamp_a"], DataType::Null); - assert_eq!(processed_rows[2]["timestamp_b"], DataType::Timestamp(Some(expected_dt_b3))); - + assert_eq!( + processed_rows[2]["timestamp_b"], + DataType::Timestamp(Some(expected_dt_b3)) + ); + println!("✓ Verified Int64 TIMESTAMP_NTZ(3) processing"); } @@ -2338,9 +2715,17 @@ mod tests { // --- Data Setup (Anonymized) --- let order_date_data = vec![Some(1738684590000i64), Some(1739547875000i64), None]; let return_created_at_data = vec![Some(1741101088253i64), None, Some(1741104132474i64)]; - let expiration_date_data = vec![Some(1743520288247i64), Some(1743521739792i64), Some(1743523332467i64)]; + let expiration_date_data = vec![ + Some(1743520288247i64), + Some(1743521739792i64), + Some(1743523332467i64), + ]; let order_number_data = vec![Some("ORD-A001"), Some("ORD-B002"), Some("ORD-C003")]; // Anonymized - let customer_name_data = vec![Some("Customer One"), Some("Customer Two"), Some("Customer Three")]; // Anonymized + let customer_name_data = vec![ + Some("Customer One"), + Some("Customer Two"), + Some("Customer Three"), + ]; // Anonymized let return_value_data = vec![Some(10000i32), None, Some(50000i32)]; // Anonymized (Represents 100.00, NULL, 500.00) let return_type_data = vec![Some("Type X"), Some("Type Y"), Some("Type Z")]; // Anonymized @@ -2348,10 +2733,13 @@ mod tests { let order_date_array = Int64Array::from(order_date_data); let return_created_at_array = Int64Array::from(return_created_at_data); let expiration_date_array = Int64Array::from(expiration_date_data); - let order_number_array = StringArray::from_iter_values(order_number_data.iter().map(|s| s.unwrap())); - let customer_name_array = StringArray::from_iter_values(customer_name_data.iter().map(|s| s.unwrap())); + let order_number_array = + StringArray::from_iter_values(order_number_data.iter().map(|s| s.unwrap())); + let customer_name_array = + StringArray::from_iter_values(customer_name_data.iter().map(|s| s.unwrap())); let return_value_array = Int32Array::from(return_value_data); // Use the correct data vector - let return_type_array = StringArray::from_iter_values(return_type_data.iter().map(|s| s.unwrap())); + let return_type_array = + StringArray::from_iter_values(return_type_data.iter().map(|s| s.unwrap())); // --- Metadata Setup --- let mut ts_metadata = std::collections::HashMap::new(); @@ -2368,13 +2756,20 @@ mod tests { fixed_metadata.insert("precision".to_string(), "32".to_string()); // Example precision // --- Field Creation --- - let field_order_date = Field::new("order_date", ArrowDataType::Int64, true).with_metadata(ts_metadata.clone()); - let field_return_created_at = Field::new("return_created_at", ArrowDataType::Int64, true).with_metadata(ts_metadata.clone()); - let field_expiration_date = Field::new("expiration_date", ArrowDataType::Int64, true).with_metadata(ts_metadata.clone()); - let field_order_number = Field::new("order_number", ArrowDataType::Utf8, true).with_metadata(text_metadata.clone()); - let field_customer_name = Field::new("customer_name", ArrowDataType::Utf8, true).with_metadata(text_metadata.clone()); - let field_return_value = Field::new("return_value", ArrowDataType::Int32, true).with_metadata(fixed_metadata.clone()); - let field_return_type = Field::new("return_type", ArrowDataType::Utf8, true).with_metadata(text_metadata.clone()); + let field_order_date = + Field::new("order_date", ArrowDataType::Int64, true).with_metadata(ts_metadata.clone()); + let field_return_created_at = Field::new("return_created_at", ArrowDataType::Int64, true) + .with_metadata(ts_metadata.clone()); + let field_expiration_date = Field::new("expiration_date", ArrowDataType::Int64, true) + .with_metadata(ts_metadata.clone()); + let field_order_number = Field::new("order_number", ArrowDataType::Utf8, true) + .with_metadata(text_metadata.clone()); + let field_customer_name = Field::new("customer_name", ArrowDataType::Utf8, true) + .with_metadata(text_metadata.clone()); + let field_return_value = Field::new("return_value", ArrowDataType::Int32, true) + .with_metadata(fixed_metadata.clone()); + let field_return_type = Field::new("return_type", ArrowDataType::Utf8, true) + .with_metadata(text_metadata.clone()); // --- Schema Creation --- let schema = Arc::new(Schema::new(vec![ @@ -2399,7 +2794,8 @@ mod tests { Arc::new(return_value_array) as ArrayRef, Arc::new(return_type_array) as ArrayRef, ], - ).unwrap(); + ) + .unwrap(); println!("Simulated Input RecordBatch schema: {:?}", batch.schema()); @@ -2419,56 +2815,198 @@ mod tests { }; // Row 0 Assertions - assert!(matches!(processed_rows[0]["order_date"], DataType::Timestamp(_)), "Row 0 order_date type mismatch"); + assert!( + matches!(processed_rows[0]["order_date"], DataType::Timestamp(_)), + "Row 0 order_date type mismatch" + ); if let DataType::Timestamp(Some(dt)) = processed_rows[0]["order_date"] { - assert_eq!(dt, dt_from_millis(1738684590000)); - } else { panic!("Incorrect value for Row 0 order_date"); } + assert_eq!(dt, dt_from_millis(1738684590000)); + } else { + panic!("Incorrect value for Row 0 order_date"); + } - assert!(matches!(processed_rows[0]["return_created_at"], DataType::Timestamp(_)), "Row 0 return_created_at type mismatch"); - if let DataType::Timestamp(Some(dt)) = processed_rows[0]["return_created_at"] { - assert_eq!(dt, dt_from_millis(1741101088253)); - } else { panic!("Incorrect value for Row 0 return_created_at"); } + assert!( + matches!( + processed_rows[0]["return_created_at"], + DataType::Timestamp(_) + ), + "Row 0 return_created_at type mismatch" + ); + if let DataType::Timestamp(Some(dt)) = processed_rows[0]["return_created_at"] { + assert_eq!(dt, dt_from_millis(1741101088253)); + } else { + panic!("Incorrect value for Row 0 return_created_at"); + } - assert!(matches!(processed_rows[0]["expiration_date"], DataType::Timestamp(_)), "Row 0 expiration_date type mismatch"); - if let DataType::Timestamp(Some(dt)) = processed_rows[0]["expiration_date"] { - assert_eq!(dt, dt_from_millis(1743520288247)); - } else { panic!("Incorrect value for Row 0 expiration_date"); } + assert!( + matches!(processed_rows[0]["expiration_date"], DataType::Timestamp(_)), + "Row 0 expiration_date type mismatch" + ); + if let DataType::Timestamp(Some(dt)) = processed_rows[0]["expiration_date"] { + assert_eq!(dt, dt_from_millis(1743520288247)); + } else { + panic!("Incorrect value for Row 0 expiration_date"); + } - assert_eq!(processed_rows[0]["order_number"], DataType::Text(Some("ord-a001".to_string()))); // Anonymized & Lowercase - assert_eq!(processed_rows[0]["customer_name"], DataType::Text(Some("customer one".to_string()))); // Anonymized & Lowercase - assert_eq!(processed_rows[0]["return_value"], DataType::Float8(Some(100.00))); // Anonymized - assert_eq!(processed_rows[0]["return_type"], DataType::Text(Some("type x".to_string()))); // Anonymized & Lowercase + assert_eq!( + processed_rows[0]["order_number"], + DataType::Text(Some("ORD-A001".to_string())) // Expect original case + ); + assert_eq!( + processed_rows[0]["customer_name"], + DataType::Text(Some("Customer One".to_string())) // Expect original case + ); + assert_eq!( + processed_rows[0]["return_value"], + DataType::Float8(Some(100.00)) + ); // Anonymized + assert_eq!( + processed_rows[0]["return_type"], + DataType::Text(Some("Type X".to_string())) // Expect original case + ); // Row 1 Assertions - assert!(matches!(processed_rows[1]["order_date"], DataType::Timestamp(_)), "Row 1 order_date type mismatch"); + assert!( + matches!(processed_rows[1]["order_date"], DataType::Timestamp(_)), + "Row 1 order_date type mismatch" + ); if let DataType::Timestamp(Some(dt)) = processed_rows[1]["order_date"] { - assert_eq!(dt, dt_from_millis(1739547875000)); - } else { panic!("Incorrect value for Row 1 order_date"); } + assert_eq!(dt, dt_from_millis(1739547875000)); + } else { + panic!("Incorrect value for Row 1 order_date"); + } assert_eq!(processed_rows[1]["return_created_at"], DataType::Null); - assert!(matches!(processed_rows[1]["expiration_date"], DataType::Timestamp(_)), "Row 1 expiration_date type mismatch"); - if let DataType::Timestamp(Some(dt)) = processed_rows[1]["expiration_date"] { - assert_eq!(dt, dt_from_millis(1743521739792)); - } else { panic!("Incorrect value for Row 1 expiration_date"); } - assert_eq!(processed_rows[1]["order_number"], DataType::Text(Some("ord-b002".to_string()))); // Anonymized & Lowercase - assert_eq!(processed_rows[1]["customer_name"], DataType::Text(Some("customer two".to_string()))); // Anonymized & Lowercase + assert!( + matches!(processed_rows[1]["expiration_date"], DataType::Timestamp(_)), + "Row 1 expiration_date type mismatch" + ); + if let DataType::Timestamp(Some(dt)) = processed_rows[1]["expiration_date"] { + assert_eq!(dt, dt_from_millis(1743521739792)); + } else { + panic!("Incorrect value for Row 1 expiration_date"); + } + assert_eq!( + processed_rows[1]["order_number"], + DataType::Text(Some("ORD-B002".to_string())) // Expect original case + ); + assert_eq!( + processed_rows[1]["customer_name"], + DataType::Text(Some("Customer Two".to_string())) // Expect original case + ); assert_eq!(processed_rows[1]["return_value"], DataType::Null); // Remains Null - assert_eq!(processed_rows[1]["return_type"], DataType::Text(Some("type y".to_string()))); // Anonymized & Lowercase + assert_eq!( + processed_rows[1]["return_type"], + DataType::Text(Some("Type Y".to_string())) // Expect original case + ); // Row 2 Assertions assert_eq!(processed_rows[2]["order_date"], DataType::Null); - assert!(matches!(processed_rows[2]["return_created_at"], DataType::Timestamp(_)), "Row 2 return_created_at type mismatch"); + assert!( + matches!( + processed_rows[2]["return_created_at"], + DataType::Timestamp(_) + ), + "Row 2 return_created_at type mismatch" + ); if let DataType::Timestamp(Some(dt)) = processed_rows[2]["return_created_at"] { - assert_eq!(dt, dt_from_millis(1741104132474)); - } else { panic!("Incorrect value for Row 2 return_created_at"); } - assert!(matches!(processed_rows[2]["expiration_date"], DataType::Timestamp(_)), "Row 2 expiration_date type mismatch"); + assert_eq!(dt, dt_from_millis(1741104132474)); + } else { + panic!("Incorrect value for Row 2 return_created_at"); + } + assert!( + matches!(processed_rows[2]["expiration_date"], DataType::Timestamp(_)), + "Row 2 expiration_date type mismatch" + ); if let DataType::Timestamp(Some(dt)) = processed_rows[2]["expiration_date"] { - assert_eq!(dt, dt_from_millis(1743523332467)); - } else { panic!("Incorrect value for Row 2 expiration_date"); } - assert_eq!(processed_rows[2]["order_number"], DataType::Text(Some("ord-c003".to_string()))); // Anonymized & Lowercase - assert_eq!(processed_rows[2]["customer_name"], DataType::Text(Some("customer three".to_string()))); // Anonymized & Lowercase - assert_eq!(processed_rows[2]["return_value"], DataType::Float8(Some(500.00))); // Anonymized - assert_eq!(processed_rows[2]["return_type"], DataType::Text(Some("type z".to_string()))); // Anonymized & Lowercase + assert_eq!(dt, dt_from_millis(1743523332467)); + } else { + panic!("Incorrect value for Row 2 expiration_date"); + } + assert_eq!( + processed_rows[2]["order_number"], + DataType::Text(Some("ORD-C003".to_string())) // Expect original case + ); + assert_eq!( + processed_rows[2]["customer_name"], + DataType::Text(Some("Customer Three".to_string())) // Expect original case + ); + assert_eq!( + processed_rows[2]["return_value"], + DataType::Float8(Some(500.00)) + ); // Anonymized + assert_eq!( + processed_rows[2]["return_type"], + DataType::Text(Some("Type Z".to_string())) // Expect original case + ); println!("✓ Verified Real-World RecordBatch Processing (Anonymized)"); } -} \ No newline at end of file + + #[test] + fn test_int64_fixed_scaled_processing() { + println!("\n=== Testing Int64 FIXED with Scale Processing ==="); + + // Sample data: Int64 values representing, for example, monetary amounts in cents + let raw_values = vec![Some(12345i64), Some(67890i64), None, Some(500i64)]; // e.g., $123.45, $678.90, NULL, $5.00 + + // Create Arrow array + let data_array = Int64Array::from(raw_values.clone()); + + // Create metadata for the field + let mut field_metadata = std::collections::HashMap::new(); + field_metadata.insert("logicalType".to_string(), "FIXED".to_string()); + field_metadata.insert("scale".to_string(), "2".to_string()); // Indicates 2 decimal places + field_metadata.insert("precision".to_string(), "38".to_string()); // Example precision + + // Create field + let value_field = Field::new( + "SCALED_VALUE", // Anonymized field name + ArrowDataType::Int64, + true, // Nullable + ) + .with_metadata(field_metadata); + + // Create schema + let schema = Arc::new(Schema::new(vec![value_field])); + + // Create record batch + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(data_array) as ArrayRef], + ) + .unwrap(); + + println!("Input RecordBatch schema: {:?}", batch.schema()); + println!( + "Input RecordBatch columns: [Column 0: {:?}]", + batch.column(0) + ); + + // Process the batch + let processed_rows = process_record_batch(&batch); + + println!("Processed Rows: {:?}", processed_rows); + + // --- Assertions --- + assert_eq!(processed_rows.len(), raw_values.len(), "Number of processed rows should match input"); + + // Expected scaled Float8 values + let expected_values = vec![ + DataType::Float8(Some(123.45)), + DataType::Float8(Some(678.90)), + DataType::Null, + DataType::Float8(Some(5.00)), + ]; + + for i in 0..expected_values.len() { + assert_eq!( + processed_rows[i]["scaled_value"], // Field name is lowercased by process_record_batch + expected_values[i], + "Mismatch in row {} for 'scaled_value'", i + ); + } + + println!("✓ Verified Int64 FIXED with Scale processing"); + } +} +