mirror of https://github.com/buster-so/buster.git
commit
ba25ada7c1
|
@ -21,16 +21,42 @@ fn process_string_value(value: String) -> String {
|
|||
value.to_lowercase()
|
||||
}
|
||||
|
||||
fn handle_snowflake_timestamp(value: &Value) -> Option<Value> {
|
||||
if let Value::Object(map) = value {
|
||||
if map.contains_key("epoch") {
|
||||
// If epoch is null, return null
|
||||
if map["epoch"].is_null() {
|
||||
return Some(Value::Null);
|
||||
}
|
||||
|
||||
// If we have a valid epoch, convert it
|
||||
if let Some(epoch) = map["epoch"].as_i64() {
|
||||
match parse_snowflake_timestamp(epoch, 0) {
|
||||
Ok(dt) => return Some(Value::String(dt.to_rfc3339())),
|
||||
Err(_) => return Some(Value::Null),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn process_json_value(value: Value) -> Value {
|
||||
match value {
|
||||
Value::String(s) => Value::String(s.to_lowercase()),
|
||||
Value::Array(arr) => Value::Array(arr.into_iter().map(process_json_value).collect()),
|
||||
Value::Object(map) => {
|
||||
let new_map = map
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k.to_lowercase(), process_json_value(v)))
|
||||
.collect();
|
||||
Value::Object(new_map)
|
||||
// First check if this object might be a Snowflake timestamp
|
||||
if let Some(processed) = handle_snowflake_timestamp(&Value::Object(map.clone())) {
|
||||
processed
|
||||
} else {
|
||||
// Otherwise process it as a normal object
|
||||
let new_map = map
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k.to_lowercase(), process_json_value(v)))
|
||||
.collect();
|
||||
Value::Object(new_map)
|
||||
}
|
||||
}
|
||||
_ => value,
|
||||
}
|
||||
|
@ -43,6 +69,49 @@ fn parse_snowflake_timestamp(epoch_data: i64, subsec_nanos: u32) -> Result<DateT
|
|||
}
|
||||
}
|
||||
|
||||
// Add this helper function before the snowflake_query function
|
||||
fn handle_snowflake_timestamp_struct(
|
||||
struct_array: &arrow::array::StructArray,
|
||||
row_idx: usize,
|
||||
) -> Option<DateTime<Utc>> {
|
||||
if struct_array.is_null(row_idx) {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Get the epoch field
|
||||
let epoch_array = struct_array
|
||||
.column_by_name("epoch")
|
||||
.and_then(|col| col.as_any().downcast_ref::<Int64Array>());
|
||||
|
||||
// Get the fraction field
|
||||
let fraction_array = struct_array
|
||||
.column_by_name("fraction")
|
||||
.and_then(|col| col.as_any().downcast_ref::<Int32Array>());
|
||||
|
||||
match (epoch_array, fraction_array) {
|
||||
(Some(epoch), Some(fraction)) if !epoch.is_null(row_idx) => {
|
||||
let epoch_value = epoch.value(row_idx);
|
||||
let fraction_value = if fraction.is_null(row_idx) {
|
||||
0
|
||||
} else {
|
||||
fraction.value(row_idx)
|
||||
};
|
||||
|
||||
// Convert fraction to nanoseconds if needed
|
||||
let nanos = (fraction_value as u32).min(999_999_999);
|
||||
|
||||
match parse_snowflake_timestamp(epoch_value, nanos) {
|
||||
Ok(dt) => Some(dt),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to parse timestamp: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn snowflake_query(
|
||||
mut snowflake_client: SnowflakeApi,
|
||||
query: String,
|
||||
|
@ -63,6 +132,7 @@ pub async fn snowflake_query(
|
|||
|
||||
// Process each batch in order
|
||||
for batch in result.iter() {
|
||||
|
||||
let schema = batch.schema();
|
||||
for row_idx in 0..batch.num_rows() {
|
||||
let row = schema
|
||||
|
@ -481,36 +551,46 @@ pub async fn snowflake_query(
|
|||
.as_any()
|
||||
.downcast_ref::<arrow::array::StructArray>()
|
||||
.unwrap();
|
||||
if struct_array.is_null(row_idx) {
|
||||
DataType::Null
|
||||
} else {
|
||||
let mut map = serde_json::Map::new();
|
||||
for (field, col) in
|
||||
fields.iter().zip(struct_array.columns().iter())
|
||||
{
|
||||
let field_name = field.name();
|
||||
let value = match col.data_type() {
|
||||
arrow::datatypes::DataType::Int32 => {
|
||||
let array = col
|
||||
.as_any()
|
||||
.downcast_ref::<Int32Array>()
|
||||
.unwrap();
|
||||
if array.is_null(row_idx) {
|
||||
Value::Null
|
||||
} else {
|
||||
Value::Number(
|
||||
array.value(row_idx).into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
// Add more field types as needed
|
||||
_ => Value::Null,
|
||||
};
|
||||
map.insert(field_name.to_string(), value);
|
||||
|
||||
// Check if this is a Snowflake timestamp struct
|
||||
if fields.len() == 2
|
||||
&& fields.iter().any(|f| f.name() == "epoch")
|
||||
&& fields.iter().any(|f| f.name() == "fraction")
|
||||
&& field.metadata().get("logicalType").map_or(false, |v| v.contains("TIMESTAMP"))
|
||||
{
|
||||
if let Some(dt) = handle_snowflake_timestamp_struct(struct_array, row_idx) {
|
||||
if field.metadata().get("logicalType").map_or(false, |v| v.contains("_TZ")) {
|
||||
DataType::Timestamptz(Some(dt))
|
||||
} else {
|
||||
DataType::Timestamp(Some(dt.naive_utc()))
|
||||
}
|
||||
} else {
|
||||
DataType::Null
|
||||
}
|
||||
} else {
|
||||
// Original struct handling for non-timestamp structs
|
||||
if struct_array.is_null(row_idx) {
|
||||
DataType::Null
|
||||
} else {
|
||||
let mut map = serde_json::Map::new();
|
||||
for (field, col) in fields.iter().zip(struct_array.columns().iter()) {
|
||||
let field_name = field.name();
|
||||
let value = match col.data_type() {
|
||||
arrow::datatypes::DataType::Int32 => {
|
||||
let array = col.as_any().downcast_ref::<Int32Array>().unwrap();
|
||||
if array.is_null(row_idx) {
|
||||
Value::Null
|
||||
} else {
|
||||
Value::Number(array.value(row_idx).into())
|
||||
}
|
||||
}
|
||||
// Add more field types as needed
|
||||
_ => Value::Null,
|
||||
};
|
||||
map.insert(field_name.to_string(), value);
|
||||
}
|
||||
DataType::Json(Some(process_json_value(Value::Object(map))))
|
||||
}
|
||||
DataType::Json(Some(process_json_value(Value::Object(
|
||||
map,
|
||||
))))
|
||||
}
|
||||
}
|
||||
arrow::datatypes::DataType::Union(_, _) => {
|
||||
|
|
Loading…
Reference in New Issue