mirror of https://github.com/buster-so/buster.git
clean up src
This commit is contained in:
parent
dba826d874
commit
8bfd0f04af
|
@ -9,24 +9,31 @@ use serde::{Deserialize, Serialize};
|
|||
use serde_json::Value;
|
||||
use uuid::Uuid;
|
||||
|
||||
use database::{enums::{AssetPermissionRole, AssetType}, models::{Dashboard, MessageDeprecated}, pool::get_pg_pool, schema::{asset_permissions, dashboards, messages_deprecated, threads_to_dashboards, users_to_organizations}, vault::read_secret};
|
||||
use crate::{
|
||||
utils::{
|
||||
clients::{sentry_utils::send_sentry_error},
|
||||
query_engine::data_types::DataType,
|
||||
sharing::asset_sharing::{
|
||||
get_asset_collections, get_asset_sharing_info, CollectionNameAndId,
|
||||
IndividualPermission, TeamPermissions,
|
||||
},
|
||||
user::user_info::get_user_organization_id,
|
||||
use crate::utils::{
|
||||
clients::sentry_utils::send_sentry_error,
|
||||
query_engine::data_types::DataType,
|
||||
sharing::asset_sharing::{
|
||||
get_asset_collections, get_asset_sharing_info, CollectionNameAndId, IndividualPermission,
|
||||
TeamPermissions,
|
||||
},
|
||||
user::user_info::get_user_organization_id,
|
||||
};
|
||||
use database::{
|
||||
enums::{AssetPermissionRole, AssetType},
|
||||
models::{Dashboard, MessageDeprecated},
|
||||
pool::get_pg_pool,
|
||||
schema::{
|
||||
asset_permissions, dashboards, messages_deprecated, threads_to_dashboards,
|
||||
users_to_organizations,
|
||||
},
|
||||
vault::read_secret,
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct Metric {
|
||||
pub id: Uuid, // This is the thread id
|
||||
#[serde(skip_serializing)]
|
||||
pub message_id: Uuid,
|
||||
pub _message_id: Uuid,
|
||||
pub name: String,
|
||||
pub time_frame: String,
|
||||
#[serde(skip_serializing)]
|
||||
|
@ -186,7 +193,7 @@ pub async fn get_dashboard_state_by_id(
|
|||
match read_secret(&secret_id).await {
|
||||
Ok(password) => Some(password),
|
||||
Err(e) => {
|
||||
tracing::error!("Error getting dashboard password: {}", e);
|
||||
tracing::error!("Error getting dashboard password: {}", e);
|
||||
None
|
||||
}
|
||||
}
|
||||
|
@ -438,7 +445,10 @@ async fn get_dashboard_metrics(dashboard_id: Arc<Uuid>) -> Result<Vec<Metric>> {
|
|||
threads_to_dashboards::table
|
||||
.on(messages_deprecated::thread_id.eq(threads_to_dashboards::thread_id)),
|
||||
)
|
||||
.select((threads_to_dashboards::thread_id, messages_deprecated::all_columns))
|
||||
.select((
|
||||
threads_to_dashboards::thread_id,
|
||||
messages_deprecated::all_columns,
|
||||
))
|
||||
.filter(threads_to_dashboards::dashboard_id.eq(dashboard_id.as_ref()))
|
||||
.filter(messages_deprecated::deleted_at.is_null())
|
||||
.filter(messages_deprecated::draft_session_id.is_null())
|
||||
|
@ -471,7 +481,7 @@ async fn get_dashboard_metrics(dashboard_id: Arc<Uuid>) -> Result<Vec<Metric>> {
|
|||
for (thread_id, message) in thread_messages {
|
||||
let metric = Metric {
|
||||
id: thread_id,
|
||||
message_id: message.id,
|
||||
_message_id: message.id,
|
||||
name: message.title.unwrap_or_default(),
|
||||
time_frame: message.time_frame.unwrap_or_default(),
|
||||
sql: message.code.unwrap_or_default(),
|
||||
|
|
|
@ -291,7 +291,7 @@ async fn update_dashboard_record(
|
|||
public_password: Option<Option<String>>,
|
||||
public_expiry_date: Option<Option<chrono::NaiveDateTime>>,
|
||||
) -> Result<()> {
|
||||
let password_secret_id = match public_password {
|
||||
let _password_secret_id = match public_password {
|
||||
Some(Some(password)) => match create_secret(&dashboard_id, &password).await {
|
||||
Ok(secret_id) => Some(Some(secret_id)),
|
||||
Err(e) => {
|
||||
|
|
|
@ -7,12 +7,12 @@ use diesel_async::RunQueryDsl;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use database::{enums::{DataSourceType, UserOrganizationRole},
|
||||
pool::get_pg_pool,
|
||||
models::Dataset,
|
||||
schema::{data_sources, datasets, organizations, users, users_to_organizations},};
|
||||
use crate::{
|
||||
utils::query_engine::credentials::{get_data_source_credentials, Credential},
|
||||
use crate::utils::query_engine::credentials::{get_data_source_credentials, Credential};
|
||||
use database::{
|
||||
enums::{DataSourceType, UserOrganizationRole},
|
||||
models::Dataset,
|
||||
pool::get_pg_pool,
|
||||
schema::{data_sources, datasets, organizations, users, users_to_organizations},
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
|
@ -42,7 +42,9 @@ pub struct DataSourceRecord {
|
|||
pub type_: DataSourceType,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
pub created_at: DateTime<Utc>,
|
||||
#[allow(dead_code)]
|
||||
pub created_by: Uuid,
|
||||
#[allow(dead_code)]
|
||||
pub secret_id: Uuid,
|
||||
pub user_id: Uuid,
|
||||
pub user_name: Option<String>,
|
||||
|
|
|
@ -3,7 +3,6 @@ use uuid::Uuid;
|
|||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
|
||||
use crate::{
|
||||
routes::ws::{
|
||||
datasets::datasets_router::{DatasetEvent, DatasetRoute},
|
||||
|
@ -58,7 +57,9 @@ pub async fn get_dataset(user: &AuthenticatedUser, req: GetDatasetReq) -> Result
|
|||
let sql = format!("SELECT * FROM {}.{} LIMIT 25", schema, database_name);
|
||||
match query_engine(&req.id, &sql).await {
|
||||
Ok(data) => data,
|
||||
Err(e) => Vec::new(),
|
||||
Err(_) => {
|
||||
Vec::new()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Vec::new()
|
||||
|
|
|
@ -1,22 +1,12 @@
|
|||
use anyhow::{anyhow, Result};
|
||||
use chrono::{DateTime, Utc};
|
||||
use diesel::{
|
||||
BoolExpressionMethods, ExpressionMethods, JoinOnDsl, NullableExpressionMethods, QueryDsl,
|
||||
};
|
||||
use diesel::{ExpressionMethods, JoinOnDsl, NullableExpressionMethods, QueryDsl};
|
||||
use diesel_async::RunQueryDsl;
|
||||
use uuid::Uuid;
|
||||
use tokio;
|
||||
use uuid::Uuid;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use database::{enums::{IdentityType, UserOrganizationRole},
|
||||
pool::get_pg_pool,
|
||||
models::UserToOrganization,
|
||||
schema::{
|
||||
data_sources, dataset_permissions, datasets, datasets_to_permission_groups, messages_deprecated,
|
||||
permission_groups_to_identities, permission_groups_to_users, teams_to_users, users,
|
||||
users_to_organizations,
|
||||
},};
|
||||
use crate::{
|
||||
routes::ws::{
|
||||
datasets::datasets_router::{DatasetEvent, DatasetRoute},
|
||||
|
@ -26,6 +16,15 @@ use crate::{
|
|||
},
|
||||
utils::clients::sentry_utils::send_sentry_error,
|
||||
};
|
||||
use database::{
|
||||
enums::UserOrganizationRole,
|
||||
models::UserToOrganization,
|
||||
pool::get_pg_pool,
|
||||
schema::{
|
||||
data_sources, dataset_permissions, datasets, messages_deprecated,
|
||||
permission_groups_to_users, users, users_to_organizations,
|
||||
},
|
||||
};
|
||||
use middleware::AuthenticatedUser;
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
|
@ -141,13 +140,13 @@ async fn list_datasets_handler(
|
|||
admin_view: Option<bool>,
|
||||
enabled: Option<bool>,
|
||||
imported: Option<bool>,
|
||||
permission_group_id: Option<Uuid>,
|
||||
_permission_group_id: Option<Uuid>,
|
||||
_belongs_to: Option<bool>,
|
||||
data_source_id: Option<Uuid>,
|
||||
) -> Result<Vec<ListDatasetObject>> {
|
||||
let page = page.unwrap_or(0);
|
||||
let page_size = page_size.unwrap_or(25);
|
||||
let admin_view = admin_view.unwrap_or(false);
|
||||
let _admin_view = admin_view.unwrap_or(false);
|
||||
|
||||
let mut conn = match get_pg_pool().get().await {
|
||||
Ok(conn) => conn,
|
||||
|
@ -189,133 +188,6 @@ async fn list_datasets_handler(
|
|||
Ok(list_of_datasets)
|
||||
}
|
||||
|
||||
async fn get_user_permissioned_datasets(
|
||||
user_id: &Uuid,
|
||||
page: i64,
|
||||
page_size: i64,
|
||||
) -> Result<Vec<ListDatasetObject>> {
|
||||
let mut conn = match get_pg_pool().get().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
|
||||
};
|
||||
|
||||
let list_dataset_records = match datasets::table
|
||||
.inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id)))
|
||||
.inner_join(
|
||||
datasets_to_permission_groups::table.on(datasets::id
|
||||
.eq(datasets_to_permission_groups::dataset_id)
|
||||
.and(datasets_to_permission_groups::deleted_at.is_null())),
|
||||
)
|
||||
.inner_join(
|
||||
permission_groups_to_identities::table.on(
|
||||
datasets_to_permission_groups::permission_group_id
|
||||
.eq(permission_groups_to_identities::permission_group_id)
|
||||
.and(permission_groups_to_identities::deleted_at.is_null()),
|
||||
),
|
||||
)
|
||||
.left_join(
|
||||
teams_to_users::table.on(permission_groups_to_identities::identity_id
|
||||
.eq(teams_to_users::team_id)
|
||||
.and(permission_groups_to_identities::identity_type.eq(IdentityType::Team))
|
||||
.and(teams_to_users::deleted_at.is_null())),
|
||||
)
|
||||
.inner_join(users::table.on(datasets::created_by.eq(users::id)))
|
||||
.left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())))
|
||||
.select((
|
||||
datasets::id,
|
||||
datasets::name,
|
||||
datasets::created_at,
|
||||
datasets::updated_at,
|
||||
datasets::enabled,
|
||||
datasets::imported,
|
||||
users::id,
|
||||
users::name.nullable(),
|
||||
users::email,
|
||||
data_sources::id,
|
||||
data_sources::name,
|
||||
))
|
||||
.group_by((
|
||||
datasets::id,
|
||||
datasets::name,
|
||||
datasets::created_at,
|
||||
datasets::updated_at,
|
||||
datasets::enabled,
|
||||
datasets::imported,
|
||||
users::id,
|
||||
users::name,
|
||||
users::email,
|
||||
data_sources::id,
|
||||
data_sources::name,
|
||||
))
|
||||
.filter(datasets::deleted_at.is_null())
|
||||
.filter(
|
||||
permission_groups_to_identities::identity_id
|
||||
.eq(user_id)
|
||||
.or(teams_to_users::user_id.eq(user_id)),
|
||||
)
|
||||
.limit(page_size)
|
||||
.offset(page * page_size)
|
||||
.load::<(
|
||||
Uuid,
|
||||
String,
|
||||
DateTime<Utc>,
|
||||
DateTime<Utc>,
|
||||
bool,
|
||||
bool,
|
||||
Uuid,
|
||||
Option<String>,
|
||||
String,
|
||||
Uuid,
|
||||
String,
|
||||
)>(&mut conn)
|
||||
.await
|
||||
{
|
||||
Ok(datasets) => datasets,
|
||||
Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
|
||||
};
|
||||
|
||||
let list_dataset_objects: Vec<ListDatasetObject> = list_dataset_records
|
||||
.into_iter()
|
||||
.map(
|
||||
|(
|
||||
id,
|
||||
name,
|
||||
created_at,
|
||||
updated_at,
|
||||
enabled,
|
||||
imported,
|
||||
user_id,
|
||||
user_name,
|
||||
user_email,
|
||||
data_source_id,
|
||||
data_source_name,
|
||||
)| {
|
||||
ListDatasetObject {
|
||||
id,
|
||||
name,
|
||||
created_at: Some(created_at),
|
||||
updated_at: Some(updated_at),
|
||||
enabled: Some(enabled),
|
||||
imported: Some(imported),
|
||||
data_source: ListDatasetDataSource {
|
||||
id: data_source_id,
|
||||
name: data_source_name,
|
||||
},
|
||||
last_queried: None,
|
||||
owner: Some(ListDatasetOwner {
|
||||
id: user_id,
|
||||
name: user_name.unwrap_or(user_email),
|
||||
avatar_url: None,
|
||||
}),
|
||||
belongs_to: None,
|
||||
}
|
||||
},
|
||||
)
|
||||
.collect();
|
||||
|
||||
Ok(list_dataset_objects)
|
||||
}
|
||||
|
||||
async fn get_org_datasets(
|
||||
organization_id: &Uuid,
|
||||
page: i64,
|
||||
|
@ -332,7 +204,10 @@ async fn get_org_datasets(
|
|||
let mut query = datasets::table
|
||||
.inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id)))
|
||||
.inner_join(users::table.on(datasets::created_by.eq(users::id)))
|
||||
.left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())))
|
||||
.left_join(
|
||||
messages_deprecated::table
|
||||
.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())),
|
||||
)
|
||||
.select((
|
||||
datasets::id,
|
||||
datasets::name,
|
||||
|
@ -439,68 +314,6 @@ async fn get_org_datasets(
|
|||
Ok(list_dataset_objects)
|
||||
}
|
||||
|
||||
async fn list_permission_group_datasets(
|
||||
organization_id: Uuid,
|
||||
page: i64,
|
||||
page_size: i64,
|
||||
permission_group_id: Uuid,
|
||||
) -> Result<Vec<ListDatasetObject>> {
|
||||
let mut conn = match get_pg_pool().get().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
|
||||
};
|
||||
|
||||
let list_dataset_records = match datasets::table
|
||||
.inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id)))
|
||||
.left_join(
|
||||
datasets_to_permission_groups::table.on(datasets::id
|
||||
.eq(datasets_to_permission_groups::dataset_id)
|
||||
.and(datasets_to_permission_groups::permission_group_id.eq(permission_group_id))
|
||||
.and(datasets_to_permission_groups::deleted_at.is_null())),
|
||||
)
|
||||
.select((
|
||||
datasets::id,
|
||||
datasets::name,
|
||||
data_sources::id,
|
||||
data_sources::name,
|
||||
datasets_to_permission_groups::permission_group_id.nullable(),
|
||||
))
|
||||
.filter(datasets::organization_id.eq(organization_id))
|
||||
.filter(datasets::deleted_at.is_null())
|
||||
.filter(datasets::enabled.eq(true))
|
||||
.limit(page_size)
|
||||
.offset(page * page_size)
|
||||
.load::<(Uuid, String, Uuid, String, Option<Uuid>)>(&mut conn)
|
||||
.await
|
||||
{
|
||||
Ok(datasets) => datasets,
|
||||
Err(e) => return Err(anyhow!("Unable to get datasets from database: {}", e)),
|
||||
};
|
||||
|
||||
let list_dataset_objects: Vec<ListDatasetObject> = list_dataset_records
|
||||
.into_iter()
|
||||
.map(
|
||||
|(id, name, data_source_id, data_source_name, permission_group_id)| ListDatasetObject {
|
||||
id,
|
||||
name,
|
||||
created_at: None,
|
||||
updated_at: None,
|
||||
enabled: None,
|
||||
imported: None,
|
||||
data_source: ListDatasetDataSource {
|
||||
id: data_source_id,
|
||||
name: data_source_name,
|
||||
},
|
||||
last_queried: None,
|
||||
owner: None,
|
||||
belongs_to: Some(permission_group_id.is_some()),
|
||||
},
|
||||
)
|
||||
.collect();
|
||||
|
||||
Ok(list_dataset_objects)
|
||||
}
|
||||
|
||||
async fn get_restricted_user_datasets(
|
||||
user_id: &Uuid,
|
||||
page: i64,
|
||||
|
@ -520,7 +333,10 @@ async fn get_restricted_user_datasets(
|
|||
.inner_join(
|
||||
dataset_permissions::table.on(dataset_permissions::dataset_id.eq(datasets::id)),
|
||||
)
|
||||
.left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())))
|
||||
.left_join(
|
||||
messages_deprecated::table
|
||||
.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())),
|
||||
)
|
||||
.select((
|
||||
datasets::id,
|
||||
datasets::name,
|
||||
|
@ -607,7 +423,10 @@ async fn get_restricted_user_datasets(
|
|||
permission_groups_to_users::table
|
||||
.on(permission_groups_to_users::user_id.eq(user_id)),
|
||||
)
|
||||
.left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())))
|
||||
.left_join(
|
||||
messages_deprecated::table
|
||||
.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())),
|
||||
)
|
||||
.select((
|
||||
datasets::id,
|
||||
datasets::name,
|
||||
|
|
|
@ -5,8 +5,6 @@ use diesel_async::RunQueryDsl;
|
|||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use database::{pool::get_pg_pool,
|
||||
schema::organizations,};
|
||||
use crate::{
|
||||
routes::ws::{
|
||||
organizations::organization_router::{OrganizationEvent, OrganizationRoute},
|
||||
|
@ -16,6 +14,7 @@ use crate::{
|
|||
},
|
||||
utils::clients::sentry_utils::send_sentry_error,
|
||||
};
|
||||
use database::{pool::get_pg_pool, schema::organizations};
|
||||
use middleware::AuthenticatedUser;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
@ -24,7 +23,10 @@ pub struct UpdateOrganizationRequest {
|
|||
pub name: String,
|
||||
}
|
||||
|
||||
pub async fn update_organization(user: &AuthenticatedUser, req: UpdateOrganizationRequest) -> Result<()> {
|
||||
pub async fn update_organization(
|
||||
user: &AuthenticatedUser,
|
||||
req: UpdateOrganizationRequest,
|
||||
) -> Result<()> {
|
||||
let org_state = match update_organization_handler(user, req.id, req.name).await {
|
||||
Ok(state) => state,
|
||||
Err(e) => {
|
||||
|
@ -65,7 +67,20 @@ pub async fn update_organization(user: &AuthenticatedUser, req: UpdateOrganizati
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_organization_handler(user: &AuthenticatedUser, id: Uuid, name: String) -> Result<()> {
|
||||
async fn update_organization_handler(
|
||||
user: &AuthenticatedUser,
|
||||
id: Uuid,
|
||||
name: String,
|
||||
) -> Result<()> {
|
||||
let organization_id = match user.organizations.get(0) {
|
||||
Some(organization) => organization.id,
|
||||
None => return Err(anyhow!("User is not a member of any organization")),
|
||||
};
|
||||
|
||||
if id != organization_id {
|
||||
return Err(anyhow!("User is not a member of this organization"));
|
||||
}
|
||||
|
||||
let mut conn = match get_pg_pool().get().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => return Err(anyhow!("Error getting pg connection: {}", e)),
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use anyhow::{anyhow, Result};
|
||||
use diesel::{update, ExpressionMethods, QueryDsl};
|
||||
use diesel::{update, ExpressionMethods};
|
||||
use diesel_async::RunQueryDsl;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::sync::Arc;
|
||||
|
@ -135,7 +135,6 @@ async fn delete_thread_handler(
|
|||
err
|
||||
})?;
|
||||
|
||||
let user_id = user.id.clone();
|
||||
let id = id.clone();
|
||||
|
||||
let delete_thread_task = tokio::task::spawn(async move {
|
||||
|
|
|
@ -10,14 +10,11 @@ use uuid::Uuid;
|
|||
|
||||
use crate::{
|
||||
routes::ws::{
|
||||
ws::{SubscriptionRwLock, WsErrorCode, WsEvent, WsResponseMessage, WsSendMethod},
|
||||
ws::{SubscriptionRwLock, WsEvent, WsResponseMessage, WsSendMethod},
|
||||
ws_router::WsRoutes,
|
||||
ws_utils::{send_error_message, send_ws_message, subscribe_to_stream},
|
||||
},
|
||||
utils::{
|
||||
clients::sentry_utils::send_sentry_error,
|
||||
query_engine::{data_types::DataType, query_engine::query_engine},
|
||||
ws_utils::{send_ws_message, subscribe_to_stream},
|
||||
},
|
||||
utils::{clients::sentry_utils::send_sentry_error, query_engine::data_types::DataType},
|
||||
};
|
||||
use database::models::StepProgress;
|
||||
|
||||
|
@ -36,6 +33,7 @@ pub struct FetchingData {
|
|||
#[derive(Deserialize, Debug, Clone)]
|
||||
pub struct GetThreadRequest {
|
||||
pub id: Uuid,
|
||||
#[allow(dead_code)]
|
||||
pub password: Option<String>,
|
||||
}
|
||||
|
||||
|
@ -82,113 +80,3 @@ pub async fn get_thread_ws(
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn send_fetching_data_in_progress_to_sub(
|
||||
subscription: &String,
|
||||
user: &AuthenticatedUser,
|
||||
thread_id: &Uuid,
|
||||
message_id: &Uuid,
|
||||
sql: &String,
|
||||
) -> Result<()> {
|
||||
let fetching_data_body = FetchingData {
|
||||
progress: StepProgress::InProgress,
|
||||
data: None,
|
||||
thread_id: thread_id.clone(),
|
||||
message_id: message_id.clone(),
|
||||
chart_config: None,
|
||||
code: Some(sql.clone()),
|
||||
};
|
||||
|
||||
let identify_dataset_ws_response = WsResponseMessage::new(
|
||||
WsRoutes::Threads(ThreadRoute::Get),
|
||||
WsEvent::Threads(ThreadEvent::FetchingData),
|
||||
Some(fetching_data_body),
|
||||
None,
|
||||
user,
|
||||
WsSendMethod::SenderOnly,
|
||||
);
|
||||
|
||||
match send_ws_message(subscription, &identify_dataset_ws_response).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn fetch_data_handler(
|
||||
subscription: &String,
|
||||
user: &AuthenticatedUser,
|
||||
sql: &String,
|
||||
dataset_id: &Uuid,
|
||||
thread_id: &Uuid,
|
||||
message_id: &Uuid,
|
||||
) -> Result<()> {
|
||||
match send_fetching_data_in_progress_to_sub(subscription, user, thread_id, message_id, sql)
|
||||
.await
|
||||
{
|
||||
Ok(_) => (),
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Unable to send fetching data in progress to subscription: {:?}",
|
||||
e
|
||||
);
|
||||
send_sentry_error(&e.to_string(), Some(&user.id));
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
let data = match query_engine(&dataset_id, &sql).await {
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
tracing::error!("Unable to query engine: {:?}", e);
|
||||
send_sentry_error(&e.to_string(), Some(&user.id));
|
||||
send_error_message(
|
||||
&subscription,
|
||||
WsRoutes::Threads(ThreadRoute::Get),
|
||||
WsEvent::Threads(ThreadEvent::FetchingData),
|
||||
WsErrorCode::InternalServerError,
|
||||
"Failed to fetch results.".to_string(),
|
||||
user,
|
||||
)
|
||||
.await?;
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let fetching_data_body = FetchingData {
|
||||
progress: StepProgress::Completed,
|
||||
data: if data.is_empty() {
|
||||
Some(vec![])
|
||||
} else {
|
||||
Some(data)
|
||||
},
|
||||
code: Some(sql.clone()),
|
||||
thread_id: thread_id.clone(),
|
||||
message_id: message_id.clone(),
|
||||
chart_config: None,
|
||||
};
|
||||
|
||||
let fetching_data_ws_response = WsResponseMessage::new(
|
||||
WsRoutes::Threads(ThreadRoute::Get),
|
||||
WsEvent::Threads(ThreadEvent::FetchingData),
|
||||
Some(fetching_data_body),
|
||||
None,
|
||||
user,
|
||||
WsSendMethod::SenderOnly,
|
||||
);
|
||||
|
||||
match send_ws_message(&subscription, &fetching_data_ws_response).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Unable to send fetching data success to subscription: {:?}",
|
||||
e
|
||||
);
|
||||
send_sentry_error(&e.to_string(), Some(&user.id));
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -39,7 +39,6 @@ use crate::{
|
|||
pub struct ListThreadsFilters {
|
||||
#[serde(rename = "status")]
|
||||
pub verification: Option<Vec<Verification>>,
|
||||
pub user_id: Option<Uuid>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Clone)]
|
||||
|
|
|
@ -1,23 +1,19 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use handlers::chats::post_chat_handler::ChatCreateNewChat;
|
||||
use handlers::chats::post_chat_handler::{self, ThreadEvent};
|
||||
use handlers::chats::types::ChatWithMessages;
|
||||
use middleware::AuthenticatedUser;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::routes::ws::{
|
||||
threads_and_messages::threads_router::{ThreadEvent as WSThreadEvent, ThreadRoute},
|
||||
ws::{SubscriptionRwLock, WsEvent, WsResponseMessage, WsSendMethod},
|
||||
ws::{WsEvent, WsResponseMessage, WsSendMethod},
|
||||
ws_router::WsRoutes,
|
||||
ws_utils::send_ws_message,
|
||||
};
|
||||
|
||||
/// Creates a new thread for a user and processes their request using the shared handler
|
||||
pub async fn post_thread(
|
||||
subscriptions: &Arc<SubscriptionRwLock>,
|
||||
user_group: &String,
|
||||
user: &AuthenticatedUser,
|
||||
request: ChatCreateNewChat,
|
||||
) -> Result<()> {
|
||||
|
@ -75,21 +71,4 @@ pub async fn post_thread(
|
|||
post_chat_handler::post_chat_handler(request, user.clone(), Some(tx)).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sends the chat response to the client via WebSocket
|
||||
async fn send_ws_response(subscription: &str, chat_with_messages: &ChatWithMessages) -> Result<()> {
|
||||
let response = WsResponseMessage::new_no_user(
|
||||
WsRoutes::Threads(ThreadRoute::Post),
|
||||
WsEvent::Threads(WSThreadEvent::InitializeChat),
|
||||
chat_with_messages,
|
||||
None,
|
||||
WsSendMethod::All,
|
||||
);
|
||||
|
||||
if let Err(e) = send_ws_message(&subscription.to_string(), &response).await {
|
||||
tracing::error!("Failed to send websocket message: {}", e);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -1,4 +1,3 @@
|
|||
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
@ -9,7 +8,6 @@ use diesel::{
|
|||
QueryDsl,
|
||||
};
|
||||
use diesel_async::RunQueryDsl;
|
||||
use indexmap::IndexMap;
|
||||
use serde::Serialize;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -17,7 +15,6 @@ use crate::{
|
|||
routes::ws::threads_and_messages::messages_utils::MessageDraftState,
|
||||
utils::{
|
||||
clients::sentry_utils::send_sentry_error,
|
||||
query_engine::{data_types::DataType, query_engine::query_engine},
|
||||
sharing::asset_sharing::{
|
||||
get_asset_collections, get_asset_sharing_info, CollectionNameAndId,
|
||||
IndividualPermission, TeamPermissions,
|
||||
|
@ -26,7 +23,6 @@ use crate::{
|
|||
};
|
||||
use database::{
|
||||
enums::{AssetPermissionRole, AssetType, UserOrganizationRole},
|
||||
models::{ColumnMetadata, DataMetadataJsonBody, MinMaxValue},
|
||||
models::{MessageDeprecated, ThreadDeprecated},
|
||||
pool::get_pg_pool,
|
||||
schema::{
|
||||
|
@ -77,8 +73,6 @@ pub struct ThreadState {
|
|||
pub draft_session_id: Option<Uuid>,
|
||||
}
|
||||
|
||||
const MAX_UNIQUE_VALUES: usize = 1000;
|
||||
|
||||
pub async fn get_thread_state_by_id(
|
||||
user_id: &Uuid,
|
||||
thread_id: &Uuid,
|
||||
|
@ -156,7 +150,7 @@ pub async fn get_thread_state_by_id(
|
|||
}
|
||||
};
|
||||
|
||||
let public_password = if let Some(password_secret_id) = thread.password_secret_id {
|
||||
let public_password = if let Some(_password_secret_id) = thread.password_secret_id {
|
||||
let public_password = match read_secret(&thread.id).await {
|
||||
Ok(public_password) => public_password,
|
||||
Err(e) => {
|
||||
|
@ -539,56 +533,6 @@ async fn is_organization_admin_or_owner(user_id: Arc<Uuid>, thread_id: Arc<Uuid>
|
|||
Ok(is_organization_adminig)
|
||||
}
|
||||
|
||||
pub async fn get_bulk_user_thread_permission(
|
||||
user_id: &Uuid,
|
||||
thread_ids: &Vec<Uuid>,
|
||||
) -> Result<HashMap<Uuid, AssetPermissionRole>> {
|
||||
let mut conn = match get_pg_pool().get().await {
|
||||
Ok(conn) => conn,
|
||||
Err(e) => {
|
||||
tracing::error!("Error getting pg connection: {}", e);
|
||||
return Err(anyhow!("Error getting pg connection: {}", e));
|
||||
}
|
||||
};
|
||||
|
||||
let permissions = match asset_permissions::table
|
||||
.left_join(
|
||||
teams_to_users::table.on(asset_permissions::identity_id.eq(teams_to_users::team_id)),
|
||||
)
|
||||
.select((asset_permissions::asset_id, asset_permissions::role))
|
||||
.filter(
|
||||
asset_permissions::identity_id
|
||||
.eq(&user_id)
|
||||
.or(teams_to_users::user_id.eq(&user_id)),
|
||||
)
|
||||
.filter(asset_permissions::asset_id.eq_any(thread_ids))
|
||||
.filter(asset_permissions::deleted_at.is_null())
|
||||
.load::<(Uuid, AssetPermissionRole)>(&mut conn)
|
||||
.await
|
||||
{
|
||||
Ok(permissions) => permissions,
|
||||
Err(diesel::result::Error::NotFound) => {
|
||||
tracing::error!("thread not found");
|
||||
return Err(anyhow!("thread not found"));
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error querying thread by ID: {}", e);
|
||||
return Err(anyhow!("Error querying thread by ID: {}", e));
|
||||
}
|
||||
};
|
||||
|
||||
let mut permission_map: HashMap<Uuid, AssetPermissionRole> = HashMap::new();
|
||||
|
||||
for (asset_id, role) in permissions {
|
||||
permission_map
|
||||
.entry(asset_id)
|
||||
.and_modify(|e| *e = AssetPermissionRole::max(e.clone(), role.clone()))
|
||||
.or_insert(role);
|
||||
}
|
||||
|
||||
Ok(permission_map)
|
||||
}
|
||||
|
||||
async fn get_thread_and_check_permissions(
|
||||
user_id: Arc<Uuid>,
|
||||
thread_id: Arc<Uuid>,
|
||||
|
@ -759,7 +703,7 @@ async fn get_thread_messages(
|
|||
None
|
||||
};
|
||||
|
||||
let sql_evaluation = if let Some(context) = &message.context {
|
||||
let _sql_evaluation = if let Some(context) = &message.context {
|
||||
match context.get("sql_evaluation") {
|
||||
Some(sql_evaluation) => Some(sql_evaluation.clone()),
|
||||
None => None,
|
||||
|
@ -857,198 +801,6 @@ async fn get_thread_dashboards(thread_id: Arc<Uuid>) -> Result<Vec<DashboardName
|
|||
Ok(dashboards)
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DataObject {
|
||||
pub data: Vec<IndexMap<String, DataType>>,
|
||||
pub data_metadata: DataMetadataJsonBody,
|
||||
}
|
||||
|
||||
pub async fn fetch_data(sql: &String, dataset_id: &Uuid) -> Result<DataObject> {
|
||||
let data = match query_engine(&dataset_id, &sql).await {
|
||||
Ok(data) => data,
|
||||
Err(e) => {
|
||||
return Err(anyhow!("Unable to query engine: {}", e));
|
||||
}
|
||||
};
|
||||
|
||||
let data_metadata = match process_data_metadata(&data).await {
|
||||
Ok(data_metadata) => data_metadata,
|
||||
Err(e) => return Err(anyhow!("Unable to process data metadata: {}", e)),
|
||||
};
|
||||
|
||||
Ok(DataObject {
|
||||
data,
|
||||
data_metadata,
|
||||
})
|
||||
}
|
||||
|
||||
async fn process_data_metadata(
|
||||
data: &Vec<IndexMap<String, DataType>>,
|
||||
) -> Result<DataMetadataJsonBody> {
|
||||
if data.is_empty() {
|
||||
return Ok(DataMetadataJsonBody {
|
||||
column_count: 0,
|
||||
row_count: 0,
|
||||
column_metadata: vec![],
|
||||
});
|
||||
}
|
||||
|
||||
let first_row = &data[0];
|
||||
let columns: Vec<_> = first_row.keys().cloned().collect();
|
||||
|
||||
let column_metadata: Vec<_> = columns
|
||||
.par_iter() // Parallel iterator
|
||||
.map(|column_name| {
|
||||
let mut unique_values = Vec::with_capacity(MAX_UNIQUE_VALUES);
|
||||
let mut min_value = None;
|
||||
let mut max_value = None;
|
||||
let mut unique_values_exceeded = false;
|
||||
let mut is_date_type = false;
|
||||
let mut min_value_str: Option<String> = None;
|
||||
let mut max_value_str: Option<String> = None;
|
||||
|
||||
for row in data {
|
||||
if let Some(value) = row.get(column_name) {
|
||||
if !unique_values_exceeded && unique_values.len() < MAX_UNIQUE_VALUES {
|
||||
if !unique_values.iter().any(|x| x == value) {
|
||||
unique_values.push(value.clone());
|
||||
}
|
||||
} else {
|
||||
unique_values_exceeded = true;
|
||||
}
|
||||
|
||||
// Update min/max for numeric types
|
||||
match value {
|
||||
DataType::Int8(Some(n)) => {
|
||||
let n = *n as f64;
|
||||
min_value = Some(min_value.map_or(n, |min: f64| min.min(n)));
|
||||
max_value = Some(max_value.map_or(n, |max: f64| max.max(n)));
|
||||
}
|
||||
DataType::Int4(Some(n)) => {
|
||||
let n = *n as f64;
|
||||
min_value = Some(min_value.map_or(n, |min: f64| min.min(n)));
|
||||
max_value = Some(max_value.map_or(n, |max: f64| max.max(n)));
|
||||
}
|
||||
DataType::Int2(Some(n)) => {
|
||||
let n = *n as f64;
|
||||
min_value = Some(min_value.map_or(n, |min: f64| min.min(n)));
|
||||
max_value = Some(max_value.map_or(n, |max: f64| max.max(n)));
|
||||
}
|
||||
DataType::Float4(Some(n)) => {
|
||||
let n = *n as f64;
|
||||
min_value = Some(min_value.map_or(n, |min: f64| min.min(n)));
|
||||
max_value = Some(max_value.map_or(n, |max: f64| max.max(n)));
|
||||
}
|
||||
DataType::Float8(Some(n)) => {
|
||||
let n = *n as f64;
|
||||
min_value = Some(min_value.map_or(n, |min: f64| min.min(n)));
|
||||
max_value = Some(max_value.map_or(n, |max: f64| max.max(n)));
|
||||
}
|
||||
DataType::Date(Some(date)) => {
|
||||
is_date_type = true;
|
||||
let date_str = date.to_string();
|
||||
min_value = match min_value {
|
||||
None => Some(date_str.parse::<f64>().unwrap_or(0.0)),
|
||||
Some(_) => None, // Clear numeric min/max since we'll use strings
|
||||
};
|
||||
max_value = None;
|
||||
if let Some(current_min) = &min_value_str {
|
||||
if date_str < *current_min {
|
||||
min_value_str = Some(date_str.clone());
|
||||
}
|
||||
} else {
|
||||
min_value_str = Some(date_str.clone());
|
||||
}
|
||||
if let Some(current_max) = &max_value_str {
|
||||
if date_str > *current_max {
|
||||
max_value_str = Some(date_str);
|
||||
}
|
||||
} else {
|
||||
max_value_str = Some(date_str);
|
||||
}
|
||||
}
|
||||
DataType::Timestamp(Some(ts)) => {
|
||||
is_date_type = true;
|
||||
let ts_str = ts.to_string();
|
||||
min_value = match min_value {
|
||||
None => Some(ts_str.parse::<f64>().unwrap_or(0.0)),
|
||||
Some(_) => None,
|
||||
};
|
||||
max_value = None;
|
||||
if let Some(current_min) = &min_value_str {
|
||||
if ts_str < *current_min {
|
||||
min_value_str = Some(ts_str.clone());
|
||||
}
|
||||
} else {
|
||||
min_value_str = Some(ts_str.clone());
|
||||
}
|
||||
if let Some(current_max) = &max_value_str {
|
||||
if ts_str > *current_max {
|
||||
max_value_str = Some(ts_str);
|
||||
}
|
||||
} else {
|
||||
max_value_str = Some(ts_str);
|
||||
}
|
||||
}
|
||||
DataType::Timestamptz(Some(ts)) => {
|
||||
is_date_type = true;
|
||||
let ts_str = ts.naive_utc().to_string();
|
||||
min_value = match min_value {
|
||||
None => Some(ts_str.parse::<f64>().unwrap_or(0.0)),
|
||||
Some(_) => None,
|
||||
};
|
||||
max_value = None;
|
||||
if let Some(current_min) = &min_value_str {
|
||||
if ts_str < *current_min {
|
||||
min_value_str = Some(ts_str.clone());
|
||||
}
|
||||
} else {
|
||||
min_value_str = Some(ts_str.clone());
|
||||
}
|
||||
if let Some(current_max) = &max_value_str {
|
||||
if ts_str > *current_max {
|
||||
max_value_str = Some(ts_str);
|
||||
}
|
||||
} else {
|
||||
max_value_str = Some(ts_str);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let column_type = first_row.get(column_name).unwrap();
|
||||
ColumnMetadata {
|
||||
name: column_name.clone(),
|
||||
type_: column_type.to_string(),
|
||||
simple_type: column_type.simple_type(),
|
||||
unique_values: if !unique_values_exceeded {
|
||||
unique_values.len() as i32
|
||||
} else {
|
||||
MAX_UNIQUE_VALUES as i32
|
||||
},
|
||||
min_value: if is_date_type {
|
||||
min_value_str.map(MinMaxValue::String)
|
||||
} else {
|
||||
min_value.map(MinMaxValue::Number)
|
||||
},
|
||||
max_value: if is_date_type {
|
||||
max_value_str.map(MinMaxValue::String)
|
||||
} else {
|
||||
max_value.map(MinMaxValue::Number)
|
||||
},
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(DataMetadataJsonBody {
|
||||
column_count: first_row.len() as i32,
|
||||
row_count: data.len() as i32,
|
||||
column_metadata,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn check_if_thread_saved(thread_id: &Uuid) -> Result<bool> {
|
||||
let threads_to_collections_id = thread_id.clone();
|
||||
|
||||
|
|
|
@ -100,7 +100,7 @@ pub async fn threads_router(
|
|||
ThreadRoute::Post => {
|
||||
let req = serde_json::from_value(data)?;
|
||||
|
||||
post_thread(subscriptions, user_group, user, req).await?;
|
||||
post_thread(user, req).await?;
|
||||
}
|
||||
ThreadRoute::Update => {
|
||||
let req = serde_json::from_value(data)?;
|
||||
|
|
|
@ -268,7 +268,7 @@ async fn update_thread_record(
|
|||
save_to_dashboard: Option<Uuid>,
|
||||
remove_from_dashboard: Option<Uuid>,
|
||||
) -> Result<()> {
|
||||
let password_secret_id = match public_password {
|
||||
let _password_secret_id = match public_password {
|
||||
Some(Some(password)) => {
|
||||
// Password provided - create new secret
|
||||
match create_secret(&thread_id, &password).await {
|
||||
|
|
Loading…
Reference in New Issue