snowflake api fix

This commit is contained in:
dal 2025-05-08 18:27:22 -06:00
parent 49ae7cee12
commit abfdde6973
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 90 additions and 29 deletions

View File

@ -1452,18 +1452,11 @@ fn convert_array_to_datatype(
DataType::Null
}
} else {
// println!("Debug [{}]: Detected as regular Int64.", field_name);
// Regular Int64 processing
// Not a timestamp, so delegate to handle_int64_array which can handle scaling or default to Int8
if let Some(array) = column.as_any().downcast_ref::<Int64Array>() {
if array.is_null(row_idx) {
// println!("Debug [{}]: Regular Int64 is null at row_idx {}.", field_name, row_idx);
return DataType::Null;
}
let value = array.value(row_idx);
// println!("Debug [{}]: Returning Int8 with value {}.", field_name, value);
DataType::Int8(Some(value))
handle_int64_array(array, row_idx, scale_str.map(|s| s.as_str()), field)
} else {
// println!("Debug [{}]: Failed to downcast regular Int64 to Int64Array.", field_name);
// println!("Debug [{}]: Failed to downcast Int64 for non-timestamp to Int64Array.", field_name);
DataType::Null
}
}
@ -1652,7 +1645,7 @@ fn prepare_query(query: &str) -> String {
}
fn process_record_batch(batch: &RecordBatch) -> Vec<IndexMap<String, DataType>> {
println!("Processing record batch with {:?} rows", batch);
// println!("Processing record batch with {:?} rows", batch);
let mut rows = Vec::with_capacity(batch.num_rows());
let schema = batch.schema();
@ -2857,20 +2850,20 @@ mod tests {
assert_eq!(
processed_rows[0]["order_number"],
DataType::Text(Some("ord-a001".to_string()))
); // Anonymized & Lowercase
DataType::Text(Some("ORD-A001".to_string())) // Expect original case
);
assert_eq!(
processed_rows[0]["customer_name"],
DataType::Text(Some("customer one".to_string()))
); // Anonymized & Lowercase
DataType::Text(Some("Customer One".to_string())) // Expect original case
);
assert_eq!(
processed_rows[0]["return_value"],
DataType::Float8(Some(100.00))
); // Anonymized
assert_eq!(
processed_rows[0]["return_type"],
DataType::Text(Some("type x".to_string()))
); // Anonymized & Lowercase
DataType::Text(Some("Type X".to_string())) // Expect original case
);
// Row 1 Assertions
assert!(
@ -2894,17 +2887,17 @@ mod tests {
}
assert_eq!(
processed_rows[1]["order_number"],
DataType::Text(Some("ord-b002".to_string()))
); // Anonymized & Lowercase
DataType::Text(Some("ORD-B002".to_string())) // Expect original case
);
assert_eq!(
processed_rows[1]["customer_name"],
DataType::Text(Some("customer two".to_string()))
); // Anonymized & Lowercase
DataType::Text(Some("Customer Two".to_string())) // Expect original case
);
assert_eq!(processed_rows[1]["return_value"], DataType::Null); // Remains Null
assert_eq!(
processed_rows[1]["return_type"],
DataType::Text(Some("type y".to_string()))
); // Anonymized & Lowercase
DataType::Text(Some("Type Y".to_string())) // Expect original case
);
// Row 2 Assertions
assert_eq!(processed_rows[2]["order_date"], DataType::Null);
@ -2931,21 +2924,89 @@ mod tests {
}
assert_eq!(
processed_rows[2]["order_number"],
DataType::Text(Some("ord-c003".to_string()))
); // Anonymized & Lowercase
DataType::Text(Some("ORD-C003".to_string())) // Expect original case
);
assert_eq!(
processed_rows[2]["customer_name"],
DataType::Text(Some("customer three".to_string()))
); // Anonymized & Lowercase
DataType::Text(Some("Customer Three".to_string())) // Expect original case
);
assert_eq!(
processed_rows[2]["return_value"],
DataType::Float8(Some(500.00))
); // Anonymized
assert_eq!(
processed_rows[2]["return_type"],
DataType::Text(Some("type z".to_string()))
); // Anonymized & Lowercase
DataType::Text(Some("Type Z".to_string())) // Expect original case
);
println!("✓ Verified Real-World RecordBatch Processing (Anonymized)");
}
#[test]
fn test_int64_fixed_scaled_processing() {
println!("\n=== Testing Int64 FIXED with Scale Processing ===");
// Sample data: Int64 values representing, for example, monetary amounts in cents
let raw_values = vec![Some(12345i64), Some(67890i64), None, Some(500i64)]; // e.g., $123.45, $678.90, NULL, $5.00
// Create Arrow array
let data_array = Int64Array::from(raw_values.clone());
// Create metadata for the field
let mut field_metadata = std::collections::HashMap::new();
field_metadata.insert("logicalType".to_string(), "FIXED".to_string());
field_metadata.insert("scale".to_string(), "2".to_string()); // Indicates 2 decimal places
field_metadata.insert("precision".to_string(), "38".to_string()); // Example precision
// Create field
let value_field = Field::new(
"SCALED_VALUE", // Anonymized field name
ArrowDataType::Int64,
true, // Nullable
)
.with_metadata(field_metadata);
// Create schema
let schema = Arc::new(Schema::new(vec![value_field]));
// Create record batch
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(data_array) as ArrayRef],
)
.unwrap();
println!("Input RecordBatch schema: {:?}", batch.schema());
println!(
"Input RecordBatch columns: [Column 0: {:?}]",
batch.column(0)
);
// Process the batch
let processed_rows = process_record_batch(&batch);
println!("Processed Rows: {:?}", processed_rows);
// --- Assertions ---
assert_eq!(processed_rows.len(), raw_values.len(), "Number of processed rows should match input");
// Expected scaled Float8 values
let expected_values = vec![
DataType::Float8(Some(123.45)),
DataType::Float8(Some(678.90)),
DataType::Null,
DataType::Float8(Some(5.00)),
];
for i in 0..expected_values.len() {
assert_eq!(
processed_rows[i]["scaled_value"], // Field name is lowercased by process_record_batch
expected_values[i],
"Mismatch in row {} for 'scaled_value'", i
);
}
println!("✓ Verified Int64 FIXED with Scale processing");
}
}