snowlfake scale 9

This commit is contained in:
dal 2025-05-12 17:05:41 -06:00
parent 4669ba92b0
commit 84d25a1456
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 306 additions and 54 deletions

View File

@ -84,6 +84,7 @@ fn handle_snowflake_timestamp(value: &Value) -> Option<Value> {
fn handle_snowflake_timestamp_struct(
struct_array: &arrow::array::StructArray,
row_idx: usize,
scale: Option<i32>, // Add scale parameter
) -> Option<DateTime<Utc>> {
if struct_array.is_null(row_idx) {
return None;
@ -108,36 +109,54 @@ fn handle_snowflake_timestamp_struct(
fraction.value(row_idx)
};
// Important: Check if epoch might be in milliseconds instead of seconds
// If the epoch value is larger than typical Unix timestamps (e.g., > 50 years worth of seconds)
// it's likely in milliseconds or microseconds
// Determine epoch/nanos based on epoch_value magnitude AND scale
let (adjusted_epoch, adjusted_nanos) = if epoch_value > 5_000_000_000 {
// Likely milliseconds or microseconds - determine which
// Epoch likely in ms or us
if epoch_value > 5_000_000_000_000 {
// Microseconds
(
epoch_value / 1_000_000,
(epoch_value % 1_000_000 * 1000) as u32,
(epoch_value % 1_000_000 * 1000).try_into().unwrap_or(0), // Convert to u32 safely
)
} else {
// Milliseconds
(epoch_value / 1000, (epoch_value % 1000 * 1_000_000) as u32)
(
epoch_value / 1000,
(epoch_value % 1000 * 1_000_000).try_into().unwrap_or(0), // Convert to u32 safely
)
}
} else {
// Seconds - use fraction for nanoseconds
// For scale 3 (milliseconds), multiply by 10^6 to get nanoseconds
(epoch_value, (fraction_value as u32) * 1_000_000)
// Epoch is likely in seconds, use scale to interpret fraction
let calculated_nanos = match scale {
Some(3) => (fraction_value as i64 * 1_000_000).try_into().unwrap_or(0), // Milliseconds to nanos
Some(6) => (fraction_value as i64 * 1000).try_into().unwrap_or(0), // Microseconds to nanos
Some(9) => fraction_value.try_into().unwrap_or(0), // Fraction IS nanos
_ => { // Default or unknown scale, assume fraction is nanos if < 1B, else 0
if fraction_value >= 0 && fraction_value < 1_000_000_000 {
fraction_value as u32
} else {
tracing::warn!(
"Unhandled scale ({:?}) or invalid fraction ({}) for seconds epoch, defaulting nanos to 0",
scale,
fraction_value
);
0
}
}
};
(epoch_value, calculated_nanos)
};
match parse_snowflake_timestamp(adjusted_epoch, adjusted_nanos) {
Ok(dt) => Some(dt),
Err(e) => {
tracing::error!("Failed to parse timestamp: {}", e);
tracing::error!("Failed to parse timestamp: {}. adjusted_epoch={}, adjusted_nanos={}. Original epoch={}, fraction={}, scale={:?}",
e, adjusted_epoch, adjusted_nanos, epoch_value, fraction_value, scale);
None
}
}
}
_ => None,
_ => None, // Epoch or fraction array missing or epoch is null
}
}
@ -1253,9 +1272,13 @@ fn handle_struct_array(
// Check if this is a Snowflake timestamp struct
let fields = match field.data_type() {
ArrowDataType::Struct(fields) => fields,
_ => return DataType::Null,
_ => return DataType::Null, // Should not happen if called with a struct field
};
// Try to get scale from metadata
let scale_meta_str = field.metadata().get("scale");
let scale: Option<i32> = scale_meta_str.and_then(|s| s.parse::<i32>().ok());
if fields.len() == 2
&& fields.iter().any(|f| f.name() == "epoch")
&& fields.iter().any(|f| f.name() == "fraction")
@ -1264,7 +1287,7 @@ fn handle_struct_array(
.get("logicalType")
.map_or(false, |v| v.contains("TIMESTAMP"))
{
if let Some(dt) = handle_snowflake_timestamp_struct(array, row_idx) {
if let Some(dt) = handle_snowflake_timestamp_struct(array, row_idx, scale) { // Pass scale here
if field
.metadata()
.get("logicalType")
@ -1283,21 +1306,23 @@ fn handle_struct_array(
DataType::Null
} else {
let mut map = JsonMap::new();
for (field, col) in fields.iter().zip(array.columns().iter()) {
let field_name = field.name();
for (struct_field_def, col) in fields.iter().zip(array.columns().iter()) {
let field_name = struct_field_def.name(); // Use name from struct_field_def
let value = if col.is_null(row_idx) {
Value::Null
} else if let Some(array) = col.as_any().downcast_ref::<Int32Array>() {
Value::Number(array.value(row_idx).into())
} else if let Some(array) = col.as_any().downcast_ref::<Int64Array>() {
Value::Number(array.value(row_idx).into())
} else if let Some(array) = col.as_any().downcast_ref::<Float64Array>() {
serde_json::Number::from_f64(array.value(row_idx))
} else if let Some(arr) = col.as_any().downcast_ref::<Int32Array>() {
Value::Number(arr.value(row_idx).into())
} else if let Some(arr) = col.as_any().downcast_ref::<Int64Array>() {
Value::Number(arr.value(row_idx).into())
} else if let Some(arr) = col.as_any().downcast_ref::<Float64Array>() {
serde_json::Number::from_f64(arr.value(row_idx))
.map(Value::Number)
.unwrap_or(Value::Null)
} else if let Some(array) = col.as_any().downcast_ref::<StringArray>() {
Value::String(array.value(row_idx).to_string())
} else if let Some(arr) = col.as_any().downcast_ref::<StringArray>() {
Value::String(arr.value(row_idx).to_string())
} else {
// Attempt to handle nested structs recursively or other types if needed
// For now, defaulting to Null for unhandled types within generic structs
Value::Null
};
map.insert(field_name.to_string(), value);
@ -1384,81 +1409,106 @@ fn convert_array_to_datatype(
}
ArrowDataType::Int64 => {
let field_name = field.name(); // Get field name for logging
// println!("Debug: Processing Int64 field: {}", field_name);
// **NEW LOGGING START**
tracing::debug!(
"Processing Int64 field: '{}', row_idx: {}, Data Type: {:?}, Metadata: {:?}",
field_name,
row_idx,
column.data_type(),
field.metadata()
);
// **NEW LOGGING END**
// Check if this is actually a timestamp in disguise
let logical_type = field.metadata().get("logicalType");
let scale_str = field.metadata().get("scale"); // Get scale_str here as well
// println!("Debug [{}]: logicalType={:?}, scale={:?}", field_name, logical_type, scale_str);
if logical_type.map_or(false, |t| t.contains("TIMESTAMP")) {
// println!("Debug [{}]: Detected as timestamp.", field_name);
// **MODIFIED LOGGING**
tracing::debug!("[{}]: Detected as timestamp. logicalType={:?}, scale={:?}", field_name, logical_type, scale_str);
// If it has a timestamp logical type, determine the time unit based on scale
let unit = match scale_str.map(|s| s.parse::<i32>().unwrap_or(3)) {
// Default parse to 3 (ms)
let unit = match scale_str.map(|s| s.parse::<i32>().unwrap_or(3)) { // Default parse to 3 (ms)
Some(0) => TimeUnit::Second,
Some(6) => TimeUnit::Microsecond,
Some(9) => TimeUnit::Nanosecond,
Some(3) | None | Some(_) => TimeUnit::Millisecond, // Default to millisecond
};
// println!("Debug [{}]: Determined unit: {:?}", field_name, unit);
// **MODIFIED LOGGING**
tracing::debug!("[{}]: Determined unit: {:?}", field_name, unit);
// Check if there's timezone info
let has_tz = logical_type.map_or(false, |t| t.contains("_TZ"));
// println!("Debug [{}]: has_tz: {}", field_name, has_tz);
let _tz: Option<std::sync::Arc<String>> = if has_tz {
Some(Arc::new(String::from("UTC")))
} else {
None
};
// **MODIFIED LOGGING**
tracing::debug!("[{}]: has_tz: {}", field_name, has_tz);
let _tz: Option<std::sync::Arc<String>> = if has_tz { Some(Arc::new(String::from("UTC"))) } else { None };
// Process as timestamp
if let Some(array) = column.as_any().downcast_ref::<Int64Array>() {
if array.is_null(row_idx) {
// println!("Debug [{}]: Value is null at row_idx {}.", field_name, row_idx);
tracing::debug!("[{}]: Value is null at row_idx {}.", field_name, row_idx);
return DataType::Null;
}
let value = array.value(row_idx);
// println!("Debug [{}]: Raw value at row_idx {}: {}", field_name, row_idx, value);
// **MODIFIED LOGGING**
tracing::debug!("[{}]: Raw value at row_idx {}: {}", field_name, row_idx, value);
// **NEW LOGGING START**
let (secs, subsec_nanos) = match unit {
TimeUnit::Second => (value, 0),
TimeUnit::Millisecond => (value / 1000, (value % 1000) * 1_000_000),
TimeUnit::Microsecond => (value / 1_000_000, (value % 1_000_000) * 1000),
TimeUnit::Nanosecond => (value / 1_000_000_000, value % 1_000_000_000),
};
// println!("Debug [{}]: Calculated secs={}, nanos={}", field_name, secs, subsec_nanos);
tracing::debug!("[{}]: Calculated secs={}, nanos={}", field_name, secs, subsec_nanos);
// **NEW LOGGING END**
// **NEW LOGGING START**
tracing::debug!(
"[{}]: Calling Utc.timestamp_opt({}, {})",
field_name,
secs,
subsec_nanos
);
// **NEW LOGGING END**
match Utc.timestamp_opt(secs, subsec_nanos as u32) {
LocalResult::Single(dt) => {
// println!("Debug [{}]: Successfully created DateTime: {}", field_name, dt);
tracing::debug!("[{}]: Successfully created DateTime: {}", field_name, dt);
if has_tz {
// println!("Debug [{}]: Returning Timestamptz.", field_name);
tracing::debug!("[{}]: Returning Timestamptz.", field_name);
DataType::Timestamptz(Some(dt))
} else {
// println!("Debug [{}]: Returning Timestamp.", field_name);
tracing::debug!("[{}]: Returning Timestamp.", field_name);
DataType::Timestamp(Some(dt.naive_utc()))
}
}
LocalResult::None | LocalResult::Ambiguous(_, _) => {
// Handle None and Ambiguous explicitly
tracing::error!("Failed to create DateTime (None or Ambiguous) from timestamp: secs={}, nanos={}", secs, subsec_nanos);
// println!("Debug [{}]: Failed to create DateTime (None or Ambiguous) from timestamp: secs={}, nanos={}", field_name, secs, subsec_nanos);
// **NEW LOGGING START**
tracing::error!(
"[{}]: Utc.timestamp_opt failed (returned None or Ambiguous) for secs={}, nanos={}. Raw value was {}. Returning Null.",
field_name, secs, subsec_nanos, value
);
// **NEW LOGGING END**
DataType::Null
}
}
} else {
// println!("Debug [{}]: Failed to downcast to Int64Array.", field_name);
// **MODIFIED LOGGING**
tracing::warn!("[{}]: Failed to downcast Int64 (identified as timestamp) to Int64Array.", field_name);
DataType::Null
}
} else {
// Not a timestamp, so delegate to handle_int64_array which can handle scaling or default to Int8
if let Some(array) = column.as_any().downcast_ref::<Int64Array>() {
// **MODIFIED LOGGING**
tracing::debug!("[{}]: Not identified as timestamp based on metadata. Delegating to handle_int64_array.", field_name);
if let Some(array) = column.as_any().downcast_ref::<Int64Array>() {
handle_int64_array(array, row_idx, scale_str.map(|s| s.as_str()), field)
} else {
// println!("Debug [{}]: Failed to downcast Int64 for non-timestamp to Int64Array.", field_name);
} else {
// **MODIFIED LOGGING**
tracing::warn!("[{}]: Failed to downcast Int64 (non-timestamp) to Int64Array.", field_name);
DataType::Null
}
}
}
}
ArrowDataType::UInt8 => {
@ -1992,7 +2042,7 @@ mod tests {
),
]);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0, field.metadata().get("scale").and_then(|s| s.parse::<i32>().ok()));
println!(
"handle_snowflake_timestamp_struct (seconds epoch, millis fraction): {:?}",
dt
@ -2024,7 +2074,7 @@ mod tests {
),
]);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0, field.metadata().get("scale").and_then(|s| s.parse::<i32>().ok()));
println!(
"handle_snowflake_timestamp_struct (millis epoch, zero fraction): {:?}",
dt
@ -2067,7 +2117,7 @@ mod tests {
),
]);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0);
let dt = handle_snowflake_timestamp_struct(&struct_array, 0, field.metadata().get("scale").and_then(|s| s.parse::<i32>().ok()));
println!(
"handle_snowflake_timestamp_struct (microsecs epoch): {:?}",
dt
@ -2398,7 +2448,7 @@ mod tests {
]);
// Call the function directly
let result = handle_snowflake_timestamp_struct(&struct_array, 0);
let result = handle_snowflake_timestamp_struct(&struct_array, 0, None);
// Print and verify result
if let Some(dt) = result {
@ -2476,8 +2526,8 @@ mod tests {
),
]);
// Test direct function
let result = handle_snowflake_timestamp_struct(&struct_array, 0);
// Test direct function - Pass None for scale
let result = handle_snowflake_timestamp_struct(&struct_array, 0, None);
assert!(result.is_none(), "Expected None for null epoch");
println!("✓ Null epoch correctly returns None");
@ -3008,5 +3058,207 @@ mod tests {
println!("✓ Verified Int64 FIXED with Scale processing");
}
/// Tests processing a RecordBatch with Struct-based timestamps (scale 9)
/// based on real-world log output.
#[test]
fn test_struct_timestamp_scale9_processing() {
println!("\n=== Testing Struct TIMESTAMP_NTZ(9) processing ===");
// --- Sample Data (Anonymized, based on provided log) ---
// Selected a few representative rows
let epoch_data = vec![
Some(1736442980i64), // Row 0
Some(1736443293i64), // Row 4
None, // Simulate a potential null row
Some(1737408291i64), // Last row
];
let fraction_data = vec![
Some(969000000i32), // Row 0 (0.969 seconds)
Some(555000000i32), // Row 4 (0.555 seconds)
Some(123456789i32), // Null epoch row, fraction irrelevant but needs value
Some(504000000i32), // Last row (0.504 seconds)
];
let product_name_data = vec![
Some("Product A"),
Some("Product B"),
Some("Product C"),
Some("Product D"),
];
let sku_data = vec![Some("SKU-A"), Some("SKU-B"), Some("SKU-C"), Some("SKU-D")];
let order_number_data = vec![
Some("ORD-111"),
Some("ORD-222"),
Some("ORD-333"),
Some("ORD-444"),
];
// --- Array Creation ---
let epoch_array = Int64Array::from(epoch_data.clone());
let fraction_array = Int32Array::from(fraction_data.clone());
let product_name_array = StringArray::from(product_name_data);
let sku_array = StringArray::from(sku_data);
let order_number_array = StringArray::from(order_number_data);
// --- Struct Array for Timestamp ---
let mut timestamp_metadata = std::collections::HashMap::new();
timestamp_metadata.insert("logicalType".to_string(), "TIMESTAMP_NTZ".to_string());
timestamp_metadata.insert("scale".to_string(), "9".to_string()); // Crucial: scale is 9
let struct_fields = Fields::from(vec![
Field::new(
"epoch",
ArrowDataType::Int64,
true, // epoch is nullable
)
.with_metadata(timestamp_metadata.clone()), // Metadata might be on inner fields too
Field::new(
"fraction",
ArrowDataType::Int32,
true, // fraction is nullable
)
.with_metadata(timestamp_metadata.clone()),
]);
// Create the StructArray
let struct_array = StructArray::new(
struct_fields.clone(),
vec![
Arc::new(epoch_array) as ArrayRef,
Arc::new(fraction_array) as ArrayRef,
],
// Set the validity based on the epoch_data's nulls
Some(arrow::buffer::NullBuffer::from(
epoch_data.iter().map(|x| x.is_some()).collect::<Vec<_>>(),
)),
);
// --- Field Creation ---
let field_returned_processed_date = Field::new(
"RETURNED_PROCESSED_DATE",
ArrowDataType::Struct(struct_fields),
true, // The struct itself is nullable
)
.with_metadata(timestamp_metadata.clone()); // Metadata on the struct field itself
let field_product_name = Field::new("PRODUCT_NAME", ArrowDataType::Utf8, true);
let field_sku = Field::new("SKU", ArrowDataType::Utf8, true);
let field_order_number = Field::new("ORDER_NUMBER", ArrowDataType::Utf8, true);
// --- Schema Creation ---
let schema = Arc::new(Schema::new(vec![
field_returned_processed_date,
field_product_name,
field_sku,
field_order_number,
]));
// --- RecordBatch Creation ---
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(struct_array) as ArrayRef,
Arc::new(product_name_array) as ArrayRef,
Arc::new(sku_array) as ArrayRef,
Arc::new(order_number_array) as ArrayRef,
],
)
.unwrap();
println!("Simulated Input RecordBatch schema: {:?}", batch.schema());
// --- Process Batch ---
let processed_rows = process_record_batch(&batch);
println!("Processed Rows (Struct Timestamp Scale 9): {:?}", processed_rows);
// --- Assertions ---
assert_eq!(processed_rows.len(), 4, "Expected 4 rows processed");
// Helper to create expected NaiveDateTime from secs/nanos
let dt_from_parts = |secs: i64, nanos: u32| Utc.timestamp_opt(secs, nanos).unwrap().naive_utc();
// Row 0 Assertions (epoch=1736442980, fraction=969000000) -> nanos=969000000
let expected_dt_0 = dt_from_parts(1736442980, 969000000);
assert_eq!(
processed_rows[0]["returned_processed_date"], // field name is lowercased
DataType::Timestamp(Some(expected_dt_0)),
"Row 0 timestamp mismatch"
);
assert_eq!(
processed_rows[0]["product_name"],
DataType::Text(Some("Product A".to_string()))
);
assert_eq!(
processed_rows[0]["sku"],
DataType::Text(Some("SKU-A".to_string()))
);
assert_eq!(
processed_rows[0]["order_number"],
DataType::Text(Some("ORD-111".to_string()))
);
// Row 1 Assertions (epoch=1736443293, fraction=555000000) -> nanos=555000000
let expected_dt_1 = dt_from_parts(1736443293, 555000000);
assert_eq!(
processed_rows[1]["returned_processed_date"],
DataType::Timestamp(Some(expected_dt_1)),
"Row 1 timestamp mismatch"
);
assert_eq!(
processed_rows[1]["product_name"],
DataType::Text(Some("Product B".to_string()))
);
assert_eq!(
processed_rows[1]["sku"],
DataType::Text(Some("SKU-B".to_string()))
);
assert_eq!(
processed_rows[1]["order_number"],
DataType::Text(Some("ORD-222".to_string()))
);
// Row 2 Assertions (Null epoch)
assert_eq!(
processed_rows[2]["returned_processed_date"],
DataType::Null,
"Row 2 timestamp should be Null"
);
assert_eq!(
processed_rows[2]["product_name"],
DataType::Text(Some("Product C".to_string()))
);
assert_eq!(
processed_rows[2]["sku"],
DataType::Text(Some("SKU-C".to_string()))
);
assert_eq!(
processed_rows[2]["order_number"],
DataType::Text(Some("ORD-333".to_string()))
);
// Row 3 Assertions (epoch=1737408291, fraction=504000000) -> nanos=504000000
let expected_dt_3 = dt_from_parts(1737408291, 504000000);
assert_eq!(
processed_rows[3]["returned_processed_date"],
DataType::Timestamp(Some(expected_dt_3)),
"Row 3 timestamp mismatch"
);
assert_eq!(
processed_rows[3]["product_name"],
DataType::Text(Some("Product D".to_string()))
);
assert_eq!(
processed_rows[3]["sku"],
DataType::Text(Some("SKU-D".to_string()))
);
assert_eq!(
processed_rows[3]["order_number"],
DataType::Text(Some("ORD-444".to_string()))
);
println!("✓ Verified Struct TIMESTAMP_NTZ(9) processing");
}
}