From 7fc4be939cb31a799f83364b6ecad7d95fe77812 Mon Sep 17 00:00:00 2001 From: Nate Kelley Date: Wed, 22 Jan 2025 10:26:40 -0700 Subject: [PATCH 1/2] add datasets to permission groups page --- .../buster-rest/permission_groups/requests.ts | 2 +- .../permission_groups/responseInterfaces.ts | 1 + .../PermissionSearchAndListWrapper.tsx | 2 +- .../PermissionGroupDatasetSelectedPopup.tsx | 42 +++++ .../PermissionGroupDatasetsController.tsx | 56 +++++++ .../PermissionGroupDatasetsListContainer.tsx | 144 ++++++++++++++++++ .../[permissionGroupId]/datasets/page.tsx | 18 ++- .../PermissionGroupUsersListContainer.tsx | 9 +- 8 files changed, 262 insertions(+), 12 deletions(-) create mode 100644 web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetSelectedPopup.tsx create mode 100644 web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetsController.tsx create mode 100644 web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetsListContainer.tsx diff --git a/web/src/api/buster-rest/permission_groups/requests.ts b/web/src/api/buster-rest/permission_groups/requests.ts index 2b4480dbc..0ff43cc92 100644 --- a/web/src/api/buster-rest/permission_groups/requests.ts +++ b/web/src/api/buster-rest/permission_groups/requests.ts @@ -72,7 +72,7 @@ export const getPermissionGroupDatasets = async ({ id }: { id: string; -}): Promise => { +}): Promise => { return await mainApi.get(`/permission_groups/${id}/datasets`).then((res) => res.data); }; diff --git a/web/src/api/buster-rest/permission_groups/responseInterfaces.ts b/web/src/api/buster-rest/permission_groups/responseInterfaces.ts index 0b354df82..a2896d55b 100644 --- a/web/src/api/buster-rest/permission_groups/responseInterfaces.ts +++ b/web/src/api/buster-rest/permission_groups/responseInterfaces.ts @@ -20,6 +20,7 @@ export interface GetPermissionGroupUsersResponse { export interface GetPermissionGroupDatasetsResponse { id: string; assigned: boolean; + name: string; } export interface GetPermissionGroupDatasetGroupsResponse { diff --git a/web/src/app/app/_components/PermissionComponents/PermissionSearchAndListWrapper.tsx b/web/src/app/app/_components/PermissionComponents/PermissionSearchAndListWrapper.tsx index 3fb2e2475..c025eff2d 100644 --- a/web/src/app/app/_components/PermissionComponents/PermissionSearchAndListWrapper.tsx +++ b/web/src/app/app/_components/PermissionComponents/PermissionSearchAndListWrapper.tsx @@ -19,7 +19,7 @@ export const PermissionSearchAndListWrapper: React.FC<{ /> {searchChildren} - {children} +
{children}
); } diff --git a/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetSelectedPopup.tsx b/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetSelectedPopup.tsx new file mode 100644 index 000000000..13a54f22b --- /dev/null +++ b/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetSelectedPopup.tsx @@ -0,0 +1,42 @@ +import { + useUpdatePermissionGroupDatasets, + useUpdatePermissionGroupUsers, + useUpdateUserDatasets +} from '@/api/buster-rest'; +import { PermissionAssignedButton } from '@/app/app/_components/PermissionComponents'; +import { BusterListSelectedOptionPopupContainer } from '@/components/list'; +import { useMemoizedFn } from 'ahooks'; +import React from 'react'; + +export const PermissionGroupDatasetSelectedPopup: React.FC<{ + selectedRowKeys: string[]; + onSelectChange: (selectedRowKeys: string[]) => void; + permissionGroupId: string; +}> = React.memo(({ selectedRowKeys, onSelectChange, permissionGroupId }) => { + const { mutateAsync: updatePermissionGroupDatasets } = useUpdatePermissionGroupDatasets(); + + const onSelectAssigned = useMemoizedFn(async (params: { id: string; assigned: boolean }[]) => { + await updatePermissionGroupDatasets({ + permissionGroupId, + data: params + }); + }); + + return ( + + ]} + /> + ); +}); + +PermissionGroupDatasetSelectedPopup.displayName = 'PermissionGroupDatasetSelectedPopup'; diff --git a/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetsController.tsx b/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetsController.tsx new file mode 100644 index 000000000..c99d7a041 --- /dev/null +++ b/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetsController.tsx @@ -0,0 +1,56 @@ +'use client'; + +import { useGetPermissionGroupDatasets } from '@/api/buster-rest'; +import { useDebounceSearch } from '@/hooks/useDebounceSearch'; +import { PermissionSearchAndListWrapper } from '@appComponents/PermissionComponents'; +import React, { useMemo, useState } from 'react'; +import { Button } from 'antd'; +import { AppMaterialIcons } from '@/components/icons'; +import { PermissionGroupDatasetsListContainer } from './PermissionGroupDatasetsListContainer'; +import { useMemoizedFn } from 'ahooks'; +import { NewDatasetModal } from '@/app/app/_components/NewDatasetModal'; + +export const PermissionGroupDatasetsController: React.FC<{ + permissionGroupId: string; +}> = ({ permissionGroupId }) => { + const { data } = useGetPermissionGroupDatasets(permissionGroupId); + const [isNewDatasetModalOpen, setIsNewDatasetModalOpen] = useState(false); + + const { filteredItems, handleSearchChange, searchText } = useDebounceSearch({ + items: data || [], + searchPredicate: (item, searchText) => item.name.includes(searchText) + }); + + const onCloseNewDatasetModal = useMemoizedFn(() => { + setIsNewDatasetModalOpen(false); + }); + + const onOpenNewDatasetModal = useMemoizedFn(() => { + setIsNewDatasetModalOpen(true); + }); + + const NewDatasetButton: React.ReactNode = useMemo(() => { + return ( + + ); + }, []); + + return ( + <> + + + + + + + ); +}; diff --git a/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetsListContainer.tsx b/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetsListContainer.tsx new file mode 100644 index 000000000..855a07c10 --- /dev/null +++ b/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/PermissionGroupDatasetsListContainer.tsx @@ -0,0 +1,144 @@ +import { + GetPermissionGroupDatasetsResponse, + GetPermissionGroupUsersResponse, + useUpdatePermissionGroupDatasets, + useUpdatePermissionGroupUsers +} from '@/api/buster-rest'; +import { PermissionAssignedCell } from '@/app/app/_components/PermissionComponents'; +import { + BusterInfiniteList, + BusterListColumn, + BusterListRowItem, + EmptyStateList, + InfiniteListContainer +} from '@/components/list'; +import { BusterRoutes, createBusterRoute } from '@/routes'; +import { useMemoizedFn } from 'ahooks'; +import React, { useMemo, useState } from 'react'; +import { ListUserItem } from '@/app/app/_components/ListContent'; +import { PermissionGroupDatasetSelectedPopup } from './PermissionGroupDatasetSelectedPopup'; + +export const PermissionGroupDatasetsListContainer: React.FC<{ + filteredDatasets: GetPermissionGroupDatasetsResponse[]; + permissionGroupId: string; +}> = React.memo(({ filteredDatasets, permissionGroupId }) => { + const [selectedRowKeys, setSelectedRowKeys] = useState([]); + const { mutateAsync: updatePermissionGroupDatasets } = useUpdatePermissionGroupDatasets(); + + const onSelectAssigned = useMemoizedFn(async (params: { id: string; assigned: boolean }) => { + await updatePermissionGroupDatasets({ + permissionGroupId, + data: [params] + }); + }); + + const columns: BusterListColumn[] = useMemo( + () => [ + { + title: 'Name', + dataIndex: 'name' + }, + { + title: 'Assigned', + dataIndex: 'assigned', + width: 130 + 85, + render: (assigned: boolean, permissionGroup: GetPermissionGroupUsersResponse) => { + return ( +
+ +
+ ); + } + } + ], + [] + ); + + const { cannotQueryPermissionUsers, canQueryPermissionUsers } = useMemo(() => { + const result: { + cannotQueryPermissionUsers: BusterListRowItem[]; + canQueryPermissionUsers: BusterListRowItem[]; + } = filteredDatasets.reduce<{ + cannotQueryPermissionUsers: BusterListRowItem[]; + canQueryPermissionUsers: BusterListRowItem[]; + }>( + (acc, dataset) => { + const datasetItem: BusterListRowItem = { + id: dataset.id, + data: dataset, + link: createBusterRoute({ + route: BusterRoutes.APP_SETTINGS_USERS_ID, + userId: dataset.id + }) + }; + if (dataset.assigned) { + acc.canQueryPermissionUsers.push(datasetItem); + } else { + acc.cannotQueryPermissionUsers.push(datasetItem); + } + return acc; + }, + { + cannotQueryPermissionUsers: [] as BusterListRowItem[], + canQueryPermissionUsers: [] as BusterListRowItem[] + } + ); + return result; + }, [filteredDatasets]); + + const rows = useMemo( + () => + [ + { + id: 'header-assigned', + data: {}, + hidden: canQueryPermissionUsers.length === 0, + rowSection: { + title: 'Assigned', + secondaryTitle: canQueryPermissionUsers.length.toString() + } + }, + ...canQueryPermissionUsers, + { + id: 'header-not-assigned', + data: {}, + hidden: cannotQueryPermissionUsers.length === 0, + rowSection: { + title: 'Not Assigned', + secondaryTitle: cannotQueryPermissionUsers.length.toString() + } + }, + ...cannotQueryPermissionUsers + ].filter((row) => !(row as any).hidden), + [canQueryPermissionUsers, cannotQueryPermissionUsers] + ); + + return ( + + }> + } + /> + + ); +}); + +PermissionGroupDatasetsListContainer.displayName = 'PermissionGroupUsersListContainer'; diff --git a/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/page.tsx b/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/page.tsx index bc6961f43..817ccb341 100644 --- a/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/page.tsx +++ b/web/src/app/app/settings/permission-groups/[permissionGroupId]/datasets/page.tsx @@ -1,3 +1,17 @@ -export default function Page() { - return
Datasets
; +import { prefetchPermissionGroupDatasets } from '@/api/buster-rest'; +import { HydrationBoundary, dehydrate } from '@tanstack/react-query'; +import { PermissionGroupDatasetsController } from './PermissionGroupDatasetsController'; + +export default async function Page({ + params: { permissionGroupId } +}: { + params: { permissionGroupId: string }; +}) { + const queryClient = await prefetchPermissionGroupDatasets(permissionGroupId); + + return ( + + + + ); } diff --git a/web/src/app/app/settings/permission-groups/[permissionGroupId]/users/PermissionGroupUsersListContainer.tsx b/web/src/app/app/settings/permission-groups/[permissionGroupId]/users/PermissionGroupUsersListContainer.tsx index 4ae031e1c..75ddd1517 100644 --- a/web/src/app/app/settings/permission-groups/[permissionGroupId]/users/PermissionGroupUsersListContainer.tsx +++ b/web/src/app/app/settings/permission-groups/[permissionGroupId]/users/PermissionGroupUsersListContainer.tsx @@ -1,9 +1,4 @@ -import { - BusterUserDatasetGroup, - GetPermissionGroupUsersResponse, - useUpdatePermissionGroupUsers, - useUpdateUserDatasetGroups -} from '@/api/buster-rest'; +import { GetPermissionGroupUsersResponse, useUpdatePermissionGroupUsers } from '@/api/buster-rest'; import { PermissionAssignedCell } from '@/app/app/_components/PermissionComponents'; import { BusterInfiniteList, @@ -15,8 +10,6 @@ import { import { BusterRoutes, createBusterRoute } from '@/routes'; import { useMemoizedFn } from 'ahooks'; import React, { useMemo, useState } from 'react'; -import pluralize from 'pluralize'; -import { Text } from '@/components/text'; import { ListUserItem } from '@/app/app/_components/ListContent'; import { PermissionGroupUsersSelectedPopup } from './PermissionGroupUsersSelectedPopup'; From 04a745f0addca53ce1fc47f1bff94d63b9af8106 Mon Sep 17 00:00:00 2001 From: dal Date: Wed, 22 Jan 2025 10:31:02 -0700 Subject: [PATCH 2/2] Update dependencies and refactor Snowflake query handling - Downgraded the `base64` crate version in `Cargo.toml` from `0.22.1` to `0.21`. - Refactored the `snowflake_query` function in `snowflake_query.rs` to improve data type handling, including support for additional Arrow data types and enhanced null value checks. - Updated the `route_to_query` function in `query_router.rs` to use mutable `snowflake_client` for better state management during query execution. - Improved error handling for closing the Snowflake client session, ensuring proper logging of any issues encountered. --- api/Cargo.toml | 2 +- .../data_source_query_routes/query_router.rs | 5 +- .../snowflake_query.rs | 420 ++++++++++++++---- 3 files changed, 332 insertions(+), 95 deletions(-) diff --git a/api/Cargo.toml b/api/Cargo.toml index d48493e05..10e2190fd 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -11,7 +11,7 @@ anyhow = "1.0.86" arrow = { version = "54.0.0", features = ["json"] } async-compression = { version = "0.4.11", features = ["tokio"] } axum = { version = "0.7.5", features = ["ws"] } -base64 = "0.22.1" +base64 = "0.21" bb8-redis = "0.18.0" chrono = { version = "0.4.38", features = ["serde"] } cohere-rust = "0.6.0" diff --git a/api/src/utils/query_engine/data_source_query_routes/query_router.rs b/api/src/utils/query_engine/data_source_query_routes/query_router.rs index f35e2dbe4..4aa1377c7 100644 --- a/api/src/utils/query_engine/data_source_query_routes/query_router.rs +++ b/api/src/utils/query_engine/data_source_query_routes/query_router.rs @@ -8,7 +8,8 @@ use crate::{ clients::supabase_vault::read_secret, query_engine::{ credentials::{ - BigqueryCredentials, DatabricksCredentials, MySqlCredentials, PostgresCredentials, SnowflakeCredentials, SqlServerCredentials, + BigqueryCredentials, DatabricksCredentials, MySqlCredentials, PostgresCredentials, + SnowflakeCredentials, SqlServerCredentials, }, data_source_connections::{ get_bigquery_client::get_bigquery_client, @@ -228,7 +229,7 @@ async fn route_to_query( DataSourceType::Snowflake => { let credentials: SnowflakeCredentials = serde_json::from_str(&credentials_string)?; - let snowflake_client = match get_snowflake_client(&credentials).await { + let mut snowflake_client = match get_snowflake_client(&credentials).await { Ok(snowflake_client) => snowflake_client, Err(e) => { tracing::error!("There was an issue while establishing a connection to the parent data source: {}", e); diff --git a/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs b/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs index 94f74d092..e1d73050b 100644 --- a/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs +++ b/api/src/utils/query_engine/data_source_query_routes/snowflake_query.rs @@ -1,8 +1,17 @@ -use arrow::array::Array; +use arrow::array::{Array, AsArray}; use indexmap::IndexMap; +use arrow::array::{ + BooleanArray, Int8Array, Int16Array, Int32Array, Int64Array, + UInt8Array, UInt16Array, UInt32Array, UInt64Array, + Float32Array, Float64Array, StringArray, LargeStringArray, + BinaryArray, LargeBinaryArray, Date32Array, Date64Array, + TimestampNanosecondArray, Decimal128Array, Decimal256Array, + FixedSizeBinaryArray, +}; +use arrow::datatypes::TimeUnit; use anyhow::{anyhow, Error}; -use chrono::{LocalResult, TimeZone, Utc}; +use chrono::{LocalResult, TimeZone, Utc, NaiveTime}; use snowflake_api::SnowflakeApi; use serde_json::Value; @@ -10,7 +19,7 @@ use serde_json::Value; use crate::utils::query_engine::data_types::DataType; pub async fn snowflake_query( - snowflake_client: SnowflakeApi, + mut snowflake_client: SnowflakeApi, query: String, ) -> Result>, Error> { let rows = match snowflake_client.exec(&query).await { @@ -27,51 +36,324 @@ pub async fn snowflake_query( .enumerate() .map(|(col_idx, field)| { let column = batch.column(col_idx); - let value = match column.data_type() { + let data_type = match column.data_type() { + arrow::datatypes::DataType::Boolean => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Bool(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::Int8 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int2(Some(array.value(row_idx) as i16)) } + } + arrow::datatypes::DataType::Int16 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int2(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::Int32 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int4(Some(array.value(row_idx))) } + } arrow::datatypes::DataType::Int64 => { - let array = column - .as_any() - .downcast_ref::() - .unwrap(); - if array.is_null(row_idx) { - Value::Null - } else { - Value::Number(array.value(row_idx).into()) - } + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int8(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::UInt8 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int2(Some(array.value(row_idx) as i16)) } + } + arrow::datatypes::DataType::UInt16 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int4(Some(array.value(row_idx) as i32)) } + } + arrow::datatypes::DataType::UInt32 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int8(Some(array.value(row_idx) as i64)) } + } + arrow::datatypes::DataType::UInt64 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Int8(Some(array.value(row_idx) as i64)) } + } + arrow::datatypes::DataType::Float32 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Float4(Some(array.value(row_idx))) } } arrow::datatypes::DataType::Float64 => { - let array = column - .as_any() - .downcast_ref::() - .unwrap(); - if array.is_null(row_idx) { - Value::Null - } else { - Value::Number( - serde_json::Number::from_f64( - array.value(row_idx), - ) - .unwrap_or(serde_json::Number::from(0)), - ) - } + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Float8(Some(array.value(row_idx))) } } arrow::datatypes::DataType::Utf8 => { - let array = column - .as_any() - .downcast_ref::() - .unwrap(); - if array.is_null(row_idx) { - Value::Null - } else { - Value::String(array.value(row_idx).to_string()) + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Text(Some(array.value(row_idx).to_string())) } + } + arrow::datatypes::DataType::LargeUtf8 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Text(Some(array.value(row_idx).to_string())) } + } + arrow::datatypes::DataType::Binary => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } + } + arrow::datatypes::DataType::LargeBinary => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } + } + arrow::datatypes::DataType::Date32 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let days = array.value(row_idx); + let timestamp = days as i64 * 24 * 60 * 60; + match Utc.timestamp_opt(timestamp, 0) { + LocalResult::Single(dt) => DataType::Date(Some(dt.date_naive())), + _ => DataType::Null, + } } } - // Add other data types as needed - _ => Value::Null, + arrow::datatypes::DataType::Date64 => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let millis = array.value(row_idx); + let secs = millis / 1000; + let nanos = ((millis % 1000) * 1_000_000) as u32; + match Utc.timestamp_opt(secs, nanos) { + LocalResult::Single(dt) => DataType::Date(Some(dt.date_naive())), + _ => DataType::Null, + } + } + } + arrow::datatypes::DataType::Timestamp(unit, tz) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let nanos = array.value(row_idx); + let (secs, subsec_nanos) = match unit { + TimeUnit::Second => (nanos, 0), + TimeUnit::Millisecond => (nanos / 1000, (nanos % 1000) * 1_000_000), + TimeUnit::Microsecond => (nanos / 1_000_000, (nanos % 1_000_000) * 1000), + TimeUnit::Nanosecond => (nanos / 1_000_000_000, nanos % 1_000_000_000), + }; + match Utc.timestamp_opt(secs as i64, subsec_nanos as u32) { + LocalResult::Single(dt) => match tz { + Some(_) => DataType::Timestamptz(Some(dt)), + None => DataType::Timestamp(Some(dt.naive_utc())), + }, + _ => DataType::Null, + } + } + } + arrow::datatypes::DataType::Decimal128(precision, scale) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let val = array.value(row_idx); + let scale_factor = 10_f64.powi(-(*scale as i32)); + let float_val = val as f64 * scale_factor; + DataType::Float8(Some(float_val)) + } + } + arrow::datatypes::DataType::Decimal256(precision, scale) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let val = array.value(row_idx); + // Convert the i256 to string first to handle large numbers + let val_str = val.to_string(); + if let Ok(float_val) = val_str.parse::() { + let scale_factor = 10_f64.powi(-(*scale as i32)); + DataType::Float8(Some(float_val * scale_factor)) + } else { + DataType::Null + } + } + } + arrow::datatypes::DataType::Null => DataType::Null, + arrow::datatypes::DataType::Float16 => { + let array = column.as_any().downcast_ref::().unwrap(); // Float16 gets converted to Float32 in Arrow + if array.is_null(row_idx) { DataType::Null } + else { DataType::Float4(Some(array.value(row_idx))) } + } + arrow::datatypes::DataType::Time32(time_unit) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let val = array.value(row_idx); + let nanos = match time_unit { + TimeUnit::Second => val as i64 * 1_000_000_000, + TimeUnit::Millisecond => val as i64 * 1_000_000, + _ => val as i64, + }; + let time = NaiveTime::from_num_seconds_from_midnight_opt( + (nanos / 1_000_000_000) as u32, + (nanos % 1_000_000_000) as u32, + ); + match time { + Some(t) => DataType::Time(Some(t)), + None => DataType::Null, + } + } + } + arrow::datatypes::DataType::Time64(time_unit) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { + let val = array.value(row_idx); + let nanos = match time_unit { + TimeUnit::Microsecond => val * 1000, + TimeUnit::Nanosecond => val, + _ => val * 1_000_000_000, + }; + let time = NaiveTime::from_num_seconds_from_midnight_opt( + (nanos / 1_000_000_000) as u32, + (nanos % 1_000_000_000) as u32, + ); + match time { + Some(t) => DataType::Time(Some(t)), + None => DataType::Null, + } + } + } + arrow::datatypes::DataType::Duration(_) => { + // Convert duration to milliseconds as float for consistency + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Float8(Some(array.value(row_idx) as f64)) } + } + arrow::datatypes::DataType::Interval(_) => { + // Convert interval to a string representation + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Text(Some(array.value(row_idx).to_string())) } + } + arrow::datatypes::DataType::FixedSizeBinary(_) => { + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } + } + arrow::datatypes::DataType::BinaryView => { + // BinaryView is similar to Binary + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Bytea(Some(array.value(row_idx).to_vec())) } + } + arrow::datatypes::DataType::Utf8View => { + // Utf8View is similar to Utf8 + let array = column.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { DataType::Null } + else { DataType::Text(Some(array.value(row_idx).to_string())) } + } + arrow::datatypes::DataType::List(_) | + arrow::datatypes::DataType::ListView(_) | + arrow::datatypes::DataType::FixedSizeList(_, _) | + arrow::datatypes::DataType::LargeList(_) | + arrow::datatypes::DataType::LargeListView(_) => { + let list_array = column.as_any().downcast_ref::().unwrap(); + if list_array.is_null(row_idx) { DataType::Null } + else { + let values = list_array.value(row_idx); + let json_array = Value::Array( + (0..values.len()) + .filter_map(|i| { + if values.is_null(i) { + None + } else if let Some(num) = values.as_any().downcast_ref::() { + Some(Value::Number(num.value(i).into())) + } else if let Some(num) = values.as_any().downcast_ref::() { + Some(Value::Number(num.value(i).into())) + } else if let Some(str) = values.as_any().downcast_ref::() { + Some(Value::String(str.value(i).to_string())) + } else { + None + } + }) + .collect() + ); + DataType::Json(Some(json_array)) + } + } + arrow::datatypes::DataType::Struct(fields) => { + let struct_array = column.as_any().downcast_ref::().unwrap(); + if struct_array.is_null(row_idx) { DataType::Null } + else { + let mut map = serde_json::Map::new(); + for (field, col) in fields.iter().zip(struct_array.columns().iter()) { + let field_name = field.name(); + let value = match col.data_type() { + arrow::datatypes::DataType::Int32 => { + let array = col.as_any().downcast_ref::().unwrap(); + if array.is_null(row_idx) { Value::Null } + else { Value::Number(array.value(row_idx).into()) } + } + // Add more field types as needed + _ => Value::Null, + }; + map.insert(field_name.to_string(), value); + } + DataType::Json(Some(Value::Object(map))) + } + } + arrow::datatypes::DataType::Union(_, _) => { + // Unions are complex - convert to string representation + DataType::Text(Some("Union type not fully supported".to_string())) + } + arrow::datatypes::DataType::Dictionary(_, _) => { + let dict_array = column.as_any().downcast_ref::>().unwrap(); + if dict_array.is_null(row_idx) { DataType::Null } + else { + let values = dict_array.values(); + match values.data_type() { + arrow::datatypes::DataType::Utf8 => { + let string_values = values.as_any().downcast_ref::().unwrap(); + let key = dict_array.keys().value(row_idx); + DataType::Text(Some(string_values.value(key as usize).to_string())) + } + _ => DataType::Text(Some("Unsupported dictionary type".to_string())), + } + } + } + arrow::datatypes::DataType::Map(_, _) => { + // Convert map to JSON object + let map_array = column.as_map(); + if map_array.is_null(row_idx) { DataType::Null } + else { + let entries = map_array.value(row_idx); + let mut json_map = serde_json::Map::new(); + // Assuming string keys and numeric values for simplicity + for i in 0..entries.len() { + if let (Some(key), Some(value)) = ( + entries.column(0).as_any().downcast_ref::().map(|arr| arr.value(i)), + entries.column(1).as_any().downcast_ref::().map(|arr| arr.value(i)) + ) { + json_map.insert(key.to_string(), Value::Number(value.into())); + } + } + DataType::Json(Some(Value::Object(json_map))) + } + } + arrow::datatypes::DataType::RunEndEncoded(_, _) => { + // Convert run-length encoded data to its base type + // This is a simplified handling + DataType::Text(Some("Run-length encoded type not fully supported".to_string())) + } }; - (field.name().clone(), value) + (field.name().clone(), data_type) }) - .collect::>() + .collect::>() }) }) .collect() @@ -84,58 +366,12 @@ pub async fn snowflake_query( } }; - let result = rows - .iter() - .map(|row| { - row.iter() - .map(|(key, value)| { - ( - key.clone(), - match value { - Value::Null => DataType::Null, - Value::Bool(val) => DataType::Bool(Some(val.clone())), - Value::Number(val) => match val.as_i64() { - Some(int_val) => DataType::Int8(Some(int_val)), - None => match val.as_f64() { - Some(float_val) => DataType::Float8(Some(float_val)), - None => DataType::Null, - }, - }, - Value::String(val) => DataType::Text(Some(val.clone())), - Value::Array(_) => DataType::Null, - Value::Object(val) => { - tracing::debug!("OBJECT: {:#?}", val); - if let ( - Some(&Value::Number(ref epoch)), - Some(&Value::Number(ref fraction)), - Some(&Value::Number(ref timezone)), - ) = (val.get("epoch"), val.get("fraction"), val.get("timezone")) - { - if let (Some(epoch), Some(fraction), Some(_)) = - (epoch.as_i64(), fraction.as_i64(), timezone.as_i64()) - { - match Utc.timestamp_opt(epoch, 0) { - LocalResult::Single(dt) => { - let nanos = fraction as u32; - let dt = dt - + chrono::Duration::nanoseconds(nanos as i64); - DataType::Timestamptz(Some(dt)) - } - _ => DataType::Null, - } - } else { - DataType::Null - } - } else { - DataType::Null - } - } - }, - ) - }) - .collect::>() - }) - .collect::>>(); + match snowflake_client.close_session().await { + Ok(_) => (), + Err(e) => { + tracing::error!("There was an issue while closing the snowflake client: {}", e); + } + } - Ok(result) + Ok(rows) }