diff --git a/api/src/routes/ws/dashboards/dashboard_utils.rs b/api/src/routes/ws/dashboards/dashboard_utils.rs index 55f9451b8..21f636b88 100644 --- a/api/src/routes/ws/dashboards/dashboard_utils.rs +++ b/api/src/routes/ws/dashboards/dashboard_utils.rs @@ -9,24 +9,31 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use uuid::Uuid; -use database::{enums::{AssetPermissionRole, AssetType}, models::{Dashboard, MessageDeprecated}, pool::get_pg_pool, schema::{asset_permissions, dashboards, messages_deprecated, threads_to_dashboards, users_to_organizations}, vault::read_secret}; -use crate::{ - utils::{ - clients::{sentry_utils::send_sentry_error}, - query_engine::data_types::DataType, - sharing::asset_sharing::{ - get_asset_collections, get_asset_sharing_info, CollectionNameAndId, - IndividualPermission, TeamPermissions, - }, - user::user_info::get_user_organization_id, +use crate::utils::{ + clients::sentry_utils::send_sentry_error, + query_engine::data_types::DataType, + sharing::asset_sharing::{ + get_asset_collections, get_asset_sharing_info, CollectionNameAndId, IndividualPermission, + TeamPermissions, }, + user::user_info::get_user_organization_id, +}; +use database::{ + enums::{AssetPermissionRole, AssetType}, + models::{Dashboard, MessageDeprecated}, + pool::get_pg_pool, + schema::{ + asset_permissions, dashboards, messages_deprecated, threads_to_dashboards, + users_to_organizations, + }, + vault::read_secret, }; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Metric { pub id: Uuid, // This is the thread id #[serde(skip_serializing)] - pub message_id: Uuid, + pub _message_id: Uuid, pub name: String, pub time_frame: String, #[serde(skip_serializing)] @@ -186,7 +193,7 @@ pub async fn get_dashboard_state_by_id( match read_secret(&secret_id).await { Ok(password) => Some(password), Err(e) => { - tracing::error!("Error getting dashboard password: {}", e); + tracing::error!("Error getting dashboard password: {}", e); None } } @@ -438,7 +445,10 @@ async fn get_dashboard_metrics(dashboard_id: Arc) -> Result> { threads_to_dashboards::table .on(messages_deprecated::thread_id.eq(threads_to_dashboards::thread_id)), ) - .select((threads_to_dashboards::thread_id, messages_deprecated::all_columns)) + .select(( + threads_to_dashboards::thread_id, + messages_deprecated::all_columns, + )) .filter(threads_to_dashboards::dashboard_id.eq(dashboard_id.as_ref())) .filter(messages_deprecated::deleted_at.is_null()) .filter(messages_deprecated::draft_session_id.is_null()) @@ -471,7 +481,7 @@ async fn get_dashboard_metrics(dashboard_id: Arc) -> Result> { for (thread_id, message) in thread_messages { let metric = Metric { id: thread_id, - message_id: message.id, + _message_id: message.id, name: message.title.unwrap_or_default(), time_frame: message.time_frame.unwrap_or_default(), sql: message.code.unwrap_or_default(), diff --git a/api/src/routes/ws/dashboards/update_dashboard.rs b/api/src/routes/ws/dashboards/update_dashboard.rs index 1fb50f62f..8ec0dde1b 100644 --- a/api/src/routes/ws/dashboards/update_dashboard.rs +++ b/api/src/routes/ws/dashboards/update_dashboard.rs @@ -291,7 +291,7 @@ async fn update_dashboard_record( public_password: Option>, public_expiry_date: Option>, ) -> Result<()> { - let password_secret_id = match public_password { + let _password_secret_id = match public_password { Some(Some(password)) => match create_secret(&dashboard_id, &password).await { Ok(secret_id) => Some(Some(secret_id)), Err(e) => { diff --git a/api/src/routes/ws/data_sources/data_source_utils/data_source_utils.rs b/api/src/routes/ws/data_sources/data_source_utils/data_source_utils.rs index 47d992dff..99cc64013 100644 --- a/api/src/routes/ws/data_sources/data_source_utils/data_source_utils.rs +++ b/api/src/routes/ws/data_sources/data_source_utils/data_source_utils.rs @@ -7,12 +7,12 @@ use diesel_async::RunQueryDsl; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use database::{enums::{DataSourceType, UserOrganizationRole}, - pool::get_pg_pool, - models::Dataset, - schema::{data_sources, datasets, organizations, users, users_to_organizations},}; -use crate::{ - utils::query_engine::credentials::{get_data_source_credentials, Credential}, +use crate::utils::query_engine::credentials::{get_data_source_credentials, Credential}; +use database::{ + enums::{DataSourceType, UserOrganizationRole}, + models::Dataset, + pool::get_pg_pool, + schema::{data_sources, datasets, organizations, users, users_to_organizations}, }; #[derive(Serialize, Deserialize, Debug, Clone)] @@ -42,7 +42,9 @@ pub struct DataSourceRecord { pub type_: DataSourceType, pub updated_at: DateTime, pub created_at: DateTime, + #[allow(dead_code)] pub created_by: Uuid, + #[allow(dead_code)] pub secret_id: Uuid, pub user_id: Uuid, pub user_name: Option, diff --git a/api/src/routes/ws/datasets/get_dataset.rs b/api/src/routes/ws/datasets/get_dataset.rs index da1d8d300..8db43ecb9 100644 --- a/api/src/routes/ws/datasets/get_dataset.rs +++ b/api/src/routes/ws/datasets/get_dataset.rs @@ -3,7 +3,6 @@ use uuid::Uuid; use serde::{Deserialize, Serialize}; - use crate::{ routes::ws::{ datasets::datasets_router::{DatasetEvent, DatasetRoute}, @@ -58,7 +57,9 @@ pub async fn get_dataset(user: &AuthenticatedUser, req: GetDatasetReq) -> Result let sql = format!("SELECT * FROM {}.{} LIMIT 25", schema, database_name); match query_engine(&req.id, &sql).await { Ok(data) => data, - Err(e) => Vec::new(), + Err(_) => { + Vec::new() + } } } else { Vec::new() diff --git a/api/src/routes/ws/datasets/list_datasets.rs b/api/src/routes/ws/datasets/list_datasets.rs index aeac854fc..4d9da29d8 100644 --- a/api/src/routes/ws/datasets/list_datasets.rs +++ b/api/src/routes/ws/datasets/list_datasets.rs @@ -1,22 +1,12 @@ use anyhow::{anyhow, Result}; use chrono::{DateTime, Utc}; -use diesel::{ - BoolExpressionMethods, ExpressionMethods, JoinOnDsl, NullableExpressionMethods, QueryDsl, -}; +use diesel::{ExpressionMethods, JoinOnDsl, NullableExpressionMethods, QueryDsl}; use diesel_async::RunQueryDsl; -use uuid::Uuid; use tokio; +use uuid::Uuid; use serde::{Deserialize, Serialize}; -use database::{enums::{IdentityType, UserOrganizationRole}, - pool::get_pg_pool, - models::UserToOrganization, - schema::{ - data_sources, dataset_permissions, datasets, datasets_to_permission_groups, messages_deprecated, - permission_groups_to_identities, permission_groups_to_users, teams_to_users, users, - users_to_organizations, - },}; use crate::{ routes::ws::{ datasets::datasets_router::{DatasetEvent, DatasetRoute}, @@ -26,6 +16,15 @@ use crate::{ }, utils::clients::sentry_utils::send_sentry_error, }; +use database::{ + enums::UserOrganizationRole, + models::UserToOrganization, + pool::get_pg_pool, + schema::{ + data_sources, dataset_permissions, datasets, messages_deprecated, + permission_groups_to_users, users, users_to_organizations, + }, +}; use middleware::AuthenticatedUser; #[derive(Deserialize, Debug, Clone)] @@ -141,13 +140,13 @@ async fn list_datasets_handler( admin_view: Option, enabled: Option, imported: Option, - permission_group_id: Option, + _permission_group_id: Option, _belongs_to: Option, data_source_id: Option, ) -> Result> { let page = page.unwrap_or(0); let page_size = page_size.unwrap_or(25); - let admin_view = admin_view.unwrap_or(false); + let _admin_view = admin_view.unwrap_or(false); let mut conn = match get_pg_pool().get().await { Ok(conn) => conn, @@ -189,133 +188,6 @@ async fn list_datasets_handler( Ok(list_of_datasets) } -async fn get_user_permissioned_datasets( - user_id: &Uuid, - page: i64, - page_size: i64, -) -> Result> { - let mut conn = match get_pg_pool().get().await { - Ok(conn) => conn, - Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)), - }; - - let list_dataset_records = match datasets::table - .inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id))) - .inner_join( - datasets_to_permission_groups::table.on(datasets::id - .eq(datasets_to_permission_groups::dataset_id) - .and(datasets_to_permission_groups::deleted_at.is_null())), - ) - .inner_join( - permission_groups_to_identities::table.on( - datasets_to_permission_groups::permission_group_id - .eq(permission_groups_to_identities::permission_group_id) - .and(permission_groups_to_identities::deleted_at.is_null()), - ), - ) - .left_join( - teams_to_users::table.on(permission_groups_to_identities::identity_id - .eq(teams_to_users::team_id) - .and(permission_groups_to_identities::identity_type.eq(IdentityType::Team)) - .and(teams_to_users::deleted_at.is_null())), - ) - .inner_join(users::table.on(datasets::created_by.eq(users::id))) - .left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable()))) - .select(( - datasets::id, - datasets::name, - datasets::created_at, - datasets::updated_at, - datasets::enabled, - datasets::imported, - users::id, - users::name.nullable(), - users::email, - data_sources::id, - data_sources::name, - )) - .group_by(( - datasets::id, - datasets::name, - datasets::created_at, - datasets::updated_at, - datasets::enabled, - datasets::imported, - users::id, - users::name, - users::email, - data_sources::id, - data_sources::name, - )) - .filter(datasets::deleted_at.is_null()) - .filter( - permission_groups_to_identities::identity_id - .eq(user_id) - .or(teams_to_users::user_id.eq(user_id)), - ) - .limit(page_size) - .offset(page * page_size) - .load::<( - Uuid, - String, - DateTime, - DateTime, - bool, - bool, - Uuid, - Option, - String, - Uuid, - String, - )>(&mut conn) - .await - { - Ok(datasets) => datasets, - Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)), - }; - - let list_dataset_objects: Vec = list_dataset_records - .into_iter() - .map( - |( - id, - name, - created_at, - updated_at, - enabled, - imported, - user_id, - user_name, - user_email, - data_source_id, - data_source_name, - )| { - ListDatasetObject { - id, - name, - created_at: Some(created_at), - updated_at: Some(updated_at), - enabled: Some(enabled), - imported: Some(imported), - data_source: ListDatasetDataSource { - id: data_source_id, - name: data_source_name, - }, - last_queried: None, - owner: Some(ListDatasetOwner { - id: user_id, - name: user_name.unwrap_or(user_email), - avatar_url: None, - }), - belongs_to: None, - } - }, - ) - .collect(); - - Ok(list_dataset_objects) -} - async fn get_org_datasets( organization_id: &Uuid, page: i64, @@ -332,7 +204,10 @@ async fn get_org_datasets( let mut query = datasets::table .inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id))) .inner_join(users::table.on(datasets::created_by.eq(users::id))) - .left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable()))) + .left_join( + messages_deprecated::table + .on(messages_deprecated::dataset_id.eq(datasets::id.nullable())), + ) .select(( datasets::id, datasets::name, @@ -439,68 +314,6 @@ async fn get_org_datasets( Ok(list_dataset_objects) } -async fn list_permission_group_datasets( - organization_id: Uuid, - page: i64, - page_size: i64, - permission_group_id: Uuid, -) -> Result> { - let mut conn = match get_pg_pool().get().await { - Ok(conn) => conn, - Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)), - }; - - let list_dataset_records = match datasets::table - .inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id))) - .left_join( - datasets_to_permission_groups::table.on(datasets::id - .eq(datasets_to_permission_groups::dataset_id) - .and(datasets_to_permission_groups::permission_group_id.eq(permission_group_id)) - .and(datasets_to_permission_groups::deleted_at.is_null())), - ) - .select(( - datasets::id, - datasets::name, - data_sources::id, - data_sources::name, - datasets_to_permission_groups::permission_group_id.nullable(), - )) - .filter(datasets::organization_id.eq(organization_id)) - .filter(datasets::deleted_at.is_null()) - .filter(datasets::enabled.eq(true)) - .limit(page_size) - .offset(page * page_size) - .load::<(Uuid, String, Uuid, String, Option)>(&mut conn) - .await - { - Ok(datasets) => datasets, - Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)), - }; - - let list_dataset_objects: Vec = list_dataset_records - .into_iter() - .map( - |(id, name, data_source_id, data_source_name, permission_group_id)| ListDatasetObject { - id, - name, - created_at: None, - updated_at: None, - enabled: None, - imported: None, - data_source: ListDatasetDataSource { - id: data_source_id, - name: data_source_name, - }, - last_queried: None, - owner: None, - belongs_to: Some(permission_group_id.is_some()), - }, - ) - .collect(); - - Ok(list_dataset_objects) -} - async fn get_restricted_user_datasets( user_id: &Uuid, page: i64, @@ -520,7 +333,10 @@ async fn get_restricted_user_datasets( .inner_join( dataset_permissions::table.on(dataset_permissions::dataset_id.eq(datasets::id)), ) - .left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable()))) + .left_join( + messages_deprecated::table + .on(messages_deprecated::dataset_id.eq(datasets::id.nullable())), + ) .select(( datasets::id, datasets::name, @@ -607,7 +423,10 @@ async fn get_restricted_user_datasets( permission_groups_to_users::table .on(permission_groups_to_users::user_id.eq(user_id)), ) - .left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable()))) + .left_join( + messages_deprecated::table + .on(messages_deprecated::dataset_id.eq(datasets::id.nullable())), + ) .select(( datasets::id, datasets::name, diff --git a/api/src/routes/ws/organizations/update_organization.rs b/api/src/routes/ws/organizations/update_organization.rs index 92176635a..605216e9b 100644 --- a/api/src/routes/ws/organizations/update_organization.rs +++ b/api/src/routes/ws/organizations/update_organization.rs @@ -5,8 +5,6 @@ use diesel_async::RunQueryDsl; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use database::{pool::get_pg_pool, - schema::organizations,}; use crate::{ routes::ws::{ organizations::organization_router::{OrganizationEvent, OrganizationRoute}, @@ -16,6 +14,7 @@ use crate::{ }, utils::clients::sentry_utils::send_sentry_error, }; +use database::{pool::get_pg_pool, schema::organizations}; use middleware::AuthenticatedUser; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -24,7 +23,10 @@ pub struct UpdateOrganizationRequest { pub name: String, } -pub async fn update_organization(user: &AuthenticatedUser, req: UpdateOrganizationRequest) -> Result<()> { +pub async fn update_organization( + user: &AuthenticatedUser, + req: UpdateOrganizationRequest, +) -> Result<()> { let org_state = match update_organization_handler(user, req.id, req.name).await { Ok(state) => state, Err(e) => { @@ -65,7 +67,20 @@ pub async fn update_organization(user: &AuthenticatedUser, req: UpdateOrganizati Ok(()) } -async fn update_organization_handler(user: &AuthenticatedUser, id: Uuid, name: String) -> Result<()> { +async fn update_organization_handler( + user: &AuthenticatedUser, + id: Uuid, + name: String, +) -> Result<()> { + let organization_id = match user.organizations.get(0) { + Some(organization) => organization.id, + None => return Err(anyhow!("User is not a member of any organization")), + }; + + if id != organization_id { + return Err(anyhow!("User is not a member of this organization")); + } + let mut conn = match get_pg_pool().get().await { Ok(conn) => conn, Err(e) => return Err(anyhow!("Error getting pg connection: {}", e)), diff --git a/api/src/routes/ws/threads_and_messages/delete_thread.rs b/api/src/routes/ws/threads_and_messages/delete_thread.rs index 6a70eb4b1..cc670e4b6 100644 --- a/api/src/routes/ws/threads_and_messages/delete_thread.rs +++ b/api/src/routes/ws/threads_and_messages/delete_thread.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use diesel::{update, ExpressionMethods, QueryDsl}; +use diesel::{update, ExpressionMethods}; use diesel_async::RunQueryDsl; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -135,7 +135,6 @@ async fn delete_thread_handler( err })?; - let user_id = user.id.clone(); let id = id.clone(); let delete_thread_task = tokio::task::spawn(async move { diff --git a/api/src/routes/ws/threads_and_messages/get_thread.rs b/api/src/routes/ws/threads_and_messages/get_thread.rs index 08b68e81b..2a2162f53 100644 --- a/api/src/routes/ws/threads_and_messages/get_thread.rs +++ b/api/src/routes/ws/threads_and_messages/get_thread.rs @@ -10,14 +10,11 @@ use uuid::Uuid; use crate::{ routes::ws::{ - ws::{SubscriptionRwLock, WsErrorCode, WsEvent, WsResponseMessage, WsSendMethod}, + ws::{SubscriptionRwLock, WsEvent, WsResponseMessage, WsSendMethod}, ws_router::WsRoutes, - ws_utils::{send_error_message, send_ws_message, subscribe_to_stream}, - }, - utils::{ - clients::sentry_utils::send_sentry_error, - query_engine::{data_types::DataType, query_engine::query_engine}, + ws_utils::{send_ws_message, subscribe_to_stream}, }, + utils::{clients::sentry_utils::send_sentry_error, query_engine::data_types::DataType}, }; use database::models::StepProgress; @@ -36,6 +33,7 @@ pub struct FetchingData { #[derive(Deserialize, Debug, Clone)] pub struct GetThreadRequest { pub id: Uuid, + #[allow(dead_code)] pub password: Option, } @@ -82,113 +80,3 @@ pub async fn get_thread_ws( Ok(()) } - -async fn send_fetching_data_in_progress_to_sub( - subscription: &String, - user: &AuthenticatedUser, - thread_id: &Uuid, - message_id: &Uuid, - sql: &String, -) -> Result<()> { - let fetching_data_body = FetchingData { - progress: StepProgress::InProgress, - data: None, - thread_id: thread_id.clone(), - message_id: message_id.clone(), - chart_config: None, - code: Some(sql.clone()), - }; - - let identify_dataset_ws_response = WsResponseMessage::new( - WsRoutes::Threads(ThreadRoute::Get), - WsEvent::Threads(ThreadEvent::FetchingData), - Some(fetching_data_body), - None, - user, - WsSendMethod::SenderOnly, - ); - - match send_ws_message(subscription, &identify_dataset_ws_response).await { - Ok(_) => (), - Err(e) => return Err(e), - } - - Ok(()) -} - -async fn fetch_data_handler( - subscription: &String, - user: &AuthenticatedUser, - sql: &String, - dataset_id: &Uuid, - thread_id: &Uuid, - message_id: &Uuid, -) -> Result<()> { - match send_fetching_data_in_progress_to_sub(subscription, user, thread_id, message_id, sql) - .await - { - Ok(_) => (), - Err(e) => { - tracing::error!( - "Unable to send fetching data in progress to subscription: {:?}", - e - ); - send_sentry_error(&e.to_string(), Some(&user.id)); - return Err(e); - } - } - - let data = match query_engine(&dataset_id, &sql).await { - Ok(data) => data, - Err(e) => { - tracing::error!("Unable to query engine: {:?}", e); - send_sentry_error(&e.to_string(), Some(&user.id)); - send_error_message( - &subscription, - WsRoutes::Threads(ThreadRoute::Get), - WsEvent::Threads(ThreadEvent::FetchingData), - WsErrorCode::InternalServerError, - "Failed to fetch results.".to_string(), - user, - ) - .await?; - return Err(e); - } - }; - - let fetching_data_body = FetchingData { - progress: StepProgress::Completed, - data: if data.is_empty() { - Some(vec![]) - } else { - Some(data) - }, - code: Some(sql.clone()), - thread_id: thread_id.clone(), - message_id: message_id.clone(), - chart_config: None, - }; - - let fetching_data_ws_response = WsResponseMessage::new( - WsRoutes::Threads(ThreadRoute::Get), - WsEvent::Threads(ThreadEvent::FetchingData), - Some(fetching_data_body), - None, - user, - WsSendMethod::SenderOnly, - ); - - match send_ws_message(&subscription, &fetching_data_ws_response).await { - Ok(_) => (), - Err(e) => { - tracing::error!( - "Unable to send fetching data success to subscription: {:?}", - e - ); - send_sentry_error(&e.to_string(), Some(&user.id)); - return Err(e); - } - } - - Ok(()) -} diff --git a/api/src/routes/ws/threads_and_messages/list_threads.rs b/api/src/routes/ws/threads_and_messages/list_threads.rs index 4fcf982fd..d6b012827 100644 --- a/api/src/routes/ws/threads_and_messages/list_threads.rs +++ b/api/src/routes/ws/threads_and_messages/list_threads.rs @@ -39,7 +39,6 @@ use crate::{ pub struct ListThreadsFilters { #[serde(rename = "status")] pub verification: Option>, - pub user_id: Option, } #[derive(Deserialize, Debug, Clone)] diff --git a/api/src/routes/ws/threads_and_messages/post_thread.rs b/api/src/routes/ws/threads_and_messages/post_thread.rs index 2d6ce3951..4d29e77ae 100644 --- a/api/src/routes/ws/threads_and_messages/post_thread.rs +++ b/api/src/routes/ws/threads_and_messages/post_thread.rs @@ -1,23 +1,19 @@ -use std::sync::Arc; use anyhow::Result; use handlers::chats::post_chat_handler::ChatCreateNewChat; use handlers::chats::post_chat_handler::{self, ThreadEvent}; -use handlers::chats::types::ChatWithMessages; use middleware::AuthenticatedUser; use tokio::sync::mpsc; use crate::routes::ws::{ threads_and_messages::threads_router::{ThreadEvent as WSThreadEvent, ThreadRoute}, - ws::{SubscriptionRwLock, WsEvent, WsResponseMessage, WsSendMethod}, + ws::{WsEvent, WsResponseMessage, WsSendMethod}, ws_router::WsRoutes, ws_utils::send_ws_message, }; /// Creates a new thread for a user and processes their request using the shared handler pub async fn post_thread( - subscriptions: &Arc, - user_group: &String, user: &AuthenticatedUser, request: ChatCreateNewChat, ) -> Result<()> { @@ -75,21 +71,4 @@ pub async fn post_thread( post_chat_handler::post_chat_handler(request, user.clone(), Some(tx)).await?; Ok(()) -} - -/// Sends the chat response to the client via WebSocket -async fn send_ws_response(subscription: &str, chat_with_messages: &ChatWithMessages) -> Result<()> { - let response = WsResponseMessage::new_no_user( - WsRoutes::Threads(ThreadRoute::Post), - WsEvent::Threads(WSThreadEvent::InitializeChat), - chat_with_messages, - None, - WsSendMethod::All, - ); - - if let Err(e) = send_ws_message(&subscription.to_string(), &response).await { - tracing::error!("Failed to send websocket message: {}", e); - } - - Ok(()) -} +} \ No newline at end of file diff --git a/api/src/routes/ws/threads_and_messages/thread_utils.rs b/api/src/routes/ws/threads_and_messages/thread_utils.rs index 2676a76ce..07050c995 100644 --- a/api/src/routes/ws/threads_and_messages/thread_utils.rs +++ b/api/src/routes/ws/threads_and_messages/thread_utils.rs @@ -1,4 +1,3 @@ -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; @@ -9,7 +8,6 @@ use diesel::{ QueryDsl, }; use diesel_async::RunQueryDsl; -use indexmap::IndexMap; use serde::Serialize; use uuid::Uuid; @@ -17,7 +15,6 @@ use crate::{ routes::ws::threads_and_messages::messages_utils::MessageDraftState, utils::{ clients::sentry_utils::send_sentry_error, - query_engine::{data_types::DataType, query_engine::query_engine}, sharing::asset_sharing::{ get_asset_collections, get_asset_sharing_info, CollectionNameAndId, IndividualPermission, TeamPermissions, @@ -26,7 +23,6 @@ use crate::{ }; use database::{ enums::{AssetPermissionRole, AssetType, UserOrganizationRole}, - models::{ColumnMetadata, DataMetadataJsonBody, MinMaxValue}, models::{MessageDeprecated, ThreadDeprecated}, pool::get_pg_pool, schema::{ @@ -77,8 +73,6 @@ pub struct ThreadState { pub draft_session_id: Option, } -const MAX_UNIQUE_VALUES: usize = 1000; - pub async fn get_thread_state_by_id( user_id: &Uuid, thread_id: &Uuid, @@ -156,7 +150,7 @@ pub async fn get_thread_state_by_id( } }; - let public_password = if let Some(password_secret_id) = thread.password_secret_id { + let public_password = if let Some(_password_secret_id) = thread.password_secret_id { let public_password = match read_secret(&thread.id).await { Ok(public_password) => public_password, Err(e) => { @@ -539,56 +533,6 @@ async fn is_organization_admin_or_owner(user_id: Arc, thread_id: Arc Ok(is_organization_adminig) } -pub async fn get_bulk_user_thread_permission( - user_id: &Uuid, - thread_ids: &Vec, -) -> Result> { - let mut conn = match get_pg_pool().get().await { - Ok(conn) => conn, - Err(e) => { - tracing::error!("Error getting pg connection: {}", e); - return Err(anyhow!("Error getting pg connection: {}", e)); - } - }; - - let permissions = match asset_permissions::table - .left_join( - teams_to_users::table.on(asset_permissions::identity_id.eq(teams_to_users::team_id)), - ) - .select((asset_permissions::asset_id, asset_permissions::role)) - .filter( - asset_permissions::identity_id - .eq(&user_id) - .or(teams_to_users::user_id.eq(&user_id)), - ) - .filter(asset_permissions::asset_id.eq_any(thread_ids)) - .filter(asset_permissions::deleted_at.is_null()) - .load::<(Uuid, AssetPermissionRole)>(&mut conn) - .await - { - Ok(permissions) => permissions, - Err(diesel::result::Error::NotFound) => { - tracing::error!("thread not found"); - return Err(anyhow!("thread not found")); - } - Err(e) => { - tracing::error!("Error querying thread by ID: {}", e); - return Err(anyhow!("Error querying thread by ID: {}", e)); - } - }; - - let mut permission_map: HashMap = HashMap::new(); - - for (asset_id, role) in permissions { - permission_map - .entry(asset_id) - .and_modify(|e| *e = AssetPermissionRole::max(e.clone(), role.clone())) - .or_insert(role); - } - - Ok(permission_map) -} - async fn get_thread_and_check_permissions( user_id: Arc, thread_id: Arc, @@ -759,7 +703,7 @@ async fn get_thread_messages( None }; - let sql_evaluation = if let Some(context) = &message.context { + let _sql_evaluation = if let Some(context) = &message.context { match context.get("sql_evaluation") { Some(sql_evaluation) => Some(sql_evaluation.clone()), None => None, @@ -857,198 +801,6 @@ async fn get_thread_dashboards(thread_id: Arc) -> Result>, - pub data_metadata: DataMetadataJsonBody, -} - -pub async fn fetch_data(sql: &String, dataset_id: &Uuid) -> Result { - let data = match query_engine(&dataset_id, &sql).await { - Ok(data) => data, - Err(e) => { - return Err(anyhow!("Unable to query engine: {}", e)); - } - }; - - let data_metadata = match process_data_metadata(&data).await { - Ok(data_metadata) => data_metadata, - Err(e) => return Err(anyhow!("Unable to process data metadata: {}", e)), - }; - - Ok(DataObject { - data, - data_metadata, - }) -} - -async fn process_data_metadata( - data: &Vec>, -) -> Result { - if data.is_empty() { - return Ok(DataMetadataJsonBody { - column_count: 0, - row_count: 0, - column_metadata: vec![], - }); - } - - let first_row = &data[0]; - let columns: Vec<_> = first_row.keys().cloned().collect(); - - let column_metadata: Vec<_> = columns - .par_iter() // Parallel iterator - .map(|column_name| { - let mut unique_values = Vec::with_capacity(MAX_UNIQUE_VALUES); - let mut min_value = None; - let mut max_value = None; - let mut unique_values_exceeded = false; - let mut is_date_type = false; - let mut min_value_str: Option = None; - let mut max_value_str: Option = None; - - for row in data { - if let Some(value) = row.get(column_name) { - if !unique_values_exceeded && unique_values.len() < MAX_UNIQUE_VALUES { - if !unique_values.iter().any(|x| x == value) { - unique_values.push(value.clone()); - } - } else { - unique_values_exceeded = true; - } - - // Update min/max for numeric types - match value { - DataType::Int8(Some(n)) => { - let n = *n as f64; - min_value = Some(min_value.map_or(n, |min: f64| min.min(n))); - max_value = Some(max_value.map_or(n, |max: f64| max.max(n))); - } - DataType::Int4(Some(n)) => { - let n = *n as f64; - min_value = Some(min_value.map_or(n, |min: f64| min.min(n))); - max_value = Some(max_value.map_or(n, |max: f64| max.max(n))); - } - DataType::Int2(Some(n)) => { - let n = *n as f64; - min_value = Some(min_value.map_or(n, |min: f64| min.min(n))); - max_value = Some(max_value.map_or(n, |max: f64| max.max(n))); - } - DataType::Float4(Some(n)) => { - let n = *n as f64; - min_value = Some(min_value.map_or(n, |min: f64| min.min(n))); - max_value = Some(max_value.map_or(n, |max: f64| max.max(n))); - } - DataType::Float8(Some(n)) => { - let n = *n as f64; - min_value = Some(min_value.map_or(n, |min: f64| min.min(n))); - max_value = Some(max_value.map_or(n, |max: f64| max.max(n))); - } - DataType::Date(Some(date)) => { - is_date_type = true; - let date_str = date.to_string(); - min_value = match min_value { - None => Some(date_str.parse::().unwrap_or(0.0)), - Some(_) => None, // Clear numeric min/max since we'll use strings - }; - max_value = None; - if let Some(current_min) = &min_value_str { - if date_str < *current_min { - min_value_str = Some(date_str.clone()); - } - } else { - min_value_str = Some(date_str.clone()); - } - if let Some(current_max) = &max_value_str { - if date_str > *current_max { - max_value_str = Some(date_str); - } - } else { - max_value_str = Some(date_str); - } - } - DataType::Timestamp(Some(ts)) => { - is_date_type = true; - let ts_str = ts.to_string(); - min_value = match min_value { - None => Some(ts_str.parse::().unwrap_or(0.0)), - Some(_) => None, - }; - max_value = None; - if let Some(current_min) = &min_value_str { - if ts_str < *current_min { - min_value_str = Some(ts_str.clone()); - } - } else { - min_value_str = Some(ts_str.clone()); - } - if let Some(current_max) = &max_value_str { - if ts_str > *current_max { - max_value_str = Some(ts_str); - } - } else { - max_value_str = Some(ts_str); - } - } - DataType::Timestamptz(Some(ts)) => { - is_date_type = true; - let ts_str = ts.naive_utc().to_string(); - min_value = match min_value { - None => Some(ts_str.parse::().unwrap_or(0.0)), - Some(_) => None, - }; - max_value = None; - if let Some(current_min) = &min_value_str { - if ts_str < *current_min { - min_value_str = Some(ts_str.clone()); - } - } else { - min_value_str = Some(ts_str.clone()); - } - if let Some(current_max) = &max_value_str { - if ts_str > *current_max { - max_value_str = Some(ts_str); - } - } else { - max_value_str = Some(ts_str); - } - } - _ => {} - } - } - } - - let column_type = first_row.get(column_name).unwrap(); - ColumnMetadata { - name: column_name.clone(), - type_: column_type.to_string(), - simple_type: column_type.simple_type(), - unique_values: if !unique_values_exceeded { - unique_values.len() as i32 - } else { - MAX_UNIQUE_VALUES as i32 - }, - min_value: if is_date_type { - min_value_str.map(MinMaxValue::String) - } else { - min_value.map(MinMaxValue::Number) - }, - max_value: if is_date_type { - max_value_str.map(MinMaxValue::String) - } else { - max_value.map(MinMaxValue::Number) - }, - } - }) - .collect(); - - Ok(DataMetadataJsonBody { - column_count: first_row.len() as i32, - row_count: data.len() as i32, - column_metadata, - }) -} - pub async fn check_if_thread_saved(thread_id: &Uuid) -> Result { let threads_to_collections_id = thread_id.clone(); diff --git a/api/src/routes/ws/threads_and_messages/threads_router.rs b/api/src/routes/ws/threads_and_messages/threads_router.rs index 4ba1db67f..ffee75e48 100644 --- a/api/src/routes/ws/threads_and_messages/threads_router.rs +++ b/api/src/routes/ws/threads_and_messages/threads_router.rs @@ -100,7 +100,7 @@ pub async fn threads_router( ThreadRoute::Post => { let req = serde_json::from_value(data)?; - post_thread(subscriptions, user_group, user, req).await?; + post_thread(user, req).await?; } ThreadRoute::Update => { let req = serde_json::from_value(data)?; diff --git a/api/src/routes/ws/threads_and_messages/update_thread.rs b/api/src/routes/ws/threads_and_messages/update_thread.rs index 4c5cc735a..24fb45436 100644 --- a/api/src/routes/ws/threads_and_messages/update_thread.rs +++ b/api/src/routes/ws/threads_and_messages/update_thread.rs @@ -268,7 +268,7 @@ async fn update_thread_record( save_to_dashboard: Option, remove_from_dashboard: Option, ) -> Result<()> { - let password_secret_id = match public_password { + let _password_secret_id = match public_password { Some(Some(password)) => { // Password provided - create new secret match create_secret(&thread_id, &password).await {