deprecated old threads and messages table

This commit is contained in:
dal 2025-01-28 12:18:59 -07:00
parent 22d75ae0b6
commit 6a73b59aa1
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
19 changed files with 209 additions and 210 deletions

View File

@ -292,10 +292,9 @@ pub struct User {
AsChangeset,
sqlx::FromRow,
)]
#[diesel(belongs_to(Thread))]
#[diesel(belongs_to(User, foreign_key = sent_by))]
#[diesel(belongs_to(Dataset))]
#[diesel(table_name = messages)]
#[diesel(table_name = messages_deprecated)]
pub struct Message {
pub id: Uuid,
pub thread_id: Uuid,
@ -380,8 +379,8 @@ pub struct Term {
AsChangeset,
)]
#[diesel(belongs_to(User, foreign_key = created_by, foreign_key = updated_by))]
#[diesel(table_name = threads)]
pub struct Thread {
#[diesel(table_name = threads_deprecated)]
pub struct ThreadDeprecated {
pub id: Uuid,
pub created_by: Uuid,
pub updated_by: Uuid,
@ -399,7 +398,7 @@ pub struct Thread {
}
#[derive(Queryable, Insertable, Associations, Debug)]
#[diesel(belongs_to(Thread, foreign_key = thread_id))]
#[diesel(belongs_to(ThreadDeprecated, foreign_key = thread_id))]
#[diesel(belongs_to(Dashboard, foreign_key = dashboard_id))]
#[diesel(belongs_to(User, foreign_key = added_by))]
#[diesel(table_name = threads_to_dashboards)]

View File

@ -14,7 +14,7 @@ use crate::database::enums::{AssetPermissionRole, AssetType, UserOrganizationRol
use crate::database::lib::{get_pg_pool, PgPool};
use crate::database::models::User;
use crate::database::schema::{
asset_permissions, collections_to_assets, dashboards, teams_to_users, threads,
asset_permissions, collections_to_assets, dashboards, teams_to_users, threads_deprecated,
threads_to_dashboards, users_to_organizations,
};
use crate::routes::rest::ApiResponse;
@ -93,15 +93,15 @@ async fn get_asset_access_handler(
AssetType::Thread => {
let mut conn = pg_pool.get().await?;
let thread_info = threads::table
let thread_info = threads_deprecated::table
.select((
threads::id,
threads::publicly_accessible,
threads::password_secret_id.is_not_null(),
threads::public_expiry_date,
threads_deprecated::id,
threads_deprecated::publicly_accessible,
threads_deprecated::password_secret_id.is_not_null(),
threads_deprecated::public_expiry_date,
))
.filter(threads::id.eq(&asset_id))
.filter(threads::deleted_at.is_null())
.filter(threads_deprecated::id.eq(&asset_id))
.filter(threads_deprecated::deleted_at.is_null())
.first::<(Uuid, bool, bool, Option<DateTime<Utc>>)>(&mut conn)
.await?;

View File

@ -16,7 +16,7 @@ use crate::{
lib::get_pg_pool,
models::{User, UserToOrganization},
schema::{
data_sources, dataset_groups, dataset_groups_permissions, dataset_permissions, datasets, messages, permission_groups_to_identities, users, users_to_organizations
data_sources, dataset_groups, dataset_groups_permissions, dataset_permissions, datasets, messages_deprecated, permission_groups_to_identities, users, users_to_organizations
},
},
routes::rest::ApiResponse,
@ -168,7 +168,7 @@ async fn get_org_datasets(
let mut query = datasets::table
.inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id)))
.inner_join(users::table.on(datasets::created_by.eq(users::id)))
.left_join(messages::table.on(messages::dataset_id.eq(datasets::id.nullable())))
.left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())))
.select((
datasets::id,
datasets::name,

View File

@ -1,5 +1,5 @@
use crate::database::lib::get_pg_pool;
use crate::database::schema::{datasets, messages};
use crate::database::schema::{datasets, messages_deprecated};
use crate::routes::rest::ApiResponse;
use crate::utils::clients::sentry_utils::send_sentry_error;
use crate::utils::clients::typesense::{
@ -64,10 +64,10 @@ async fn update_record(table: CollectionName, record: Value) -> Result<(), anyho
let organization_id = match table {
CollectionName::Messages => {
match messages::table
match messages_deprecated::table
.inner_join(datasets::table)
.select(datasets::organization_id)
.filter(messages::id.eq(id))
.filter(messages_deprecated::id.eq(id))
.first::<Uuid>(&mut conn)
.await
{

View File

@ -16,8 +16,8 @@ use crate::{
lib::get_pg_pool,
models::Collection,
schema::{
asset_permissions, collections, collections_to_assets, dashboards, messages,
teams_to_users, threads, users,
asset_permissions, collections, collections_to_assets, dashboards, messages_deprecated,
teams_to_users, threads_deprecated, users,
},
},
utils::{
@ -504,15 +504,15 @@ async fn get_thread_assets(collection_id: Arc<Uuid>) -> Result<Option<Vec<Collec
};
let thread_assets = match collections_to_assets::table
.inner_join(threads::table.on(collections_to_assets::asset_id.eq(threads::id)))
.inner_join(users::table.on(threads::created_by.eq(users::id)))
.inner_join(messages::table.on(threads::state_message_id.eq(messages::id.nullable())))
.inner_join(threads_deprecated::table.on(collections_to_assets::asset_id.eq(threads_deprecated::id)))
.inner_join(users::table.on(threads_deprecated::created_by.eq(users::id)))
.inner_join(messages_deprecated::table.on(threads_deprecated::state_message_id.eq(messages_deprecated::id.nullable())))
.select((
threads::id,
messages::title.nullable(),
messages::message,
threads::created_at,
threads::updated_at,
threads_deprecated::id,
messages_deprecated::title.nullable(),
messages_deprecated::message,
threads_deprecated::created_at,
threads_deprecated::updated_at,
users::name.nullable(),
users::email,
collections_to_assets::asset_type,

View File

@ -14,7 +14,7 @@ use crate::{
enums::{AssetPermissionRole, AssetType},
lib::get_pg_pool,
models::{Dashboard, Message},
schema::{asset_permissions, dashboards, messages, threads_to_dashboards, users_to_organizations},
schema::{asset_permissions, dashboards, messages_deprecated, threads_to_dashboards, users_to_organizations},
},
utils::{
clients::{sentry_utils::send_sentry_error, supabase_vault::read_secret},
@ -478,15 +478,15 @@ async fn get_dashboard_metrics(dashboard_id: Arc<Uuid>) -> Result<Vec<Metric>> {
}
};
let metric_records = match messages::table
let metric_records = match messages_deprecated::table
.inner_join(
threads_to_dashboards::table
.on(messages::thread_id.eq(threads_to_dashboards::thread_id)),
.on(messages_deprecated::thread_id.eq(threads_to_dashboards::thread_id)),
)
.select((threads_to_dashboards::thread_id, messages::all_columns))
.select((threads_to_dashboards::thread_id, messages_deprecated::all_columns))
.filter(threads_to_dashboards::dashboard_id.eq(dashboard_id.as_ref()))
.filter(messages::deleted_at.is_null())
.filter(messages::draft_session_id.is_null())
.filter(messages_deprecated::deleted_at.is_null())
.filter(messages_deprecated::draft_session_id.is_null())
.filter(threads_to_dashboards::deleted_at.is_null())
.load::<(Uuid, Message)>(&mut conn)
.await

View File

@ -17,7 +17,7 @@ use crate::{
lib::get_pg_pool,
models::{User, UserToOrganization},
schema::{
data_sources, dataset_permissions, datasets, datasets_to_permission_groups, messages,
data_sources, dataset_permissions, datasets, datasets_to_permission_groups, messages_deprecated,
permission_groups_to_identities, permission_groups_to_users, teams_to_users, users,
users_to_organizations,
},
@ -223,7 +223,7 @@ async fn get_user_permissioned_datasets(
.and(teams_to_users::deleted_at.is_null())),
)
.inner_join(users::table.on(datasets::created_by.eq(users::id)))
.left_join(messages::table.on(messages::dataset_id.eq(datasets::id.nullable())))
.left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())))
.select((
datasets::id,
datasets::name,
@ -338,7 +338,7 @@ async fn get_org_datasets(
let mut query = datasets::table
.inner_join(data_sources::table.on(datasets::data_source_id.eq(data_sources::id)))
.inner_join(users::table.on(datasets::created_by.eq(users::id)))
.left_join(messages::table.on(messages::dataset_id.eq(datasets::id.nullable())))
.left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())))
.select((
datasets::id,
datasets::name,
@ -529,7 +529,7 @@ async fn get_restricted_user_datasets(
.inner_join(
dataset_permissions::table.on(dataset_permissions::dataset_id.eq(datasets::id)),
)
.left_join(messages::table.on(messages::dataset_id.eq(datasets::id.nullable())))
.left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())))
.select((
datasets::id,
datasets::name,
@ -619,7 +619,7 @@ async fn get_restricted_user_datasets(
permission_groups_to_users::table
.on(permission_groups_to_users::user_id.eq(user_id)),
)
.left_join(messages::table.on(messages::dataset_id.eq(datasets::id.nullable())))
.left_join(messages_deprecated::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())))
.select((
datasets::id,
datasets::name,

View File

@ -28,7 +28,7 @@ use crate::database::{
enums::{TeamToUserRole, UserOrganizationRole},
models::{PermissionGroup, Team, User},
schema::{
datasets_to_permission_groups, messages, permission_groups,
datasets_to_permission_groups, messages_deprecated, permission_groups,
permission_groups_to_identities, teams, teams_to_users, users, users_to_organizations,
},
};
@ -505,10 +505,10 @@ async fn get_user_and_org_role(user_id: Arc<Uuid>) -> Result<(User, UserToOrgani
async fn get_user_queries_last_30_days(user_id: Arc<Uuid>) -> Result<i64> {
let mut conn = get_pg_pool().get().await?;
let query_count = match messages::table
.select(count(messages::id))
.filter(messages::sent_by.eq(user_id.as_ref()))
.filter(messages::created_at.ge(Utc::now() - chrono::Duration::days(30)))
let query_count = match messages_deprecated::table
.select(count(messages_deprecated::id))
.filter(messages_deprecated::sent_by.eq(user_id.as_ref()))
.filter(messages_deprecated::created_at.ge(Utc::now() - chrono::Duration::days(30)))
.first::<i64>(&mut conn)
.await
{

View File

@ -10,7 +10,7 @@ use crate::{
database::{
lib::get_pg_pool,
models::User,
schema::threads,
schema::threads_deprecated,
},
routes::ws::{
ws::{SubscriptionRwLock, WsErrorCode, WsEvent, WsResponseMessage, WsSendMethod},
@ -141,9 +141,9 @@ async fn delete_thread_handler(
let id = id.clone();
let delete_thread_task = tokio::task::spawn(async move {
match update(threads::table)
.filter(threads::id.eq(&id))
.set(threads::deleted_at.eq(chrono::Utc::now()))
match update(threads_deprecated::table)
.filter(threads_deprecated::id.eq(&id))
.set(threads_deprecated::deleted_at.eq(chrono::Utc::now()))
.execute(&mut conn)
.await
{

View File

@ -11,7 +11,7 @@ use crate::{
enums::{AssetPermissionRole, AssetType, IdentityType},
lib::{get_pg_pool, FetchingData, StepProgress},
models::{AssetPermission, Message, User},
schema::{asset_permissions, messages, threads},
schema::{asset_permissions, messages_deprecated, threads_deprecated},
},
routes::ws::{
ws::{SubscriptionRwLock, WsErrorCode, WsEvent, WsResponseMessage, WsSendMethod},
@ -373,7 +373,7 @@ async fn duplicate_thread_handler(
Err(e) => return Err(anyhow!("Error getting pg connection: {}", e)),
};
match insert_into(threads::table)
match insert_into(threads_deprecated::table)
.values(&thread_state.thread)
.execute(&mut conn)
.await
@ -394,7 +394,7 @@ async fn duplicate_thread_handler(
Err(e) => return Err(anyhow!("Error getting pg connection: {}", e)),
};
tokio::spawn(async move {
match insert_into(messages::table)
match insert_into(messages_deprecated::table)
.values(bulk_messages)
.execute(&mut conn)
.await

View File

@ -13,7 +13,7 @@ use crate::{
lib::get_pg_pool,
models::User,
schema::{
asset_permissions, datasets, messages, teams_to_users, threads, users,
asset_permissions, datasets, messages_deprecated, teams_to_users, threads_deprecated, users,
},
},
routes::ws::{
@ -232,16 +232,16 @@ async fn get_user_threads(
}
};
let mut messages_statement = threads::table
let mut messages_statement = threads_deprecated::table
.inner_join(
messages::table.on(messages::id
messages_deprecated::table.on(messages_deprecated::id
.nullable()
.eq(threads::state_message_id.nullable())
.and(messages::deleted_at.is_null())),
.eq(threads_deprecated::state_message_id.nullable())
.and(messages_deprecated::deleted_at.is_null())),
)
.inner_join(datasets::table.on(datasets::id.nullable().eq(messages::dataset_id)))
.inner_join(datasets::table.on(datasets::id.nullable().eq(messages_deprecated::dataset_id)))
.inner_join(
asset_permissions::table.on(threads::id
asset_permissions::table.on(threads_deprecated::id
.eq(asset_permissions::asset_id)
.and(asset_permissions::asset_type.eq(AssetType::Thread))
.and(asset_permissions::deleted_at.is_null())),
@ -252,13 +252,13 @@ async fn get_user_threads(
.and(asset_permissions::identity_type.eq(IdentityType::Team))
.and(asset_permissions::deleted_at.is_null())),
)
.inner_join(users::table.on(users::id.eq(threads::created_by)))
.inner_join(users::table.on(users::id.eq(threads_deprecated::created_by)))
.select((
(
messages::thread_id,
messages::title,
messages::verification,
messages::created_at,
messages_deprecated::thread_id,
messages_deprecated::title,
messages_deprecated::verification,
messages_deprecated::created_at,
),
(users::id, users::name.nullable(), users::email),
(datasets::id, datasets::name),
@ -268,12 +268,12 @@ async fn get_user_threads(
.eq(user_id)
.or(teams_to_users::user_id.eq(user_id)),
)
.filter(threads::deleted_at.is_null())
.filter(messages::deleted_at.is_null())
.filter(messages::draft_session_id.is_null())
.filter(messages::dataset_id.is_not_null())
.filter(messages::code.is_not_null())
.order_by(messages::created_at.desc())
.filter(threads_deprecated::deleted_at.is_null())
.filter(messages_deprecated::deleted_at.is_null())
.filter(messages_deprecated::draft_session_id.is_null())
.filter(messages_deprecated::dataset_id.is_not_null())
.filter(messages_deprecated::code.is_not_null())
.order_by(messages_deprecated::created_at.desc())
.offset(page * page_size)
.limit(page_size)
.into_boxed();
@ -281,7 +281,7 @@ async fn get_user_threads(
if let Some(filters) = filters {
if let Some(verification) = filters.verification {
messages_statement =
messages_statement.filter(messages::verification.eq_any(verification));
messages_statement.filter(messages_deprecated::verification.eq_any(verification));
}
}
@ -317,31 +317,31 @@ async fn get_admin_threads(
> {
let organization_id = get_user_organization_id(user_id).await?;
let mut messages_statement = threads::table
let mut messages_statement = threads_deprecated::table
.inner_join(
messages::table.on(messages::id
messages_deprecated::table.on(messages_deprecated::id
.nullable()
.eq(threads::state_message_id.nullable())),
.eq(threads_deprecated::state_message_id.nullable())),
)
.inner_join(datasets::table.on(datasets::id.nullable().eq(messages::dataset_id)))
.inner_join(users::table.on(users::id.eq(threads::created_by)))
.inner_join(datasets::table.on(datasets::id.nullable().eq(messages_deprecated::dataset_id)))
.inner_join(users::table.on(users::id.eq(threads_deprecated::created_by)))
.select((
(
messages::thread_id,
messages::title,
messages::verification,
messages::created_at,
messages_deprecated::thread_id,
messages_deprecated::title,
messages_deprecated::verification,
messages_deprecated::created_at,
),
(users::id, users::name.nullable(), users::email),
(datasets::id, datasets::name),
))
.filter(threads::organization_id.eq(organization_id))
.filter(threads::deleted_at.is_null())
.filter(messages::deleted_at.is_null())
.filter(messages::draft_session_id.is_null())
.filter(messages::dataset_id.is_not_null())
.filter(messages::code.is_not_null())
.order_by(messages::created_at.desc())
.filter(threads_deprecated::organization_id.eq(organization_id))
.filter(threads_deprecated::deleted_at.is_null())
.filter(messages_deprecated::deleted_at.is_null())
.filter(messages_deprecated::draft_session_id.is_null())
.filter(messages_deprecated::dataset_id.is_not_null())
.filter(messages_deprecated::code.is_not_null())
.order_by(messages_deprecated::created_at.desc())
.offset(page * page_size)
.limit(page_size)
.into_boxed();
@ -349,7 +349,7 @@ async fn get_admin_threads(
if let Some(filters) = filters {
if let Some(verification) = filters.verification {
messages_statement =
messages_statement.filter(messages::verification.eq_any(verification));
messages_statement.filter(messages_deprecated::verification.eq_any(verification));
}
}

View File

@ -5,7 +5,7 @@ use crate::database::{
lib::get_pg_pool,
models::Message,
schema::{
asset_permissions, data_sources, datasets, messages, teams_to_users, threads,
asset_permissions, data_sources, datasets, messages_deprecated, teams_to_users, threads_deprecated,
users_to_organizations,
},
};
@ -104,10 +104,10 @@ async fn get_message_by_id(message_id: Arc<Uuid>) -> Result<Message> {
}
};
let message = match messages::table
.filter(messages::id.eq(message_id.as_ref()))
.filter(messages::deleted_at.is_null())
.select(messages::all_columns)
let message = match messages_deprecated::table
.filter(messages_deprecated::id.eq(message_id.as_ref()))
.filter(messages_deprecated::deleted_at.is_null())
.select(messages_deprecated::all_columns)
.first::<Message>(&mut conn)
.await
{
@ -132,18 +132,18 @@ pub async fn check_public_thread(message_id: Arc<Uuid>) -> Result<bool> {
}
};
match threads::table
match threads_deprecated::table
.inner_join(
messages::table.on(threads::id.eq(messages::thread_id).and(
messages::id.eq(message_id.as_ref()).and(
messages::deleted_at
messages_deprecated::table.on(threads_deprecated::id.eq(messages_deprecated::thread_id).and(
messages_deprecated::id.eq(message_id.as_ref()).and(
messages_deprecated::deleted_at
.is_null()
.and(messages::draft_session_id.is_null()),
.and(messages_deprecated::draft_session_id.is_null()),
),
)),
)
.select((threads::publicly_accessible, threads::public_expiry_date))
.filter(threads::deleted_at.is_null())
.select((threads_deprecated::publicly_accessible, threads_deprecated::public_expiry_date))
.filter(threads_deprecated::deleted_at.is_null())
.first::<(bool, Option<DateTime<Utc>>)>(&mut conn)
.await
{
@ -246,14 +246,14 @@ async fn get_user_asset_role(
.left_join(
teams_to_users::table.on(asset_permissions::identity_id.eq(teams_to_users::team_id)),
)
.inner_join(messages::table.on(asset_permissions::asset_id.eq(messages::thread_id)))
.inner_join(messages_deprecated::table.on(asset_permissions::asset_id.eq(messages_deprecated::thread_id)))
.select(asset_permissions::role)
.filter(
asset_permissions::identity_id
.eq(user_id.as_ref())
.or(teams_to_users::user_id.eq(user_id.as_ref())),
)
.filter(messages::id.eq(message_id.as_ref()))
.filter(messages_deprecated::id.eq(message_id.as_ref()))
.filter(asset_permissions::deleted_at.is_null())
.load::<AssetPermissionRole>(&mut conn)
.await
@ -284,9 +284,9 @@ async fn is_organization_admin_or_owner(user_id: Arc<Uuid>, message_id: Arc<Uuid
.on(users_to_organizations::organization_id.eq(data_sources::organization_id)),
)
.inner_join(datasets::table.on(data_sources::id.eq(datasets::data_source_id)))
.inner_join(messages::table.on(datasets::id.nullable().eq(messages::dataset_id)))
.inner_join(messages_deprecated::table.on(datasets::id.nullable().eq(messages_deprecated::dataset_id)))
.select(users_to_organizations::role)
.filter(messages::id.eq(message_id.as_ref()))
.filter(messages_deprecated::id.eq(message_id.as_ref()))
.filter(users_to_organizations::user_id.eq(user_id.as_ref()))
.first::<UserOrganizationRole>(&mut conn)
.await

View File

@ -5,8 +5,8 @@ use crate::{
models::{AssetPermission, DataSource, Dataset, DatasetColumn, UserToOrganization},
schema::{
asset_permissions, data_sources, dataset_columns, dataset_groups, dataset_groups_permissions,
dataset_permissions, datasets, messages,
permission_groups_to_identities, threads,
dataset_permissions, datasets, messages_deprecated,
permission_groups_to_identities, threads_deprecated,
users_to_organizations,
},
},
@ -55,7 +55,7 @@ use uuid::Uuid;
use crate::{
database::{
enums::Verification,
models::{Message, Thread, User},
models::{Message, ThreadDeprecated, User},
},
routes::ws::{
ws::{WsEvent, WsResponseMessage},
@ -1236,14 +1236,14 @@ async fn follow_up_thread(
messages_to_upsert.push(new_message.clone());
// Perform bulk upsert
match diesel::insert_into(messages::table)
match diesel::insert_into(messages_deprecated::table)
.values(&messages_to_upsert)
.on_conflict(messages::id)
.on_conflict(messages_deprecated::id)
.do_update()
.set((
messages::draft_state.eq(excluded(messages::draft_state)),
messages::deleted_at.eq(excluded(messages::deleted_at)),
messages::updated_at.eq(Utc::now()),
messages_deprecated::draft_state.eq(excluded(messages_deprecated::draft_state)),
messages_deprecated::deleted_at.eq(excluded(messages_deprecated::deleted_at)),
messages_deprecated::updated_at.eq(Utc::now()),
))
.execute(&mut conn)
.await
@ -1366,7 +1366,7 @@ async fn create_thread(
) -> Result<(ThreadState, Message)> {
let message_uuid = Uuid::new_v4();
let thread = Thread {
let thread = ThreadDeprecated {
id: Uuid::new_v4(),
created_by: user.id,
updated_by: user.id,
@ -1422,7 +1422,7 @@ async fn create_thread(
return Err(err);
}
};
match insert_into(threads::table)
match insert_into(threads_deprecated::table)
.values(&thread_insert_body)
.execute(&mut conn)
.await
@ -1474,13 +1474,13 @@ async fn create_thread(
tokio::spawn(async move {
let mut conn = get_pg_pool().get().await?;
match diesel::insert_into(messages::table)
match diesel::insert_into(messages_deprecated::table)
.values(&messages_to_upsert)
.on_conflict(messages::id)
.on_conflict(messages_deprecated::id)
.do_update()
.set((
messages::deleted_at.eq(excluded(messages::deleted_at)),
messages::updated_at.eq(Utc::now()),
messages_deprecated::deleted_at.eq(excluded(messages_deprecated::deleted_at)),
messages_deprecated::updated_at.eq(Utc::now()),
))
.execute(&mut conn)
.await
@ -1664,8 +1664,8 @@ async fn update_thread_and_message(
}
};
match update(threads::table)
.filter(threads::id.eq(&update_thread.id))
match update(threads_deprecated::table)
.filter(threads_deprecated::id.eq(&update_thread.id))
.set(&update_thread)
.execute(&mut conn)
.await
@ -1692,20 +1692,20 @@ async fn update_thread_and_message(
};
// Explicitly specify all fields that need to be updated
match diesel::update(messages::table)
.filter(messages::id.eq(&update_message.id))
match diesel::update(messages_deprecated::table)
.filter(messages_deprecated::id.eq(&update_message.id))
.set((
messages::responses.eq(&update_message.responses),
messages::chart_config.eq(&update_message.chart_config),
messages::data_metadata.eq(&update_message.data_metadata),
messages::code.eq(&update_message.code),
messages::context.eq(&update_message.context),
messages::title.eq(&update_message.title),
messages::summary_question.eq(&update_message.summary_question),
messages::time_frame.eq(&update_message.time_frame),
messages::dataset_id.eq(&update_message.dataset_id),
messages::updated_at.eq(Utc::now()),
messages::sql_evaluation_id.eq(&update_message.sql_evaluation_id),
messages_deprecated::responses.eq(&update_message.responses),
messages_deprecated::chart_config.eq(&update_message.chart_config),
messages_deprecated::data_metadata.eq(&update_message.data_metadata),
messages_deprecated::code.eq(&update_message.code),
messages_deprecated::context.eq(&update_message.context),
messages_deprecated::title.eq(&update_message.title),
messages_deprecated::summary_question.eq(&update_message.summary_question),
messages_deprecated::time_frame.eq(&update_message.time_frame),
messages_deprecated::dataset_id.eq(&update_message.dataset_id),
messages_deprecated::updated_at.eq(Utc::now()),
messages_deprecated::sql_evaluation_id.eq(&update_message.sql_evaluation_id),
))
.execute(&mut conn)
.await

View File

@ -17,10 +17,10 @@ use crate::{
database::{
enums::{AssetPermissionRole, AssetType, UserOrganizationRole},
lib::{get_pg_pool, ColumnMetadata, DataMetadataJsonBody, MinMaxValue, PgPool},
models::{Message, Thread},
models::{Message, ThreadDeprecated},
schema::{
asset_permissions, collections_to_assets, dashboards, data_sources, datasets, messages,
sql_evaluations, teams_to_users, threads, threads_to_dashboards, users,
asset_permissions, collections_to_assets, dashboards, data_sources, datasets, messages_deprecated,
sql_evaluations, teams_to_users, threads_deprecated, threads_to_dashboards, users,
users_to_organizations,
},
},
@ -59,7 +59,7 @@ pub struct DashboardNameAndId {
#[derive(Serialize, Clone)]
pub struct ThreadState {
#[serde(flatten)]
pub thread: Thread,
pub thread: ThreadDeprecated,
pub title: String,
pub messages: Vec<MessageWithUserInfo>,
pub dashboards: Vec<DashboardNameAndId>,
@ -507,9 +507,9 @@ async fn is_organization_admin_or_owner(user_id: Arc<Uuid>, thread_id: Arc<Uuid>
.on(users_to_organizations::organization_id.eq(data_sources::organization_id)),
)
.inner_join(datasets::table.on(data_sources::id.eq(datasets::data_source_id)))
.inner_join(messages::table.on(datasets::id.nullable().eq(messages::dataset_id)))
.inner_join(messages_deprecated::table.on(datasets::id.nullable().eq(messages_deprecated::dataset_id)))
.select(users_to_organizations::role)
.filter(messages::thread_id.eq(thread_id.as_ref()))
.filter(messages_deprecated::thread_id.eq(thread_id.as_ref()))
.filter(users_to_organizations::user_id.eq(user_id.as_ref()))
.first::<UserOrganizationRole>(&mut conn)
.await
@ -587,7 +587,7 @@ pub async fn get_bulk_user_thread_permission(
async fn get_thread_and_check_permissions(
user_id: Arc<Uuid>,
thread_id: Arc<Uuid>,
) -> Result<(Thread, Option<AssetPermissionRole>)> {
) -> Result<(ThreadDeprecated, Option<AssetPermissionRole>)> {
let thread_handler = {
let id = Arc::clone(&thread_id);
tokio::spawn(async move { get_thread_by_id(id).await })
@ -622,7 +622,7 @@ async fn get_thread_and_check_permissions(
Ok((thread, permission))
}
async fn get_thread_by_id(thread_id: Arc<Uuid>) -> Result<Thread> {
async fn get_thread_by_id(thread_id: Arc<Uuid>) -> Result<ThreadDeprecated> {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => {
@ -631,11 +631,11 @@ async fn get_thread_by_id(thread_id: Arc<Uuid>) -> Result<Thread> {
}
};
let thread = match threads::table
.filter(threads::id.eq(thread_id.as_ref()))
.filter(threads::deleted_at.is_null())
.select(threads::all_columns)
.first::<Thread>(&mut conn)
let thread = match threads_deprecated::table
.filter(threads_deprecated::id.eq(thread_id.as_ref()))
.filter(threads_deprecated::deleted_at.is_null())
.select(threads_deprecated::all_columns)
.first::<ThreadDeprecated>(&mut conn)
.await
{
Ok(threads) => threads,
@ -662,15 +662,15 @@ async fn get_thread_messages(
}
};
let mut statement = messages::table
.inner_join(users::table.on(messages::sent_by.eq(users::id)))
.left_join(datasets::table.on(messages::dataset_id.eq(datasets::id.nullable())))
let mut statement = messages_deprecated::table
.inner_join(users::table.on(messages_deprecated::sent_by.eq(users::id)))
.left_join(datasets::table.on(messages_deprecated::dataset_id.eq(datasets::id.nullable())))
.left_join(
sql_evaluations::table
.on(messages::sql_evaluation_id.eq(sql_evaluations::id.nullable())),
.on(messages_deprecated::sql_evaluation_id.eq(sql_evaluations::id.nullable())),
)
.select((
messages::all_columns,
messages_deprecated::all_columns,
sql_evaluations::evaluation_summary.nullable(),
sql_evaluations::score.nullable(),
users::name.nullable(),
@ -678,21 +678,21 @@ async fn get_thread_messages(
users::id,
datasets::name.nullable(),
))
.filter(messages::thread_id.eq(thread_id.as_ref()))
.filter(messages::deleted_at.is_null())
.order(messages::created_at.asc())
.filter(messages_deprecated::thread_id.eq(thread_id.as_ref()))
.filter(messages_deprecated::deleted_at.is_null())
.order(messages_deprecated::created_at.asc())
.into_boxed();
if let Some(draft_session_id) = draft_session_id {
println!("draft_session_id: {:?}", draft_session_id);
statement = statement.filter(
messages::draft_session_id
messages_deprecated::draft_session_id
.eq(draft_session_id)
.or(messages::draft_session_id.is_null()),
.or(messages_deprecated::draft_session_id.is_null()),
);
} else {
println!("draft_session_id is null");
statement = statement.filter(messages::draft_session_id.is_null());
statement = statement.filter(messages_deprecated::draft_session_id.is_null());
}
let message_records = match statement

View File

@ -12,7 +12,7 @@ use crate::{
enums::{AssetPermissionRole, MessageFeedback, Verification},
lib::get_pg_pool,
models::User,
schema::{messages, threads},
schema::{messages_deprecated, threads_deprecated},
},
routes::ws::{
ws::{SubscriptionRwLock, WsErrorCode, WsEvent, WsResponseMessage, WsSendMethod},
@ -57,10 +57,10 @@ pub async fn update_message(
}
};
let thread_id = match threads::table
.select(threads::id)
.inner_join(messages::table.on(threads::id.eq(messages::thread_id)))
.filter(messages::id.eq(&req.id))
let thread_id = match threads_deprecated::table
.select(threads_deprecated::id)
.inner_join(messages_deprecated::table.on(threads_deprecated::id.eq(messages_deprecated::thread_id)))
.filter(messages_deprecated::id.eq(&req.id))
.first::<Uuid>(&mut conn)
.await
{
@ -276,9 +276,9 @@ async fn update_message_handler(
}
};
match update(messages::table)
match update(messages_deprecated::table)
.set(&message)
.filter(messages::id.eq(&message.id))
.filter(messages_deprecated::id.eq(&message.id))
.execute(&mut conn)
.await
{

View File

@ -15,7 +15,7 @@ use crate::{
enums::AssetType,
lib::get_pg_pool,
models::{Message, ThreadToDashboard, User},
schema::{messages, threads, threads_to_dashboards},
schema::{messages_deprecated, threads_deprecated, threads_to_dashboards},
},
routes::ws::{
ws::{SubscriptionRwLock, WsErrorCode, WsEvent, WsResponseMessage, WsSendMethod},
@ -242,7 +242,7 @@ pub async fn update_thread(
}
#[derive(AsChangeset)]
#[diesel(table_name = threads)]
#[diesel(table_name = threads_deprecated)]
pub struct ThreadChangeset {
pub updated_at: DateTime<Utc>,
pub updated_by: Uuid,
@ -317,8 +317,8 @@ async fn update_thread_record(
let thread_id = Arc::clone(&thread_id);
tokio::spawn(async move {
match update(threads::table)
.filter(threads::id.eq(*thread_id))
match update(threads_deprecated::table)
.filter(threads_deprecated::id.eq(*thread_id))
.set(changeset)
.execute(&mut conn)
.await
@ -538,15 +538,15 @@ async fn save_draft_handler(thread_id: Arc<Uuid>, draft_session_id: Uuid) -> Res
}
};
let mut most_recent_message = match messages::table
.select(messages::all_columns)
.filter(messages::thread_id.eq(*thread_id))
let mut most_recent_message = match messages_deprecated::table
.select(messages_deprecated::all_columns)
.filter(messages_deprecated::thread_id.eq(*thread_id))
.filter(
messages::draft_session_id
messages_deprecated::draft_session_id
.eq(&draft_session_id)
.or(messages::draft_session_id.is_null()),
.or(messages_deprecated::draft_session_id.is_null()),
)
.order(messages::created_at.desc())
.order(messages_deprecated::created_at.desc())
.first::<Message>(&mut conn)
.await
{
@ -561,9 +561,9 @@ async fn save_draft_handler(thread_id: Arc<Uuid>, draft_session_id: Uuid) -> Res
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Error getting connection from pool: {}", e)),
};
match update(threads::table)
.filter(threads::id.eq(*thread_id))
.set(threads::state_message_id.eq(most_recent_message.id))
match update(threads_deprecated::table)
.filter(threads_deprecated::id.eq(*thread_id))
.set(threads_deprecated::state_message_id.eq(most_recent_message.id))
.execute(&mut conn)
.await
{
@ -580,10 +580,10 @@ async fn save_draft_handler(thread_id: Arc<Uuid>, draft_session_id: Uuid) -> Res
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Error getting connection from pool: {}", e)),
};
match update(messages::table)
.filter(messages::thread_id.eq(*thread_id))
.filter(messages::draft_session_id.eq(Some(&draft_session_id)))
.set(messages::draft_session_id.eq(None::<Uuid>))
match update(messages_deprecated::table)
.filter(messages_deprecated::thread_id.eq(*thread_id))
.filter(messages_deprecated::draft_session_id.eq(Some(&draft_session_id)))
.set(messages_deprecated::draft_session_id.eq(None::<Uuid>))
.execute(&mut conn)
.await
{
@ -609,8 +609,8 @@ async fn save_draft_handler(thread_id: Arc<Uuid>, draft_session_id: Uuid) -> Res
};
tokio::spawn(async move {
match update(messages::table)
.filter(messages::id.eq(most_recent_message.id))
match update(messages_deprecated::table)
.filter(messages_deprecated::id.eq(most_recent_message.id))
.set(most_recent_message)
.execute(&mut conn)
.await

View File

@ -10,7 +10,7 @@ use crate::database::{
enums::AssetType,
lib::get_pg_pool,
models::{User, UserFavorite},
schema::{collections, collections_to_assets, dashboards, messages, threads, user_favorites},
schema::{collections, collections_to_assets, dashboards, messages_deprecated, threads_deprecated, user_favorites},
};
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
@ -168,15 +168,15 @@ async fn get_favorite_threads(thread_ids: Arc<Vec<Uuid>>) -> Result<Vec<Favorite
Err(e) => return Err(anyhow!("Error getting connection from pool: {:?}", e)),
};
let thread_records: Vec<(Uuid, Option<String>)> = match threads::table
.inner_join(messages::table.on(threads::id.eq(messages::thread_id)))
.select((threads::id, messages::title))
.filter(threads::id.eq_any(thread_ids.as_ref()))
.filter(threads::deleted_at.is_null())
.filter(messages::deleted_at.is_null())
.filter(messages::draft_session_id.is_null())
.distinct_on(threads::id)
.order((threads::id, messages::created_at.desc()))
let thread_records: Vec<(Uuid, Option<String>)> = match threads_deprecated::table
.inner_join(messages_deprecated::table.on(threads_deprecated::id.eq(messages_deprecated::thread_id)))
.select((threads_deprecated::id, messages_deprecated::title))
.filter(threads_deprecated::id.eq_any(thread_ids.as_ref()))
.filter(threads_deprecated::deleted_at.is_null())
.filter(messages_deprecated::deleted_at.is_null())
.filter(messages_deprecated::draft_session_id.is_null())
.distinct_on(threads_deprecated::id)
.order((threads_deprecated::id, messages_deprecated::created_at.desc()))
.load::<(Uuid, Option<String>)>(&mut conn)
.await
{
@ -382,24 +382,24 @@ async fn get_threads_from_collections(
Err(e) => return Err(anyhow!("Error getting connection from pool: {:?}", e)),
};
let threads_records: Vec<(Uuid, Uuid, Option<String>)> = match threads::table
let threads_records: Vec<(Uuid, Uuid, Option<String>)> = match threads_deprecated::table
.inner_join(
collections_to_assets::table.on(threads::id.eq(collections_to_assets::asset_id)),
collections_to_assets::table.on(threads_deprecated::id.eq(collections_to_assets::asset_id)),
)
.inner_join(messages::table.on(threads::id.eq(messages::thread_id)))
.inner_join(messages_deprecated::table.on(threads_deprecated::id.eq(messages_deprecated::thread_id)))
.select((
collections_to_assets::collection_id,
threads::id,
messages::title,
threads_deprecated::id,
messages_deprecated::title,
))
.filter(collections_to_assets::asset_type.eq(AssetType::Thread))
.filter(collections_to_assets::collection_id.eq_any(collection_ids))
.filter(threads::deleted_at.is_null())
.filter(threads_deprecated::deleted_at.is_null())
.filter(collections_to_assets::deleted_at.is_null())
.filter(messages::deleted_at.is_null())
.filter(messages::draft_session_id.is_null())
.order((threads::id, messages::created_at.desc()))
.distinct_on(threads::id)
.filter(messages_deprecated::deleted_at.is_null())
.filter(messages_deprecated::draft_session_id.is_null())
.order((threads_deprecated::id, messages_deprecated::created_at.desc()))
.distinct_on(threads_deprecated::id)
.load::<(Uuid, Uuid, Option<String>)>(&mut conn)
.await
{

View File

@ -24,8 +24,8 @@ use crate::{
lib::get_pg_pool,
models::{AssetPermission, CollectionToAsset, User},
schema::{
asset_permissions, collections, collections_to_assets, dashboards, messages,
organizations, teams, teams_to_users, threads, user_favorites, users,
asset_permissions, collections, collections_to_assets, dashboards, messages_deprecated,
organizations, teams, teams_to_users, threads_deprecated, user_favorites, users,
},
},
utils::clients::{
@ -658,11 +658,11 @@ async fn get_asset_name(asset_id: Arc<Uuid>, asset_type: AssetType) -> Result<St
}
}
AssetType::Thread => {
match messages::table
.inner_join(threads::table.on(messages::thread_id.eq(threads::id)))
.filter(threads::id.eq(asset_id.as_ref()))
.select(messages::title.nullable())
.order(messages::created_at.desc())
match messages_deprecated::table
.inner_join(threads_deprecated::table.on(messages_deprecated::thread_id.eq(threads_deprecated::id)))
.filter(threads_deprecated::id.eq(asset_id.as_ref()))
.select(messages_deprecated::title.nullable())
.order(messages_deprecated::created_at.desc())
.first::<Option<String>>(&mut conn)
.await
{

View File

@ -27,7 +27,7 @@ pub struct SearchDataCatalogTool;
#[async_trait]
impl ToolExecutor for SearchDataCatalogTool {
async fn execute(&self, tool_call: &ToolCall) -> Result<Value> {
let params: SearchDataCatalogParams = serde_json::from_value(tool_call.function.arguments.clone())?;
let params: SearchDataCatalogParams = serde_json::from_str(&tool_call.function.arguments.clone())?;
// TODO: Implement actual data catalog search logic
Ok(Value::Array(vec![]))
}