Merge pull request #212 from buster-so/evals

buster email system fixes
This commit is contained in:
dal 2025-04-22 08:18:41 -07:00 committed by GitHub
commit 68aa7449d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 269 additions and 47 deletions

View File

@ -630,7 +630,6 @@ diesel::joinable!(api_keys -> organizations (organization_id));
diesel::joinable!(api_keys -> users (owner_id)); diesel::joinable!(api_keys -> users (owner_id));
diesel::joinable!(chats -> organizations (organization_id)); diesel::joinable!(chats -> organizations (organization_id));
diesel::joinable!(collections -> organizations (organization_id)); diesel::joinable!(collections -> organizations (organization_id));
diesel::joinable!(dashboard_files -> users (publicly_enabled_by));
diesel::joinable!(dashboard_versions -> dashboards (dashboard_id)); diesel::joinable!(dashboard_versions -> dashboards (dashboard_id));
diesel::joinable!(dashboards -> organizations (organization_id)); diesel::joinable!(dashboards -> organizations (organization_id));
diesel::joinable!(data_sources -> organizations (organization_id)); diesel::joinable!(data_sources -> organizations (organization_id));
@ -648,11 +647,10 @@ diesel::joinable!(datasets_to_permission_groups -> permission_groups (permission
diesel::joinable!(messages -> chats (chat_id)); diesel::joinable!(messages -> chats (chat_id));
diesel::joinable!(messages -> users (created_by)); diesel::joinable!(messages -> users (created_by));
diesel::joinable!(messages_deprecated -> datasets (dataset_id)); diesel::joinable!(messages_deprecated -> datasets (dataset_id));
diesel::joinable!(messages_deprecated -> users (sent_by));
diesel::joinable!(messages_to_files -> messages (message_id)); diesel::joinable!(messages_to_files -> messages (message_id));
diesel::joinable!(metric_files -> users (publicly_enabled_by));
diesel::joinable!(metric_files_to_dashboard_files -> dashboard_files (dashboard_file_id)); diesel::joinable!(metric_files_to_dashboard_files -> dashboard_files (dashboard_file_id));
diesel::joinable!(metric_files_to_dashboard_files -> metric_files (metric_file_id)); diesel::joinable!(metric_files_to_dashboard_files -> metric_files (metric_file_id));
diesel::joinable!(metric_files_to_dashboard_files -> users (created_by));
diesel::joinable!(permission_groups -> organizations (organization_id)); diesel::joinable!(permission_groups -> organizations (organization_id));
diesel::joinable!(permission_groups_to_users -> permission_groups (permission_group_id)); diesel::joinable!(permission_groups_to_users -> permission_groups (permission_group_id));
diesel::joinable!(permission_groups_to_users -> users (user_id)); diesel::joinable!(permission_groups_to_users -> users (user_id));

View File

@ -28,6 +28,7 @@ query_engine = { path = "../query_engine" }
middleware = { path = "../middleware" } middleware = { path = "../middleware" }
sharing = { path = "../sharing" } sharing = { path = "../sharing" }
search = { path = "../search" } search = { path = "../search" }
email = { path = "../email" }
# Add any handler-specific dependencies here # Add any handler-specific dependencies here
dashmap = "5.5.3" dashmap = "5.5.3"

View File

@ -3,104 +3,157 @@ use chrono::Utc;
use database::{ use database::{
self, self,
enums::{SharingSetting, UserOrganizationRole, UserOrganizationStatus}, enums::{SharingSetting, UserOrganizationRole, UserOrganizationStatus},
models::{User, UserToOrganization}, models::{Organization, User, UserToOrganization},
pool::get_pg_pool, pool::get_pg_pool,
schema::{users, users_to_organizations}, schema::{organizations, users, users_to_organizations},
}; };
use diesel_async::{AsyncPgConnection, RunQueryDsl}; use diesel::prelude::*;
use diesel_async::RunQueryDsl;
use email::{send_email, EmailType, InviteToBuster};
use middleware::AuthenticatedUser; use middleware::AuthenticatedUser;
use serde_json::json; use serde_json::json;
use std::collections::HashSet;
use tracing;
use uuid::Uuid; use uuid::Uuid;
/// Invites multiple users by creating user records and adding them to the inviter's organization. /// Invites multiple users by creating user records, adding them to the inviter's organization,
/// This function requires an active database transaction. /// and sending an invitation email.
pub async fn invite_user_handler( pub async fn invite_user_handler(
inviting_user: &AuthenticatedUser, inviting_user: &AuthenticatedUser,
emails: Vec<String>, emails: Vec<String>,
) -> Result<()> { ) -> Result<()> {
let pool = get_pg_pool();
let mut conn = pool
.get()
.await
.context("Failed to get database connection")?;
// Fetch inviter details
let inviter = users::table
.find(inviting_user.id)
.select(User::as_select())
.first::<User>(&mut conn)
.await
.context("Failed to find inviting user")?;
let inviter_name = inviter.name.unwrap_or_else(|| inviter.email.clone()); // Use email if name is None
let organization_id = inviting_user let organization_id = inviting_user
.organizations .organizations
.get(0) // Accessing the vector of organizations .get(0)
.map(|m| m.id) // Use .id as confirmed by search .map(|m| m.id)
.context("Inviting user is not associated with any organization")?; .context("Inviting user is not associated with any organization")?;
let inviter_id = inviting_user.id; // For created_by/updated_by // Fetch organization details
let organization = organizations::table
.first::<Organization>(&mut conn)
.await
.context("Failed to find organization")?;
let organization_name = organization.name;
let inviter_id = inviting_user.id;
let now = Utc::now(); let now = Utc::now();
let mut successful_emails: Vec<String> = Vec::new();
for email in emails { for email in emails {
// Use a separate connection for each user to avoid holding one connection for the whole loop
let mut user_conn = match pool.get().await {
Ok(conn) => conn,
Err(e) => {
tracing::error!(error = %e, email = %email, "Failed to get DB connection for inviting user");
continue; // Skip this user if connection fails
}
};
// 1. Generate ID and construct attributes first // 1. Generate ID and construct attributes first
let new_user_id = Uuid::new_v4(); let new_user_id = Uuid::new_v4();
let user_email = email.clone(); // Clone email for ownership let user_email = email.clone();
let assigned_role = UserOrganizationRole::RestrictedQuerier; let assigned_role = UserOrganizationRole::RestrictedQuerier;
let user_attributes = json!({ let user_attributes = json!({
"user_id": new_user_id.to_string(), "user_id": new_user_id.to_string(),
"user_email": user_email, "user_email": user_email,
"organization_id": organization_id.to_string(), "organization_id": organization_id.to_string(),
"organization_role": format!("{:?}", assigned_role) // Use variable for role "organization_role": format!("{:?}", assigned_role)
}); });
// 2. Create User struct instance using the generated ID and attributes // 2. Create User struct instance
let user_to_insert = User { let user_to_insert = User {
id: new_user_id, // Use the generated ID id: new_user_id,
email: email.clone(), // Use the original email variable again or the cloned one email: email.clone(),
name: None, name: None,
config: json!({}), config: json!({}),
created_at: now, created_at: now,
updated_at: now, updated_at: now,
attributes: user_attributes, // Use the constructed attributes attributes: user_attributes,
avatar_url: None, avatar_url: None,
}; };
let mut conn = match get_pg_pool().get().await {
Ok(mut conn) => conn,
Err(e) => {
return Err(e.into());
}
};
// 3. Insert user // 3. Insert user
match diesel::insert_into(users::table) let user_insert_result = diesel::insert_into(users::table)
.values(&user_to_insert) .values(&user_to_insert)
.execute(&mut conn) .execute(&mut user_conn)
.await .await;
{
Ok(_) => (), if let Err(e) = user_insert_result {
Err(e) => { tracing::error!(error = %e, email = %email, "Failed to insert user record");
return Err(e.into()); continue; // Skip this user if insertion fails
} }
};
// 4. Create UserToOrganization struct instance // 4. Create UserToOrganization struct instance
let user_org_to_insert = UserToOrganization { let user_org_to_insert = UserToOrganization {
user_id: new_user_id, // Use the generated ID user_id: new_user_id,
organization_id, organization_id,
role: assigned_role, // Use the role variable role: assigned_role,
sharing_setting: SharingSetting::None, // Default setting sharing_setting: SharingSetting::None,
edit_sql: false, // Default permission edit_sql: false,
upload_csv: false, // Default permission upload_csv: false,
export_assets: false, // Default permission export_assets: false,
email_slack_enabled: false, // Default setting email_slack_enabled: false,
created_at: now, created_at: now,
updated_at: now, updated_at: now,
deleted_at: None, deleted_at: None,
created_by: inviter_id, created_by: inviter_id,
updated_by: inviter_id, updated_by: inviter_id,
deleted_by: None, deleted_by: None,
status: UserOrganizationStatus::Active, // Default status status: UserOrganizationStatus::Active,
}; };
// 5. Insert user organization mapping // 5. Insert user organization mapping
match diesel::insert_into(users_to_organizations::table) let user_org_insert_result = diesel::insert_into(users_to_organizations::table)
.values(&user_org_to_insert) .values(&user_org_to_insert)
.execute(&mut conn) .execute(&mut user_conn)
.await .await;
{
Ok(_) => (), match user_org_insert_result {
Ok(_) => {
// Only add email if both inserts were successful
successful_emails.push(email.clone());
}
Err(e) => { Err(e) => {
return Err(e.into()); tracing::error!(error = %e, email = %email, user_id = %new_user_id, "Failed to insert user_to_organization record");
// Consider rolling back the user insert here if desired, although it's complex without a transaction per user.
// For now, we just log and don't add the email to the success list.
} }
}; };
} }
// 6. Send batch email if there were any successful invites
if !successful_emails.is_empty() {
let invite_details = InviteToBuster {
inviter_name, // Use fetched inviter name
organization_name, // Use fetched organization name
};
let emails_set: HashSet<String> = successful_emails.into_iter().collect();
if let Err(e) = send_email(emails_set, EmailType::InviteToBuster(invite_details)).await {
// Log the error but don't fail the entire handler,
// as the core user creation logic succeeded.
tracing::error!(error = %e, "Failed to send invitation emails");
// Optionally, return a specific error or warning here if needed.
} else {
tracing::info!("Successfully sent invitation emails");
}
}
Ok(()) Ok(())
} }

View File

@ -0,0 +1,84 @@
-- This file should undo anything in `up.sql`
-- Recreate the helper function to revert the changes
CREATE OR REPLACE FUNCTION alter_fk_on_update(
p_table_name TEXT,
p_column_name TEXT,
p_foreign_table_name TEXT,
p_foreign_column_name TEXT,
p_on_update_action TEXT -- 'CASCADE' or 'NO ACTION'
)
RETURNS VOID AS $$
DECLARE
v_constraint_name TEXT;
BEGIN
-- Find the existing constraint name (should now be the one added by up.sql)
SELECT conname
INTO v_constraint_name
FROM pg_constraint
WHERE conrelid = p_table_name::regclass
AND conname = p_table_name || '_' || p_column_name || '_fkey' -- Match exact name from up.sql
AND confrelid = p_foreign_table_name::regclass
AND contype = 'f'
AND p_column_name = ANY(SELECT attname FROM pg_attribute WHERE attrelid = conrelid AND attnum = ANY(conkey))
LIMIT 1;
-- If constraint exists, drop it
IF v_constraint_name IS NOT NULL THEN
EXECUTE 'ALTER TABLE ' || quote_ident(p_table_name) || ' DROP CONSTRAINT ' || quote_ident(v_constraint_name);
END IF;
-- Add the constraint back with the specified (original) ON UPDATE action
EXECUTE 'ALTER TABLE ' || quote_ident(p_table_name) ||
' ADD CONSTRAINT ' || quote_ident(p_table_name || '_' || p_column_name || '_fkey') || -- Re-add with standard name
' FOREIGN KEY (' || quote_ident(p_column_name) || ')' ||
' REFERENCES ' || quote_ident(p_foreign_table_name) || '(' || quote_ident(p_foreign_column_name) || ')' ||
' ON UPDATE ' || p_on_update_action || ' ON DELETE NO ACTION';
END;
$$ LANGUAGE plpgsql;
-- Revert all foreign keys to ON UPDATE NO ACTION (the default)
SELECT alter_fk_on_update('api_keys', 'owner_id', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('asset_permissions', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('asset_permissions', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('chats', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('chats', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('chats', 'publicly_enabled_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('collections', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('collections', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('collections_to_assets', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('collections_to_assets', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('dashboard_files', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('dashboard_files', 'publicly_enabled_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('dashboards', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('dashboards', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('data_sources', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('data_sources', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('datasets', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('datasets', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('messages', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('messages_deprecated', 'sent_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('metric_files', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('metric_files', 'publicly_enabled_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('metric_files_to_dashboard_files', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('permission_groups', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('permission_groups', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('permission_groups_to_identities', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('permission_groups_to_identities', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('permission_groups_to_users', 'user_id', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('teams', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('teams_to_users', 'user_id', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('terms', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('terms', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('threads_deprecated', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('threads_deprecated', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('threads_deprecated', 'publicly_enabled_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('threads_to_dashboards', 'added_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('user_favorites', 'user_id', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('users_to_organizations', 'user_id', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('users_to_organizations', 'created_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('users_to_organizations', 'updated_by', 'users', 'id', 'NO ACTION');
SELECT alter_fk_on_update('users_to_organizations', 'deleted_by', 'users', 'id', 'NO ACTION');
-- Drop the helper function
DROP FUNCTION alter_fk_on_update(TEXT, TEXT, TEXT, TEXT, TEXT);

View File

@ -0,0 +1,86 @@
-- Your SQL goes here
-- Function to safely drop and add foreign key constraints
-- We need this because constraint names might vary slightly depending on how they were created
-- or if they were manually named. This function finds the constraint by table and column.
CREATE OR REPLACE FUNCTION alter_fk_on_update(
p_table_name TEXT,
p_column_name TEXT,
p_foreign_table_name TEXT,
p_foreign_column_name TEXT,
p_on_update_action TEXT -- 'CASCADE' or 'NO ACTION'
)
RETURNS VOID AS $$
DECLARE
v_constraint_name TEXT;
BEGIN
-- Find the existing constraint name
SELECT conname
INTO v_constraint_name
FROM pg_constraint
WHERE conrelid = p_table_name::regclass
AND conname LIKE p_table_name || '_' || p_column_name || '_fkey%' -- Handle potential suffix variations
AND confrelid = p_foreign_table_name::regclass
AND contype = 'f'
AND p_column_name = ANY(SELECT attname FROM pg_attribute WHERE attrelid = conrelid AND attnum = ANY(conkey))
LIMIT 1;
-- If constraint exists, drop it
IF v_constraint_name IS NOT NULL THEN
EXECUTE 'ALTER TABLE ' || quote_ident(p_table_name) || ' DROP CONSTRAINT ' || quote_ident(v_constraint_name);
END IF;
-- Add the new constraint with the specified ON UPDATE action
EXECUTE 'ALTER TABLE ' || quote_ident(p_table_name) ||
' ADD CONSTRAINT ' || quote_ident(p_table_name || '_' || p_column_name || '_fkey') ||
' FOREIGN KEY (' || quote_ident(p_column_name) || ')' ||
' REFERENCES ' || quote_ident(p_foreign_table_name) || '(' || quote_ident(p_foreign_column_name) || ')' ||
' ON UPDATE ' || p_on_update_action || ' ON DELETE NO ACTION'; -- Assuming default ON DELETE NO ACTION
END;
$$ LANGUAGE plpgsql;
-- Apply ON UPDATE CASCADE to all identified foreign keys referencing users.id
SELECT alter_fk_on_update('api_keys', 'owner_id', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('asset_permissions', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('asset_permissions', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('chats', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('chats', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('chats', 'publicly_enabled_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('collections', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('collections', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('collections_to_assets', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('collections_to_assets', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('dashboard_files', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('dashboard_files', 'publicly_enabled_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('dashboards', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('dashboards', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('data_sources', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('data_sources', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('datasets', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('datasets', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('messages', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('messages_deprecated', 'sent_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('metric_files', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('metric_files', 'publicly_enabled_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('metric_files_to_dashboard_files', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('permission_groups', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('permission_groups', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('permission_groups_to_identities', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('permission_groups_to_identities', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('permission_groups_to_users', 'user_id', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('teams', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('teams_to_users', 'user_id', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('terms', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('terms', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('threads_deprecated', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('threads_deprecated', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('threads_deprecated', 'publicly_enabled_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('threads_to_dashboards', 'added_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('user_favorites', 'user_id', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('users_to_organizations', 'user_id', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('users_to_organizations', 'created_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('users_to_organizations', 'updated_by', 'users', 'id', 'CASCADE');
SELECT alter_fk_on_update('users_to_organizations', 'deleted_by', 'users', 'id', 'CASCADE');
-- Drop the helper function
DROP FUNCTION alter_fk_on_update(TEXT, TEXT, TEXT, TEXT, TEXT);