merging data_source_endpoints

This commit is contained in:
dal 2025-03-24 13:54:35 -06:00
commit 13938a8140
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
20 changed files with 2176 additions and 244 deletions

View File

@ -514,7 +514,7 @@ impl DataSourceType {
}
}
pub fn to_string(&self) -> &'static str {
pub fn to_str(&self) -> &'static str {
match *self {
DataSourceType::BigQuery => "bigquery",
DataSourceType::Databricks => "databricks",
@ -527,6 +527,20 @@ impl DataSourceType {
DataSourceType::Supabase => "supabase",
}
}
pub fn to_string(&self) -> String {
String::from(match *self {
DataSourceType::BigQuery => "bigquery",
DataSourceType::Databricks => "databricks",
DataSourceType::MySql => "mysql",
DataSourceType::Mariadb => "mariadb",
DataSourceType::Postgres => "postgres",
DataSourceType::Redshift => "redshift",
DataSourceType::Snowflake => "snowflake",
DataSourceType::SqlServer => "sqlserver",
DataSourceType::Supabase => "supabase",
})
}
}
impl FromStr for DataSourceType {

View File

@ -0,0 +1,135 @@
use anyhow::{anyhow, Result};
use chrono::Utc;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use middleware::types::AuthenticatedUser;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use database::{
enums::{DataSourceOnboardingStatus, UserOrganizationRole},
models::DataSource,
pool::get_pg_pool,
schema::data_sources,
vault::create_secret,
};
use query_engine::credentials::Credential;
#[derive(Deserialize)]
pub struct CreateDataSourceRequest {
pub name: String,
#[serde(flatten)]
pub credential: Credential,
}
#[derive(Serialize)]
pub struct CreateDataSourceResponse {
pub id: String,
pub name: String,
pub r#type: String,
pub created_at: String,
pub updated_at: String,
pub created_by: CreatedByResponse,
pub credentials: Credential,
pub data_sets: Vec<DatasetResponse>,
}
#[derive(Serialize)]
pub struct CreatedByResponse {
pub id: String,
pub email: String,
pub name: String,
}
#[derive(Serialize)]
pub struct DatasetResponse {
pub id: String,
pub name: String,
}
pub async fn create_data_source_handler(
user: &AuthenticatedUser,
request: CreateDataSourceRequest,
) -> Result<CreateDataSourceResponse> {
// Verify user has an organization
if user.organizations.is_empty() {
return Err(anyhow!("User is not a member of any organization"));
}
// Get the first organization (users can only belong to one organization currently)
let user_org = &user.organizations[0];
// Verify user has appropriate permissions (admin role)
if user_org.role != UserOrganizationRole::WorkspaceAdmin && user_org.role != UserOrganizationRole::DataAdmin {
return Err(anyhow!("User does not have appropriate permissions to create data sources"));
}
let mut conn = get_pg_pool().get().await?;
// Check if data source with same name already exists in the organization
let existing_data_source = data_sources::table
.filter(data_sources::name.eq(&request.name))
.filter(data_sources::organization_id.eq(user_org.id))
.filter(data_sources::deleted_at.is_null())
.first::<DataSource>(&mut conn)
.await
.ok();
if existing_data_source.is_some() {
return Err(anyhow!(
"A data source with this name already exists in this organization and environment"
));
}
// Create new data source
let data_source_id = Uuid::new_v4();
let now = Utc::now();
let data_source = DataSource {
id: data_source_id,
name: request.name.clone(),
type_: request.credential.get_type(),
secret_id: data_source_id, // Use same ID for data source and secret
onboarding_status: DataSourceOnboardingStatus::NotStarted,
onboarding_error: None,
organization_id: user_org.id,
created_by: user.id,
updated_by: user.id,
created_at: now,
updated_at: now,
deleted_at: None,
env: "env".to_string(),
};
// Insert the data source
diesel::insert_into(data_sources::table)
.values(&data_source)
.execute(&mut conn)
.await
.map_err(|e| anyhow!("Error creating data source: {}", e))?;
// Store credentials in vault
let credential_json = serde_json::to_string(&request.credential)
.map_err(|e| anyhow!("Error serializing credentials: {}", e))?;
create_secret(&data_source_id, &credential_json)
.await
.map_err(|e| anyhow!("Error storing credentials in vault: {}", e))?;
// Build response using AuthenticatedUser info
let response = CreateDataSourceResponse {
id: data_source.id.to_string(),
name: data_source.name,
r#type: data_source.type_.to_string(),
created_at: data_source.created_at.to_rfc3339(),
updated_at: data_source.updated_at.to_rfc3339(),
created_by: CreatedByResponse {
id: user.id.to_string(),
email: user.email.clone(),
name: user.name.clone().unwrap_or_default(),
},
credentials: request.credential,
data_sets: Vec::new(), // Empty for new data sources
};
Ok(response)
}

View File

@ -0,0 +1,58 @@
use anyhow::{anyhow, Result};
use chrono::Utc;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use middleware::types::AuthenticatedUser;
use uuid::Uuid;
use database::enums::UserOrganizationRole;
use database::{
models::DataSource,
pool::get_pg_pool,
schema::data_sources,
vault::delete_secret,
};
pub async fn delete_data_source_handler(
user: &AuthenticatedUser,
data_source_id: &Uuid,
) -> Result<()> {
// Verify user has an organization
if user.organizations.is_empty() {
return Err(anyhow!("User is not a member of any organization"));
}
// Get the first organization (users can only belong to one organization currently)
let user_org = &user.organizations[0];
// Verify user has appropriate permissions (admin role)
if user_org.role != UserOrganizationRole::WorkspaceAdmin && user_org.role != UserOrganizationRole::DataAdmin {
return Err(anyhow!("User does not have appropriate permissions to delete data sources"));
}
let mut conn = get_pg_pool().get().await?;
// Get the data source to verify it exists and belongs to the user's organization
let _data_source = data_sources::table
.filter(data_sources::id.eq(data_source_id))
.filter(data_sources::organization_id.eq(user_org.id))
.filter(data_sources::deleted_at.is_null())
.first::<DataSource>(&mut conn)
.await
.map_err(|_| anyhow!("Data source not found or you don't have access to it"))?;
// Soft delete the data source
diesel::update(data_sources::table)
.filter(data_sources::id.eq(data_source_id))
.set(data_sources::deleted_at.eq(Some(Utc::now())))
.execute(&mut conn)
.await
.map_err(|e| anyhow!("Error deleting data source: {}", e))?;
// Delete credentials from vault
delete_secret(data_source_id)
.await
.map_err(|e| anyhow!("Error deleting credentials from vault: {}", e))?;
Ok(())
}

View File

@ -1,6 +1,6 @@
use anyhow::{anyhow, Result};
use database::{
enums::DataSourceType,
enums::{DataSourceType, UserOrganizationRole},
models::{DataSource, Dataset, User},
pool::get_pg_pool,
schema::{data_sources, datasets, users},
@ -8,6 +8,7 @@ use database::{
};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use middleware::types::AuthenticatedUser;
use query_engine::credentials::Credential;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
@ -44,14 +45,35 @@ pub struct DatasetResponse {
pub async fn get_data_source_handler(
request: GetDataSourceRequest,
_user_id: &Uuid,
user: &AuthenticatedUser,
) -> Result<DataSourceResponse> {
// Verify user has an organization
if user.organizations.is_empty() {
return Err(anyhow!("User is not a member of any organization"));
}
// Get the first organization (users can only belong to one organization currently)
let user_org = &user.organizations[0];
// Verify user has appropriate permissions (at least viewer role)
if user_org.role != UserOrganizationRole::WorkspaceAdmin
&& user_org.role != UserOrganizationRole::DataAdmin
&& user_org.role != UserOrganizationRole::Querier
&& user_org.role != UserOrganizationRole::RestrictedQuerier
&& user_org.role != UserOrganizationRole::Viewer
{
return Err(anyhow!(
"User does not have appropriate permissions to view data sources"
));
}
let pool = get_pg_pool();
let mut conn = pool.get().await?;
// Get the data source
// Get the data source (filter by organization)
let data_source: DataSource = data_sources::table
.filter(data_sources::id.eq(request.id))
.filter(data_sources::organization_id.eq(user_org.id))
.filter(data_sources::deleted_at.is_null())
.first(&mut conn)
.await

View File

@ -2,14 +2,13 @@ use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use middleware::types::AuthenticatedUser;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use database::{
enums::DataSourceType,
models::UserToOrganization,
enums::{DataSourceType, UserOrganizationRole},
pool::get_pg_pool,
schema::{data_sources, users_to_organizations},
schema::data_sources,
};
#[derive(Deserialize)]
@ -28,27 +27,35 @@ pub struct DataSourceListItem {
}
pub async fn list_data_sources_handler(
user_id: &Uuid,
user: &AuthenticatedUser,
page: Option<i64>,
page_size: Option<i64>,
) -> Result<Vec<DataSourceListItem>> {
// Verify user has an organization
if user.organizations.is_empty() {
return Err(anyhow!("User is not a member of any organization"));
}
// Get the first organization (users can only belong to one organization currently)
let user_org = &user.organizations[0];
// Verify user has appropriate permissions (at least viewer role)
if user_org.role != UserOrganizationRole::WorkspaceAdmin
&& user_org.role != UserOrganizationRole::DataAdmin
&& user_org.role != UserOrganizationRole::Querier
&& user_org.role != UserOrganizationRole::RestrictedQuerier
&& user_org.role != UserOrganizationRole::Viewer {
return Err(anyhow!("User does not have appropriate permissions to view data sources"));
}
let page = page.unwrap_or(0);
let page_size = page_size.unwrap_or(25);
let mut conn = get_pg_pool().get().await?;
// Get the user's organization
let user_organization = users_to_organizations::table
.filter(users_to_organizations::user_id.eq(user_id))
.filter(users_to_organizations::deleted_at.is_null())
.select(users_to_organizations::all_columns)
.first::<UserToOrganization>(&mut conn)
.await
.map_err(|e| anyhow!("Unable to get user organization: {}", e))?;
// Get data sources for the organization
// Get data sources for the organization (using organization ID from AuthenticatedUser)
let data_sources_list = data_sources::table
.filter(data_sources::organization_id.eq(user_organization.organization_id))
.filter(data_sources::organization_id.eq(user_org.id))
.filter(data_sources::deleted_at.is_null())
.order_by(data_sources::updated_at.desc())
.limit(page_size)

View File

@ -1,8 +1,22 @@
mod list_data_sources_handler;
mod create_data_source_handler;
mod delete_data_source_handler;
mod get_data_source_handler;
mod list_data_sources_handler;
mod update_data_source_handler;
// Explicitly re-export the specific items from each module
pub use list_data_sources_handler::{list_data_sources_handler, ListDataSourcesRequest, DataSourceListItem};
pub use get_data_source_handler::{get_data_source_handler, GetDataSourceRequest, DataSourceResponse, CreatedByResponse, DatasetResponse};
pub use update_data_source_handler::{update_data_source_handler, UpdateDataSourceRequest, DataSourceResponse as UpdateDataSourceResponse, CreatedBy, Credentials};
pub use create_data_source_handler::{
create_data_source_handler, CreateDataSourceRequest, CreateDataSourceResponse,
};
pub use delete_data_source_handler::delete_data_source_handler;
pub use get_data_source_handler::{
get_data_source_handler, CreatedByResponse, DataSourceResponse, DatasetResponse,
GetDataSourceRequest,
};
pub use list_data_sources_handler::{
list_data_sources_handler, DataSourceListItem, ListDataSourcesRequest,
};
pub use update_data_source_handler::{
update_data_source_handler, CreatedBy, DataSourceResponse as UpdateDataSourceResponse,
UpdateDataSourceRequest,
};

View File

@ -4,16 +4,18 @@ use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use diesel::{AsChangeset, ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use middleware::types::AuthenticatedUser;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use database::{
enums::DataSourceType,
models::{DataSource, User, UserToOrganization},
enums::{DataSourceType, UserOrganizationRole},
models::{DataSource, User},
pool::get_pg_pool,
schema::{data_sources, users, users_to_organizations},
schema::{data_sources, users},
vault::{read_secret, update_secret},
};
use query_engine::credentials::Credential;
/// Request for updating a data source
#[derive(Debug, Deserialize)]
@ -44,26 +46,6 @@ pub struct CreatedBy {
pub name: String,
}
/// Credentials information in the response
#[derive(Serialize)]
pub struct Credentials {
pub database: Option<String>,
pub host: String,
pub jump_host: Option<String>,
pub password: String,
pub port: u64,
pub schemas: Option<Vec<String>>,
pub ssh_private_key: Option<String>,
pub ssh_username: Option<String>,
pub username: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub project_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dataset_ids: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub credentials_json: Option<serde_json::Value>,
}
/// Response for a data source
#[derive(Serialize)]
pub struct DataSourceResponse {
@ -73,37 +55,45 @@ pub struct DataSourceResponse {
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub created_by: CreatedBy,
pub credentials: Credentials,
pub credentials: Credential,
pub data_sets: Vec<serde_json::Value>, // Empty for now, could be populated if needed
}
/// Handler for updating a data source
pub async fn update_data_source_handler(
user_id: &Uuid,
user: &AuthenticatedUser,
data_source_id: &Uuid,
request: UpdateDataSourceRequest,
) -> Result<DataSourceResponse> {
let mut conn = get_pg_pool().get().await?;
// Verify user has an organization
if user.organizations.is_empty() {
return Err(anyhow!("User is not a member of any organization"));
}
// Verify user has access to the data source
let user_org = users_to_organizations::table
.filter(users_to_organizations::user_id.eq(user_id))
.filter(users_to_organizations::deleted_at.is_null())
.select(users_to_organizations::all_columns)
.first::<UserToOrganization>(&mut conn)
.await
.map_err(|e| anyhow!("Unable to get user organization: {}", e))?;
// Get the first organization (users can only belong to one organization currently)
let user_org = &user.organizations[0];
// Verify user has appropriate permissions (admin role)
if user_org.role != UserOrganizationRole::WorkspaceAdmin
&& user_org.role != UserOrganizationRole::DataAdmin
{
return Err(anyhow!(
"User does not have appropriate permissions to update data sources"
));
}
let mut conn = get_pg_pool().get().await?;
// Get current data source
let mut data_source = data_sources::table
.filter(data_sources::id.eq(data_source_id))
.filter(data_sources::organization_id.eq(user_org.organization_id))
.filter(data_sources::organization_id.eq(user_org.id))
.filter(data_sources::deleted_at.is_null())
.first::<DataSource>(&mut conn)
.await
.map_err(|e| anyhow!("Data source not found: {}", e))?;
// Extract type from credentials if present
// Extract type from credentials if present (can't change type in update)
let type_field = request
.credential
.as_ref()
@ -118,7 +108,7 @@ pub async fn update_data_source_handler(
name: request.name.clone(),
env: request.env.clone(),
updated_at: Utc::now(),
updated_by: *user_id,
updated_by: user.id,
type_field: type_field.clone(),
};
@ -145,23 +135,284 @@ pub async fn update_data_source_handler(
}
// Update credentials if provided
if let Some(credentials) = &request.credential {
if let Some(new_credentials) = &request.credential {
// Read existing secret
let existing_secret = read_secret(data_source_id).await?;
let mut existing_credential: serde_json::Value = serde_json::from_str(&existing_secret)?;
// Merge credential fields
if let (Some(existing_obj), Some(new_obj)) =
(existing_credential.as_object_mut(), credentials.as_object())
{
for (key, value) in new_obj {
existing_obj.insert(key.clone(), value.clone());
// Get the current credential structure
let current_credential: Credential = serde_json::from_str(&existing_secret)
.map_err(|e| anyhow!("Failed to parse existing credentials: {}", e))?;
// Create updated credential based on the type
let updated_credential = match &current_credential {
Credential::Postgres(creds) => {
let mut updated = creds.clone();
// Update fields if present in request
if let Some(host) = new_credentials.get("host").and_then(|v| v.as_str()) {
updated.host = host.to_string();
}
if let Some(port) = new_credentials
.get("port")
.and_then(|v| v.as_u64())
.map(|v| v as u16)
{
updated.port = port;
}
if let Some(username) = new_credentials.get("username").and_then(|v| v.as_str()) {
updated.username = username.to_string();
}
if let Some(password) = new_credentials.get("password").and_then(|v| v.as_str()) {
updated.password = password.to_string();
}
if let Some(default_database) = new_credentials
.get("default_database")
.and_then(|v| v.as_str())
{
updated.default_database = default_database.to_string();
}
if let Some(default_schema) = new_credentials
.get("default_schema")
.and_then(|v| v.as_str())
{
updated.default_schema = default_schema.to_string();
}
if let Some(jump_host) = new_credentials.get("jump_host").and_then(|v| v.as_str()) {
updated.jump_host = Some(jump_host.to_string());
}
if let Some(ssh_username) =
new_credentials.get("ssh_username").and_then(|v| v.as_str())
{
updated.ssh_username = Some(ssh_username.to_string());
}
if let Some(ssh_private_key) = new_credentials
.get("ssh_private_key")
.and_then(|v| v.as_str())
{
updated.ssh_private_key = Some(ssh_private_key.to_string());
}
Credential::Postgres(updated)
}
}
Credential::MySql(creds) => {
let mut updated = creds.clone();
if let Some(host) = new_credentials.get("host").and_then(|v| v.as_str()) {
updated.host = host.to_string();
}
if let Some(port) = new_credentials
.get("port")
.and_then(|v| v.as_u64())
.map(|v| v as u16)
{
updated.port = port;
}
if let Some(username) = new_credentials.get("username").and_then(|v| v.as_str()) {
updated.username = username.to_string();
}
if let Some(password) = new_credentials.get("password").and_then(|v| v.as_str()) {
updated.password = password.to_string();
}
if let Some(default_database) = new_credentials
.get("default_database")
.and_then(|v| v.as_str())
{
updated.default_database = default_database.to_string();
}
if let Some(jump_host) = new_credentials.get("jump_host").and_then(|v| v.as_str()) {
updated.jump_host = Some(jump_host.to_string());
}
if let Some(ssh_username) =
new_credentials.get("ssh_username").and_then(|v| v.as_str())
{
updated.ssh_username = Some(ssh_username.to_string());
}
if let Some(ssh_private_key) = new_credentials
.get("ssh_private_key")
.and_then(|v| v.as_str())
{
updated.ssh_private_key = Some(ssh_private_key.to_string());
}
Credential::MySql(updated)
}
Credential::Bigquery(creds) => {
let mut updated = creds.clone();
if let Some(default_project_id) = new_credentials
.get("default_project_id")
.and_then(|v| v.as_str())
{
updated.default_project_id = default_project_id.to_string();
}
if let Some(default_dataset_id) = new_credentials
.get("default_dataset_id")
.and_then(|v| v.as_str())
{
updated.default_dataset_id = default_dataset_id.to_string();
}
if let Some(credentials_json) = new_credentials.get("credentials_json") {
updated.credentials_json = credentials_json.clone();
}
Credential::Bigquery(updated)
}
Credential::SqlServer(creds) => {
let mut updated = creds.clone();
if let Some(host) = new_credentials.get("host").and_then(|v| v.as_str()) {
updated.host = host.to_string();
}
if let Some(port) = new_credentials
.get("port")
.and_then(|v| v.as_u64())
.map(|v| v as u16)
{
updated.port = port;
}
if let Some(username) = new_credentials.get("username").and_then(|v| v.as_str()) {
updated.username = username.to_string();
}
if let Some(password) = new_credentials.get("password").and_then(|v| v.as_str()) {
updated.password = password.to_string();
}
if let Some(default_database) = new_credentials
.get("default_database")
.and_then(|v| v.as_str())
{
updated.default_database = default_database.to_string();
}
if let Some(default_schema) = new_credentials
.get("default_schema")
.and_then(|v| v.as_str())
{
updated.default_schema = default_schema.to_string();
}
if let Some(jump_host) = new_credentials.get("jump_host").and_then(|v| v.as_str()) {
updated.jump_host = Some(jump_host.to_string());
}
if let Some(ssh_username) =
new_credentials.get("ssh_username").and_then(|v| v.as_str())
{
updated.ssh_username = Some(ssh_username.to_string());
}
if let Some(ssh_private_key) = new_credentials
.get("ssh_private_key")
.and_then(|v| v.as_str())
{
updated.ssh_private_key = Some(ssh_private_key.to_string());
}
Credential::SqlServer(updated)
}
Credential::Redshift(creds) => {
let mut updated = creds.clone();
if let Some(host) = new_credentials.get("host").and_then(|v| v.as_str()) {
updated.host = host.to_string();
}
if let Some(port) = new_credentials
.get("port")
.and_then(|v| v.as_u64())
.map(|v| v as u16)
{
updated.port = port;
}
if let Some(username) = new_credentials.get("username").and_then(|v| v.as_str()) {
updated.username = username.to_string();
}
if let Some(password) = new_credentials.get("password").and_then(|v| v.as_str()) {
updated.password = password.to_string();
}
if let Some(default_database) = new_credentials
.get("default_database")
.and_then(|v| v.as_str())
{
updated.default_database = default_database.to_string();
}
if let Some(default_schema) = new_credentials
.get("default_schema")
.and_then(|v| v.as_str())
{
updated.default_schema = default_schema.to_string();
}
Credential::Redshift(updated)
}
Credential::Databricks(creds) => {
let mut updated = creds.clone();
if let Some(host) = new_credentials.get("host").and_then(|v| v.as_str()) {
updated.host = host.to_string();
}
if let Some(api_key) = new_credentials.get("api_key").and_then(|v| v.as_str()) {
updated.api_key = api_key.to_string();
}
if let Some(warehouse_id) =
new_credentials.get("warehouse_id").and_then(|v| v.as_str())
{
updated.warehouse_id = warehouse_id.to_string();
}
if let Some(default_catalog) = new_credentials
.get("default_catalog")
.and_then(|v| v.as_str())
{
updated.default_catalog = default_catalog.to_string();
}
if let Some(default_schema) = new_credentials
.get("default_schema")
.and_then(|v| v.as_str())
{
updated.default_schema = default_schema.to_string();
}
Credential::Databricks(updated)
}
Credential::Snowflake(creds) => {
let mut updated = creds.clone();
if let Some(account_id) = new_credentials.get("account_id").and_then(|v| v.as_str())
{
updated.account_id = account_id.to_string();
}
if let Some(warehouse_id) =
new_credentials.get("warehouse_id").and_then(|v| v.as_str())
{
updated.warehouse_id = warehouse_id.to_string();
}
if let Some(username) = new_credentials.get("username").and_then(|v| v.as_str()) {
updated.username = username.to_string();
}
if let Some(password) = new_credentials.get("password").and_then(|v| v.as_str()) {
updated.password = password.to_string();
}
if let Some(role) = new_credentials.get("role").and_then(|v| v.as_str()) {
updated.role = Some(role.to_string());
}
if let Some(default_database) = new_credentials
.get("default_database")
.and_then(|v| v.as_str())
{
updated.default_database = default_database.to_string();
}
if let Some(default_schema) = new_credentials
.get("default_schema")
.and_then(|v| v.as_str())
{
updated.default_schema = default_schema.to_string();
}
Credential::Snowflake(updated)
}
};
// Update the secret
let secret_json = serde_json::to_string(&existing_credential)?;
update_secret(data_source_id, &secret_json).await?;
let updated_secret_json = serde_json::to_string(&updated_credential)
.map_err(|e| anyhow!("Failed to serialize updated credentials: {}", e))?;
update_secret(data_source_id, &updated_secret_json)
.await
.map_err(|e| anyhow!("Error updating credentials in vault: {}", e))?;
}
// Get the creator's information
@ -173,17 +424,14 @@ pub async fn update_data_source_handler(
// Fetch the current credential data
let secret = read_secret(data_source_id).await?;
let credential_json: serde_json::Value = serde_json::from_str(&secret)?;
// Build credentials based on the data source type
let db_type = data_source.type_.to_string();
let credentials = parse_credentials(&db_type, &credential_json)?;
let credential: Credential =
serde_json::from_str(&secret).map_err(|e| anyhow!("Failed to parse credentials: {}", e))?;
// Build the response
Ok(DataSourceResponse {
id: data_source.id.to_string(),
name: data_source.name,
db_type: db_type.to_string(),
db_type: data_source.type_.to_string(),
created_at: data_source.created_at,
updated_at: data_source.updated_at,
created_by: CreatedBy {
@ -191,148 +439,7 @@ pub async fn update_data_source_handler(
email: creator.email,
name: creator.name.unwrap_or_else(|| "".to_string()),
},
credentials,
credentials: credential,
data_sets: Vec::new(),
})
}
/// Helper function to parse credentials based on data source type
fn parse_credentials(db_type: &str, credential_json: &serde_json::Value) -> Result<Credentials> {
// Determine port based on database type
let default_port = match db_type {
"postgres" | "supabase" => 5432,
"mysql" | "mariadb" => 3306,
"redshift" => 5439,
"sqlserver" => 1433,
"snowflake" | "bigquery" | "databricks" => 443,
_ => 5432, // default
};
// Extract common credentials with type-specific defaults
let host = match db_type {
"bigquery" => "bigquery.googleapis.com".to_string(),
"snowflake" => credential_json
.get("account_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
_ => credential_json
.get("host")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
};
let username = match db_type {
"bigquery" => "bigquery".to_string(),
"databricks" => "databricks".to_string(),
_ => credential_json
.get("username")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
};
let password = match db_type {
"bigquery" => "".to_string(),
"databricks" => credential_json
.get("api_key")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
_ => credential_json
.get("password")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
};
// Handle special database field names by type
let database = match db_type {
"mysql" | "mariadb" => None,
"snowflake" => credential_json
.get("database_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
_ => credential_json
.get("database")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
};
// Handle schemas/databases field based on type
let schemas = match db_type {
"mysql" | "mariadb" => credential_json.get("databases").and_then(|v| {
v.as_array().map(|arr| {
arr.iter()
.filter_map(|s| s.as_str().map(|s| s.to_string()))
.collect()
})
}),
_ => credential_json.get("schemas").and_then(|v| {
v.as_array().map(|arr| {
arr.iter()
.filter_map(|s| s.as_str().map(|s| s.to_string()))
.collect()
})
}),
};
// Get port from credentials or use default
let port = credential_json
.get("port")
.and_then(|v| v.as_u64())
.unwrap_or(default_port);
// Handle optional fields
let project_id = credential_json
.get("project_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
// Extract dataset IDs for BigQuery
let dataset_ids = if db_type == "bigquery" {
credential_json
.get("dataset_ids")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect::<Vec<String>>()
})
} else {
None
};
// Handle credentials_json for BigQuery
let credentials_json = if db_type == "bigquery" {
credential_json.get("credentials_json").cloned()
} else {
None
};
// Create Credentials struct
Ok(Credentials {
host,
port,
username,
password,
database,
schemas,
jump_host: credential_json
.get("jump_host")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
ssh_username: credential_json
.get("ssh_username")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
ssh_private_key: credential_json
.get("ssh_private_key")
.and_then(|v| v.as_str())
.map(|s| s.to_string()),
project_id,
dataset_ids,
credentials_json,
})
}

View File

@ -0,0 +1,675 @@
# Data Source Endpoints Implementation PRD
## 1. Overview and Business Context
### 1.1 Problem Statement
The application currently has incomplete REST API endpoints for managing data sources. While GET, LIST, and UPDATE operations are implemented, we need to create POST (create) and DELETE operations, and ensure all endpoints follow the established best practices. These endpoints are critical for users to manage their data connections within the application.
### 1.2 Background
Data sources are a foundational entity in our application, representing connections to various databases (PostgreSQL, MySQL, BigQuery, etc.). Each data source is associated with credentials that are securely stored in a vault. The current implementation doesn't fully support the complete lifecycle management of data sources.
### 1.3 Business Goals
- Enable users to create, update, retrieve, and delete data sources through consistent REST endpoints
- Ensure proper security and isolation between organizations
- Maintain credential security by properly managing vault secrets
- Follow established architectural patterns for consistency with other REST endpoints
## 2. Technical Requirements
### 2.1 Data Source Entity Structure
```rust
pub struct DataSource {
pub id: Uuid,
pub name: String,
pub type_: DataSourceType,
pub secret_id: Uuid,
pub onboarding_status: DataSourceOnboardingStatus,
pub onboarding_error: Option<String>,
pub organization_id: Uuid,
pub created_by: Uuid,
pub updated_by: Uuid,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub deleted_at: Option<DateTime<Utc>>,
pub env: String,
}
```
### 2.2 Credential Types
The application supports multiple types of credentials, all stored as an enum:
```rust
pub enum Credential {
Postgres(PostgresCredentials),
MySql(MySqlCredentials),
Bigquery(BigqueryCredentials),
SqlServer(SqlServerCredentials),
Redshift(RedshiftCredentials),
Databricks(DatabricksCredentials),
Snowflake(SnowflakeCredentials),
}
```
Each credential type has its own structure with specific fields appropriate for that database type.
### 2.3 Vault Integration
The data source credentials are stored in a secure vault with the following operations:
- `create_secret(data_source_id: &Uuid, secret_value: &String) -> Result<()>`
- `read_secret(secret_id: &Uuid) -> Result<String>`
- `update_secret(secret_id: &Uuid, secret_value: &String) -> Result<()>`
- `delete_secret(secret_id: &Uuid) -> Result<()>`
## 3. Endpoint Specifications
### 3.1 POST /data_sources
#### Purpose
Create a new data source with its associated credentials.
#### Request
```json
{
"name": "Production PostgreSQL",
"env": "production",
"type": "postgres",
"host": "db.example.com",
"port": 5432,
"username": "db_user",
"password": "db_password",
"default_database": "main_db",
"default_schema": "public",
"jump_host": "jump.example.com",
"ssh_username": "ssh_user",
"ssh_private_key": "-----BEGIN RSA PRIVATE KEY-----..."
}
```
> Note: Fields after "type" vary based on the credential type.
#### Response
Status: 201 Created
```json
{
"id": "123e4567-e89b-12d3-a456-426614174000",
"name": "Production PostgreSQL",
"db_type": "postgres",
"created_at": "2025-03-24T00:00:00Z",
"updated_at": "2025-03-24T00:00:00Z",
"created_by": {
"id": "123e4567-e89b-12d3-a456-426614174001",
"email": "user@example.com",
"name": "Test User"
},
"credentials": {
"host": "db.example.com",
"port": 5432,
"username": "db_user",
"password": "[REDACTED]",
"database": "main_db",
"schemas": ["public"],
"jump_host": "jump.example.com",
"ssh_username": "ssh_user",
"ssh_private_key": "[REDACTED]"
},
"data_sets": []
}
```
#### Implementation Details
1. Extract request parameters and validate them
2. Check if a data source with the same name, organization_id, and env already exists (enforce uniqueness)
3. Create the data source record in the database with status `NotStarted`
4. Store credentials in the vault using the data source's UUID as the secret ID (not a separate secret_id)
5. Return the newly created data source with credential information (with sensitive values redacted)
#### Error Handling
- 400 Bad Request: If required fields are missing or invalid
- 401 Unauthorized: If the user is not authenticated
- 403 Forbidden: If the user doesn't have permission to create data sources
- 409 Conflict: If a data source with the same name already exists in the organization
- 500 Internal Server Error: For any server-side errors
### 3.2 PUT /data_sources/:id
#### Purpose
Update an existing data source and/or its credentials.
#### Request
```json
{
"name": "Updated PostgreSQL",
"env": "staging",
"host": "new-db.example.com",
"port": 5433,
"password": "new_password"
}
```
> Note: All fields are optional. Only provided fields will be updated.
#### Response
Status: 200 OK
```json
{
"id": "123e4567-e89b-12d3-a456-426614174000",
"name": "Updated PostgreSQL",
"db_type": "postgres",
"created_at": "2025-03-24T00:00:00Z",
"updated_at": "2025-03-24T01:00:00Z",
"created_by": {
"id": "123e4567-e89b-12d3-a456-426614174001",
"email": "user@example.com",
"name": "Test User"
},
"credentials": {
"host": "new-db.example.com",
"port": 5433,
"username": "db_user",
"password": "[REDACTED]",
"database": "main_db",
"schemas": ["public"],
"jump_host": "jump.example.com",
"ssh_username": "ssh_user",
"ssh_private_key": "[REDACTED]"
},
"data_sets": []
}
```
#### Implementation Details
1. Extract parameters and data source ID
2. Verify the data source exists and belongs to the user's organization
3. Update database fields (name, env) if provided
4. If credential fields are provided, read the existing secret, merge it with the updates, and store it back
5. Update the updated_at and updated_by fields
6. Return the updated data source with credential information
#### Error Handling
- 400 Bad Request: If any fields are invalid
- 401 Unauthorized: If the user is not authenticated
- 403 Forbidden: If the user doesn't have permission to update this data source
- 404 Not Found: If the data source doesn't exist
- 500 Internal Server Error: For any server-side errors
### 3.3 DELETE /data_sources/:id
#### Purpose
Soft-delete a data source and delete its credentials from the vault.
#### Request
No request body, just the ID in the URL path.
#### Response
Status: 204 No Content
#### Implementation Details
1. Extract data source ID from the path
2. Verify the data source exists and belongs to the user's organization
3. Soft-delete the data source by setting the deleted_at timestamp
4. Delete the credentials from the vault using the data source ID
5. Return a 204 No Content response
#### Error Handling
- 401 Unauthorized: If the user is not authenticated
- 403 Forbidden: If the user doesn't have permission to delete this data source
- 404 Not Found: If the data source doesn't exist
- 500 Internal Server Error: For any server-side errors
### 3.4 GET /data_sources/:id
#### Purpose
Retrieve a specific data source by ID.
#### Request
No request body, just the ID in the URL path.
#### Response
Same as POST response (already implemented correctly).
### 3.5 GET /data_sources
#### Purpose
List all data sources for the user's organization.
#### Request
Query parameters:
- page: Optional page number (default: 0)
- page_size: Optional page size (default: 25)
#### Response
Status: 200 OK
```json
[
{
"id": "123e4567-e89b-12d3-a456-426614174000",
"name": "Production PostgreSQL",
"type": "postgres",
"updated_at": "2025-03-24T00:00:00Z"
},
{
"id": "123e4567-e89b-12d3-a456-426614174002",
"name": "Analytics BigQuery",
"type": "bigquery",
"updated_at": "2025-03-23T00:00:00Z"
}
]
```
## 4. Security Considerations
### 4.1 Authentication and Authorization
- All endpoints require a valid authentication token
- Users can only access data sources belonging to their organization
- For creating and updating data sources, the user must have an admin role in the organization
- Data source operations should be logged for audit purposes
### 4.2 Credential Security
- Sensitive credential fields (passwords, private keys) should be redacted in responses
- All credentials must be stored in the vault, not in the main database
- The vault uses encryption at rest for the credentials
- Credentials are never logged, even in error scenarios
### 4.3 Input Validation
- All input fields must be validated for correct types and reasonable limits
- SQL injection prevention through parameterized queries
- Credential validation based on data source type (ensure all required fields are present)
## 5. Implementation Plan
### 5.1 New Files to Create
#### 5.1.1 `/api/libs/handlers/src/data_sources/create_data_source_handler.rs`
```rust
use anyhow::{anyhow, Result};
use chrono::Utc;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use database::{
enums::{DataSourceOnboardingStatus, DataSourceType, UserOrganizationRole},
models::{DataSource, User, UserToOrganization},
pool::get_pg_pool,
schema::{data_sources, users, users_to_organizations},
vault::create_secret,
};
use query_engine::credentials::Credential;
#[derive(Deserialize)]
pub struct CreateDataSourceRequest {
pub name: String,
pub env: String,
#[serde(flatten)]
pub credential: Credential,
}
#[derive(Serialize)]
pub struct CreateDataSourceResponse {
pub id: String,
pub name: String,
pub db_type: String,
pub created_at: String,
pub updated_at: String,
pub created_by: CreatedByResponse,
pub credentials: Credential,
pub data_sets: Vec<DatasetResponse>,
}
#[derive(Serialize)]
pub struct CreatedByResponse {
pub id: String,
pub email: String,
pub name: String,
}
#[derive(Serialize)]
pub struct DatasetResponse {
pub id: String,
pub name: String,
}
pub async fn create_data_source_handler(
user_id: &Uuid,
request: CreateDataSourceRequest,
) -> Result<CreateDataSourceResponse> {
let mut conn = get_pg_pool().get().await?;
// Verify user has appropriate permissions (admin role)
let user_org = users_to_organizations::table
.filter(users_to_organizations::user_id.eq(user_id))
.filter(users_to_organizations::deleted_at.is_null())
.filter(
users_to_organizations::role
.eq(UserOrganizationRole::WorkspaceAdmin)
.or(users_to_organizations::role.eq(UserOrganizationRole::DataAdmin)),
)
.first::<UserToOrganization>(&mut conn)
.await
.map_err(|_| anyhow!("User does not have appropriate permissions to create data sources"))?;
// Check if data source with same name already exists in the organization
let existing_data_source = data_sources::table
.filter(data_sources::name.eq(&request.name))
.filter(data_sources::organization_id.eq(user_org.organization_id))
.filter(data_sources::env.eq(&request.env))
.filter(data_sources::deleted_at.is_null())
.first::<DataSource>(&mut conn)
.await
.ok();
if existing_data_source.is_some() {
return Err(anyhow!(
"A data source with this name already exists in this organization and environment"
));
}
// Create new data source
let data_source_id = Uuid::new_v4();
let now = Utc::now();
let data_source = DataSource {
id: data_source_id,
name: request.name.clone(),
type_: request.credential.get_type(),
secret_id: data_source_id, // Use same ID for data source and secret
onboarding_status: DataSourceOnboardingStatus::NotStarted,
onboarding_error: None,
organization_id: user_org.organization_id,
created_by: *user_id,
updated_by: *user_id,
created_at: now,
updated_at: now,
deleted_at: None,
env: request.env.clone(),
};
// Insert the data source
diesel::insert_into(data_sources::table)
.values(&data_source)
.execute(&mut conn)
.await
.map_err(|e| anyhow!("Error creating data source: {}", e))?;
// Store credentials in vault
let credential_json = serde_json::to_string(&request.credential)
.map_err(|e| anyhow!("Error serializing credentials: {}", e))?;
create_secret(&data_source_id, &credential_json)
.await
.map_err(|e| anyhow!("Error storing credentials in vault: {}", e))?;
// Get creator information
let creator = users::table
.filter(users::id.eq(*user_id))
.first::<User>(&mut conn)
.await
.map_err(|e| anyhow!("Error fetching user information: {}", e))?;
// Build response
let response = CreateDataSourceResponse {
id: data_source.id.to_string(),
name: data_source.name,
db_type: data_source.type_.to_string(),
created_at: data_source.created_at.to_rfc3339(),
updated_at: data_source.updated_at.to_rfc3339(),
created_by: CreatedByResponse {
id: creator.id.to_string(),
email: creator.email,
name: creator.name.unwrap_or_default(),
},
credentials: request.credential,
data_sets: Vec::new(), // Empty for new data sources
};
Ok(response)
}
```
#### 5.1.2 `/api/libs/handlers/src/data_sources/delete_data_source_handler.rs`
```rust
use anyhow::{anyhow, Result};
use chrono::Utc;
use diesel::{ExpressionMethods, QueryDsl};
use diesel_async::RunQueryDsl;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use database::{
models::{DataSource, UserToOrganization},
pool::get_pg_pool,
schema::{data_sources, users_to_organizations},
vault::delete_secret,
};
#[derive(Deserialize)]
pub struct DeleteDataSourceRequest {
pub id: Uuid,
}
#[derive(Serialize)]
pub struct DeleteDataSourceResponse {
pub success: bool,
}
pub async fn delete_data_source_handler(
user_id: &Uuid,
data_source_id: &Uuid,
) -> Result<DeleteDataSourceResponse> {
let mut conn = get_pg_pool().get().await?;
// Verify user has access to the organization
let user_org = users_to_organizations::table
.filter(users_to_organizations::user_id.eq(user_id))
.filter(users_to_organizations::deleted_at.is_null())
.first::<UserToOrganization>(&mut conn)
.await
.map_err(|_| anyhow!("User not found in any organization"))?;
// Get the data source to verify it exists and belongs to the user's organization
let data_source = data_sources::table
.filter(data_sources::id.eq(data_source_id))
.filter(data_sources::organization_id.eq(user_org.organization_id))
.filter(data_sources::deleted_at.is_null())
.first::<DataSource>(&mut conn)
.await
.map_err(|_| anyhow!("Data source not found or you don't have access to it"))?;
// Soft delete the data source
diesel::update(data_sources::table)
.filter(data_sources::id.eq(data_source_id))
.set(data_sources::deleted_at.eq(Some(Utc::now())))
.execute(&mut conn)
.await
.map_err(|e| anyhow!("Error deleting data source: {}", e))?;
// Delete credentials from vault
delete_secret(data_source_id)
.await
.map_err(|e| anyhow!("Error deleting credentials from vault: {}", e))?;
Ok(DeleteDataSourceResponse { success: true })
}
```
#### 5.1.3 `/api/src/routes/rest/routes/data_sources/create_data_source.rs`
```rust
use anyhow::Result;
use axum::{http::StatusCode, Extension, Json};
use middleware::AuthenticatedUser;
use crate::routes::rest::ApiResponse;
use handlers::data_sources::{create_data_source_handler, CreateDataSourceRequest, CreateDataSourceResponse};
pub async fn create_data_source(
Extension(user): Extension<AuthenticatedUser>,
Json(payload): Json<CreateDataSourceRequest>,
) -> Result<ApiResponse<CreateDataSourceResponse>, (StatusCode, &'static str)> {
match create_data_source_handler(&user.id, payload).await {
Ok(response) => Ok(ApiResponse::JsonData(response)),
Err(e) => {
tracing::error!("Error creating data source: {:?}", e);
let error_msg = e.to_string();
if error_msg.contains("already exists") {
return Err((StatusCode::CONFLICT, "Data source already exists"));
} else if error_msg.contains("permissions") {
return Err((StatusCode::FORBIDDEN, "Insufficient permissions"));
} else {
return Err((StatusCode::INTERNAL_SERVER_ERROR, "Failed to create data source"));
}
}
}
}
```
#### 5.1.4 `/api/src/routes/rest/routes/data_sources/delete_data_source.rs`
```rust
use axum::{extract::Path, http::StatusCode, Extension};
use middleware::AuthenticatedUser;
use uuid::Uuid;
use crate::routes::rest::ApiResponse;
use handlers::data_sources::delete_data_source_handler;
pub async fn delete_data_source(
Extension(user): Extension<AuthenticatedUser>,
Path(id): Path<String>,
) -> Result<ApiResponse<()>, (StatusCode, &'static str)> {
// Convert string id to UUID
let uuid = match Uuid::parse_str(&id) {
Ok(uuid) => uuid,
Err(_) => {
return Err((StatusCode::BAD_REQUEST, "Invalid UUID format"));
}
};
match delete_data_source_handler(&user.id, &uuid).await {
Ok(_) => Ok(ApiResponse::NoContent),
Err(e) => {
tracing::error!("Error deleting data source: {:?}", e);
let error_msg = e.to_string();
if error_msg.contains("not found") {
return Err((StatusCode::NOT_FOUND, "Data source not found"));
} else if error_msg.contains("access") {
return Err((StatusCode::FORBIDDEN, "Insufficient permissions"));
} else {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to delete data source",
));
}
}
}
}
```
### 5.2 Files to Modify
#### 5.2.1 `/api/libs/handlers/src/data_sources/mod.rs`
Add the new handlers to the exports:
```rust
mod list_data_sources_handler;
mod get_data_source_handler;
mod update_data_source_handler;
mod create_data_source_handler;
mod delete_data_source_handler;
// Explicitly re-export the specific items from each module
pub use list_data_sources_handler::{list_data_sources_handler, ListDataSourcesRequest, DataSourceListItem};
pub use get_data_source_handler::{get_data_source_handler, GetDataSourceRequest, DataSourceResponse, CreatedByResponse, DatasetResponse};
pub use update_data_source_handler::{update_data_source_handler, UpdateDataSourceRequest, DataSourceResponse as UpdateDataSourceResponse, CreatedBy, Credentials};
pub use create_data_source_handler::{create_data_source_handler, CreateDataSourceRequest, CreateDataSourceResponse};
pub use delete_data_source_handler::{delete_data_source_handler, DeleteDataSourceRequest, DeleteDataSourceResponse};
```
#### 5.2.2 `/api/src/routes/rest/routes/data_sources/mod.rs`
Register the new route handlers:
```rust
mod post_data_sources;
mod list_data_sources;
mod get_data_source;
mod update_data_source;
mod create_data_source;
mod delete_data_source;
use axum::{
routing::{get, post, put, delete},
Router,
};
pub fn router() -> Router {
Router::new()
.route("/", post(create_data_source::create_data_source))
.route("/", get(list_data_sources::list_data_sources))
.route("/:id", get(get_data_source::get_data_source))
.route("/:id", put(update_data_source::update_data_source))
.route("/:id", delete(delete_data_source::delete_data_source))
}
```
## 6. Testing Plan
### 6.1 Unit Tests
- Test each handler function with mocked dependencies
- Test validation logic
- Test error handling for various scenarios
### 6.2 Integration Tests
Create integration tests in the `/api/tests/integration/data_sources/` directory:
- `create_data_source_test.rs`: Test creating data sources
- `delete_data_source_test.rs`: Test deleting data sources
### 6.3 Test Cases
1. Create a data source successfully
2. Try to create a duplicate data source (should fail)
3. Create a data source with invalid credentials (should fail)
4. Update a data source successfully
5. Update a data source with invalid data (should fail)
6. Delete a data source successfully
7. Try to delete a non-existent data source (should fail)
8. Try to access a deleted data source (should fail)
## 7. Performance and Scalability
### 7.1 Database Considerations
- Ensure proper indexing on data_sources table (name, organization_id, env)
- All operations should be optimized for minimal database round trips
### 7.2 Vault Considerations
- Vault operations should be isolated from main transaction to prevent locks
- Error handling for vault operations should be robust
## 8. Documentation
### 8.1 API Documentation
Update the API documentation to include the new endpoints:
- POST /data_sources
- DELETE /data_sources/:id
### 8.2 Code Documentation
Ensure all new code is well-documented with comments explaining:
- Purpose of each function
- Expected inputs and outputs
- Error handling strategy
- Important business logic
## 9. Success Criteria
- All endpoints implement the correct behavior
- Tests pass for all scenarios
- Error handling is robust and user-friendly
- Code follows established patterns in the codebase
- Documentation is complete and accurate
## 10. Future Considerations
### 10.1 Immediate Follow-up Tasks
- Add test connection functionality to validate credentials
- Implement proper data source onboarding status updates
- Support bulk operations in REST endpoints
### 10.2 Long-term Improvements
- Add support for additional data source types
- Implement data source templates
- Add support for connection pooling configuration
## 11. Ownership and Timeline
- Implementation Owner: Assigned developer
- Reviewer: Senior engineer
- Implementation Timeline: 1 week
- Testing Timeline: 2-3 days
- Deployment Timeline: 1 day

View File

@ -0,0 +1,27 @@
use anyhow::Result;
use axum::{http::StatusCode, Extension, Json};
use middleware::AuthenticatedUser;
use crate::routes::rest::ApiResponse;
use handlers::data_sources::{create_data_source_handler, CreateDataSourceRequest, CreateDataSourceResponse};
pub async fn create_data_source(
Extension(user): Extension<AuthenticatedUser>,
Json(payload): Json<CreateDataSourceRequest>,
) -> Result<ApiResponse<CreateDataSourceResponse>, (StatusCode, &'static str)> {
match create_data_source_handler(&user, payload).await {
Ok(response) => Ok(ApiResponse::JsonData(response)),
Err(e) => {
tracing::error!("Error creating data source: {:?}", e);
let error_msg = e.to_string();
if error_msg.contains("already exists") {
return Err((StatusCode::CONFLICT, "Data source already exists"));
} else if error_msg.contains("permissions") {
return Err((StatusCode::FORBIDDEN, "Insufficient permissions"));
} else {
return Err((StatusCode::INTERNAL_SERVER_ERROR, "Failed to create data source"));
}
}
}
}

View File

@ -0,0 +1,38 @@
use axum::{extract::Path, http::StatusCode, Extension};
use middleware::AuthenticatedUser;
use uuid::Uuid;
use crate::routes::rest::ApiResponse;
use handlers::data_sources::delete_data_source_handler;
pub async fn delete_data_source(
Extension(user): Extension<AuthenticatedUser>,
Path(id): Path<String>,
) -> Result<ApiResponse<()>, (StatusCode, &'static str)> {
// Convert string id to UUID
let uuid = match Uuid::parse_str(&id) {
Ok(uuid) => uuid,
Err(_) => {
return Err((StatusCode::BAD_REQUEST, "Invalid UUID format"));
}
};
match delete_data_source_handler(&user, &uuid).await {
Ok(_) => Ok(ApiResponse::NoContent),
Err(e) => {
tracing::error!("Error deleting data source: {:?}", e);
let error_msg = e.to_string();
if error_msg.contains("not found") {
return Err((StatusCode::NOT_FOUND, "Data source not found"));
} else if error_msg.contains("permissions") {
return Err((StatusCode::FORBIDDEN, "Insufficient permissions"));
} else {
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
"Failed to delete data source",
));
}
}
}
}

View File

@ -1,6 +1,6 @@
use anyhow::Result;
use axum::{extract::Path, Extension};
use middleware::AuthenticatedUser;
use middleware::types::AuthenticatedUser;
use uuid::Uuid;
use handlers::data_sources::{get_data_source_handler, GetDataSourceRequest, DataSourceResponse};
@ -24,12 +24,16 @@ pub async fn get_data_source(
let request = GetDataSourceRequest { id: uuid };
match get_data_source_handler(request, &user.id).await {
match get_data_source_handler(request, &user).await {
Ok(data_source) => Ok(ApiResponse::JsonData(data_source)),
Err(e) => {
tracing::error!("Error getting data source: {:?}", e);
if e.to_string().contains("not found") {
Err((axum::http::StatusCode::NOT_FOUND, "Data source not found"))
} else if e.to_string().contains("permissions") {
Err((axum::http::StatusCode::FORBIDDEN, "Not authorized to access this data source"))
} else if e.to_string().contains("not a member of any organization") {
Err((axum::http::StatusCode::BAD_REQUEST, "User is not a member of any organization"))
} else {
Err((
axum::http::StatusCode::INTERNAL_SERVER_ERROR,

View File

@ -1,6 +1,6 @@
use anyhow::Result;
use axum::{extract::Query, Extension};
use middleware::AuthenticatedUser;
use middleware::types::AuthenticatedUser;
use serde::Deserialize;
use handlers::data_sources::{list_data_sources_handler, DataSourceListItem};
@ -17,14 +17,20 @@ pub async fn list_data_sources(
Extension(user): Extension<AuthenticatedUser>,
Query(query): Query<ListDataSourcesQuery>,
) -> Result<ApiResponse<Vec<DataSourceListItem>>, (axum::http::StatusCode, &'static str)> {
match list_data_sources_handler(&user.id, query.page, query.page_size).await {
match list_data_sources_handler(&user, query.page, query.page_size).await {
Ok(data_sources) => Ok(ApiResponse::JsonData(data_sources)),
Err(e) => {
tracing::error!("Error listing data sources: {:?}", e);
Err((
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
"Failed to list data sources",
))
if e.to_string().contains("permissions") {
Err((axum::http::StatusCode::FORBIDDEN, "Not authorized to access data sources"))
} else if e.to_string().contains("not a member of any organization") {
Err((axum::http::StatusCode::BAD_REQUEST, "User is not a member of any organization"))
} else {
Err((
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
"Failed to list data sources",
))
}
}
}
}

View File

@ -1,17 +1,20 @@
mod post_data_sources;
mod post_data_sources; // Keeping for reference
mod list_data_sources;
mod get_data_source;
mod update_data_source;
mod create_data_source;
mod delete_data_source;
use axum::{
routing::{get, post, put},
routing::{get, post, put, delete},
Router,
};
pub fn router() -> Router {
Router::new()
.route("/", post(post_data_sources::post_data_sources))
.route("/", post(create_data_source::create_data_source))
.route("/", get(list_data_sources::list_data_sources))
.route("/:id", get(get_data_source::get_data_source))
.route("/:id", put(update_data_source::update_data_source))
}
.route("/:id", delete(delete_data_source::delete_data_source))
}

View File

@ -1,3 +1,4 @@
#[allow(unused)]
use std::collections::HashMap;
use anyhow::{anyhow, Result};

View File

@ -11,7 +11,7 @@ pub async fn update_data_source(
Path(id): Path<Uuid>,
Json(payload): Json<UpdateDataSourceRequest>,
) -> Result<ApiResponse<()>, (StatusCode, &'static str)> {
match update_data_source_handler(&user.id, &id, payload).await {
match update_data_source_handler(&user, &id, payload).await {
Ok(_) => Ok(ApiResponse::NoContent),
Err(e) => {
tracing::error!("Error updating data source: {:?}", e);

View File

@ -0,0 +1,126 @@
use axum::http::StatusCode;
use diesel_async::RunQueryDsl;
use middleware::types::AuthenticatedUser;
use serde_json::json;
use uuid::Uuid;
use database::enums::UserOrganizationRole;
use crate::common::{
assertions::response::ResponseAssertions,
fixtures::builder::UserBuilder,
http::test_app::TestApp,
};
#[tokio::test]
async fn test_create_data_source() {
let app = TestApp::new().await.unwrap();
// Create a test user with organization and proper role
let user = UserBuilder::new()
.with_organization("Test Org")
.with_org_role(UserOrganizationRole::WorkspaceAdmin) // Ensure user has admin role
.build(&app.db.pool)
.await;
// Prepare create request
let create_req = json!({
"name": "New Data Source",
"env": "dev",
"type": "postgres",
"host": "localhost",
"port": 5432,
"username": "postgres",
"password": "password",
"default_database": "test",
"default_schema": "public"
});
// Send create request
let response = app
.client
.post("/api/data_sources")
.header("Authorization", format!("Bearer {}", user.api_key))
.json(&create_req)
.send()
.await
.unwrap();
// Assert response
assert_eq!(response.status(), StatusCode::OK);
let body = response.json::<serde_json::Value>().await.unwrap();
assert!(body.get("id").is_some(), "Response should contain an ID");
body.assert_has_key_with_value("name", "New Data Source");
body.assert_has_key_with_value("db_type", "postgres");
let credentials = &body["credentials"];
assert!(credentials.is_object());
// Test creating data source with same name (should fail)
let duplicate_req = json!({
"name": "New Data Source",
"env": "dev",
"type": "postgres",
"host": "localhost",
"port": 5432,
"username": "postgres",
"password": "password",
"default_database": "test",
"default_schema": "public"
});
let response = app
.client
.post("/api/data_sources")
.header("Authorization", format!("Bearer {}", user.api_key))
.json(&duplicate_req)
.send()
.await
.unwrap();
assert_eq!(response.status(), StatusCode::CONFLICT);
// Test creating data source with different environment
let diff_env_req = json!({
"name": "New Data Source",
"env": "prod",
"type": "postgres",
"host": "localhost",
"port": 5432,
"username": "postgres",
"password": "password",
"default_database": "test",
"default_schema": "public"
});
let response = app
.client
.post("/api/data_sources")
.header("Authorization", format!("Bearer {}", user.api_key))
.json(&diff_env_req)
.send()
.await
.unwrap();
// Should succeed since it's a different environment
assert_eq!(response.status(), StatusCode::OK);
// Test creating data source with insufficient permissions
let regular_user = UserBuilder::new()
.with_organization("Test Org")
.with_org_role(UserOrganizationRole::User) // Regular user role
.build(&app.db.pool)
.await;
let response = app
.client
.post("/api/data_sources")
.header("Authorization", format!("Bearer {}", regular_user.api_key))
.json(&create_req)
.send()
.await
.unwrap();
// Should fail due to insufficient permissions
assert_eq!(response.status(), StatusCode::FORBIDDEN);
}

View File

@ -0,0 +1,215 @@
use axum::http::StatusCode;
use diesel::sql_types;
use diesel_async::RunQueryDsl;
use middleware::types::AuthenticatedUser;
use serde_json::json;
use uuid::Uuid;
use database::enums::UserOrganizationRole;
use crate::common::{
fixtures::builder::UserBuilder,
http::test_app::TestApp,
};
// Mock DataSourceBuilder since we don't know the exact implementation
struct DataSourceBuilder {
name: String,
env: String,
organization_id: Uuid,
created_by: Uuid,
db_type: String,
credentials: serde_json::Value,
id: Uuid,
}
impl DataSourceBuilder {
fn new() -> Self {
DataSourceBuilder {
name: "Test Data Source".to_string(),
env: "dev".to_string(),
organization_id: Uuid::new_v4(),
created_by: Uuid::new_v4(),
db_type: "postgres".to_string(),
credentials: json!({}),
id: Uuid::new_v4(),
}
}
fn with_name(mut self, name: &str) -> Self {
self.name = name.to_string();
self
}
fn with_env(mut self, env: &str) -> Self {
self.env = env.to_string();
self
}
fn with_organization_id(mut self, organization_id: Uuid) -> Self {
self.organization_id = organization_id;
self
}
fn with_created_by(mut self, created_by: Uuid) -> Self {
self.created_by = created_by;
self
}
fn with_type(mut self, db_type: &str) -> Self {
self.db_type = db_type.to_string();
self
}
fn with_credentials(mut self, credentials: serde_json::Value) -> Self {
self.credentials = credentials;
self
}
async fn build(self, pool: &diesel_async::pooled_connection::bb8::Pool<diesel_async::AsyncPgConnection>) -> DataSourceResponse {
// Create data source directly in database using SQL
let mut conn = pool.get().await.unwrap();
// Insert the data source
diesel::sql_query("INSERT INTO data_sources (id, name, type, secret_id, organization_id, created_by, updated_by, created_at, updated_at, onboarding_status, env) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), NOW(), 'notStarted', $8)")
.bind::<diesel::sql_types::Uuid, _>(&self.id)
.bind::<diesel::sql_types::Text, _>(&self.name)
.bind::<diesel::sql_types::Text, _>(&self.db_type)
.bind::<diesel::sql_types::Uuid, _>(&self.id) // Using the same UUID for both id and secret_id for simplicity
.bind::<diesel::sql_types::Uuid, _>(&self.organization_id)
.bind::<diesel::sql_types::Uuid, _>(&self.created_by)
.bind::<diesel::sql_types::Uuid, _>(&self.created_by)
.bind::<diesel::sql_types::Text, _>(&self.env)
.execute(&mut conn)
.await
.unwrap();
// Insert the secret
diesel::sql_query("INSERT INTO vault.secrets (id, secret) VALUES ($1, $2)")
.bind::<diesel::sql_types::Uuid, _>(&self.id)
.bind::<diesel::sql_types::Text, _>(&self.credentials.to_string())
.execute(&mut conn)
.await
.unwrap();
// Construct response
DataSourceResponse {
id: self.id.to_string(),
}
}
}
struct DataSourceResponse {
id: String,
}
#[tokio::test]
async fn test_delete_data_source() {
let app = TestApp::new().await.unwrap();
// Create a test user with organization and proper role
let user = UserBuilder::new()
.with_organization("Test Org")
.with_org_role(UserOrganizationRole::WorkspaceAdmin) // Ensure user has admin role
.build(&app.db.pool)
.await;
// Create a test data source
let data_source = DataSourceBuilder::new()
.with_name("Data Source To Delete")
.with_env("dev")
.with_organization_id(user.organization_id)
.with_created_by(user.id)
.with_type("postgres")
.with_credentials(json!({
"type": "postgres",
"host": "localhost",
"port": 5432,
"username": "postgres",
"password": "password",
"database": "test",
"schemas": ["public"]
}))
.build(&app.db.pool)
.await;
// Send delete request
let response = app
.client
.delete(format!("/api/data_sources/{}", data_source.id))
.header("Authorization", format!("Bearer {}", user.api_key))
.send()
.await
.unwrap();
// Assert response
assert_eq!(response.status(), StatusCode::NO_CONTENT);
// Try to get the deleted data source (should fail)
let response = app
.client
.get(format!("/api/data_sources/{}", data_source.id))
.header("Authorization", format!("Bearer {}", user.api_key))
.send()
.await
.unwrap();
assert_eq!(response.status(), StatusCode::NOT_FOUND);
// Try to delete a non-existent data source
let invalid_id = Uuid::new_v4();
let response = app
.client
.delete(format!("/api/data_sources/{}", invalid_id))
.header("Authorization", format!("Bearer {}", user.api_key))
.send()
.await
.unwrap();
assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR);
// Test deleting with insufficient permissions
let regular_user = UserBuilder::new()
.with_organization("Test Org")
.with_org_role(UserOrganizationRole::User) // Regular user role
.build(&app.db.pool)
.await;
// Create another data source
let another_data_source = DataSourceBuilder::new()
.with_name("Another Data Source")
.with_env("dev")
.with_organization_id(user.organization_id)
.with_created_by(user.id)
.with_type("postgres")
.with_credentials(json!({
"type": "postgres",
"host": "localhost",
"port": 5432,
"username": "postgres",
"password": "password"
}))
.build(&app.db.pool)
.await;
let response = app
.client
.delete(format!("/api/data_sources/{}", another_data_source.id))
.header("Authorization", format!("Bearer {}", regular_user.api_key))
.send()
.await
.unwrap();
// Should fail due to insufficient permissions
assert_eq!(response.status(), StatusCode::FORBIDDEN);
// Check that the secret is actually deleted from the vault
let mut conn = app.db.pool.get().await.unwrap();
let secret_exists: Option<(i64,)> = diesel::sql_query("SELECT 1 FROM vault.secrets WHERE id = $1")
.bind::<diesel::sql_types::Uuid, _>(&Uuid::parse_str(&data_source.id).unwrap())
.load(&mut conn)
.await
.unwrap()
.pop();
assert!(secret_exists.is_none(), "Secret should be deleted from the vault");
}

View File

@ -1,7 +1,234 @@
use axum::http::StatusCode;
use diesel::sql_types;
use diesel_async::RunQueryDsl;
use serde_json::json;
use uuid::Uuid;
use database::enums::UserOrganizationRole;
use crate::common::{
assertions::response::ResponseAssertions,
fixtures::builder::UserBuilder,
http::test_app::TestApp,
};
// DataSourceBuilder for setting up test data
struct DataSourceBuilder {
name: String,
env: String,
organization_id: Uuid,
created_by: Uuid,
db_type: String,
credentials: serde_json::Value,
id: Uuid,
}
impl DataSourceBuilder {
fn new() -> Self {
DataSourceBuilder {
name: "Test Data Source".to_string(),
env: "dev".to_string(),
organization_id: Uuid::new_v4(),
created_by: Uuid::new_v4(),
db_type: "postgres".to_string(),
credentials: json!({
"type": "postgres",
"host": "localhost",
"port": 5432,
"username": "postgres",
"password": "password",
"default_database": "test_db",
"default_schema": "public"
}),
id: Uuid::new_v4(),
}
}
fn with_name(mut self, name: &str) -> Self {
self.name = name.to_string();
self
}
fn with_env(mut self, env: &str) -> Self {
self.env = env.to_string();
self
}
fn with_organization_id(mut self, organization_id: Uuid) -> Self {
self.organization_id = organization_id;
self
}
fn with_created_by(mut self, created_by: Uuid) -> Self {
self.created_by = created_by;
self
}
fn with_type(mut self, db_type: &str) -> Self {
self.db_type = db_type.to_string();
self
}
fn with_credentials(mut self, credentials: serde_json::Value) -> Self {
self.credentials = credentials;
self
}
async fn build(self, pool: &diesel_async::pooled_connection::bb8::Pool<diesel_async::AsyncPgConnection>) -> DataSourceResponse {
// Create data source directly in database using SQL
let mut conn = pool.get().await.unwrap();
// Insert the data source
diesel::sql_query("INSERT INTO data_sources (id, name, type, secret_id, organization_id, created_by, updated_by, created_at, updated_at, onboarding_status, env) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), NOW(), 'notStarted', $8)")
.bind::<diesel::sql_types::Uuid, _>(&self.id)
.bind::<diesel::sql_types::Text, _>(&self.name)
.bind::<diesel::sql_types::Text, _>(&self.db_type)
.bind::<diesel::sql_types::Uuid, _>(&self.id) // Using the same UUID for both id and secret_id for simplicity
.bind::<diesel::sql_types::Uuid, _>(&self.organization_id)
.bind::<diesel::sql_types::Uuid, _>(&self.created_by)
.bind::<diesel::sql_types::Uuid, _>(&self.created_by)
.bind::<diesel::sql_types::Text, _>(&self.env)
.execute(&mut conn)
.await
.unwrap();
// Insert the secret
diesel::sql_query("INSERT INTO vault.secrets (id, secret) VALUES ($1, $2)")
.bind::<diesel::sql_types::Uuid, _>(&self.id)
.bind::<diesel::sql_types::Text, _>(&self.credentials.to_string())
.execute(&mut conn)
.await
.unwrap();
// Construct response
DataSourceResponse {
id: self.id.to_string(),
}
}
}
struct DataSourceResponse {
id: String,
}
#[tokio::test]
async fn test_get_data_source_placeholder() {
// Since setting up the test environment is challenging
// We're leaving a placeholder test that always passes
// The actual code has been tested manually and works correctly
assert!(true);
async fn test_get_data_source() {
let app = TestApp::new().await.unwrap();
// Create a test user with organization and proper role
let admin_user = UserBuilder::new()
.with_organization("Test Org")
.with_org_role(UserOrganizationRole::WorkspaceAdmin)
.build(&app.db.pool)
.await;
// Create a test data source
let postgres_credentials = json!({
"type": "postgres",
"host": "localhost",
"port": 5432,
"username": "postgres",
"password": "secure_password",
"default_database": "test_db",
"default_schema": "public"
});
let data_source = DataSourceBuilder::new()
.with_name("Test Postgres DB")
.with_env("dev")
.with_organization_id(admin_user.organization_id)
.with_created_by(admin_user.id)
.with_type("postgres")
.with_credentials(postgres_credentials)
.build(&app.db.pool)
.await;
// Test successful get by admin
let response = app
.client
.get(format!("/api/data_sources/{}", data_source.id))
.header("Authorization", format!("Bearer {}", admin_user.api_key))
.send()
.await
.unwrap();
response.assert_status(StatusCode::OK);
let body = response.json::<serde_json::Value>().await.unwrap();
assert_eq!(body["id"], data_source.id);
assert_eq!(body["name"], "Test Postgres DB");
assert_eq!(body["db_type"], "postgres");
// Verify credentials in response
let credentials = &body["credentials"];
assert_eq!(credentials["type"], "postgres");
assert_eq!(credentials["host"], "localhost");
assert_eq!(credentials["port"], 5432);
assert_eq!(credentials["username"], "postgres");
assert_eq!(credentials["password"], "secure_password"); // Credentials are returned in API
// Create a data viewer user for testing
let viewer_user = UserBuilder::new()
.with_organization("Test Org")
.with_org_role(UserOrganizationRole::DataViewer)
.build(&app.db.pool)
.await;
// Test successful get by viewer
let response = app
.client
.get(format!("/api/data_sources/{}", data_source.id))
.header("Authorization", format!("Bearer {}", viewer_user.api_key))
.send()
.await
.unwrap();
response.assert_status(StatusCode::OK);
// Create a regular user for testing permissions
let regular_user = UserBuilder::new()
.with_organization("Test Org")
.with_org_role(UserOrganizationRole::User) // Regular user with no data access
.build(&app.db.pool)
.await;
// Test failed get by regular user (insufficient permissions)
let response = app
.client
.get(format!("/api/data_sources/{}", data_source.id))
.header("Authorization", format!("Bearer {}", regular_user.api_key))
.send()
.await
.unwrap();
response.assert_status(StatusCode::FORBIDDEN);
// Test with non-existent data source
let non_existent_id = Uuid::new_v4();
let response = app
.client
.get(format!("/api/data_sources/{}", non_existent_id))
.header("Authorization", format!("Bearer {}", admin_user.api_key))
.send()
.await
.unwrap();
response.assert_status(StatusCode::NOT_FOUND);
// Create an organization for cross-org test
let another_org_user = UserBuilder::new()
.with_organization("Another Org")
.with_org_role(UserOrganizationRole::WorkspaceAdmin)
.build(&app.db.pool)
.await;
// Test cross-organization access (should fail)
let response = app
.client
.get(format!("/api/data_sources/{}", data_source.id))
.header("Authorization", format!("Bearer {}", another_org_user.api_key))
.send()
.await
.unwrap();
response.assert_status(StatusCode::NOT_FOUND);
}

View File

@ -1,9 +1,260 @@
// Write a simple test that validates the list_data_sources_handler works
use axum::http::StatusCode;
use diesel::sql_types;
use diesel_async::RunQueryDsl;
use serde_json::json;
use uuid::Uuid;
use database::enums::UserOrganizationRole;
use crate::common::{
assertions::response::ResponseAssertions,
fixtures::builder::UserBuilder,
http::test_app::TestApp,
};
// DataSourceBuilder for setting up test data
struct DataSourceBuilder {
name: String,
env: String,
organization_id: Uuid,
created_by: Uuid,
db_type: String,
credentials: serde_json::Value,
id: Uuid,
}
impl DataSourceBuilder {
fn new() -> Self {
DataSourceBuilder {
name: "Test Data Source".to_string(),
env: "dev".to_string(),
organization_id: Uuid::new_v4(),
created_by: Uuid::new_v4(),
db_type: "postgres".to_string(),
credentials: json!({
"type": "postgres",
"host": "localhost",
"port": 5432,
"username": "postgres",
"password": "password",
"default_database": "test_db",
"default_schema": "public"
}),
id: Uuid::new_v4(),
}
}
fn with_name(mut self, name: &str) -> Self {
self.name = name.to_string();
self
}
fn with_env(mut self, env: &str) -> Self {
self.env = env.to_string();
self
}
fn with_organization_id(mut self, organization_id: Uuid) -> Self {
self.organization_id = organization_id;
self
}
fn with_created_by(mut self, created_by: Uuid) -> Self {
self.created_by = created_by;
self
}
fn with_type(mut self, db_type: &str) -> Self {
self.db_type = db_type.to_string();
self
}
fn with_credentials(mut self, credentials: serde_json::Value) -> Self {
self.credentials = credentials;
self
}
async fn build(self, pool: &diesel_async::pooled_connection::bb8::Pool<diesel_async::AsyncPgConnection>) -> DataSourceResponse {
// Create data source directly in database using SQL
let mut conn = pool.get().await.unwrap();
// Insert the data source
diesel::sql_query("INSERT INTO data_sources (id, name, type, secret_id, organization_id, created_by, updated_by, created_at, updated_at, onboarding_status, env) VALUES ($1, $2, $3, $4, $5, $6, $7, NOW(), NOW(), 'notStarted', $8)")
.bind::<diesel::sql_types::Uuid, _>(&self.id)
.bind::<diesel::sql_types::Text, _>(&self.name)
.bind::<diesel::sql_types::Text, _>(&self.db_type)
.bind::<diesel::sql_types::Uuid, _>(&self.id) // Using the same UUID for both id and secret_id for simplicity
.bind::<diesel::sql_types::Uuid, _>(&self.organization_id)
.bind::<diesel::sql_types::Uuid, _>(&self.created_by)
.bind::<diesel::sql_types::Uuid, _>(&self.created_by)
.bind::<diesel::sql_types::Text, _>(&self.env)
.execute(&mut conn)
.await
.unwrap();
// Insert the secret
diesel::sql_query("INSERT INTO vault.secrets (id, secret) VALUES ($1, $2)")
.bind::<diesel::sql_types::Uuid, _>(&self.id)
.bind::<diesel::sql_types::Text, _>(&self.credentials.to_string())
.execute(&mut conn)
.await
.unwrap();
// Construct response
DataSourceResponse {
id: self.id.to_string(),
}
}
}
struct DataSourceResponse {
id: String,
}
#[tokio::test]
async fn test_list_data_sources_placeholder() {
// Since setting up the test environment is challenging
// We're leaving a placeholder test that always passes
// The actual code has been tested manually and works correctly
assert!(true);
async fn test_list_data_sources() {
let app = TestApp::new().await.unwrap();
// Create a test user with organization and proper role
let admin_user = UserBuilder::new()
.with_organization("Test Org")
.with_org_role(UserOrganizationRole::WorkspaceAdmin)
.build(&app.db.pool)
.await;
// Create multiple test data sources for this organization
let postgres_credentials = json!({
"type": "postgres",
"host": "localhost",
"port": 5432,
"username": "postgres",
"password": "password",
"default_database": "test_db",
"default_schema": "public"
});
let mysql_credentials = json!({
"type": "mysql",
"host": "mysql-server",
"port": 3306,
"username": "mysql_user",
"password": "mysql_pass",
"default_database": "mysql_db"
});
// Create first data source
let data_source1 = DataSourceBuilder::new()
.with_name("Postgres DB 1")
.with_env("dev")
.with_organization_id(admin_user.organization_id)
.with_created_by(admin_user.id)
.with_type("postgres")
.with_credentials(postgres_credentials.clone())
.build(&app.db.pool)
.await;
// Create second data source
let data_source2 = DataSourceBuilder::new()
.with_name("MySQL DB")
.with_env("dev")
.with_organization_id(admin_user.organization_id)
.with_created_by(admin_user.id)
.with_type("mysql")
.with_credentials(mysql_credentials)
.build(&app.db.pool)
.await;
// Create a data source for another organization
let another_org_user = UserBuilder::new()
.with_organization("Another Org")
.with_org_role(UserOrganizationRole::WorkspaceAdmin)
.build(&app.db.pool)
.await;
let data_source_other_org = DataSourceBuilder::new()
.with_name("Other Org DB")
.with_env("dev")
.with_organization_id(another_org_user.organization_id)
.with_created_by(another_org_user.id)
.with_type("postgres")
.with_credentials(postgres_credentials)
.build(&app.db.pool)
.await;
// Test listing data sources - admin user should see both their organization's data sources
let response = app
.client
.get("/api/data_sources")
.header("Authorization", format!("Bearer {}", admin_user.api_key))
.send()
.await
.unwrap();
response.assert_status(StatusCode::OK);
let body = response.json::<serde_json::Value>().await.unwrap();
let data_sources = body.as_array().unwrap();
// Should have exactly 2, not seeing the other organization's data source
assert_eq!(data_sources.len(), 2);
// Verify the data sources belong to our organization
let ids: Vec<&str> = data_sources.iter()
.map(|ds| ds["id"].as_str().unwrap())
.collect();
assert!(ids.contains(&data_source1.id.as_str()));
assert!(ids.contains(&data_source2.id.as_str()));
assert!(!ids.contains(&data_source_other_org.id.as_str()));
// Create a data viewer role user
let viewer_user = UserBuilder::new()
.with_organization("Test Org")
.with_org_role(UserOrganizationRole::DataViewer)
.build(&app.db.pool)
.await;
// Test listing data sources with viewer role - should succeed
let response = app
.client
.get("/api/data_sources")
.header("Authorization", format!("Bearer {}", viewer_user.api_key))
.send()
.await
.unwrap();
response.assert_status(StatusCode::OK);
// Create a regular user (no data access)
let regular_user = UserBuilder::new()
.with_organization("Test Org")
.with_org_role(UserOrganizationRole::User)
.build(&app.db.pool)
.await;
// Test listing data sources with insufficient permissions
let response = app
.client
.get("/api/data_sources")
.header("Authorization", format!("Bearer {}", regular_user.api_key))
.send()
.await
.unwrap();
response.assert_status(StatusCode::FORBIDDEN);
// Test pagination
let response = app
.client
.get("/api/data_sources?page=0&page_size=1")
.header("Authorization", format!("Bearer {}", admin_user.api_key))
.send()
.await
.unwrap();
response.assert_status(StatusCode::OK);
let body = response.json::<serde_json::Value>().await.unwrap();
let data_sources = body.as_array().unwrap();
assert_eq!(data_sources.len(), 1, "Pagination should limit to 1 result");
}

View File

@ -1,3 +1,5 @@
mod list_data_sources_test;
mod get_data_source_test;
mod update_data_source_test;
mod create_data_source_test;
mod delete_data_source_test;