feat: enhance column metadata retrieval across database sources

- Add support for capturing source type (table, view, materialized view)
- Improve column metadata queries for Postgres, MySQL, BigQuery, and Snowflake
- Include more comprehensive column information during dataset import
- Extend DatasetColumnRecord to include source_type field
This commit is contained in:
dal 2025-02-05 18:21:40 -07:00
parent 6e5c299389
commit fa480f6797
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
1 changed files with 154 additions and 51 deletions

View File

@ -26,6 +26,7 @@ pub struct DatasetColumnRecord {
pub type_: String,
pub nullable: bool,
pub comment: Option<String>,
pub source_type: String,
}
pub async fn import_dataset_columns(
@ -143,39 +144,76 @@ async fn get_postgres_columns(
schema_name: &String,
credentials: &PostgresCredentials,
) -> Result<Vec<DatasetColumnRecord>> {
let (postgres_conn, child_process, tempfile) = match get_postgres_connection(credentials).await
{
let (postgres_conn, child_process, tempfile) = match get_postgres_connection(credentials).await {
Ok(conn) => conn,
Err(e) => return Err(e),
};
let sql = format!("SELECT
c.column_name as name,
c.data_type as type_,
CASE WHEN c.is_nullable = 'YES' THEN true ELSE false END as nullable,
pgd.description AS comment
FROM
information_schema.columns c
LEFT JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema = st.schemaname and c.table_name = st.relname
LEFT JOIN
pg_catalog.pg_description pgd on pgd.objoid = st.relid and pgd.objsubid = c.ordinal_position
WHERE
c.table_name = '{dataset_name}'
AND c.table_schema = '{schema_name}'
ORDER BY
c.table_schema,
c.table_name,
c.ordinal_position;"
// Query for tables and views
let regular_sql = format!(
"SELECT
c.column_name as name,
c.data_type as type_,
CASE WHEN c.is_nullable = 'YES' THEN true ELSE false END as nullable,
pgd.description AS comment,
t.table_type as source_type
FROM
information_schema.columns c
JOIN
information_schema.tables t ON c.table_name = t.table_name AND c.table_schema = t.table_schema
LEFT JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema = st.schemaname and c.table_name = st.relname
LEFT JOIN
pg_catalog.pg_description pgd on pgd.objoid = st.relid and pgd.objsubid = c.ordinal_position
WHERE
c.table_name = '{dataset_name}'
AND c.table_schema = '{schema_name}'
AND t.table_type IN ('BASE TABLE', 'VIEW')
ORDER BY
c.ordinal_position;"
);
let cols = match sqlx::query_as::<_, DatasetColumnRecord>(&sql)
// Query for materialized views
let mv_sql = format!(
"SELECT
a.attname as name,
format_type(a.atttypid, a.atttypmod) as type_,
NOT a.attnotnull as nullable,
d.description as comment,
'MATERIALIZED_VIEW' as source_type
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
JOIN pg_attribute a ON a.attrelid = c.oid
LEFT JOIN pg_description d ON d.objoid = c.oid AND d.objsubid = a.attnum
WHERE c.relkind = 'm'
AND n.nspname = '{schema_name}'
AND c.relname = '{dataset_name}'
AND a.attnum > 0
AND NOT a.attisdropped
ORDER BY a.attnum;"
);
let mut cols = Vec::new();
// Get regular tables and views
let regular_cols = match sqlx::query_as::<_, DatasetColumnRecord>(&regular_sql)
.fetch_all(&postgres_conn)
.await
{
Ok(cols) => cols,
Err(e) => return Err(anyhow!("Error fetching columns: {:?}", e)),
Ok(c) => c,
Err(e) => return Err(anyhow!("Error fetching regular columns: {:?}", e)),
};
cols.extend(regular_cols);
// Get materialized view columns
let mv_cols = match sqlx::query_as::<_, DatasetColumnRecord>(&mv_sql)
.fetch_all(&postgres_conn)
.await
{
Ok(c) => c,
Err(e) => return Err(anyhow!("Error fetching materialized view columns: {:?}", e)),
};
cols.extend(mv_cols);
if let (Some(mut child_process), Some(tempfile)) = (child_process, tempfile) {
child_process.kill()?;
@ -198,16 +236,19 @@ async fn get_mysql_columns(
let sql = format!(
"SELECT
CAST(COLUMN_NAME AS CHAR) as name,
CAST(DATA_TYPE AS CHAR) as type_,
CASE WHEN IS_NULLABLE = 'YES' THEN true ELSE false END as nullable,
CAST(COLUMN_COMMENT AS CHAR) as comment
CAST(c.COLUMN_NAME AS CHAR) as name,
CAST(c.DATA_TYPE AS CHAR) as type_,
CASE WHEN c.IS_NULLABLE = 'YES' THEN true ELSE false END as nullable,
CAST(c.COLUMN_COMMENT AS CHAR) as comment,
CAST(t.TABLE_TYPE AS CHAR) as source_type
FROM
INFORMATION_SCHEMA.COLUMNS
INFORMATION_SCHEMA.COLUMNS c
JOIN
INFORMATION_SCHEMA.TABLES t ON c.TABLE_NAME = t.TABLE_NAME AND c.TABLE_SCHEMA = t.TABLE_SCHEMA
WHERE
TABLE_NAME = '{}'
c.TABLE_NAME = '{}'
ORDER BY
ORDINAL_POSITION;",
c.ORDINAL_POSITION;",
dataset_name
);
@ -234,14 +275,35 @@ async fn get_bigquery_columns(
let sql = format!(
r#"
SELECT
column_name AS name,
data_type AS type_,
is_nullable = 'YES' AS nullable
FROM `region-us`.INFORMATION_SCHEMA.COLUMNS
WHERE table_name = '{dataset_name}'
WITH all_columns AS (
-- Regular tables and views
SELECT
column_name AS name,
data_type AS type_,
is_nullable = 'YES' AS nullable,
NULL as comment,
table_type as source_type
FROM `region-us`.INFORMATION_SCHEMA.COLUMNS c
JOIN `region-us`.INFORMATION_SCHEMA.TABLES t
USING(table_name, table_schema)
WHERE table_name = '{dataset_name}'
UNION ALL
-- Materialized views specific metadata if needed
SELECT
column_name AS name,
data_type AS type_,
is_nullable = 'YES' AS nullable,
NULL as comment,
'MATERIALIZED_VIEW' as source_type
FROM `region-us`.INFORMATION_SCHEMA.MATERIALIZED_VIEWS mv
JOIN `region-us`.INFORMATION_SCHEMA.COLUMNS c
USING(table_name, table_schema)
WHERE mv.table_name = '{dataset_name}'
)
SELECT * FROM all_columns
"#,
dataset_name = dataset_name
);
let query_request = QueryRequest {
@ -256,7 +318,7 @@ async fn get_bigquery_columns(
.job()
.query(&project_id, query_request)
.await
.map_err(|e| anyhow!("Error fetching table and views records: {:?}", e))?;
.map_err(|e| anyhow!("Error fetching columns: {:?}", e))?;
let mut columns = Vec::new();
@ -284,11 +346,25 @@ async fn get_bigquery_columns(
.ok_or_else(|| anyhow!("Missing nullable value"))?
.parse::<bool>()?;
let comment = cols[3]
.value
.as_ref()
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let source_type = cols[4]
.value
.as_ref()
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("Missing source type"))?
.to_string();
columns.push(DatasetColumnRecord {
name,
type_,
nullable,
comment: None,
comment,
source_type,
});
}
}
@ -306,18 +382,44 @@ async fn get_snowflake_columns(
let uppercase_dataset_name = dataset_name.to_uppercase();
let sql = format!(
"SELECT
COLUMN_NAME AS name,
DATA_TYPE AS type_,
CASE WHEN IS_NULLABLE = 'YES' THEN true ELSE false END AS nullable,
COMMENT AS comment
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = '{}'
ORDER BY
ORDINAL_POSITION;",
uppercase_dataset_name
"WITH all_objects AS (
-- Regular tables and views
SELECT
c.COLUMN_NAME AS name,
c.DATA_TYPE AS type_,
CASE WHEN c.IS_NULLABLE = 'YES' THEN true ELSE false END AS nullable,
c.COMMENT AS comment,
t.TABLE_TYPE as source_type
FROM
INFORMATION_SCHEMA.COLUMNS c
JOIN
INFORMATION_SCHEMA.TABLES t
ON c.TABLE_NAME = t.TABLE_NAME
AND c.TABLE_SCHEMA = t.TABLE_SCHEMA
WHERE
c.TABLE_NAME = '{uppercase_dataset_name}'
UNION ALL
-- Materialized views
SELECT
c.COLUMN_NAME AS name,
c.DATA_TYPE AS type_,
CASE WHEN c.IS_NULLABLE = 'YES' THEN true ELSE false END AS nullable,
c.COMMENT AS comment,
'MATERIALIZED_VIEW' as source_type
FROM
INFORMATION_SCHEMA.COLUMNS c
JOIN
INFORMATION_SCHEMA.VIEWS v
ON c.TABLE_NAME = v.TABLE_NAME
AND c.TABLE_SCHEMA = v.TABLE_SCHEMA
WHERE
c.TABLE_NAME = '{uppercase_dataset_name}'
AND v.IS_MATERIALIZED = 'YES'
)
SELECT * FROM all_objects
ORDER BY name;",
);
// Execute the query using the Snowflake client
@ -385,6 +487,7 @@ async fn get_snowflake_columns(
type_,
nullable,
comment,
source_type: "TABLE".to_string(),
});
}
}