added in collections routes, need to clean up

This commit is contained in:
dal 2025-03-11 12:16:28 -06:00
parent 7656e9039a
commit fbfea49253
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
17 changed files with 1551 additions and 6 deletions

View File

@ -990,9 +990,9 @@ fn transform_tool_message(
let messages = match name.as_str() {
"search_data_catalog" => tool_data_catalog_search(id.clone(), content)?,
"create_metrics" => tool_create_metrics(id.clone(), content)?,
"modify_metrics" => tool_modify_metrics(id.clone(), content)?,
"update_metrics" => tool_modify_metrics(id.clone(), content)?,
"create_dashboards" => tool_create_dashboards(id.clone(), content)?,
"modify_dashboards" => tool_modify_dashboards(id.clone(), content)?,
"update_dashboards" => tool_modify_dashboards(id.clone(), content)?,
"create_plan" => tool_create_plan(id.clone(), content)?,
_ => return Err(anyhow::anyhow!("Unknown tool name: {}", name)),
};
@ -1361,7 +1361,7 @@ fn transform_assistant_tool_message(
progress.clone(),
initial,
)?,
"modify_metrics" => assistant_modify_metrics(
"update_metrics" => assistant_modify_metrics(
tool_id,
tool_call.function.arguments.clone(),
progress.clone(),
@ -1373,7 +1373,7 @@ fn transform_assistant_tool_message(
progress.clone(),
initial,
)?,
"modify_dashboards" => assistant_modify_dashboards(
"update_dashboards" => assistant_modify_dashboards(
tool_id,
tool_call.function.arguments.clone(),
progress.clone(),

View File

@ -0,0 +1,150 @@
use anyhow::{anyhow, Result};
use chrono::Utc;
use database::{
enums::{AssetPermissionRole, AssetType, IdentityType},
models::{AssetPermission, Collection},
pool::get_pg_pool,
schema::{asset_permissions, collections},
};
use diesel::insert_into;
use diesel_async::RunQueryDsl;
use tokio;
use uuid::Uuid;
use crate::collections::types::{CollectionState, CreateCollectionRequest};
/// Handler for creating a new collection
///
/// # Arguments
/// * `user_id` - The ID of the user creating the collection
/// * `req` - The request containing the collection details
///
/// # Returns
/// * `Result<CollectionState>` - The created collection state
pub async fn create_collection_handler(
user_id: &Uuid,
organization_id: &Uuid,
req: CreateCollectionRequest,
) -> Result<CollectionState> {
let collection_id = Uuid::new_v4();
// Create collection object
let collection = Collection {
id: collection_id,
name: req.name,
description: None,
created_at: Utc::now(),
updated_at: Utc::now(),
created_by: *user_id,
updated_by: *user_id,
deleted_at: None,
organization_id,
};
let insert_task_user_id = *user_id;
let insert_task_collection = collection.clone();
// Insert collection and permissions
let collection_insert = tokio::spawn(async move {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => {
tracing::error!("Error getting pg connection: {}", e);
return Err(anyhow!("Error getting pg connection: {}", e));
}
};
let asset_permissions = AssetPermission {
identity_id: insert_task_user_id,
identity_type: IdentityType::User,
asset_id: insert_task_collection.id,
asset_type: AssetType::Collection,
role: AssetPermissionRole::Owner,
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
created_by: insert_task_user_id,
updated_by: insert_task_user_id,
};
match insert_into(collections::table)
.values(&insert_task_collection)
.execute(&mut conn)
.await
{
Ok(_) => (),
Err(e) => {
tracing::error!("Error inserting collection: {}", e);
return Err(anyhow!("Error inserting collection: {}", e));
}
};
match insert_into(asset_permissions::table)
.values(asset_permissions)
.execute(&mut conn)
.await
{
Ok(_) => (),
Err(e) => {
tracing::error!("Error inserting asset permissions: {}", e);
return Err(anyhow!("Error inserting asset permissions: {}", e));
}
}
Ok(())
});
// Update search index
let collection_id_for_search = collection_id;
let collection_name = collection.name.clone();
let organization_id_for_search = organization_id;
let collection_search_handle = tokio::spawn(async move {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => {
tracing::error!("Unable to get connection from pool: {:?}", e);
return Err(anyhow!("Unable to get connection from pool: {:?}", e));
}
};
let query = diesel::sql_query(
"INSERT INTO asset_search (asset_id, asset_type, content, organization_id)
VALUES ($1, 'collection', $2, $3)
ON CONFLICT (asset_id, asset_type)
DO UPDATE SET
content = EXCLUDED.content,
updated_at = NOW()",
)
.bind::<diesel::sql_types::Uuid, _>(collection_id_for_search)
.bind::<diesel::sql_types::Text, _>(collection_name)
.bind::<diesel::sql_types::Uuid, _>(organization_id_for_search);
match query.execute(&mut conn).await {
Ok(_) => Ok(()),
Err(e) => {
tracing::error!("Failed to update asset search: {:?}", e);
Err(anyhow!("Failed to update asset search: {:?}", e))
}
}
});
// Wait for both tasks to complete
if let Err(e) = collection_insert.await? {
return Err(anyhow!("Error in collection insert: {:?}", e));
}
if let Err(e) = collection_search_handle.await? {
return Err(anyhow!("Error in collection search insert: {:?}", e));
}
// Return the collection state
Ok(CollectionState {
collection,
assets: None,
permission: AssetPermissionRole::Owner,
individual_permissions: None,
team_permissions: None,
organization_permissions: false,
})
}

View File

@ -0,0 +1,62 @@
use anyhow::{anyhow, Result};
use chrono::Utc;
use database::{
enums::AssetPermissionRole,
pool::get_pg_pool,
schema::collections,
};
use diesel::{update, ExpressionMethods};
use diesel_async::RunQueryDsl;
use uuid::Uuid;
use crate::collections::types::DeleteCollectionResponse;
/// Handler for deleting collections
///
/// # Arguments
/// * `user_id` - The ID of the user deleting the collections
/// * `ids` - The IDs of the collections to delete
///
/// # Returns
/// * `Result<DeleteCollectionResponse>` - The IDs of the collections that were successfully deleted
pub async fn delete_collection_handler(
user_id: &Uuid,
organization_id: &Uuid,
ids: Vec<Uuid>,
) -> Result<DeleteCollectionResponse> {
// Filter out collections where the user only has viewer permission
let filtered_ids_to_delete: Vec<Uuid> = ids
.into_iter()
.filter(|id| match roles.get(id) {
Some(role) if *role != AssetPermissionRole::Viewer => true,
_ => false,
})
.collect();
// Get database connection
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => {
return Err(anyhow!("Error getting database connection: {}", e));
}
};
// Soft delete the collections
match update(collections::table)
.filter(collections::id.eq_any(&filtered_ids_to_delete))
.set(collections::deleted_at.eq(Some(Utc::now())))
.execute(&mut conn)
.await
{
Ok(_) => {}
Err(e) => {
return Err(anyhow!("Error deleting collections: {}", e));
}
};
// Return the IDs of the deleted collections
Ok(DeleteCollectionResponse {
ids: filtered_ids_to_delete,
})
}

View File

@ -0,0 +1,22 @@
use anyhow::Result;
use uuid::Uuid;
use crate::collections::types::{CollectionState, GetCollectionRequest};
/// Handler for getting a single collection by ID
///
/// # Arguments
/// * `user_id` - The ID of the user requesting the collection
/// * `req` - The request containing the collection ID
///
/// # Returns
/// * `Result<CollectionState>` - The collection state if found and accessible
pub async fn get_collection_handler(
user_id: &Uuid,
req: GetCollectionRequest,
) -> Result<CollectionState> {
// Reuse the existing collection_utils function
let collection = database::utils::collections::get_collection_by_id(user_id, &req.id).await?;
Ok(collection)
}

View File

@ -0,0 +1,153 @@
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use database::{
enums::{AssetPermissionRole, AssetType, IdentityType},
pool::get_pg_pool,
schema::{asset_permissions, collections, teams_to_users, users},
};
use diesel::{
BoolExpressionMethods, ExpressionMethods, JoinOnDsl, NullableExpressionMethods, QueryDsl,
};
use diesel_async::RunQueryDsl;
use tracing;
use uuid::Uuid;
use crate::collections::types::{
ListCollectionsCollection, ListCollectionsRequest, ListCollectionsUser,
};
/// Handler for listing collections with pagination and filtering
///
/// # Arguments
/// * `user_id` - The ID of the user requesting the collections
/// * `req` - The request containing pagination and filtering options
///
/// # Returns
/// * `Result<Vec<ListCollectionsCollection>>` - A list of collections the user has access to
pub async fn list_collections_handler(
user_id: &Uuid,
req: ListCollectionsRequest,
) -> Result<Vec<ListCollectionsCollection>> {
let page = req.page.unwrap_or(0);
let page_size = req.page_size.unwrap_or(25);
let list_of_collections = get_permissioned_collections(user_id, page, page_size, req).await?;
Ok(list_of_collections)
}
/// Get collections that the user has permission to access
///
/// # Arguments
/// * `user_id` - The ID of the user requesting the collections
/// * `page` - The page number for pagination
/// * `page_size` - The number of items per page
/// * `req` - The request containing filtering options
///
/// # Returns
/// * `Result<Vec<ListCollectionsCollection>>` - A list of collections the user has access to
async fn get_permissioned_collections(
user_id: &Uuid,
page: i64,
page_size: i64,
req: ListCollectionsRequest,
) -> Result<Vec<ListCollectionsCollection>> {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => return Err(anyhow!("Unable to get connection from pool: {}", e)),
};
let mut collections_statement = collections::table
.inner_join(
asset_permissions::table.on(collections::id
.eq(asset_permissions::asset_id)
.and(asset_permissions::asset_type.eq(AssetType::Collection))
.and(asset_permissions::deleted_at.is_null())),
)
.left_join(
teams_to_users::table.on(asset_permissions::identity_id
.eq(teams_to_users::user_id)
.and(asset_permissions::identity_type.eq(IdentityType::Team))
.and(teams_to_users::deleted_at.is_null())),
)
.inner_join(users::table.on(users::id.eq(collections::created_by)))
.select((
collections::id,
collections::name,
collections::updated_at,
collections::created_at,
asset_permissions::role,
users::id,
users::name.nullable(),
users::email,
))
.filter(collections::deleted_at.is_null())
.filter(
asset_permissions::identity_id
.eq(user_id)
.or(teams_to_users::user_id.eq(user_id)),
)
.distinct()
.order((collections::updated_at.desc(), collections::id.asc()))
.offset(page * page_size)
.limit(page_size)
.into_boxed();
if let Some(filters) = req.filters {
tracing::info!("Filters: {:?}", filters);
if filters.shared_with_me.unwrap_or(false) {
tracing::info!("Filtering for shared with me");
collections_statement = collections_statement
.filter(asset_permissions::role.ne(AssetPermissionRole::Owner));
}
if filters.owned_by_me.unwrap_or(false) {
collections_statement = collections_statement
.filter(asset_permissions::role.eq(AssetPermissionRole::Owner));
}
}
let sql = diesel::debug_query::<diesel::pg::Pg, _>(&collections_statement).to_string();
tracing::info!("SQL: {}", sql);
tracing::info!("User ID: {}", user_id);
let collection_results = match collections_statement
.load::<(
Uuid,
String,
DateTime<Utc>,
DateTime<Utc>,
AssetPermissionRole,
Uuid,
Option<String>,
String,
)>(&mut conn)
.await
{
Ok(collection_results) => collection_results,
Err(e) => return Err(anyhow!("Error getting collection results: {}", e)),
};
let mut collections: Vec<ListCollectionsCollection> = Vec::new();
for (id, name, updated_at, created_at, role, user_id, user_name, email) in collection_results {
let owner = ListCollectionsUser {
id: user_id,
name: user_name.unwrap_or(email),
avatar_url: None,
};
let collection = ListCollectionsCollection {
id,
name,
last_edited: updated_at,
created_at,
owner,
description: "".to_string(),
};
collections.push(collection);
}
Ok(collections)
}

View File

@ -0,0 +1,17 @@
// Collections handlers module
// mod create_collection_handler;
// mod delete_collection_handler;
// mod get_collection_handler;
mod list_collections_handler;
mod types;
// mod update_collection_handler;
// Re-export types
pub use types::*;
// Re-export handlers
// pub use create_collection_handler::create_collection_handler;
// pub use delete_collection_handler::delete_collection_handler;
// pub use get_collection_handler::get_collection_handler;
pub use list_collections_handler::list_collections_handler;
// pub use update_collection_handler::update_collection_handler;

View File

@ -0,0 +1,120 @@
use chrono::{DateTime, Utc};
use database::{
enums::{AssetPermissionRole, AssetType},
models::Collection,
};
use diesel::AsChangeset;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
// List collections types
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ListCollectionsFilter {
pub shared_with_me: Option<bool>,
pub owned_by_me: Option<bool>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ListCollectionsRequest {
pub page: Option<i64>,
pub page_size: Option<i64>,
#[serde(flatten)]
pub filters: Option<ListCollectionsFilter>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ListCollectionsUser {
pub id: Uuid,
pub name: String,
pub avatar_url: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ListCollectionsCollection {
pub id: Uuid,
pub name: String,
pub description: String,
pub last_edited: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub owner: ListCollectionsUser,
// TODO implement member
}
// Get collection types
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GetCollectionRequest {
pub id: Uuid,
}
// Collection state types
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AssetUser {
pub name: Option<String>,
pub email: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollectionAsset {
pub id: Uuid,
pub name: String,
pub created_by: AssetUser,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub asset_type: AssetType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CollectionState {
#[serde(flatten)]
pub collection: Collection,
pub permission: AssetPermissionRole,
// pub individual_permissions: Option<Vec<database::utils::sharing::asset_sharing::IndividualPermission>>,
// pub team_permissions: Option<Vec<database::utils::sharing::asset_sharing::TeamPermissions>>,
pub organization_permissions: bool,
pub assets: Option<Vec<CollectionAsset>>,
}
// Create collection types
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CreateCollectionRequest {
pub name: String,
pub description: Option<String>,
}
// Update collection types
#[derive(Debug, Clone, Deserialize, Serialize, AsChangeset)]
#[diesel(table_name = database::schema::collections)]
pub struct UpdateCollectionObject {
pub name: Option<String>,
pub description: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct UpdateCollectionAssetsRequest {
pub id: Uuid,
#[serde(rename = "type")]
pub type_: AssetType,
}
// #[derive(Debug, Clone, Deserialize, Serialize)]
// pub struct UpdateCollectionRequest {
// pub id: Uuid,
// #[serde(flatten)]
// pub collection: Option<UpdateCollectionObject>,
// pub assets: Option<Vec<UpdateCollectionAssetsRequest>>,
// pub team_permissions: Option<Vec<database::utils::sharing::asset_sharing::ShareWithTeamsReqObject>>,
// pub user_permissions: Option<Vec<database::utils::sharing::asset_sharing::ShareWithUsersReqObject>>,
// pub remove_teams: Option<Vec<Uuid>>,
// pub remove_users: Option<Vec<Uuid>>,
// }
// Delete collection types
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DeleteCollectionRequest {
pub ids: Vec<Uuid>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DeleteCollectionResponse {
pub ids: Vec<Uuid>,
}

View File

@ -0,0 +1,321 @@
use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use database::{
enums::{AssetPermissionRole, AssetType},
models::CollectionToAsset,
pool::get_pg_pool,
schema::{collections, collections_to_assets},
};
use diesel::{dsl::not, update, AsChangeset, BoolExpressionMethods, ExpressionMethods};
use diesel_async::RunQueryDsl;
use std::sync::Arc;
use tokio;
use uuid::Uuid;
use crate::collections::types::{
CollectionState, UpdateCollectionAssetsRequest, UpdateCollectionObject, UpdateCollectionRequest,
};
/// Handler for updating a collection
///
/// # Arguments
/// * `user_id` - The ID of the user updating the collection
/// * `req` - The request containing the collection updates
///
/// # Returns
/// * `Result<CollectionState>` - The updated collection state
pub async fn update_collection_handler(
user_id: &Uuid,
req: UpdateCollectionRequest,
) -> Result<CollectionState> {
let user_id = Arc::new(*user_id);
let collection_id = Arc::new(req.id);
// Update collection record if provided
let update_collection_record_handle = if let Some(collection) = req.collection {
if collection.name.is_some() || collection.description.is_some() {
let user_id = Arc::clone(&user_id);
let collection_id = Arc::clone(&collection_id);
Some(tokio::spawn(async move {
update_collection_record(user_id, collection_id, collection).await
}))
} else {
None
}
} else {
None
};
// Update collection assets if provided
let update_collection_assets_handle = if let Some(assets) = req.assets {
let user_id = Arc::clone(&user_id);
let collection_id = Arc::clone(&collection_id);
Some(tokio::spawn(async move {
update_collection_assets(user_id, collection_id, assets).await
}))
} else {
None
};
// Wait for all update operations to complete
if let Some(update_collection_permissions_handle) = update_collection_permissions_handle {
match update_collection_permissions_handle.await {
Ok(Ok(_)) => (),
Ok(Err(e)) => {
tracing::error!("Error updating collection permissions: {}", e);
return Err(anyhow!("Error updating collection permissions: {}", e));
}
Err(e) => {
tracing::error!("Error updating collection permissions: {}", e);
return Err(anyhow!("Error updating collection permissions: {}", e));
}
}
}
if let Some(update_collection_record_handle) = update_collection_record_handle {
match update_collection_record_handle.await {
Ok(Ok(_)) => (),
Ok(Err(e)) => {
tracing::error!("Error updating collection record: {}", e);
return Err(anyhow!("Error updating collection record: {}", e));
}
Err(e) => {
tracing::error!("Error updating collection record: {}", e);
return Err(anyhow!("Error updating collection record: {}", e));
}
}
}
if let Some(update_collection_assets_handle) = update_collection_assets_handle {
match update_collection_assets_handle.await {
Ok(Ok(_)) => (),
Ok(Err(e)) => {
tracing::error!("Error updating collection assets: {}", e);
return Err(anyhow!("Error updating collection assets: {}", e));
}
Err(e) => {
tracing::error!("Error updating collection assets: {}", e);
return Err(anyhow!("Error updating collection assets: {}", e));
}
}
}
// Get the updated collection
let collection = database::utils::collections::get_collection_by_id(user_id.as_ref(), &req.id).await?;
Ok(collection)
}
/// Update collection record in the database
///
/// # Arguments
/// * `user_id` - The ID of the user updating the collection
/// * `collection_id` - The ID of the collection to update
/// * `collection` - The collection update object
///
/// # Returns
/// * `Result<()>` - Success or error
async fn update_collection_record(
user_id: Arc<Uuid>,
collection_id: Arc<Uuid>,
collection: UpdateCollectionObject,
) -> Result<()> {
let collection_update = {
let collection_id = collection_id.clone();
let user_id = user_id.clone();
let collection = collection.clone();
tokio::spawn(async move {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => {
tracing::error!("Error getting pg connection: {}", e);
return Err(anyhow!("Error getting pg connection: {}", e));
}
};
match update(collections::table)
.filter(collections::id.eq(collection_id.as_ref()))
.set((
collection,
collections::updated_at.eq(Utc::now()),
collections::updated_by.eq(*user_id),
))
.execute(&mut conn)
.await
{
Ok(updated_rows) => {
if updated_rows == 0 {
let err = anyhow!(
"User does not have write access to this collection or collection not found"
);
tracing::error!("{}", err);
return Err(err);
}
Ok(())
}
Err(e) => {
tracing::error!("Error updating collection: {}", e);
Err(anyhow!("Error updating collection: {}", e))
}
}
})
};
let collection_search_handle = {
let collection_id = collection_id.clone();
let collection_name = collection.name.unwrap_or_default();
tokio::spawn(async move {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => {
tracing::error!("Unable to get connection from pool: {:?}", e);
return Err(anyhow!("Unable to get connection from pool: {:?}", e));
}
};
let query = diesel::sql_query(
"UPDATE asset_search
SET content = $1, updated_at = NOW()
WHERE asset_id = $2 AND asset_type = 'collection'"
)
.bind::<diesel::sql_types::Text, _>(collection_name)
.bind::<diesel::sql_types::Uuid, _>(*collection_id);
match query.execute(&mut conn).await {
Ok(_) => Ok(()),
Err(e) => {
tracing::error!("Failed to update asset search: {:?}", e);
Err(anyhow!("Failed to update asset search: {:?}", e))
}
}
})
};
match collection_update.await {
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err(anyhow!("Error in collection update: {:?}", e)),
Err(e) => return Err(anyhow!("Error in collection update: {:?}", e)),
}
match collection_search_handle.await {
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err(anyhow!("Error in collection search update: {:?}", e)),
Err(e) => return Err(anyhow!("Error in collection search update: {:?}", e)),
}
Ok(())
}
/// Update collection assets in the database
///
/// # Arguments
/// * `user_id` - The ID of the user updating the collection
/// * `collection_id` - The ID of the collection to update
/// * `assets` - The assets to add to the collection
///
/// # Returns
/// * `Result<()>` - Success or error
async fn update_collection_assets(
user_id: Arc<Uuid>,
collection_id: Arc<Uuid>,
assets: Vec<UpdateCollectionAssetsRequest>,
) -> Result<()> {
let assets = Arc::new(assets);
let upsert_handle = {
let assets = Arc::clone(&assets);
let collection_id = Arc::clone(&collection_id);
let user_id = Arc::clone(&user_id);
tokio::spawn(async move {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => {
tracing::error!("Error getting pg connection: {}", e);
return Err(anyhow!("Error getting pg connection: {}", e));
}
};
let new_asset_records: Vec<CollectionToAsset> = assets
.iter()
.map(|asset| CollectionToAsset {
collection_id: *collection_id,
asset_id: asset.id,
asset_type: asset.type_,
created_at: Utc::now(),
updated_at: Utc::now(),
deleted_at: None,
created_by: *user_id,
updated_by: *user_id,
})
.collect();
match diesel::insert_into(collections_to_assets::table)
.values(&new_asset_records)
.on_conflict((
collections_to_assets::collection_id,
collections_to_assets::asset_id,
collections_to_assets::asset_type,
))
.do_update()
.set((
collections_to_assets::updated_at.eq(Utc::now()),
collections_to_assets::deleted_at.eq(Option::<DateTime<Utc>>::None),
))
.execute(&mut conn)
.await
{
Ok(_) => Ok(()),
Err(e) => {
tracing::error!("Error updating collection assets: {}", e);
Err(anyhow!("Unable to upsert assets to collection: {}", e))
}
}
})
};
let remove_handle = {
let assets = Arc::clone(&assets);
let collection_id = Arc::clone(&collection_id);
tokio::spawn(async move {
let mut conn = match get_pg_pool().get().await {
Ok(conn) => conn,
Err(e) => {
tracing::error!("Error getting pg connection: {}", e);
return Err(anyhow!("Error getting pg connection: {}", e));
}
};
match update(collections_to_assets::table)
.filter(collections_to_assets::collection_id.eq(*collection_id))
.filter(not(collections_to_assets::asset_id
.eq_any(assets.iter().map(|a| a.id))
.and(
collections_to_assets::asset_type.eq_any(assets.iter().map(|a| a.type_)),
)))
.set(collections_to_assets::deleted_at.eq(Some(Utc::now())))
.execute(&mut conn)
.await
{
Ok(_) => Ok(()),
Err(e) => {
tracing::error!("Error removing assets from collection: {}", e);
Err(anyhow!("Error removing assets from collection: {}", e))
}
}
})
};
match upsert_handle.await {
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err(anyhow!("Error upserting assets to collection: {}", e)),
Err(e) => return Err(anyhow!("Error upserting assets to collection: {}", e)),
}
match remove_handle.await {
Ok(Ok(_)) => (),
Ok(Err(e)) => return Err(anyhow!("Error removing assets from collection: {}", e)),
Err(e) => return Err(anyhow!("Error removing assets from collection: {}", e)),
}
Ok(())
}

View File

@ -1,7 +1,8 @@
pub mod messages;
pub mod chats;
pub mod files;
pub mod collections;
pub mod favorites;
pub mod files;
pub mod messages;
pub mod metrics;
// Re-export commonly used types and functions

View File

@ -0,0 +1,472 @@
# API Collections REST Endpoints
## Problem Statement ✅
Currently, the collections functionality in our API is only accessible via WebSocket endpoints. This creates several limitations:
1. Clients must maintain WebSocket connections to interact with collections
2. REST API clients cannot access collections functionality
3. Business logic is tightly coupled with WebSocket-specific code
4. Code reuse between different transport mechanisms is limited
These limitations impact our API usability and maintainability:
### Current Limitations
- Collections can only be accessed via WebSocket connections
- Business logic is mixed with transport-specific code
- Duplicate code may be introduced when implementing new transport mechanisms
- Testing is more complex due to WebSocket dependencies
### Impact
- **User Impact**: Clients that prefer or require REST APIs cannot access collections functionality
- **System Impact**: Code maintenance is more difficult due to tight coupling
- **Business Impact**: Limited API access patterns may reduce API adoption and usage
## Requirements
### Functional Requirements ✅
#### Core Functionality
- Migrate WebSocket collections business logic to transport-agnostic handlers
- Details: Extract business logic from WebSocket handlers to reusable handlers
- Acceptance Criteria: Handlers can be used by both WebSocket and REST endpoints
- Dependencies: Existing WebSocket collections implementation
- Implement REST endpoints for collections CRUD operations
- Details: Create REST endpoints that use the new handlers
- Acceptance Criteria: All collection operations available via REST API
- Dependencies: New handler implementations
#### API Endpoints
- GET /users/collections
- Details: List collections with pagination and filtering
- Acceptance Criteria: Same functionality as WebSocket list endpoint
- Dependencies: List collections handler
- GET /users/collections/{id}
- Details: Get a single collection by ID
- Acceptance Criteria: Same functionality as WebSocket get endpoint
- Dependencies: Get collection handler
- POST /users/collections
- Details: Create a new collection
- Acceptance Criteria: Same functionality as WebSocket post endpoint
- Dependencies: Create collection handler
- PUT /users/collections/{id}
- Details: Update an existing collection
- Acceptance Criteria: Same functionality as WebSocket update endpoint
- Dependencies: Update collection handler
- DELETE /users/collections/{id}
- Details: Delete a collection
- Acceptance Criteria: Same functionality as WebSocket delete endpoint
- Dependencies: Delete collection handler
### Non-Functional Requirements ✅
- Performance Requirements
- REST endpoints should have comparable performance to WebSocket endpoints
- Handlers should efficiently reuse database connections and queries
- Maintainability Requirements
- Code should follow the handler guidelines in `.cursor/rules/handlers.mdc`
- Business logic should be separated from transport-specific code
- Types should be shared between handlers and endpoints where possible
- Compatibility Requirements
- Existing WebSocket endpoints must continue to function without changes to client code
- REST endpoints should use consistent naming and patterns with other REST APIs
## Technical Design ✅
### System Architecture
The new architecture separates business logic from transport-specific code:
```mermaid
graph TD
WS[WebSocket Endpoints] --> H[Handlers]
REST[REST Endpoints] --> H
H --> DB[Database]
H --> Utils[Utility Functions]
```
### Core Components ✅
#### Component 1: Collection Handlers
The handlers will implement the core business logic for collections operations:
```rust
// libs/handlers/src/collections/list_collections_handler.rs
pub async fn list_collections_handler(
user_id: &Uuid,
request: ListCollectionsRequest,
) -> Result<Vec<ListCollectionsCollection>> {
// Implementation of list collections logic
// Extracted from existing WebSocket handler
}
// Request and response types
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ListCollectionsRequest {
pub page: Option<i64>,
pub page_size: Option<i64>,
#[serde(flatten)]
pub filters: Option<ListCollectionsFilter>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ListCollectionsCollection {
pub id: Uuid,
pub name: String,
pub last_edited: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub owner: ListCollectionsUser,
pub is_shared: bool,
}
```
#### Component 2: REST Endpoints
The REST endpoints will use the handlers to implement the API:
```rust
// src/routes/rest/routes/users/collections/list.rs
pub async fn list_collections(
Query(query): Query<ListCollectionsRequest>,
user: AuthenticatedUser,
) -> Result<Json<Vec<ListCollectionsCollection>>, AppError> {
let result = collections::list_collections_handler(&user.id, query).await?;
Ok(Json(result))
}
```
#### Component 3: Updated WebSocket Handlers
The WebSocket handlers will be updated to use the new handlers:
```rust
// src/routes/ws/collections/list_collections.rs
pub async fn list_collections(user: &AuthenticatedUser, req: ListCollectionsRequest) -> Result<()> {
let list_collections_res = match collections::list_collections_handler(&user.id, req).await {
Ok(res) => res,
Err(e) => {
// Error handling
return Err(e);
}
};
// WebSocket-specific response handling
// ...
}
```
### API Changes
#### New REST Endpoints
```rust
// GET /users/collections
struct ListCollectionsRequest {
page: Option<i64>, // Pagination page number
page_size: Option<i64>, // Items per page
shared_with_me: Option<bool>, // Filter for collections shared with user
owned_by_me: Option<bool>, // Filter for collections owned by user
}
// Response: Vec<ListCollectionsCollection>
// GET /users/collections/{id}
// Path parameter: id: Uuid
// Response: CollectionState
// POST /users/collections
struct CreateCollectionRequest {
name: String, // Collection name
// Other fields as needed
}
// Response: Collection
// PUT /users/collections/{id}
struct UpdateCollectionRequest {
name: Option<String>, // Updated collection name
// Other fields as needed
}
// Response: Collection
// DELETE /users/collections/{id}
// Path parameter: id: Uuid
// Response: ()
```
### File Changes
#### New Files
- `libs/handlers/src/collections/mod.rs`
- Purpose: Re-export handlers and types
- Key components: Module exports
- Dependencies: Handler implementations
- `libs/handlers/src/collections/types.rs`
- Purpose: Shared types for collections
- Key components: Request and response types
- Dependencies: None
- `libs/handlers/src/collections/list_collections_handler.rs`
- Purpose: Handler for listing collections
- Key components: `list_collections_handler` function
- Dependencies: Database, types
- `libs/handlers/src/collections/get_collection_handler.rs`
- Purpose: Handler for getting a single collection
- Key components: `get_collection_handler` function
- Dependencies: Database, types
- `libs/handlers/src/collections/create_collection_handler.rs`
- Purpose: Handler for creating a collection
- Key components: `create_collection_handler` function
- Dependencies: Database, types
- `libs/handlers/src/collections/update_collection_handler.rs`
- Purpose: Handler for updating a collection
- Key components: `update_collection_handler` function
- Dependencies: Database, types
- `libs/handlers/src/collections/delete_collection_handler.rs`
- Purpose: Handler for deleting a collection
- Key components: `delete_collection_handler` function
- Dependencies: Database, types
- `src/routes/rest/routes/users/collections/mod.rs`
- Purpose: Re-export REST routes
- Key components: Module exports
- Dependencies: Route implementations
- `src/routes/rest/routes/users/collections/list.rs`
- Purpose: REST endpoint for listing collections
- Key components: `list_collections` function
- Dependencies: Handlers, authentication
- `src/routes/rest/routes/users/collections/get.rs`
- Purpose: REST endpoint for getting a collection
- Key components: `get_collection` function
- Dependencies: Handlers, authentication
- `src/routes/rest/routes/users/collections/create.rs`
- Purpose: REST endpoint for creating a collection
- Key components: `create_collection` function
- Dependencies: Handlers, authentication
- `src/routes/rest/routes/users/collections/update.rs`
- Purpose: REST endpoint for updating a collection
- Key components: `update_collection` function
- Dependencies: Handlers, authentication
- `src/routes/rest/routes/users/collections/delete.rs`
- Purpose: REST endpoint for deleting a collection
- Key components: `delete_collection` function
- Dependencies: Handlers, authentication
#### Modified Files
- `src/routes/ws/collections/list_collections.rs`
- Changes: Update to use new handler
- Purpose: Maintain WebSocket functionality
- `src/routes/ws/collections/get_collection.rs`
- Changes: Update to use new handler
- Purpose: Maintain WebSocket functionality
- `src/routes/ws/collections/post_collection.rs`
- Changes: Update to use new handler
- Purpose: Maintain WebSocket functionality
- `src/routes/ws/collections/update_collection.rs`
- Changes: Update to use new handler
- Purpose: Maintain WebSocket functionality
- `src/routes/ws/collections/delete_collection.rs`
- Changes: Update to use new handler
- Purpose: Maintain WebSocket functionality
- `src/routes/rest/routes/mod.rs`
- Changes: Add collections routes
- Purpose: Register new REST endpoints
## Implementation Plan
### Phase 1: Create Handler Structure ⏳
1. Create directory structure for handlers
- [ ] Create `libs/handlers/src/collections/` directory
- [ ] Create `libs/handlers/src/collections/mod.rs`
- [ ] Create `libs/handlers/src/collections/types.rs`
2. Define shared types
- [ ] Extract types from WebSocket handlers
- [ ] Define request and response types
- [ ] Ensure compatibility with both WebSocket and REST
### Phase 2: Implement List Collections ⏳
1. Create list collections handler
- [ ] Create `libs/handlers/src/collections/list_collections_handler.rs`
- [ ] Extract business logic from WebSocket handler
- [ ] Implement handler function
2. Update WebSocket handler
- [ ] Update `src/routes/ws/collections/list_collections.rs` to use new handler
- [ ] Ensure backward compatibility
3. Create REST endpoint
- [ ] Create `src/routes/rest/routes/users/collections/list.rs`
- [ ] Implement REST endpoint using the handler
- [ ] Add route to router
### Phase 3: Implement Get Collection 🔜
1. Create get collection handler
- [ ] Create `libs/handlers/src/collections/get_collection_handler.rs`
- [ ] Extract business logic from WebSocket handler
- [ ] Implement handler function
2. Update WebSocket handler
- [ ] Update `src/routes/ws/collections/get_collection.rs` to use new handler
- [ ] Ensure backward compatibility
3. Create REST endpoint
- [ ] Create `src/routes/rest/routes/users/collections/get.rs`
- [ ] Implement REST endpoint using the handler
- [ ] Add route to router
### Phase 4: Implement Create Collection 🔜
1. Create create collection handler
- [ ] Create `libs/handlers/src/collections/create_collection_handler.rs`
- [ ] Extract business logic from WebSocket handler
- [ ] Implement handler function
2. Update WebSocket handler
- [ ] Update `src/routes/ws/collections/post_collection.rs` to use new handler
- [ ] Ensure backward compatibility
3. Create REST endpoint
- [ ] Create `src/routes/rest/routes/users/collections/create.rs`
- [ ] Implement REST endpoint using the handler
- [ ] Add route to router
### Phase 5: Implement Update Collection 🔜
1. Create update collection handler
- [ ] Create `libs/handlers/src/collections/update_collection_handler.rs`
- [ ] Extract business logic from WebSocket handler
- [ ] Implement handler function
2. Update WebSocket handler
- [ ] Update `src/routes/ws/collections/update_collection.rs` to use new handler
- [ ] Ensure backward compatibility
3. Create REST endpoint
- [ ] Create `src/routes/rest/routes/users/collections/update.rs`
- [ ] Implement REST endpoint using the handler
- [ ] Add route to router
### Phase 6: Implement Delete Collection 🔜
1. Create delete collection handler
- [ ] Create `libs/handlers/src/collections/delete_collection_handler.rs`
- [ ] Extract business logic from WebSocket handler
- [ ] Implement handler function
2. Update WebSocket handler
- [ ] Update `src/routes/ws/collections/delete_collection.rs` to use new handler
- [ ] Ensure backward compatibility
3. Create REST endpoint
- [ ] Create `src/routes/rest/routes/users/collections/delete.rs`
- [ ] Implement REST endpoint using the handler
- [ ] Add route to router
### Phase 7: Testing and Documentation 🔜
1. Write tests
- [ ] Unit tests for handlers
- [ ] Integration tests for REST endpoints
- [ ] Verify WebSocket backward compatibility
2. Update documentation
- [ ] Add API documentation for new endpoints
- [ ] Update internal documentation
## Testing Strategy
### Unit Tests
- Test each handler function with various inputs
- Test error handling and edge cases
- Mock database connections and external dependencies
```rust
#[tokio::test]
async fn test_list_collections_handler() {
// Setup test data
let user_id = Uuid::new_v4();
let request = ListCollectionsRequest {
page: Some(0),
page_size: Some(10),
filters: None,
};
// Call handler
let result = list_collections_handler(&user_id, request).await;
// Assert expectations
assert!(result.is_ok());
let collections = result.unwrap();
// Additional assertions
}
```
### Integration Tests
- Test REST endpoints with HTTP requests
- Verify response formats and status codes
- Test pagination, filtering, and error handling
```rust
#[tokio::test]
async fn test_list_collections_endpoint() {
// Setup test app
let app = test_app().await;
// Make request
let response = app
.get("/users/collections")
.header("Authorization", "Bearer test_token")
.send()
.await;
// Assert expectations
assert_eq!(response.status(), 200);
// Additional assertions
}
```
### Backward Compatibility Tests
- Verify that WebSocket endpoints continue to work as expected
- Test with existing client code
## Success Criteria
- All REST endpoints are implemented and working correctly
- WebSocket endpoints continue to work as before
- Code follows best practices and guidelines
- All tests pass
## Dependencies
- Existing WebSocket handlers
- Database schema and models
- Authentication middleware
## Checklist Before Submission
- [ ] All template sections completed
- [ ] Technical design is detailed and complete
- [ ] File changes are documented
- [ ] Implementation phases are clear
- [ ] Testing strategy is defined
- [ ] Security considerations addressed
- [ ] Dependencies listed
- [ ] File references included

View File

@ -0,0 +1,50 @@
use axum::{
extract::State,
http::StatusCode,
Json,
};
use handlers::collections::{create_collection_handler, CreateCollectionRequest, CollectionState};
use middleware::AuthenticatedUser;
use uuid::Uuid;
use database::utils::user::get_user_organization_id;
/// Create a new collection
///
/// This endpoint creates a new collection with the provided details.
pub async fn create_collection(
user: AuthenticatedUser,
Json(req): Json<CreateCollectionRequest>,
) -> Result<Json<CollectionState>, (StatusCode, String)> {
// Get the user's organization ID
let org_id = match get_user_organization_id(&user.id).await {
Ok(id) => id,
Err(e) => {
tracing::error!("Error getting user organization ID: {}", e);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error getting user organization: {}", e),
));
}
};
// Call the handler
match create_collection_handler(&user.id, &org_id, req).await {
Ok(collection) => Ok(Json(collection)),
Err(e) => {
tracing::error!("Error creating collection: {}", e);
// Return appropriate error response based on the error
if e.to_string().contains("permission") {
Err((
StatusCode::FORBIDDEN,
format!("Permission denied: {}", e),
))
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error creating collection: {}", e),
))
}
}
}
}

View File

@ -0,0 +1,42 @@
use axum::{
http::StatusCode,
Json,
};
use handlers::collections::{delete_collection_handler, DeleteCollectionRequest, DeleteCollectionResponse};
use middleware::AuthenticatedUser;
use uuid::Uuid;
use database::utils::user::get_user_organization_id;
/// Delete a collection
///
/// This endpoint deletes one or more collections by their IDs.
pub async fn delete_collection(
Extension(user): Extension<AuthenticatedUser>,
Json(req): Json<DeleteCollectionRequest>,
) -> Result<Json<DeleteCollectionResponse>, (StatusCode, String)> {
// Call the handler
match delete_collection_handler(&user.id, &user.organization_id, req.ids).await {
Ok(response) => Ok(Json(response)),
Err(e) => {
tracing::error!("Error deleting collection: {}", e);
// Return appropriate error response based on the error
if e.to_string().contains("not found") {
Err((
StatusCode::NOT_FOUND,
format!("Collection not found: {}", e),
))
} else if e.to_string().contains("permission") {
Err((
StatusCode::FORBIDDEN,
format!("Permission denied: {}", e),
))
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error deleting collection: {}", e),
))
}
}
}
}

View File

@ -0,0 +1,43 @@
use axum::{
extract::{Path, State},
http::StatusCode,
Json,
};
use handlers::collections::{get_collection_handler, CollectionState};
use middleware::AuthenticatedUser;
use uuid::Uuid;
use axum::extract::Extension;
/// Get a collection by ID
///
/// This endpoint returns a collection by its ID.
pub async fn get_collection(
Extension(user): Extension<AuthenticatedUser>,
Path(id): Path<Uuid>,
) -> Result<Json<CollectionState>, (StatusCode, String)> {
// Call the handler
match get_collection_handler(&user.id, &id).await {
Ok(collection) => Ok(Json(collection)),
Err(e) => {
tracing::error!("Error getting collection: {}", e);
// Return appropriate error response based on the error
if e.to_string().contains("not found") {
Err((
StatusCode::NOT_FOUND,
format!("Collection not found: {}", e),
))
} else if e.to_string().contains("permission") {
Err((
StatusCode::FORBIDDEN,
format!("Permission denied: {}", e),
))
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error getting collection: {}", e),
))
}
}
}
}

View File

@ -0,0 +1,30 @@
use axum::{
extract::{Query, State},
http::StatusCode,
Extension, Json,
};
use handlers::collections::{
list_collections_handler, ListCollectionsCollection, ListCollectionsRequest,
};
use middleware::AuthenticatedUser;
use uuid::Uuid;
/// List collections
///
/// This endpoint returns a list of collections for the authenticated user.
pub async fn list_collections(
Extension(user): Extension<AuthenticatedUser>,
Query(query): Query<ListCollectionsRequest>,
) -> Result<Json<Vec<ListCollectionsCollection>>, (StatusCode, String)> {
// Call the handler
match list_collections_handler(&user.id, query).await {
Ok(collections) => Ok(Json(collections)),
Err(e) => {
tracing::error!("Error listing collections: {}", e);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error listing collections: {}", e),
))
}
}
}

View File

@ -0,0 +1,19 @@
use axum::{
routing::{get, post, put, delete},
Router,
};
mod list_collections;
// mod get_collection;
// mod create_collection;
// mod update_collection;
// mod delete_collection;
pub fn router() -> Router {
Router::new()
.route("/", get(list_collections::list_collections))
// .route("/", post(create_collection::create_collection))
// .route("/:id", get(get_collection::get_collection))
// .route("/:id", put(update_collection::update_collection))
// .route("/:id", delete(delete_collection::delete_collection))
}

View File

@ -0,0 +1,41 @@
use axum::{
http::StatusCode,
Json,
};
use handlers::collections::{update_collection_handler, UpdateCollectionRequest, CollectionState};
use middleware::AuthenticatedUser;
use uuid::Uuid;
/// Update a collection
///
/// This endpoint updates a collection with the provided details.
pub async fn update_collection(
Extension(user): Extension<AuthenticatedUser>,
Json(req): Json<UpdateCollectionRequest>,
) -> Result<Json<CollectionState>, (StatusCode, String)> {
// Call the handler
match update_collection_handler(&user.id, req).await {
Ok(collection) => Ok(Json(collection)),
Err(e) => {
tracing::error!("Error updating collection: {}", e);
// Return appropriate error response based on the error
if e.to_string().contains("not found") {
Err((
StatusCode::NOT_FOUND,
format!("Collection not found: {}", e),
))
} else if e.to_string().contains("permission") {
Err((
StatusCode::FORBIDDEN,
format!("Permission denied: {}", e),
))
} else {
Err((
StatusCode::INTERNAL_SERVER_ERROR,
format!("Error updating collection: {}", e),
))
}
}
}
}

View File

@ -11,6 +11,7 @@ mod organizations;
mod permission_groups;
mod sql;
mod users;
mod collections;
use axum::{middleware as axum_middleware, Router};
@ -31,6 +32,7 @@ pub fn router() -> Router {
.nest("/metrics", metrics::router())
.nest("/dashboards", dashboards::router())
.nest("/users", users::router())
.nest("/collections", collections::router())
.route_layer(axum_middleware::from_fn(auth)),
)
}