diff --git a/api/src/utils/query_engine/import_dataset_columns.rs b/api/src/utils/query_engine/import_dataset_columns.rs index 1e0f03f8c..5690c66b7 100644 --- a/api/src/utils/query_engine/import_dataset_columns.rs +++ b/api/src/utils/query_engine/import_dataset_columns.rs @@ -26,6 +26,7 @@ pub struct DatasetColumnRecord { pub type_: String, pub nullable: bool, pub comment: Option, + pub source_type: String, } pub async fn import_dataset_columns( @@ -143,39 +144,76 @@ async fn get_postgres_columns( schema_name: &String, credentials: &PostgresCredentials, ) -> Result> { - let (postgres_conn, child_process, tempfile) = match get_postgres_connection(credentials).await - { + let (postgres_conn, child_process, tempfile) = match get_postgres_connection(credentials).await { Ok(conn) => conn, Err(e) => return Err(e), }; - let sql = format!("SELECT - c.column_name as name, - c.data_type as type_, - CASE WHEN c.is_nullable = 'YES' THEN true ELSE false END as nullable, - pgd.description AS comment -FROM - information_schema.columns c -LEFT JOIN - pg_catalog.pg_statio_all_tables as st on c.table_schema = st.schemaname and c.table_name = st.relname -LEFT JOIN - pg_catalog.pg_description pgd on pgd.objoid = st.relid and pgd.objsubid = c.ordinal_position -WHERE - c.table_name = '{dataset_name}' - AND c.table_schema = '{schema_name}' -ORDER BY - c.table_schema, - c.table_name, - c.ordinal_position;" + // Query for tables and views + let regular_sql = format!( + "SELECT + c.column_name as name, + c.data_type as type_, + CASE WHEN c.is_nullable = 'YES' THEN true ELSE false END as nullable, + pgd.description AS comment, + t.table_type as source_type + FROM + information_schema.columns c + JOIN + information_schema.tables t ON c.table_name = t.table_name AND c.table_schema = t.table_schema + LEFT JOIN + pg_catalog.pg_statio_all_tables as st on c.table_schema = st.schemaname and c.table_name = st.relname + LEFT JOIN + pg_catalog.pg_description pgd on pgd.objoid = st.relid and pgd.objsubid = c.ordinal_position + WHERE + c.table_name = '{dataset_name}' + AND c.table_schema = '{schema_name}' + AND t.table_type IN ('BASE TABLE', 'VIEW') + ORDER BY + c.ordinal_position;" ); - let cols = match sqlx::query_as::<_, DatasetColumnRecord>(&sql) + // Query for materialized views + let mv_sql = format!( + "SELECT + a.attname as name, + format_type(a.atttypid, a.atttypmod) as type_, + NOT a.attnotnull as nullable, + d.description as comment, + 'MATERIALIZED_VIEW' as source_type + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + JOIN pg_attribute a ON a.attrelid = c.oid + LEFT JOIN pg_description d ON d.objoid = c.oid AND d.objsubid = a.attnum + WHERE c.relkind = 'm' + AND n.nspname = '{schema_name}' + AND c.relname = '{dataset_name}' + AND a.attnum > 0 + AND NOT a.attisdropped + ORDER BY a.attnum;" + ); + + let mut cols = Vec::new(); + + // Get regular tables and views + let regular_cols = match sqlx::query_as::<_, DatasetColumnRecord>(®ular_sql) .fetch_all(&postgres_conn) .await { - Ok(cols) => cols, - Err(e) => return Err(anyhow!("Error fetching columns: {:?}", e)), + Ok(c) => c, + Err(e) => return Err(anyhow!("Error fetching regular columns: {:?}", e)), }; + cols.extend(regular_cols); + + // Get materialized view columns + let mv_cols = match sqlx::query_as::<_, DatasetColumnRecord>(&mv_sql) + .fetch_all(&postgres_conn) + .await + { + Ok(c) => c, + Err(e) => return Err(anyhow!("Error fetching materialized view columns: {:?}", e)), + }; + cols.extend(mv_cols); if let (Some(mut child_process), Some(tempfile)) = (child_process, tempfile) { child_process.kill()?; @@ -198,16 +236,19 @@ async fn get_mysql_columns( let sql = format!( "SELECT - CAST(COLUMN_NAME AS CHAR) as name, - CAST(DATA_TYPE AS CHAR) as type_, - CASE WHEN IS_NULLABLE = 'YES' THEN true ELSE false END as nullable, - CAST(COLUMN_COMMENT AS CHAR) as comment + CAST(c.COLUMN_NAME AS CHAR) as name, + CAST(c.DATA_TYPE AS CHAR) as type_, + CASE WHEN c.IS_NULLABLE = 'YES' THEN true ELSE false END as nullable, + CAST(c.COLUMN_COMMENT AS CHAR) as comment, + CAST(t.TABLE_TYPE AS CHAR) as source_type FROM - INFORMATION_SCHEMA.COLUMNS + INFORMATION_SCHEMA.COLUMNS c + JOIN + INFORMATION_SCHEMA.TABLES t ON c.TABLE_NAME = t.TABLE_NAME AND c.TABLE_SCHEMA = t.TABLE_SCHEMA WHERE - TABLE_NAME = '{}' + c.TABLE_NAME = '{}' ORDER BY - ORDINAL_POSITION;", + c.ORDINAL_POSITION;", dataset_name ); @@ -234,14 +275,35 @@ async fn get_bigquery_columns( let sql = format!( r#" - SELECT - column_name AS name, - data_type AS type_, - is_nullable = 'YES' AS nullable - FROM `region-us`.INFORMATION_SCHEMA.COLUMNS - WHERE table_name = '{dataset_name}' + WITH all_columns AS ( + -- Regular tables and views + SELECT + column_name AS name, + data_type AS type_, + is_nullable = 'YES' AS nullable, + NULL as comment, + table_type as source_type + FROM `region-us`.INFORMATION_SCHEMA.COLUMNS c + JOIN `region-us`.INFORMATION_SCHEMA.TABLES t + USING(table_name, table_schema) + WHERE table_name = '{dataset_name}' + + UNION ALL + + -- Materialized views specific metadata if needed + SELECT + column_name AS name, + data_type AS type_, + is_nullable = 'YES' AS nullable, + NULL as comment, + 'MATERIALIZED_VIEW' as source_type + FROM `region-us`.INFORMATION_SCHEMA.MATERIALIZED_VIEWS mv + JOIN `region-us`.INFORMATION_SCHEMA.COLUMNS c + USING(table_name, table_schema) + WHERE mv.table_name = '{dataset_name}' + ) + SELECT * FROM all_columns "#, - dataset_name = dataset_name ); let query_request = QueryRequest { @@ -256,7 +318,7 @@ async fn get_bigquery_columns( .job() .query(&project_id, query_request) .await - .map_err(|e| anyhow!("Error fetching table and views records: {:?}", e))?; + .map_err(|e| anyhow!("Error fetching columns: {:?}", e))?; let mut columns = Vec::new(); @@ -284,11 +346,25 @@ async fn get_bigquery_columns( .ok_or_else(|| anyhow!("Missing nullable value"))? .parse::()?; + let comment = cols[3] + .value + .as_ref() + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + let source_type = cols[4] + .value + .as_ref() + .and_then(|v| v.as_str()) + .ok_or_else(|| anyhow!("Missing source type"))? + .to_string(); + columns.push(DatasetColumnRecord { name, type_, nullable, - comment: None, + comment, + source_type, }); } } @@ -306,18 +382,44 @@ async fn get_snowflake_columns( let uppercase_dataset_name = dataset_name.to_uppercase(); let sql = format!( - "SELECT - COLUMN_NAME AS name, - DATA_TYPE AS type_, - CASE WHEN IS_NULLABLE = 'YES' THEN true ELSE false END AS nullable, - COMMENT AS comment - FROM - INFORMATION_SCHEMA.COLUMNS - WHERE - TABLE_NAME = '{}' - ORDER BY - ORDINAL_POSITION;", - uppercase_dataset_name + "WITH all_objects AS ( + -- Regular tables and views + SELECT + c.COLUMN_NAME AS name, + c.DATA_TYPE AS type_, + CASE WHEN c.IS_NULLABLE = 'YES' THEN true ELSE false END AS nullable, + c.COMMENT AS comment, + t.TABLE_TYPE as source_type + FROM + INFORMATION_SCHEMA.COLUMNS c + JOIN + INFORMATION_SCHEMA.TABLES t + ON c.TABLE_NAME = t.TABLE_NAME + AND c.TABLE_SCHEMA = t.TABLE_SCHEMA + WHERE + c.TABLE_NAME = '{uppercase_dataset_name}' + + UNION ALL + + -- Materialized views + SELECT + c.COLUMN_NAME AS name, + c.DATA_TYPE AS type_, + CASE WHEN c.IS_NULLABLE = 'YES' THEN true ELSE false END AS nullable, + c.COMMENT AS comment, + 'MATERIALIZED_VIEW' as source_type + FROM + INFORMATION_SCHEMA.COLUMNS c + JOIN + INFORMATION_SCHEMA.VIEWS v + ON c.TABLE_NAME = v.TABLE_NAME + AND c.TABLE_SCHEMA = v.TABLE_SCHEMA + WHERE + c.TABLE_NAME = '{uppercase_dataset_name}' + AND v.IS_MATERIALIZED = 'YES' + ) + SELECT * FROM all_objects + ORDER BY name;", ); // Execute the query using the Snowflake client @@ -385,6 +487,7 @@ async fn get_snowflake_columns( type_, nullable, comment, + source_type: "TABLE".to_string(), }); } }