diff --git a/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs b/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs index cd9a14837..59adb3873 100644 --- a/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs +++ b/api/libs/query_engine/src/data_source_query_routes/snowflake_query.rs @@ -84,6 +84,7 @@ fn handle_snowflake_timestamp(value: &Value) -> Option { fn handle_snowflake_timestamp_struct( struct_array: &arrow::array::StructArray, row_idx: usize, + scale: Option, // Add scale parameter ) -> Option> { if struct_array.is_null(row_idx) { return None; @@ -108,36 +109,54 @@ fn handle_snowflake_timestamp_struct( fraction.value(row_idx) }; - // Important: Check if epoch might be in milliseconds instead of seconds - // If the epoch value is larger than typical Unix timestamps (e.g., > 50 years worth of seconds) - // it's likely in milliseconds or microseconds + // Determine epoch/nanos based on epoch_value magnitude AND scale let (adjusted_epoch, adjusted_nanos) = if epoch_value > 5_000_000_000 { - // Likely milliseconds or microseconds - determine which + // Epoch likely in ms or us if epoch_value > 5_000_000_000_000 { // Microseconds ( epoch_value / 1_000_000, - (epoch_value % 1_000_000 * 1000) as u32, + (epoch_value % 1_000_000 * 1000).try_into().unwrap_or(0), // Convert to u32 safely ) } else { // Milliseconds - (epoch_value / 1000, (epoch_value % 1000 * 1_000_000) as u32) + ( + epoch_value / 1000, + (epoch_value % 1000 * 1_000_000).try_into().unwrap_or(0), // Convert to u32 safely + ) } } else { - // Seconds - use fraction for nanoseconds - // For scale 3 (milliseconds), multiply by 10^6 to get nanoseconds - (epoch_value, (fraction_value as u32) * 1_000_000) + // Epoch is likely in seconds, use scale to interpret fraction + let calculated_nanos = match scale { + Some(3) => (fraction_value as i64 * 1_000_000).try_into().unwrap_or(0), // Milliseconds to nanos + Some(6) => (fraction_value as i64 * 1000).try_into().unwrap_or(0), // Microseconds to nanos + Some(9) => fraction_value.try_into().unwrap_or(0), // Fraction IS nanos + _ => { // Default or unknown scale, assume fraction is nanos if < 1B, else 0 + if fraction_value >= 0 && fraction_value < 1_000_000_000 { + fraction_value as u32 + } else { + tracing::warn!( + "Unhandled scale ({:?}) or invalid fraction ({}) for seconds epoch, defaulting nanos to 0", + scale, + fraction_value + ); + 0 + } + } + }; + (epoch_value, calculated_nanos) }; match parse_snowflake_timestamp(adjusted_epoch, adjusted_nanos) { Ok(dt) => Some(dt), Err(e) => { - tracing::error!("Failed to parse timestamp: {}", e); + tracing::error!("Failed to parse timestamp: {}. adjusted_epoch={}, adjusted_nanos={}. Original epoch={}, fraction={}, scale={:?}", + e, adjusted_epoch, adjusted_nanos, epoch_value, fraction_value, scale); None } } } - _ => None, + _ => None, // Epoch or fraction array missing or epoch is null } } @@ -1253,9 +1272,13 @@ fn handle_struct_array( // Check if this is a Snowflake timestamp struct let fields = match field.data_type() { ArrowDataType::Struct(fields) => fields, - _ => return DataType::Null, + _ => return DataType::Null, // Should not happen if called with a struct field }; + // Try to get scale from metadata + let scale_meta_str = field.metadata().get("scale"); + let scale: Option = scale_meta_str.and_then(|s| s.parse::().ok()); + if fields.len() == 2 && fields.iter().any(|f| f.name() == "epoch") && fields.iter().any(|f| f.name() == "fraction") @@ -1264,7 +1287,7 @@ fn handle_struct_array( .get("logicalType") .map_or(false, |v| v.contains("TIMESTAMP")) { - if let Some(dt) = handle_snowflake_timestamp_struct(array, row_idx) { + if let Some(dt) = handle_snowflake_timestamp_struct(array, row_idx, scale) { // Pass scale here if field .metadata() .get("logicalType") @@ -1283,21 +1306,23 @@ fn handle_struct_array( DataType::Null } else { let mut map = JsonMap::new(); - for (field, col) in fields.iter().zip(array.columns().iter()) { - let field_name = field.name(); + for (struct_field_def, col) in fields.iter().zip(array.columns().iter()) { + let field_name = struct_field_def.name(); // Use name from struct_field_def let value = if col.is_null(row_idx) { Value::Null - } else if let Some(array) = col.as_any().downcast_ref::() { - Value::Number(array.value(row_idx).into()) - } else if let Some(array) = col.as_any().downcast_ref::() { - Value::Number(array.value(row_idx).into()) - } else if let Some(array) = col.as_any().downcast_ref::() { - serde_json::Number::from_f64(array.value(row_idx)) + } else if let Some(arr) = col.as_any().downcast_ref::() { + Value::Number(arr.value(row_idx).into()) + } else if let Some(arr) = col.as_any().downcast_ref::() { + Value::Number(arr.value(row_idx).into()) + } else if let Some(arr) = col.as_any().downcast_ref::() { + serde_json::Number::from_f64(arr.value(row_idx)) .map(Value::Number) .unwrap_or(Value::Null) - } else if let Some(array) = col.as_any().downcast_ref::() { - Value::String(array.value(row_idx).to_string()) + } else if let Some(arr) = col.as_any().downcast_ref::() { + Value::String(arr.value(row_idx).to_string()) } else { + // Attempt to handle nested structs recursively or other types if needed + // For now, defaulting to Null for unhandled types within generic structs Value::Null }; map.insert(field_name.to_string(), value); @@ -1384,81 +1409,106 @@ fn convert_array_to_datatype( } ArrowDataType::Int64 => { let field_name = field.name(); // Get field name for logging - // println!("Debug: Processing Int64 field: {}", field_name); + // **NEW LOGGING START** + tracing::debug!( + "Processing Int64 field: '{}', row_idx: {}, Data Type: {:?}, Metadata: {:?}", + field_name, + row_idx, + column.data_type(), + field.metadata() + ); + // **NEW LOGGING END** // Check if this is actually a timestamp in disguise let logical_type = field.metadata().get("logicalType"); let scale_str = field.metadata().get("scale"); // Get scale_str here as well - // println!("Debug [{}]: logicalType={:?}, scale={:?}", field_name, logical_type, scale_str); if logical_type.map_or(false, |t| t.contains("TIMESTAMP")) { - // println!("Debug [{}]: Detected as timestamp.", field_name); + // **MODIFIED LOGGING** + tracing::debug!("[{}]: Detected as timestamp. logicalType={:?}, scale={:?}", field_name, logical_type, scale_str); // If it has a timestamp logical type, determine the time unit based on scale - let unit = match scale_str.map(|s| s.parse::().unwrap_or(3)) { - // Default parse to 3 (ms) + let unit = match scale_str.map(|s| s.parse::().unwrap_or(3)) { // Default parse to 3 (ms) Some(0) => TimeUnit::Second, Some(6) => TimeUnit::Microsecond, Some(9) => TimeUnit::Nanosecond, Some(3) | None | Some(_) => TimeUnit::Millisecond, // Default to millisecond }; - // println!("Debug [{}]: Determined unit: {:?}", field_name, unit); + // **MODIFIED LOGGING** + tracing::debug!("[{}]: Determined unit: {:?}", field_name, unit); // Check if there's timezone info let has_tz = logical_type.map_or(false, |t| t.contains("_TZ")); - // println!("Debug [{}]: has_tz: {}", field_name, has_tz); - let _tz: Option> = if has_tz { - Some(Arc::new(String::from("UTC"))) - } else { - None - }; + // **MODIFIED LOGGING** + tracing::debug!("[{}]: has_tz: {}", field_name, has_tz); + let _tz: Option> = if has_tz { Some(Arc::new(String::from("UTC"))) } else { None }; // Process as timestamp if let Some(array) = column.as_any().downcast_ref::() { if array.is_null(row_idx) { - // println!("Debug [{}]: Value is null at row_idx {}.", field_name, row_idx); + tracing::debug!("[{}]: Value is null at row_idx {}.", field_name, row_idx); return DataType::Null; } let value = array.value(row_idx); - // println!("Debug [{}]: Raw value at row_idx {}: {}", field_name, row_idx, value); + // **MODIFIED LOGGING** + tracing::debug!("[{}]: Raw value at row_idx {}: {}", field_name, row_idx, value); + + // **NEW LOGGING START** let (secs, subsec_nanos) = match unit { TimeUnit::Second => (value, 0), TimeUnit::Millisecond => (value / 1000, (value % 1000) * 1_000_000), TimeUnit::Microsecond => (value / 1_000_000, (value % 1_000_000) * 1000), TimeUnit::Nanosecond => (value / 1_000_000_000, value % 1_000_000_000), }; - // println!("Debug [{}]: Calculated secs={}, nanos={}", field_name, secs, subsec_nanos); + tracing::debug!("[{}]: Calculated secs={}, nanos={}", field_name, secs, subsec_nanos); + // **NEW LOGGING END** + // **NEW LOGGING START** + tracing::debug!( + "[{}]: Calling Utc.timestamp_opt({}, {})", + field_name, + secs, + subsec_nanos + ); + // **NEW LOGGING END** match Utc.timestamp_opt(secs, subsec_nanos as u32) { LocalResult::Single(dt) => { - // println!("Debug [{}]: Successfully created DateTime: {}", field_name, dt); + tracing::debug!("[{}]: Successfully created DateTime: {}", field_name, dt); if has_tz { - // println!("Debug [{}]: Returning Timestamptz.", field_name); + tracing::debug!("[{}]: Returning Timestamptz.", field_name); DataType::Timestamptz(Some(dt)) } else { - // println!("Debug [{}]: Returning Timestamp.", field_name); + tracing::debug!("[{}]: Returning Timestamp.", field_name); DataType::Timestamp(Some(dt.naive_utc())) } } LocalResult::None | LocalResult::Ambiguous(_, _) => { // Handle None and Ambiguous explicitly - tracing::error!("Failed to create DateTime (None or Ambiguous) from timestamp: secs={}, nanos={}", secs, subsec_nanos); - // println!("Debug [{}]: Failed to create DateTime (None or Ambiguous) from timestamp: secs={}, nanos={}", field_name, secs, subsec_nanos); + // **NEW LOGGING START** + tracing::error!( + "[{}]: Utc.timestamp_opt failed (returned None or Ambiguous) for secs={}, nanos={}. Raw value was {}. Returning Null.", + field_name, secs, subsec_nanos, value + ); + // **NEW LOGGING END** DataType::Null } } } else { - // println!("Debug [{}]: Failed to downcast to Int64Array.", field_name); + // **MODIFIED LOGGING** + tracing::warn!("[{}]: Failed to downcast Int64 (identified as timestamp) to Int64Array.", field_name); DataType::Null } } else { // Not a timestamp, so delegate to handle_int64_array which can handle scaling or default to Int8 - if let Some(array) = column.as_any().downcast_ref::() { + // **MODIFIED LOGGING** + tracing::debug!("[{}]: Not identified as timestamp based on metadata. Delegating to handle_int64_array.", field_name); + if let Some(array) = column.as_any().downcast_ref::() { handle_int64_array(array, row_idx, scale_str.map(|s| s.as_str()), field) - } else { - // println!("Debug [{}]: Failed to downcast Int64 for non-timestamp to Int64Array.", field_name); + } else { + // **MODIFIED LOGGING** + tracing::warn!("[{}]: Failed to downcast Int64 (non-timestamp) to Int64Array.", field_name); DataType::Null - } + } } } ArrowDataType::UInt8 => { @@ -1645,7 +1695,7 @@ fn prepare_query(query: &str) -> String { } fn process_record_batch(batch: &RecordBatch) -> Vec> { - // println!("Processing record batch with {:?} rows", batch); + println!("Processing record batch with {:?} rows", batch); let mut rows = Vec::with_capacity(batch.num_rows()); let schema = batch.schema(); @@ -1992,7 +2042,7 @@ mod tests { ), ]); - let dt = handle_snowflake_timestamp_struct(&struct_array, 0); + let dt = handle_snowflake_timestamp_struct(&struct_array, 0, field.metadata().get("scale").and_then(|s| s.parse::().ok())); println!( "handle_snowflake_timestamp_struct (seconds epoch, millis fraction): {:?}", dt @@ -2024,7 +2074,7 @@ mod tests { ), ]); - let dt = handle_snowflake_timestamp_struct(&struct_array, 0); + let dt = handle_snowflake_timestamp_struct(&struct_array, 0, field.metadata().get("scale").and_then(|s| s.parse::().ok())); println!( "handle_snowflake_timestamp_struct (millis epoch, zero fraction): {:?}", dt @@ -2067,7 +2117,7 @@ mod tests { ), ]); - let dt = handle_snowflake_timestamp_struct(&struct_array, 0); + let dt = handle_snowflake_timestamp_struct(&struct_array, 0, field.metadata().get("scale").and_then(|s| s.parse::().ok())); println!( "handle_snowflake_timestamp_struct (microsecs epoch): {:?}", dt @@ -2398,7 +2448,7 @@ mod tests { ]); // Call the function directly - let result = handle_snowflake_timestamp_struct(&struct_array, 0); + let result = handle_snowflake_timestamp_struct(&struct_array, 0, None); // Print and verify result if let Some(dt) = result { @@ -2476,8 +2526,8 @@ mod tests { ), ]); - // Test direct function - let result = handle_snowflake_timestamp_struct(&struct_array, 0); + // Test direct function - Pass None for scale + let result = handle_snowflake_timestamp_struct(&struct_array, 0, None); assert!(result.is_none(), "Expected None for null epoch"); println!("✓ Null epoch correctly returns None"); @@ -3008,5 +3058,466 @@ mod tests { println!("✓ Verified Int64 FIXED with Scale processing"); } + + /// Tests processing a RecordBatch with Struct-based timestamps (scale 9) + /// based on real-world log output. + #[test] + fn test_struct_timestamp_scale9_processing() { + println!("\n=== Testing Struct TIMESTAMP_NTZ(9) processing ==="); + + // --- Sample Data (Anonymized, based on provided log) --- + // Selected a few representative rows + let epoch_data = vec![ + Some(1736442980i64), // Row 0 + Some(1736443293i64), // Row 4 + None, // Simulate a potential null row + Some(1737408291i64), // Last row + ]; + let fraction_data = vec![ + Some(969000000i32), // Row 0 (0.969 seconds) + Some(555000000i32), // Row 4 (0.555 seconds) + Some(123456789i32), // Null epoch row, fraction irrelevant but needs value + Some(504000000i32), // Last row (0.504 seconds) + ]; + let product_name_data = vec![ + Some("Product A"), + Some("Product B"), + Some("Product C"), + Some("Product D"), + ]; + let sku_data = vec![Some("SKU-A"), Some("SKU-B"), Some("SKU-C"), Some("SKU-D")]; + let order_number_data = vec![ + Some("ORD-111"), + Some("ORD-222"), + Some("ORD-333"), + Some("ORD-444"), + ]; + + // --- Array Creation --- + let epoch_array = Int64Array::from(epoch_data.clone()); + let fraction_array = Int32Array::from(fraction_data.clone()); + let product_name_array = StringArray::from(product_name_data); + let sku_array = StringArray::from(sku_data); + let order_number_array = StringArray::from(order_number_data); + + // --- Struct Array for Timestamp --- + let mut timestamp_metadata = std::collections::HashMap::new(); + timestamp_metadata.insert("logicalType".to_string(), "TIMESTAMP_NTZ".to_string()); + timestamp_metadata.insert("scale".to_string(), "9".to_string()); // Crucial: scale is 9 + + let struct_fields = Fields::from(vec![ + Field::new( + "epoch", + ArrowDataType::Int64, + true, // epoch is nullable + ) + .with_metadata(timestamp_metadata.clone()), // Metadata might be on inner fields too + Field::new( + "fraction", + ArrowDataType::Int32, + true, // fraction is nullable + ) + .with_metadata(timestamp_metadata.clone()), + ]); + + // Create the StructArray + let struct_array = StructArray::new( + struct_fields.clone(), + vec![ + Arc::new(epoch_array) as ArrayRef, + Arc::new(fraction_array) as ArrayRef, + ], + // Set the validity based on the epoch_data's nulls + Some(arrow::buffer::NullBuffer::from( + epoch_data.iter().map(|x| x.is_some()).collect::>(), + )), + ); + + // --- Field Creation --- + let field_returned_processed_date = Field::new( + "RETURNED_PROCESSED_DATE", + ArrowDataType::Struct(struct_fields), + true, // The struct itself is nullable + ) + .with_metadata(timestamp_metadata.clone()); // Metadata on the struct field itself + + let field_product_name = Field::new("PRODUCT_NAME", ArrowDataType::Utf8, true); + let field_sku = Field::new("SKU", ArrowDataType::Utf8, true); + let field_order_number = Field::new("ORDER_NUMBER", ArrowDataType::Utf8, true); + + // --- Schema Creation --- + let schema = Arc::new(Schema::new(vec![ + field_returned_processed_date, + field_product_name, + field_sku, + field_order_number, + ])); + + // --- RecordBatch Creation --- + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(struct_array) as ArrayRef, + Arc::new(product_name_array) as ArrayRef, + Arc::new(sku_array) as ArrayRef, + Arc::new(order_number_array) as ArrayRef, + ], + ) + .unwrap(); + + println!("Simulated Input RecordBatch schema: {:?}", batch.schema()); + + // --- Process Batch --- + let processed_rows = process_record_batch(&batch); + + println!("Processed Rows (Struct Timestamp Scale 9): {:?}", processed_rows); + + // --- Assertions --- + assert_eq!(processed_rows.len(), 4, "Expected 4 rows processed"); + + // Helper to create expected NaiveDateTime from secs/nanos + let dt_from_parts = |secs: i64, nanos: u32| Utc.timestamp_opt(secs, nanos).unwrap().naive_utc(); + + // Row 0 Assertions (epoch=1736442980, fraction=969000000) -> nanos=969000000 + let expected_dt_0 = dt_from_parts(1736442980, 969000000); + assert_eq!( + processed_rows[0]["returned_processed_date"], // field name is lowercased + DataType::Timestamp(Some(expected_dt_0)), + "Row 0 timestamp mismatch" + ); + assert_eq!( + processed_rows[0]["product_name"], + DataType::Text(Some("Product A".to_string())) + ); + assert_eq!( + processed_rows[0]["sku"], + DataType::Text(Some("SKU-A".to_string())) + ); + assert_eq!( + processed_rows[0]["order_number"], + DataType::Text(Some("ORD-111".to_string())) + ); + + // Row 1 Assertions (epoch=1736443293, fraction=555000000) -> nanos=555000000 + let expected_dt_1 = dt_from_parts(1736443293, 555000000); + assert_eq!( + processed_rows[1]["returned_processed_date"], + DataType::Timestamp(Some(expected_dt_1)), + "Row 1 timestamp mismatch" + ); + assert_eq!( + processed_rows[1]["product_name"], + DataType::Text(Some("Product B".to_string())) + ); + assert_eq!( + processed_rows[1]["sku"], + DataType::Text(Some("SKU-B".to_string())) + ); + assert_eq!( + processed_rows[1]["order_number"], + DataType::Text(Some("ORD-222".to_string())) + ); + + // Row 2 Assertions (Null epoch) + assert_eq!( + processed_rows[2]["returned_processed_date"], + DataType::Null, + "Row 2 timestamp should be Null" + ); + assert_eq!( + processed_rows[2]["product_name"], + DataType::Text(Some("Product C".to_string())) + ); + assert_eq!( + processed_rows[2]["sku"], + DataType::Text(Some("SKU-C".to_string())) + ); + assert_eq!( + processed_rows[2]["order_number"], + DataType::Text(Some("ORD-333".to_string())) + ); + + + // Row 3 Assertions (epoch=1737408291, fraction=504000000) -> nanos=504000000 + let expected_dt_3 = dt_from_parts(1737408291, 504000000); + assert_eq!( + processed_rows[3]["returned_processed_date"], + DataType::Timestamp(Some(expected_dt_3)), + "Row 3 timestamp mismatch" + ); + assert_eq!( + processed_rows[3]["product_name"], + DataType::Text(Some("Product D".to_string())) + ); + assert_eq!( + processed_rows[3]["sku"], + DataType::Text(Some("SKU-D".to_string())) + ); + assert_eq!( + processed_rows[3]["order_number"], + DataType::Text(Some("ORD-444".to_string())) + ); + + println!("✓ Verified Struct TIMESTAMP_NTZ(9) processing"); + } + + /// Tests processing Int64 arrays with TIMESTAMP_NTZ metadata and various scales. + #[test] + fn test_int64_timestamp_ntz_various_scales() { + println!("\n=== Testing Int64 TIMESTAMP_NTZ with various scales ==="); + + // --- Sample Data --- + // Use a consistent base time for easier verification + let base_secs = 1700000000i64; // 2023-11-14 22:13:20 UTC + // Define expected nanos carefully + let expected_nanos = 123456789u32; + + // Calculate Int64 values based on scale + let data_scale0 = vec![Some(base_secs)]; // Value is seconds + let data_scale6 = vec![Some(base_secs * 1_000_000 + (expected_nanos / 1000) as i64)]; // Value is microseconds + let data_scale9 = vec![Some(base_secs * 1_000_000_000 + expected_nanos as i64)]; // Value is nanoseconds + let data_null = vec![None::]; + + // --- Array Creation --- + let array_scale0 = Int64Array::from(data_scale0); + let array_scale6 = Int64Array::from(data_scale6); + let array_scale9 = Int64Array::from(data_scale9); + let array_null = Int64Array::from(data_null); + + // --- Metadata and Field Creation --- + let mut meta_ntz_scale0 = std::collections::HashMap::new(); + meta_ntz_scale0.insert("logicalType".to_string(), "TIMESTAMP_NTZ".to_string()); + meta_ntz_scale0.insert("scale".to_string(), "0".to_string()); + let field_scale0 = Field::new("ts_scale0", ArrowDataType::Int64, true).with_metadata(meta_ntz_scale0); + + let mut meta_ntz_scale6 = std::collections::HashMap::new(); + meta_ntz_scale6.insert("logicalType".to_string(), "TIMESTAMP_NTZ".to_string()); + meta_ntz_scale6.insert("scale".to_string(), "6".to_string()); + let field_scale6 = Field::new("ts_scale6", ArrowDataType::Int64, true).with_metadata(meta_ntz_scale6); + + let mut meta_ntz_scale9 = std::collections::HashMap::new(); + meta_ntz_scale9.insert("logicalType".to_string(), "TIMESTAMP_NTZ".to_string()); + meta_ntz_scale9.insert("scale".to_string(), "9".to_string()); + let field_scale9 = Field::new("ts_scale9", ArrowDataType::Int64, true).with_metadata(meta_ntz_scale9.clone()); // Clone for null field + + // Field for null test (metadata shouldn't matter but use one) + let field_null = Field::new("ts_null", ArrowDataType::Int64, true).with_metadata(meta_ntz_scale9); + + // --- Schema and RecordBatch --- + let schema = Arc::new(Schema::new(vec![field_scale0, field_scale6, field_scale9, field_null])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(array_scale0) as ArrayRef, + Arc::new(array_scale6) as ArrayRef, + Arc::new(array_scale9) as ArrayRef, + Arc::new(array_null) as ArrayRef, + ], + ).unwrap(); + + // --- Process Batch --- + let processed_rows = process_record_batch(&batch); + + // --- Assertions --- + assert_eq!(processed_rows.len(), 1, "Expected 1 row"); + let row = &processed_rows[0]; + + // Calculate the final expected NaiveDateTime based ONLY on base_secs and expected_nanos + // Note: For scale 0, the input data doesn't contain nano precision, so the expected result *should* reflect that loss. + let expected_dt_s0 = Utc.timestamp_opt(base_secs, 0).unwrap().naive_utc(); // Scale 0 loses nanos + + // For scale 6, we only have microsecond precision (the last 3 digits of nanos are truncated) + let microsecond_nanos = (expected_nanos / 1000) * 1000; // Truncate to microsecond precision + let expected_dt_s6 = Utc.timestamp_opt(base_secs, microsecond_nanos).unwrap().naive_utc(); + + // For scale 9, we have full nanosecond precision + let expected_dt_s9 = Utc.timestamp_opt(base_secs, expected_nanos).unwrap().naive_utc(); + + // Scale 0 (Seconds) - Loses nanosecond precision from original `expected_nanos` + assert_eq!(row["ts_scale0"], DataType::Timestamp(Some(expected_dt_s0))); + // Scale 6 (Microseconds) - Should only have microsecond precision (truncated) + assert_eq!(row["ts_scale6"], DataType::Timestamp(Some(expected_dt_s6))); + // Scale 9 (Nanoseconds) - Should retain full nanosecond precision + assert_eq!(row["ts_scale9"], DataType::Timestamp(Some(expected_dt_s9))); + // Null value + assert_eq!(row["ts_null"], DataType::Null); + + println!("✓ Verified Int64 TIMESTAMP_NTZ with various scales"); + } + + /// Tests processing Int64 arrays with TIMESTAMP_TZ metadata and scale 3. + #[test] + fn test_int64_timestamp_tz_scale3() { + println!("\n=== Testing Int64 TIMESTAMP_TZ with scale 3 ==="); + + // --- Sample Data --- + let base_secs = 1700000000i64; + let base_millis = 123i64; + let data_millis = vec![Some(base_secs * 1000 + base_millis)]; // Milliseconds since epoch + let data_null = vec![None::]; + + // --- Array Creation --- + let array_data = Int64Array::from(data_millis); + let array_null = Int64Array::from(data_null); + + // --- Metadata and Field Creation --- + let mut meta_tz_scale3 = std::collections::HashMap::new(); + meta_tz_scale3.insert("logicalType".to_string(), "TIMESTAMP_TZ".to_string()); + meta_tz_scale3.insert("scale".to_string(), "3".to_string()); + let field_data = Field::new("ts_tz_scale3", ArrowDataType::Int64, true).with_metadata(meta_tz_scale3.clone()); + let field_null = Field::new("ts_null", ArrowDataType::Int64, true).with_metadata(meta_tz_scale3); + + // --- Schema and RecordBatch --- + let schema = Arc::new(Schema::new(vec![field_data, field_null])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(array_data) as ArrayRef, + Arc::new(array_null) as ArrayRef, + ], + ).unwrap(); + + // --- Process Batch --- + let processed_rows = process_record_batch(&batch); + + // --- Assertions --- + assert_eq!(processed_rows.len(), 1, "Expected 1 row"); + let row = &processed_rows[0]; + + let expected_dt_utc = Utc.timestamp_millis_opt(base_secs * 1000 + base_millis).unwrap(); + + // TZ Scale 3 (Milliseconds) + assert_eq!(row["ts_tz_scale3"], DataType::Timestamptz(Some(expected_dt_utc))); + // Null value + assert_eq!(row["ts_null"], DataType::Null); + + println!("✓ Verified Int64 TIMESTAMP_TZ with scale 3"); + } + + /// Tests processing Struct timestamps with various scales and TZ/NTZ metadata. + #[test] + fn test_struct_timestamp_various_scales_and_tz() { + println!("\n=== Testing Struct Timestamps with various scales and TZ/NTZ ==="); + + // Helper function to create test structs with different scales + fn create_test_case(epoch_value: i64, fraction_value: i32, scale: i32, is_tz: bool) -> (StructArray, Field) { + let epoch_array = Int64Array::from(vec![epoch_value]); + let fraction_array = Int32Array::from(vec![fraction_value]); + + let struct_fields = Fields::from(vec![ + Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), + Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), + ]); + + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("epoch", ArrowDataType::Int64, false)), + Arc::new(epoch_array) as arrow::array::ArrayRef, + ), + ( + Arc::new(Field::new("fraction", ArrowDataType::Int32, false)), + Arc::new(fraction_array) as arrow::array::ArrayRef, + ), + ]); + + // Create field with metadata indicating this is a timestamp + let mut struct_metadata = std::collections::HashMap::new(); + struct_metadata.insert("scale".to_string(), scale.to_string()); + struct_metadata.insert( + "logicalType".to_string(), + if is_tz { + "TIMESTAMP_TZ".to_string() + } else { + "TIMESTAMP_NTZ".to_string() + }, + ); + + let struct_field = Field::new( + "TIMESTAMP_STRUCT", + ArrowDataType::Struct(struct_fields), + false, + ).with_metadata(struct_metadata); + + (struct_array, struct_field) + } + + // Base timestamp values for testing + let base_secs = 1700000000i64; // 2023-11-14 22:13:20 UTC + + // Test cases for different scales + // (epoch, fraction, scale, is_tz, expected_subsec_nanos) + let test_cases = vec![ + // Scale 3 (milliseconds) + (base_secs, 123, 3, false, 123_000_000), // 123 milliseconds → 123,000,000 nanos + (base_secs, 123, 3, true, 123_000_000), // Same with TZ + + // Scale 6 (microseconds) + (base_secs, 123456, 6, false, 123_456_000), // 123,456 microseconds → 123,456,000 nanos + (base_secs, 123456, 6, true, 123_456_000), // Same with TZ + + // Scale 9 (nanoseconds) - most important case to test + (base_secs, 123456789, 9, false, 123_456_789), // 123,456,789 nanoseconds directly + (base_secs, 123456789, 9, true, 123_456_789), // Same with TZ + + // Edge cases + (base_secs, 0, 9, false, 0), // Zero fraction + (base_secs, 999_999_999, 9, false, 999_999_999), // Max nanoseconds + ]; + + // Process each test case + for (idx, (epoch, fraction, scale, is_tz, expected_nanos)) in test_cases.iter().enumerate() { + println!("\nTest case {}: epoch={}, fraction={}, scale={}, tz={}", + idx, epoch, fraction, scale, is_tz); + + let (struct_array, struct_field) = create_test_case(*epoch, *fraction, *scale, *is_tz); + + // Test direct function call + let dt_result = handle_snowflake_timestamp_struct(&struct_array, 0, Some(*scale)); + + // Verify result + assert!(dt_result.is_some(), + "handle_snowflake_timestamp_struct returned None for case {}", idx); + + let dt = dt_result.unwrap(); + let expected_dt = Utc.timestamp_opt(*epoch, *expected_nanos).unwrap(); + + assert_eq!(dt.timestamp(), expected_dt.timestamp(), + "Incorrect timestamp seconds for case {}", idx); + assert_eq!(dt.timestamp_subsec_nanos(), *expected_nanos, + "Incorrect nanoseconds for case {}: got {} expected {}", + idx, dt.timestamp_subsec_nanos(), expected_nanos); + + // Additionally test through handle_struct_array + let struct_array_ref = Arc::new(struct_array) as arrow::array::ArrayRef; + let result = handle_struct_array( + struct_array_ref.as_any().downcast_ref::().unwrap(), + 0, + &struct_field, + ); + + // Check result type and value + if *is_tz { + match &result { + DataType::Timestamptz(Some(result_dt)) => { + assert_eq!(result_dt.timestamp(), expected_dt.timestamp()); + assert_eq!(result_dt.timestamp_subsec_nanos(), *expected_nanos); + } + _ => panic!("Expected DataType::Timestamptz, got: {:?}", result), + } + } else { + match &result { + DataType::Timestamp(Some(result_naive_dt)) => { + assert_eq!(result_naive_dt.and_utc().timestamp(), expected_dt.timestamp()); + assert_eq!(result_naive_dt.and_utc().timestamp_subsec_nanos(), *expected_nanos); + } + _ => panic!("Expected DataType::Timestamp, got: {:?}", result), + } + } + + println!("✓ Test case {} passed", idx); + } + + println!("✓ Verified Struct Timestamps with various scales and TZ/NTZ"); + } }