Merge branch 'main' into staging

This commit is contained in:
dal 2025-05-21 23:57:37 -07:00 committed by GitHub
commit efa8ff8df0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 566 additions and 55 deletions

View File

@ -84,6 +84,7 @@ fn handle_snowflake_timestamp(value: &Value) -> Option<Value> {
fn handle_snowflake_timestamp_struct(
struct_array: &arrow::array::StructArray,
row_idx: usize,
scale: Option<i32>, // Add scale parameter
) -> Option<DateTime<Utc>> {
if struct_array.is_null(row_idx) {
return None;
@ -108,36 +109,54 @@ fn handle_snowflake_timestamp_struct(
fraction.value(row_idx)
};
// Important: Check if epoch might be in milliseconds instead of seconds
// If the epoch value is larger than typical Unix timestamps (e.g., > 50 years worth of seconds)
// it's likely in milliseconds or microseconds
// Determine epoch/nanos based on epoch_value magnitude AND scale
let (adjusted_epoch, adjusted_nanos) = if epoch_value > 5_000_000_000 {
// Likely milliseconds or microseconds - determine which
// Epoch likely in ms or us
if epoch_value > 5_000_000_000_000 {
// Microseconds
(
epoch_value / 1_000_000,
(epoch_value % 1_000_000 * 1000) as u32,
(epoch_value % 1_000_000 * 1000).try_into().unwrap_or(0), // Convert to u32 safely
)
} else {
// Milliseconds
(epoch_value / 1000, (epoch_value % 1000 * 1_000_000) as u32)
(
epoch_value / 1000,
(epoch_value % 1000 * 1_000_000).try_into().unwrap_or(0), // Convert to u32 safely
)
}
} else {
// Seconds - use fraction for nanoseconds
// For scale 3 (milliseconds), multiply by 10^6 to get nanoseconds
(epoch_value, (fraction_value as u32) * 1_000_000)
// Epoch is likely in seconds, use scale to interpret fraction
let calculated_nanos = match scale {
Some(3) => (fraction_value as i64 * 1_000_000).try_into().unwrap_or(0), // Milliseconds to nanos
Some(6) => (fraction_value as i64 * 1000).try_into().unwrap_or(0), // Microseconds to nanos
Some(9) => fraction_value.try_into().unwrap_or(0), // Fraction IS nanos
_ => { // Default or unknown scale, assume fraction is nanos if < 1B, else 0
if fraction_value >= 0 && fraction_value < 1_000_000_000 {
fraction_value as u32
} else {
tracing::warn!(
"Unhandled scale ({:?}) or invalid fraction ({}) for seconds epoch, defaulting nanos to 0",
scale,
fraction_value
);
0
}
}
};
(epoch_value, calculated_nanos)
};
match parse_snowflake_timestamp(adjusted_epoch, adjusted_nanos) {
Ok(dt) => Some(dt),
Err(e) => {
tracing::error!("Failed to parse timestamp: {}", e);
tracing::error!("Failed to parse timestamp: {}. adjusted_epoch={}, adjusted_nanos={}. Original epoch={}, fraction={}, scale={:?}",
e, adjusted_epoch, adjusted_nanos, epoch_value, fraction_value, scale);
None
}
}
}
_ => None,
_ => None, // Epoch or fraction array missing or epoch is null
}
}
@ -1253,9 +1272,13 @@ fn handle_struct_array(
// Check if this is a Snowflake timestamp struct
let fields = match field.data_type() {
ArrowDataType::Struct(fields) => fields,
_ => return DataType::Null,
_ => return DataType::Null, // Should not happen if called with a struct field
};
// Try to get scale from metadata
let scale_meta_str = field.metadata().get("scale");
let scale: Option<i32> = scale_meta_str.and_then(|s| s.parse::<i32>().ok());
if fields.len() == 2
&& fields.iter().any(|f| f.name() == "epoch")
&& fields.iter().any(|f| f.name() == "fraction")
@ -1264,7 +1287,7 @@ fn handle_struct_array(
.get("logicalType")
.map_or(false, |v| v.contains("TIMESTAMP"))
{
if let Some(dt) = handle_snowflake_timestamp_struct(array, row_idx) {
if let Some(dt) = handle_snowflake_timestamp_struct(array, row_idx, scale) { // Pass scale here
if field
.metadata()
.get("logicalType")
@ -1283,21 +1306,23 @@ fn handle_struct_array(
DataType::Null
} else {
let mut map = JsonMap::new();
for (field, col) in fields.iter().zip(array.columns().iter()) {
let field_name = field.name();
for (struct_field_def, col) in fields.iter().zip(array.columns().iter()) {
let field_name = struct_field_def.name(); // Use name from struct_field_def
let value = if col.is_null(row_idx) {
Value::Null
} else if let Some(array) = col.as_any().downcast_ref::<Int32Array>() {
Value::Number(array.value(row_idx).into())
} else if let Some(array) = col.as_any().downcast_ref::<Int64Array>() {
Value::Number(array.value(row_idx).into())
} else if let Some(array) = col.as_any().downcast_ref::<Float64Array>() {
serde_json::Number::from_f64(array.value(row_idx))
} else if let Some(arr) = col.as_any().downcast_ref::<Int32Array>() {
Value::Number(arr.value(row_idx).into())
} else if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
Value::Number(arr.value(row_idx).into())
} else if let Some(arr) = col.as_any().downcast_ref::<Float64Array>() {
serde_json::Number::from_f64(arr.value(row_idx))
.map(Value::Number)
.unwrap_or(Value::Null)
} else if let Some(array) = col.as_any().downcast_ref::<StringArray>() {
Value::String(array.value(row_idx).to_string())
} else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
Value::String(arr.value(row_idx).to_string())
} else {
// Attempt to handle nested structs recursively or other types if needed
// For now, defaulting to Null for unhandled types within generic structs
Value::Null
};
map.insert(field_name.to_string(), value);
@ -1384,81 +1409,106 @@ fn convert_array_to_datatype(
}
ArrowDataType::Int64 => {
let field_name = field.name(); // Get field name for logging
// println!("Debug: Processing Int64 field: {}", field_name);
// **NEW LOGGING START**
tracing::debug!(
"Processing Int64 field: '{}', row_idx: {}, Data Type: {:?}, Metadata: {:?}",
field_name,
row_idx,
column.data_type(),
field.metadata()
);
// **NEW LOGGING END**
// Check if this is actually a timestamp in disguise
let logical_type = field.metadata().get("logicalType");
let scale_str = field.metadata().get("scale"); // Get scale_str here as well
// println!("Debug [{}]: logicalType={:?}, scale={:?}", field_name, logical_type, scale_str);
if logical_type.map_or(false, |t| t.contains("TIMESTAMP")) {
// println!("Debug [{}]: Detected as timestamp.", field_name);
// **MODIFIED LOGGING**
tracing::debug!("[{}]: Detected as timestamp. logicalType={:?}, scale={:?}", field_name, logical_type, scale_str);
// If it has a timestamp logical type, determine the time unit based on scale
let unit = match scale_str.map(|s| s.parse::<i32>().unwrap_or(3)) {
// Default parse to 3 (ms)
let unit = match scale_str.map(|s| s.parse::<i32>().unwrap_or(3)) { // Default parse to 3 (ms)
Some(0) => TimeUnit::Second,
Some(6) => TimeUnit::Microsecond,
Some(9) => TimeUnit::Nanosecond,
Some(3) | None | Some(_) => TimeUnit::Millisecond, // Default to millisecond
};
// println!("Debug [{}]: Determined unit: {:?}", field_name, unit);
// **MODIFIED LOGGING**
tracing::debug!("[{}]: Determined unit: {:?}", field_name, unit);
// Check if there's timezone info
let has_tz = logical_type.map_or(false, |t| t.contains("_TZ"));
// println!("Debug [{}]: has_tz: {}", field_name, has_tz);
let _tz: Option<std::sync::Arc<String>> = if has_tz {
Some(Arc::new(String::from("UTC")))
} else {
None
};
// **MODIFIED LOGGING**
tracing::debug!("[{}]: has_tz: {}", field_name, has_tz);
let _tz: Option<std::sync::Arc<String>> = if has_tz { Some(Arc::new(String::from("UTC"))) } else { None };
// Process as timestamp
if let Some(array) = column.as_any().downcast_ref::<Int64Array>() {
if array.is_null(row_idx) {
// println!("Debug [{}]: Value is null at row_idx {}.", field_name, row_idx);
tracing::debug!("[{}]: Value is null at row_idx {}.", field_name, row_idx);
return DataType::Null;
}
let value = array.value(row_idx);
// println!("Debug [{}]: Raw value at row_idx {}: {}", field_name, row_idx, value);
// **MODIFIED LOGGING**
tracing::debug!("[{}]: Raw value at row_idx {}: {}", field_name, row_idx, value);
// **NEW LOGGING START**
let (secs, subsec_nanos) = match unit {
TimeUnit::Second => (value, 0),
TimeUnit::Millisecond => (value / 1000, (value % 1000) * 1_000_000),
TimeUnit::Microsecond => (value / 1_000_000, (value % 1_000_000) * 1000),
TimeUnit::Nanosecond => (value / 1_000_000_000, value % 1_000_000_000),
};
// println!("Debug [{}]: Calculated secs={}, nanos={}", field_name, secs, subsec_nanos);
tracing::debug!("[{}]: Calculated secs={}, nanos={}", field_name, secs, subsec_nanos);
// **NEW LOGGING END**
// **NEW LOGGING START**
tracing::debug!(
"[{}]: Calling Utc.timestamp_opt({}, {})",
field_name,
secs,
subsec_nanos
);
// **NEW LOGGING END**
match Utc.timestamp_opt(secs, subsec_nanos as u32) {
LocalResult::Single(dt) => {
// println!("Debug [{}]: Successfully created DateTime: {}", field_name, dt);
tracing::debug!("[{}]: Successfully created DateTime: {}", field_name, dt);
if has_tz {
// println!("Debug [{}]: Returning Timestamptz.", field_name);
tracing::debug!("[{}]: Returning Timestamptz.", field_name);
DataType::Timestamptz(Some(dt))
} else {
// println!("Debug [{}]: Returning Timestamp.", field_name);
tracing::debug!("[{}]: Returning Timestamp.", field_name);
DataType::Timestamp(Some(dt.naive_utc()))
}
}
LocalResult::None | LocalResult::Ambiguous(_, _) => {
// Handle None and Ambiguous explicitly
tracing::error!("Failed to create DateTime (None or Ambiguous) from timestamp: secs={}, nanos={}", secs, subsec_nanos);
// println!("Debug [{}]: Failed to create DateTime (None or Ambiguous) from timestamp: secs={}, nanos={}", field_name, secs, subsec_nanos);
// **NEW LOGGING START**
tracing::error!(
"[{}]: Utc.timestamp_opt failed (returned None or Ambiguous) for secs={}, nanos={}. Raw value was {}. Returning Null.",
field_name, secs, subsec_nanos, value
);
// **NEW LOGGING END**
DataType::Null
}
}
} else {
// println!("Debug [{}]: Failed to downcast to Int64Array.", field_name);
// **MODIFIED LOGGING**
tracing::warn!("[{}]: Failed to downcast Int64 (identified as timestamp) to Int64Array.", field_name);
DataType::Null
}
} else {
// Not a timestamp, so delegate to handle_int64_array which can handle scaling or default to Int8
if let Some(array) = column.as_any().downcast_ref::<Int64Array>() {
// **MODIFIED LOGGING**
tracing::debug!("[{}]: Not identified as timestamp based on metadata. Delegating to handle_int64_array.", field_name);
if let Some(array) = column.as_any().downcast_ref::<Int64Array>() {
handle_int64_array(array, row_idx, scale_str.map(|s| s.as_str()), field)
} else {
// println!("Debug [{}]: Failed to downcast Int64 for non-timestamp to Int64Array.", field_name);
} else {
// **MODIFIED LOGGING**
tracing::warn!("[{}]: Failed to downcast Int64 (non-timestamp) to Int64Array.", field_name);
DataType::Null
}
}
}
}
ArrowDataType::UInt8 => {
@ -1645,7 +1695,7 @@ fn prepare_query(query: &str) -> String {
}
fn process_record_batch(batch: &RecordBatch) -> Vec<IndexMap<String, DataType>> {
// println!("Processing record batch with {:?} rows", batch);
println!("Processing record batch with {:?} rows", batch);
let mut rows = Vec::with_capacity(batch.num_rows());
let schema = batch.schema();
@ -1992,7 +2042,7 @@ mod tests {
),
]);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0, field.metadata().get("scale").and_then(|s| s.parse::<i32>().ok()));
println!(
"handle_snowflake_timestamp_struct (seconds epoch, millis fraction): {:?}",
dt
@ -2024,7 +2074,7 @@ mod tests {
),
]);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0, field.metadata().get("scale").and_then(|s| s.parse::<i32>().ok()));
println!(
"handle_snowflake_timestamp_struct (millis epoch, zero fraction): {:?}",
dt
@ -2067,7 +2117,7 @@ mod tests {
),
]);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0, field.metadata().get("scale").and_then(|s| s.parse::<i32>().ok()));
println!(
"handle_snowflake_timestamp_struct (microsecs epoch): {:?}",
dt
@ -2398,7 +2448,7 @@ mod tests {
]);
// Call the function directly
let result = handle_snowflake_timestamp_struct(&struct_array, 0);
let result = handle_snowflake_timestamp_struct(&struct_array, 0, None);
// Print and verify result
if let Some(dt) = result {
@ -2476,8 +2526,8 @@ mod tests {
),
]);
// Test direct function
let result = handle_snowflake_timestamp_struct(&struct_array, 0);
// Test direct function - Pass None for scale
let result = handle_snowflake_timestamp_struct(&struct_array, 0, None);
assert!(result.is_none(), "Expected None for null epoch");
println!("✓ Null epoch correctly returns None");
@ -3008,5 +3058,466 @@ mod tests {
println!("✓ Verified Int64 FIXED with Scale processing");
}
/// Tests processing a RecordBatch with Struct-based timestamps (scale 9)
/// based on real-world log output.
#[test]
fn test_struct_timestamp_scale9_processing() {
println!("\n=== Testing Struct TIMESTAMP_NTZ(9) processing ===");
// --- Sample Data (Anonymized, based on provided log) ---
// Selected a few representative rows
let epoch_data = vec![
Some(1736442980i64), // Row 0
Some(1736443293i64), // Row 4
None, // Simulate a potential null row
Some(1737408291i64), // Last row
];
let fraction_data = vec![
Some(969000000i32), // Row 0 (0.969 seconds)
Some(555000000i32), // Row 4 (0.555 seconds)
Some(123456789i32), // Null epoch row, fraction irrelevant but needs value
Some(504000000i32), // Last row (0.504 seconds)
];
let product_name_data = vec![
Some("Product A"),
Some("Product B"),
Some("Product C"),
Some("Product D"),
];
let sku_data = vec![Some("SKU-A"), Some("SKU-B"), Some("SKU-C"), Some("SKU-D")];
let order_number_data = vec![
Some("ORD-111"),
Some("ORD-222"),
Some("ORD-333"),
Some("ORD-444"),
];
// --- Array Creation ---
let epoch_array = Int64Array::from(epoch_data.clone());
let fraction_array = Int32Array::from(fraction_data.clone());
let product_name_array = StringArray::from(product_name_data);
let sku_array = StringArray::from(sku_data);
let order_number_array = StringArray::from(order_number_data);
// --- Struct Array for Timestamp ---
let mut timestamp_metadata = std::collections::HashMap::new();
timestamp_metadata.insert("logicalType".to_string(), "TIMESTAMP_NTZ".to_string());
timestamp_metadata.insert("scale".to_string(), "9".to_string()); // Crucial: scale is 9
let struct_fields = Fields::from(vec![
Field::new(
"epoch",
ArrowDataType::Int64,
true, // epoch is nullable
)
.with_metadata(timestamp_metadata.clone()), // Metadata might be on inner fields too
Field::new(
"fraction",
ArrowDataType::Int32,
true, // fraction is nullable
)
.with_metadata(timestamp_metadata.clone()),
]);
// Create the StructArray
let struct_array = StructArray::new(
struct_fields.clone(),
vec![
Arc::new(epoch_array) as ArrayRef,
Arc::new(fraction_array) as ArrayRef,
],
// Set the validity based on the epoch_data's nulls
Some(arrow::buffer::NullBuffer::from(
epoch_data.iter().map(|x| x.is_some()).collect::<Vec<_>>(),
)),
);
// --- Field Creation ---
let field_returned_processed_date = Field::new(
"RETURNED_PROCESSED_DATE",
ArrowDataType::Struct(struct_fields),
true, // The struct itself is nullable
)
.with_metadata(timestamp_metadata.clone()); // Metadata on the struct field itself
let field_product_name = Field::new("PRODUCT_NAME", ArrowDataType::Utf8, true);
let field_sku = Field::new("SKU", ArrowDataType::Utf8, true);
let field_order_number = Field::new("ORDER_NUMBER", ArrowDataType::Utf8, true);
// --- Schema Creation ---
let schema = Arc::new(Schema::new(vec![
field_returned_processed_date,
field_product_name,
field_sku,
field_order_number,
]));
// --- RecordBatch Creation ---
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(struct_array) as ArrayRef,
Arc::new(product_name_array) as ArrayRef,
Arc::new(sku_array) as ArrayRef,
Arc::new(order_number_array) as ArrayRef,
],
)
.unwrap();
println!("Simulated Input RecordBatch schema: {:?}", batch.schema());
// --- Process Batch ---
let processed_rows = process_record_batch(&batch);
println!("Processed Rows (Struct Timestamp Scale 9): {:?}", processed_rows);
// --- Assertions ---
assert_eq!(processed_rows.len(), 4, "Expected 4 rows processed");
// Helper to create expected NaiveDateTime from secs/nanos
let dt_from_parts = |secs: i64, nanos: u32| Utc.timestamp_opt(secs, nanos).unwrap().naive_utc();
// Row 0 Assertions (epoch=1736442980, fraction=969000000) -> nanos=969000000
let expected_dt_0 = dt_from_parts(1736442980, 969000000);
assert_eq!(
processed_rows[0]["returned_processed_date"], // field name is lowercased
DataType::Timestamp(Some(expected_dt_0)),
"Row 0 timestamp mismatch"
);
assert_eq!(
processed_rows[0]["product_name"],
DataType::Text(Some("Product A".to_string()))
);
assert_eq!(
processed_rows[0]["sku"],
DataType::Text(Some("SKU-A".to_string()))
);
assert_eq!(
processed_rows[0]["order_number"],
DataType::Text(Some("ORD-111".to_string()))
);
// Row 1 Assertions (epoch=1736443293, fraction=555000000) -> nanos=555000000
let expected_dt_1 = dt_from_parts(1736443293, 555000000);
assert_eq!(
processed_rows[1]["returned_processed_date"],
DataType::Timestamp(Some(expected_dt_1)),
"Row 1 timestamp mismatch"
);
assert_eq!(
processed_rows[1]["product_name"],
DataType::Text(Some("Product B".to_string()))
);
assert_eq!(
processed_rows[1]["sku"],
DataType::Text(Some("SKU-B".to_string()))
);
assert_eq!(
processed_rows[1]["order_number"],
DataType::Text(Some("ORD-222".to_string()))
);
// Row 2 Assertions (Null epoch)
assert_eq!(
processed_rows[2]["returned_processed_date"],
DataType::Null,
"Row 2 timestamp should be Null"
);
assert_eq!(
processed_rows[2]["product_name"],
DataType::Text(Some("Product C".to_string()))
);
assert_eq!(
processed_rows[2]["sku"],
DataType::Text(Some("SKU-C".to_string()))
);
assert_eq!(
processed_rows[2]["order_number"],
DataType::Text(Some("ORD-333".to_string()))
);
// Row 3 Assertions (epoch=1737408291, fraction=504000000) -> nanos=504000000
let expected_dt_3 = dt_from_parts(1737408291, 504000000);
assert_eq!(
processed_rows[3]["returned_processed_date"],
DataType::Timestamp(Some(expected_dt_3)),
"Row 3 timestamp mismatch"
);
assert_eq!(
processed_rows[3]["product_name"],
DataType::Text(Some("Product D".to_string()))
);
assert_eq!(
processed_rows[3]["sku"],
DataType::Text(Some("SKU-D".to_string()))
);
assert_eq!(
processed_rows[3]["order_number"],
DataType::Text(Some("ORD-444".to_string()))
);
println!("✓ Verified Struct TIMESTAMP_NTZ(9) processing");
}
/// Tests processing Int64 arrays with TIMESTAMP_NTZ metadata and various scales.
#[test]
fn test_int64_timestamp_ntz_various_scales() {
println!("\n=== Testing Int64 TIMESTAMP_NTZ with various scales ===");
// --- Sample Data ---
// Use a consistent base time for easier verification
let base_secs = 1700000000i64; // 2023-11-14 22:13:20 UTC
// Define expected nanos carefully
let expected_nanos = 123456789u32;
// Calculate Int64 values based on scale
let data_scale0 = vec![Some(base_secs)]; // Value is seconds
let data_scale6 = vec![Some(base_secs * 1_000_000 + (expected_nanos / 1000) as i64)]; // Value is microseconds
let data_scale9 = vec![Some(base_secs * 1_000_000_000 + expected_nanos as i64)]; // Value is nanoseconds
let data_null = vec![None::<i64>];
// --- Array Creation ---
let array_scale0 = Int64Array::from(data_scale0);
let array_scale6 = Int64Array::from(data_scale6);
let array_scale9 = Int64Array::from(data_scale9);
let array_null = Int64Array::from(data_null);
// --- Metadata and Field Creation ---
let mut meta_ntz_scale0 = std::collections::HashMap::new();
meta_ntz_scale0.insert("logicalType".to_string(), "TIMESTAMP_NTZ".to_string());
meta_ntz_scale0.insert("scale".to_string(), "0".to_string());
let field_scale0 = Field::new("ts_scale0", ArrowDataType::Int64, true).with_metadata(meta_ntz_scale0);
let mut meta_ntz_scale6 = std::collections::HashMap::new();
meta_ntz_scale6.insert("logicalType".to_string(), "TIMESTAMP_NTZ".to_string());
meta_ntz_scale6.insert("scale".to_string(), "6".to_string());
let field_scale6 = Field::new("ts_scale6", ArrowDataType::Int64, true).with_metadata(meta_ntz_scale6);
let mut meta_ntz_scale9 = std::collections::HashMap::new();
meta_ntz_scale9.insert("logicalType".to_string(), "TIMESTAMP_NTZ".to_string());
meta_ntz_scale9.insert("scale".to_string(), "9".to_string());
let field_scale9 = Field::new("ts_scale9", ArrowDataType::Int64, true).with_metadata(meta_ntz_scale9.clone()); // Clone for null field
// Field for null test (metadata shouldn't matter but use one)
let field_null = Field::new("ts_null", ArrowDataType::Int64, true).with_metadata(meta_ntz_scale9);
// --- Schema and RecordBatch ---
let schema = Arc::new(Schema::new(vec![field_scale0, field_scale6, field_scale9, field_null]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(array_scale0) as ArrayRef,
Arc::new(array_scale6) as ArrayRef,
Arc::new(array_scale9) as ArrayRef,
Arc::new(array_null) as ArrayRef,
],
).unwrap();
// --- Process Batch ---
let processed_rows = process_record_batch(&batch);
// --- Assertions ---
assert_eq!(processed_rows.len(), 1, "Expected 1 row");
let row = &processed_rows[0];
// Calculate the final expected NaiveDateTime based ONLY on base_secs and expected_nanos
// Note: For scale 0, the input data doesn't contain nano precision, so the expected result *should* reflect that loss.
let expected_dt_s0 = Utc.timestamp_opt(base_secs, 0).unwrap().naive_utc(); // Scale 0 loses nanos
// For scale 6, we only have microsecond precision (the last 3 digits of nanos are truncated)
let microsecond_nanos = (expected_nanos / 1000) * 1000; // Truncate to microsecond precision
let expected_dt_s6 = Utc.timestamp_opt(base_secs, microsecond_nanos).unwrap().naive_utc();
// For scale 9, we have full nanosecond precision
let expected_dt_s9 = Utc.timestamp_opt(base_secs, expected_nanos).unwrap().naive_utc();
// Scale 0 (Seconds) - Loses nanosecond precision from original `expected_nanos`
assert_eq!(row["ts_scale0"], DataType::Timestamp(Some(expected_dt_s0)));
// Scale 6 (Microseconds) - Should only have microsecond precision (truncated)
assert_eq!(row["ts_scale6"], DataType::Timestamp(Some(expected_dt_s6)));
// Scale 9 (Nanoseconds) - Should retain full nanosecond precision
assert_eq!(row["ts_scale9"], DataType::Timestamp(Some(expected_dt_s9)));
// Null value
assert_eq!(row["ts_null"], DataType::Null);
println!("✓ Verified Int64 TIMESTAMP_NTZ with various scales");
}
/// Tests processing Int64 arrays with TIMESTAMP_TZ metadata and scale 3.
#[test]
fn test_int64_timestamp_tz_scale3() {
println!("\n=== Testing Int64 TIMESTAMP_TZ with scale 3 ===");
// --- Sample Data ---
let base_secs = 1700000000i64;
let base_millis = 123i64;
let data_millis = vec![Some(base_secs * 1000 + base_millis)]; // Milliseconds since epoch
let data_null = vec![None::<i64>];
// --- Array Creation ---
let array_data = Int64Array::from(data_millis);
let array_null = Int64Array::from(data_null);
// --- Metadata and Field Creation ---
let mut meta_tz_scale3 = std::collections::HashMap::new();
meta_tz_scale3.insert("logicalType".to_string(), "TIMESTAMP_TZ".to_string());
meta_tz_scale3.insert("scale".to_string(), "3".to_string());
let field_data = Field::new("ts_tz_scale3", ArrowDataType::Int64, true).with_metadata(meta_tz_scale3.clone());
let field_null = Field::new("ts_null", ArrowDataType::Int64, true).with_metadata(meta_tz_scale3);
// --- Schema and RecordBatch ---
let schema = Arc::new(Schema::new(vec![field_data, field_null]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(array_data) as ArrayRef,
Arc::new(array_null) as ArrayRef,
],
).unwrap();
// --- Process Batch ---
let processed_rows = process_record_batch(&batch);
// --- Assertions ---
assert_eq!(processed_rows.len(), 1, "Expected 1 row");
let row = &processed_rows[0];
let expected_dt_utc = Utc.timestamp_millis_opt(base_secs * 1000 + base_millis).unwrap();
// TZ Scale 3 (Milliseconds)
assert_eq!(row["ts_tz_scale3"], DataType::Timestamptz(Some(expected_dt_utc)));
// Null value
assert_eq!(row["ts_null"], DataType::Null);
println!("✓ Verified Int64 TIMESTAMP_TZ with scale 3");
}
/// Tests processing Struct timestamps with various scales and TZ/NTZ metadata.
#[test]
fn test_struct_timestamp_various_scales_and_tz() {
println!("\n=== Testing Struct Timestamps with various scales and TZ/NTZ ===");
// Helper function to create test structs with different scales
fn create_test_case(epoch_value: i64, fraction_value: i32, scale: i32, is_tz: bool) -> (StructArray, Field) {
let epoch_array = Int64Array::from(vec![epoch_value]);
let fraction_array = Int32Array::from(vec![fraction_value]);
let struct_fields = Fields::from(vec![
Arc::new(Field::new("epoch", ArrowDataType::Int64, false)),
Arc::new(Field::new("fraction", ArrowDataType::Int32, false)),
]);
let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("epoch", ArrowDataType::Int64, false)),
Arc::new(epoch_array) as arrow::array::ArrayRef,
),
(
Arc::new(Field::new("fraction", ArrowDataType::Int32, false)),
Arc::new(fraction_array) as arrow::array::ArrayRef,
),
]);
// Create field with metadata indicating this is a timestamp
let mut struct_metadata = std::collections::HashMap::new();
struct_metadata.insert("scale".to_string(), scale.to_string());
struct_metadata.insert(
"logicalType".to_string(),
if is_tz {
"TIMESTAMP_TZ".to_string()
} else {
"TIMESTAMP_NTZ".to_string()
},
);
let struct_field = Field::new(
"TIMESTAMP_STRUCT",
ArrowDataType::Struct(struct_fields),
false,
).with_metadata(struct_metadata);
(struct_array, struct_field)
}
// Base timestamp values for testing
let base_secs = 1700000000i64; // 2023-11-14 22:13:20 UTC
// Test cases for different scales
// (epoch, fraction, scale, is_tz, expected_subsec_nanos)
let test_cases = vec![
// Scale 3 (milliseconds)
(base_secs, 123, 3, false, 123_000_000), // 123 milliseconds → 123,000,000 nanos
(base_secs, 123, 3, true, 123_000_000), // Same with TZ
// Scale 6 (microseconds)
(base_secs, 123456, 6, false, 123_456_000), // 123,456 microseconds → 123,456,000 nanos
(base_secs, 123456, 6, true, 123_456_000), // Same with TZ
// Scale 9 (nanoseconds) - most important case to test
(base_secs, 123456789, 9, false, 123_456_789), // 123,456,789 nanoseconds directly
(base_secs, 123456789, 9, true, 123_456_789), // Same with TZ
// Edge cases
(base_secs, 0, 9, false, 0), // Zero fraction
(base_secs, 999_999_999, 9, false, 999_999_999), // Max nanoseconds
];
// Process each test case
for (idx, (epoch, fraction, scale, is_tz, expected_nanos)) in test_cases.iter().enumerate() {
println!("\nTest case {}: epoch={}, fraction={}, scale={}, tz={}",
idx, epoch, fraction, scale, is_tz);
let (struct_array, struct_field) = create_test_case(*epoch, *fraction, *scale, *is_tz);
// Test direct function call
let dt_result = handle_snowflake_timestamp_struct(&struct_array, 0, Some(*scale));
// Verify result
assert!(dt_result.is_some(),
"handle_snowflake_timestamp_struct returned None for case {}", idx);
let dt = dt_result.unwrap();
let expected_dt = Utc.timestamp_opt(*epoch, *expected_nanos).unwrap();
assert_eq!(dt.timestamp(), expected_dt.timestamp(),
"Incorrect timestamp seconds for case {}", idx);
assert_eq!(dt.timestamp_subsec_nanos(), *expected_nanos,
"Incorrect nanoseconds for case {}: got {} expected {}",
idx, dt.timestamp_subsec_nanos(), expected_nanos);
// Additionally test through handle_struct_array
let struct_array_ref = Arc::new(struct_array) as arrow::array::ArrayRef;
let result = handle_struct_array(
struct_array_ref.as_any().downcast_ref::<StructArray>().unwrap(),
0,
&struct_field,
);
// Check result type and value
if *is_tz {
match &result {
DataType::Timestamptz(Some(result_dt)) => {
assert_eq!(result_dt.timestamp(), expected_dt.timestamp());
assert_eq!(result_dt.timestamp_subsec_nanos(), *expected_nanos);
}
_ => panic!("Expected DataType::Timestamptz, got: {:?}", result),
}
} else {
match &result {
DataType::Timestamp(Some(result_naive_dt)) => {
assert_eq!(result_naive_dt.and_utc().timestamp(), expected_dt.timestamp());
assert_eq!(result_naive_dt.and_utc().timestamp_subsec_nanos(), *expected_nanos);
}
_ => panic!("Expected DataType::Timestamp, got: {:?}", result),
}
}
println!("✓ Test case {} passed", idx);
}
println!("✓ Verified Struct Timestamps with various scales and TZ/NTZ");
}
}