diff --git a/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs b/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs index fc31cd58f..c6607be6b 100644 --- a/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs +++ b/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs @@ -56,343 +56,345 @@ pub async fn snowflake_query( let rows = match snowflake_client.exec(&limited_query).await { Ok(result) => match result { snowflake_api::QueryResult::Arrow(result) => { - result - .iter() - .flat_map(|batch| { - let schema = batch.schema(); - (0..batch.num_rows()).map(move |row_idx| { - schema - .fields() - .iter() - .enumerate() - .map(|(col_idx, field)| { - let column = batch.column(col_idx); - let data_type = match column.data_type() { - arrow::datatypes::DataType::Boolean => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Bool(Some(array.value(row_idx))) } - } - arrow::datatypes::DataType::Int8 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Int2(Some(array.value(row_idx) as i16)) } - } - arrow::datatypes::DataType::Int16 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Int2(Some(array.value(row_idx))) } - } - arrow::datatypes::DataType::Int32 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Int4(Some(array.value(row_idx))) } - } - arrow::datatypes::DataType::Int64 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Int8(Some(array.value(row_idx))) } - } - arrow::datatypes::DataType::UInt8 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Int2(Some(array.value(row_idx) as i16)) } - } - arrow::datatypes::DataType::UInt16 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Int4(Some(array.value(row_idx) as i32)) } - } - arrow::datatypes::DataType::UInt32 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Int8(Some(array.value(row_idx) as i64)) } - } - arrow::datatypes::DataType::UInt64 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Int8(Some(array.value(row_idx) as i64)) } - } - arrow::datatypes::DataType::Float32 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Float4(Some(array.value(row_idx))) } - } - arrow::datatypes::DataType::Float64 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Float8(Some(array.value(row_idx))) } - } - arrow::datatypes::DataType::Utf8 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Text(Some(process_string_value(array.value(row_idx).to_string()))) } - } - arrow::datatypes::DataType::LargeUtf8 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Text(Some(process_string_value(array.value(row_idx).to_string()))) } - } - arrow::datatypes::DataType::Binary => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } - } - arrow::datatypes::DataType::LargeBinary => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } - } - arrow::datatypes::DataType::Date32 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { - let days = array.value(row_idx); - let timestamp = days as i64 * 24 * 60 * 60; - match Utc.timestamp_opt(timestamp, 0) { - LocalResult::Single(dt) => DataType::Date(Some(dt.date_naive())), - _ => DataType::Null, - } + let mut all_rows = Vec::new(); + + // Process each batch in order + for batch in result.iter() { + let schema = batch.schema(); + for row_idx in 0..batch.num_rows() { + let row = schema + .fields() + .iter() + .enumerate() + .map(|(col_idx, field)| { + let column = batch.column(col_idx); + let data_type = match column.data_type() { + arrow::datatypes::DataType::Boolean => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Bool(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::Int8 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int2(Some(array.value(row_idx) as i16)) } + } + arrow::datatypes::DataType::Int16 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int2(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::Int32 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int4(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::Int64 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int8(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::UInt8 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int2(Some(array.value(row_idx) as i16)) } + } + arrow::datatypes::DataType::UInt16 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int4(Some(array.value(row_idx) as i32)) } + } + arrow::datatypes::DataType::UInt32 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int8(Some(array.value(row_idx) as i64)) } + } + arrow::datatypes::DataType::UInt64 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int8(Some(array.value(row_idx) as i64)) } + } + arrow::datatypes::DataType::Float32 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Float4(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::Float64 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Float8(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::Utf8 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Text(Some(process_string_value(array.value(row_idx).to_string()))) } + } + arrow::datatypes::DataType::LargeUtf8 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Text(Some(process_string_value(array.value(row_idx).to_string()))) } + } + arrow::datatypes::DataType::Binary => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } + } + arrow::datatypes::DataType::LargeBinary => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } + } + arrow::datatypes::DataType::Date32 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let days = array.value(row_idx); + let timestamp = days as i64 * 24 * 60 * 60; + match Utc.timestamp_opt(timestamp, 0) { + LocalResult::Single(dt) => DataType::Date(Some(dt.date_naive())), + _ => DataType::Null, } } - arrow::datatypes::DataType::Date64 => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { - let millis = array.value(row_idx); - let secs = millis / 1000; - let nanos = ((millis % 1000) * 1_000_000) as u32; - match Utc.timestamp_opt(secs, nanos) { - LocalResult::Single(dt) => DataType::Date(Some(dt.date_naive())), - _ => DataType::Null, - } + } + arrow::datatypes::DataType::Date64 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let millis = array.value(row_idx); + let secs = millis / 1000; + let nanos = ((millis % 1000) * 1_000_000) as u32; + match Utc.timestamp_opt(secs, nanos) { + LocalResult::Single(dt) => DataType::Date(Some(dt.date_naive())), + _ => DataType::Null, } } - arrow::datatypes::DataType::Timestamp(unit, tz) => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { - DataType::Null - } else { - let nanos = array.value(row_idx); - let (secs, subsec_nanos) = match unit { - TimeUnit::Second => (nanos, 0), - TimeUnit::Millisecond => (nanos / 1000, (nanos % 1000) * 1_000_000), - TimeUnit::Microsecond => (nanos / 1_000_000, (nanos % 1_000_000) * 1000), - TimeUnit::Nanosecond => (nanos / 1_000_000_000, nanos % 1_000_000_000), - }; - - match parse_snowflake_timestamp(secs as i64, subsec_nanos as u32) { - Ok(dt) => match tz { - Some(_) => DataType::Timestamptz(Some(dt)), - None => DataType::Timestamp(Some(dt.naive_utc())), - }, - Err(e) => { - tracing::error!("Failed to parse timestamp: {}", e); - DataType::Null - } - } - } - } - arrow::datatypes::DataType::Decimal128(precision, scale) => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { - let val = array.value(row_idx); - let scale_factor = 10_f64.powi(-(*scale as i32)); - let float_val = val as f64 * scale_factor; - DataType::Float8(Some(float_val)) - } - } - arrow::datatypes::DataType::Decimal256(precision, scale) => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { - let val = array.value(row_idx); - // Convert the i256 to string first to handle large numbers - let val_str = val.to_string(); - if let Ok(float_val) = val_str.parse::() { - let scale_factor = 10_f64.powi(-(*scale as i32)); - DataType::Float8(Some(float_val * scale_factor)) - } else { + } + arrow::datatypes::DataType::Timestamp(unit, tz) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { + DataType::Null + } else { + let nanos = array.value(row_idx); + let (secs, subsec_nanos) = match unit { + TimeUnit::Second => (nanos, 0), + TimeUnit::Millisecond => (nanos / 1000, (nanos % 1000) * 1_000_000), + TimeUnit::Microsecond => (nanos / 1_000_000, (nanos % 1_000_000) * 1000), + TimeUnit::Nanosecond => (nanos / 1_000_000_000, nanos % 1_000_000_000), + }; + + match parse_snowflake_timestamp(secs as i64, subsec_nanos as u32) { + Ok(dt) => match tz { + Some(_) => DataType::Timestamptz(Some(dt)), + None => DataType::Timestamp(Some(dt.naive_utc())), + }, + Err(e) => { + tracing::error!("Failed to parse timestamp: {}", e); DataType::Null } } } - arrow::datatypes::DataType::Null => DataType::Null, - arrow::datatypes::DataType::Float16 => { - let array = column.as_any().downcast_ref::().unwrap(); // Float16 gets converted to Float32 in Arrow - if array.is_null(row_idx) { DataType::Null } - else { DataType::Float4(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::Decimal128(precision, scale) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let val = array.value(row_idx); + let scale_factor = 10_f64.powi(-(*scale as i32)); + let float_val = val as f64 * scale_factor; + DataType::Float8(Some(float_val)) } - arrow::datatypes::DataType::Time32(time_unit) => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { - let val = array.value(row_idx); - let nanos = match time_unit { - TimeUnit::Second => val as i64 * 1_000_000_000, - TimeUnit::Millisecond => val as i64 * 1_000_000, - _ => val as i64, - }; - let time = NaiveTime::from_num_seconds_from_midnight_opt( - (nanos / 1_000_000_000) as u32, - (nanos % 1_000_000_000) as u32, - ); - match time { - Some(t) => DataType::Time(Some(t)), - None => DataType::Null, - } + } + arrow::datatypes::DataType::Decimal256(precision, scale) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let val = array.value(row_idx); + // Convert the i256 to string first to handle large numbers + let val_str = val.to_string(); + if let Ok(float_val) = val_str.parse::() { + let scale_factor = 10_f64.powi(-(*scale as i32)); + DataType::Float8(Some(float_val * scale_factor)) + } else { + DataType::Null } } - arrow::datatypes::DataType::Time64(time_unit) => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { - let val = array.value(row_idx); - let nanos = match time_unit { - TimeUnit::Microsecond => val * 1000, - TimeUnit::Nanosecond => val, - _ => val * 1_000_000_000, - }; - let time = NaiveTime::from_num_seconds_from_midnight_opt( - (nanos / 1_000_000_000) as u32, - (nanos % 1_000_000_000) as u32, - ); - match time { - Some(t) => DataType::Time(Some(t)), - None => DataType::Null, - } + } + arrow::datatypes::DataType::Null => DataType::Null, + arrow::datatypes::DataType::Float16 => { + let array = column.as_any().downcast_ref::().unwrap(); // Float16 gets converted to Float32 in Arrow + if array.is_null(row_idx) { DataType::Null } + else { DataType::Float4(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::Time32(time_unit) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let val = array.value(row_idx); + let nanos = match time_unit { + TimeUnit::Second => val as i64 * 1_000_000_000, + TimeUnit::Millisecond => val as i64 * 1_000_000, + _ => val as i64, + }; + let time = NaiveTime::from_num_seconds_from_midnight_opt( + (nanos / 1_000_000_000) as u32, + (nanos % 1_000_000_000) as u32, + ); + match time { + Some(t) => DataType::Time(Some(t)), + None => DataType::Null, } } - arrow::datatypes::DataType::Duration(_) => { - // Convert duration to milliseconds as float for consistency - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Float8(Some(array.value(row_idx) as f64)) } - } - arrow::datatypes::DataType::Interval(_) => { - // Convert interval to a string representation - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Text(Some(array.value(row_idx).to_string())) } - } - arrow::datatypes::DataType::FixedSizeBinary(_) => { - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } - } - arrow::datatypes::DataType::BinaryView => { - // BinaryView is similar to Binary - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } - } - arrow::datatypes::DataType::Utf8View => { - // Utf8View is similar to Utf8 - let array = column.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { DataType::Null } - else { DataType::Text(Some(array.value(row_idx).to_string())) } - } - arrow::datatypes::DataType::List(_) | - arrow::datatypes::DataType::ListView(_) | - arrow::datatypes::DataType::FixedSizeList(_, _) | - arrow::datatypes::DataType::LargeList(_) | - arrow::datatypes::DataType::LargeListView(_) => { - let list_array = column.as_any().downcast_ref::().unwrap(); - if list_array.is_null(row_idx) { DataType::Null } - else { - let values = list_array.value(row_idx); - let json_array = Value::Array( - (0..values.len()) - .filter_map(|i| { - if values.is_null(i) { - None - } else if let Some(num) = values.as_any().downcast_ref::() { - Some(Value::Number(num.value(i).into())) - } else if let Some(num) = values.as_any().downcast_ref::() { - Some(Value::Number(num.value(i).into())) - } else if let Some(str) = values.as_any().downcast_ref::() { - Some(Value::String(process_string_value(str.value(i).to_string()))) - } else { - None - } - }) - .collect() - ); - DataType::Json(Some(process_json_value(json_array))) + } + arrow::datatypes::DataType::Time64(time_unit) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let val = array.value(row_idx); + let nanos = match time_unit { + TimeUnit::Microsecond => val * 1000, + TimeUnit::Nanosecond => val, + _ => val * 1_000_000_000, + }; + let time = NaiveTime::from_num_seconds_from_midnight_opt( + (nanos / 1_000_000_000) as u32, + (nanos % 1_000_000_000) as u32, + ); + match time { + Some(t) => DataType::Time(Some(t)), + None => DataType::Null, } } - arrow::datatypes::DataType::Struct(fields) => { - let struct_array = column.as_any().downcast_ref::().unwrap(); - if struct_array.is_null(row_idx) { DataType::Null } - else { - let mut map = serde_json::Map::new(); - for (field, col) in fields.iter().zip(struct_array.columns().iter()) { - let field_name = field.name(); - let value = match col.data_type() { - arrow::datatypes::DataType::Int32 => { - let array = col.as_any().downcast_ref::().unwrap(); - if array.is_null(row_idx) { Value::Null } - else { Value::Number(array.value(row_idx).into()) } + } + arrow::datatypes::DataType::Duration(_) => { + // Convert duration to milliseconds as float for consistency + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Float8(Some(array.value(row_idx) as f64)) } + } + arrow::datatypes::DataType::Interval(_) => { + // Convert interval to a string representation + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Text(Some(array.value(row_idx).to_string())) } + } + arrow::datatypes::DataType::FixedSizeBinary(_) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } + } + arrow::datatypes::DataType::BinaryView => { + // BinaryView is similar to Binary + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } + } + arrow::datatypes::DataType::Utf8View => { + // Utf8View is similar to Utf8 + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Text(Some(array.value(row_idx).to_string())) } + } + arrow::datatypes::DataType::List(_) | + arrow::datatypes::DataType::ListView(_) | + arrow::datatypes::DataType::FixedSizeList(_, _) | + arrow::datatypes::DataType::LargeList(_) | + arrow::datatypes::DataType::LargeListView(_) => { + let list_array = column.as_any().downcast_ref::().unwrap(); + if list_array.is_null(row_idx) { DataType::Null } + else { + let values = list_array.value(row_idx); + let json_array = Value::Array( + (0..values.len()) + .filter_map(|i| { + if values.is_null(i) { + None + } else if let Some(num) = values.as_any().downcast_ref::() { + Some(Value::Number(num.value(i).into())) + } else if let Some(num) = values.as_any().downcast_ref::() { + Some(Value::Number(num.value(i).into())) + } else if let Some(str) = values.as_any().downcast_ref::() { + Some(Value::String(process_string_value(str.value(i).to_string()))) + } else { + None } - // Add more field types as needed - _ => Value::Null, - }; - map.insert(field_name.to_string(), value); - } - DataType::Json(Some(process_json_value(Value::Object(map)))) - } + }) + .collect() + ); + DataType::Json(Some(process_json_value(json_array))) } - arrow::datatypes::DataType::Union(_, _) => { - // Unions are complex - convert to string representation - DataType::Text(Some("Union type not fully supported".to_string())) - } - arrow::datatypes::DataType::Dictionary(_, _) => { - let dict_array = column.as_any().downcast_ref::>().unwrap(); - if dict_array.is_null(row_idx) { DataType::Null } - else { - let values = dict_array.values(); - match values.data_type() { - arrow::datatypes::DataType::Utf8 => { - let string_values = values.as_any().downcast_ref::().unwrap(); - let key = dict_array.keys().value(row_idx); - DataType::Text(Some(string_values.value(key as usize).to_string())) + } + arrow::datatypes::DataType::Struct(fields) => { + let struct_array = column.as_any().downcast_ref::().unwrap(); + if struct_array.is_null(row_idx) { DataType::Null } + else { + let mut map = serde_json::Map::new(); + for (field, col) in fields.iter().zip(struct_array.columns().iter()) { + let field_name = field.name(); + let value = match col.data_type() { + arrow::datatypes::DataType::Int32 => { + let array = col.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { Value::Null } + else { Value::Number(array.value(row_idx).into()) } } - _ => DataType::Text(Some("Unsupported dictionary type".to_string())), + // Add more field types as needed + _ => Value::Null, + }; + map.insert(field_name.to_string(), value); + } + DataType::Json(Some(process_json_value(Value::Object(map)))) + } + } + arrow::datatypes::DataType::Union(_, _) => { + // Unions are complex - convert to string representation + DataType::Text(Some("Union type not fully supported".to_string())) + } + arrow::datatypes::DataType::Dictionary(_, _) => { + let dict_array = column.as_any().downcast_ref::>().unwrap(); + if dict_array.is_null(row_idx) { DataType::Null } + else { + let values = dict_array.values(); + match values.data_type() { + arrow::datatypes::DataType::Utf8 => { + let string_values = values.as_any().downcast_ref::().unwrap(); + let key = dict_array.keys().value(row_idx); + DataType::Text(Some(string_values.value(key as usize).to_string())) } + _ => DataType::Text(Some("Unsupported dictionary type".to_string())), } } - arrow::datatypes::DataType::Map(_, _) => { - // Convert map to JSON object - let map_array = column.as_map(); - if map_array.is_null(row_idx) { DataType::Null } - else { - let entries = map_array.value(row_idx); - let mut json_map = serde_json::Map::new(); - // Assuming string keys and numeric values for simplicity - for i in 0..entries.len() { - if let (Some(key), Some(value)) = ( - entries.column(0).as_any().downcast_ref::().map(|arr| arr.value(i)), - entries.column(1).as_any().downcast_ref::().map(|arr| arr.value(i)) - ) { - json_map.insert(key.to_string(), Value::Number(value.into())); - } + } + arrow::datatypes::DataType::Map(_, _) => { + // Convert map to JSON object + let map_array = column.as_map(); + if map_array.is_null(row_idx) { DataType::Null } + else { + let entries = map_array.value(row_idx); + let mut json_map = serde_json::Map::new(); + // Assuming string keys and numeric values for simplicity + for i in 0..entries.len() { + if let (Some(key), Some(value)) = ( + entries.column(0).as_any().downcast_ref::().map(|arr| arr.value(i)), + entries.column(1).as_any().downcast_ref::().map(|arr| arr.value(i)) + ) { + json_map.insert(key.to_string(), Value::Number(value.into())); } - DataType::Json(Some(process_json_value(Value::Object(json_map)))) } + DataType::Json(Some(process_json_value(Value::Object(json_map)))) } - arrow::datatypes::DataType::RunEndEncoded(_, _) => { - // Convert run-length encoded data to its base type - // This is a simplified handling - DataType::Text(Some("Run-length encoded type not fully supported".to_string())) - } - }; - (field.name().clone(), data_type) - }) - .collect::>() - }) - }) - .collect() + } + arrow::datatypes::DataType::RunEndEncoded(_, _) => { + // Convert run-length encoded data to its base type + // This is a simplified handling + DataType::Text(Some("Run-length encoded type not fully supported".to_string())) + } + }; + (field.name().clone(), data_type) + }) + .collect::>(); + all_rows.push(row); + } + } + all_rows } _ => Vec::new(), },