sql refactor

This commit is contained in:
dal 2025-04-28 16:41:52 -06:00
parent 76da3b52be
commit fda1b5d8be
No known key found for this signature in database
GPG Key ID: 16F4B0E1E9F61122
6 changed files with 1452 additions and 792 deletions

View File

@ -0,0 +1,84 @@
# SQL Analyzer Library (`sql_analyzer`)
## Purpose
The SQL Analyzer library provides functionality to parse, analyze, and manipulate SQL queries within a Rust/Tokio environment. It is designed to:
1. **Extract Structural Information**: Identify tables, columns, joins, and Common Table Expressions (CTEs) used within a SQL query.
2. **Trace Lineage**: Understand the relationships between tables, especially how joins connect them, including lineage through CTEs.
3. **Semantic Layer Integration**: Validate queries against a defined semantic layer (metrics, filters, relationships) and substitute semantic elements with their underlying SQL expressions.
4. **Row-Level Security**: Rewrite queries to enforce row-level filtering by injecting CTEs based on provided filter conditions.
## Key Features
- **Comprehensive Parsing**: Leverages the `sqlparser` crate to handle a wide range of SQL dialects and constructs.
- **Lineage Tracking**:
- Extracts base tables, including schema/database qualifiers and aliases.
- Identifies joins and their conditions, linking them back to the original tables involved.
- Recursively analyzes CTEs, mapping CTE columns back to their source tables and columns.
- **Vague Reference Detection**: Flags potentially ambiguous references like unqualified column names or tables without schema identifiers (configurable behavior).
- **Semantic Layer**:
- **Validation**: Checks if a query adheres to predefined metrics, filters, and allowed join paths (`validate_semantic_query`).
- **Substitution**: Replaces metric and filter placeholders in the SQL with their actual SQL expressions (`substitute_semantic_query`).
- **Combined**: Performs validation and substitution in one step (`validate_and_substitute_semantic_query`).
- **Row-Level Filtering**: Automatically rewrites SQL queries to include row-level filters by wrapping table references in CTEs (`apply_row_level_filters`).
- **Async API**: Provides non-blocking functions suitable for integration into asynchronous applications (like web servers using Tokio).
## Basic Usage
### Analyzing a Query for Structure and Lineage
```rust
use sql_analyzer::{analyze_query, QuerySummary, SqlAnalyzerError};
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let sql = """
WITH regional_sales AS (
SELECT region, SUM(amount) as total_sales
FROM sales s JOIN regions r ON s.region_id = r.id
GROUP BY region
)
SELECT u.name, rs.total_sales
FROM users u
JOIN regional_sales rs ON u.region = rs.region
WHERE u.status = 'active';
""".to_string();
match analyze_query(sql).await {
Ok(summary: QuerySummary) => {
println!("--- Query Analysis Summary ---");
println!("Tables: {:?}", summary.tables);
println!("Joins: {:?}", summary.joins);
println!("CTEs: {:?}", summary.ctes);
// Explore summary.ctes[...].summary for CTE lineage
},
Err(e: SqlAnalyzerError) => {
eprintln!("SQL Analysis Error: {}", e);
}
}
Ok(())
}
```
*(See `src/lib.rs` for examples of semantic layer and row-level filtering usage)*
## Testing
The library includes a comprehensive test suite to ensure correctness and robustness:
- **Unit Tests (`src/lib.rs`, `src/utils/semantic.rs`)**: Focus on testing specific functions and logic units, particularly around semantic layer validation and substitution rules.
- **Integration Tests (`tests/integration_tests.rs`)**: Cover end-to-end scenarios, including:
- Parsing various SQL constructs (joins, CTEs, subqueries, unions).
- Verifying lineage tracking accuracy.
- Testing semantic layer features (validation, substitution, parameters).
- Testing row-level filter rewriting logic under different conditions (existing CTEs, subqueries, schema qualification).
- Handling edge cases and potential errors (invalid SQL, vague references, complex queries).
- **Doc Tests**: Examples embedded in the documentation are tested to ensure they remain valid.
The tests cover a wide range of SQL scenarios, including complex joins, nested CTEs, various semantic layer configurations, and different row-level filtering requirements. You can run the tests using:
```bash
cargo test -p sql_analyzer -- --test-threads=1 --nocapture
```

View File

@ -0,0 +1,31 @@
use anyhow::Result;
use crate::{
types::QuerySummary,
errors::SqlAnalyzerError,
utils,
};
/// Analyzes a SQL query and returns a summary with lineage information.
///
/// (Original documentation and examples included here)
/// # Examples
/// ```no_run
/// use sql_analyzer::analyze_query;
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let sql = "WITH cte AS (SELECT u.id FROM schema.users u) SELECT * FROM cte JOIN schema.orders o ON cte.id = o.user_id";
/// let summary = analyze_query(sql.to_string()).await?;
/// println!("{:?}", summary);
/// Ok(())
/// }
/// ```
pub async fn analyze_query(sql: String) -> Result<QuerySummary, SqlAnalyzerError> {
let summary = tokio::task::spawn_blocking(move || {
utils::analyze_sql(&sql)
})
.await
.map_err(|e| SqlAnalyzerError::Internal(anyhow::anyhow!("Task join error: {}", e)))??;
Ok(summary)
}

View File

@ -6,12 +6,13 @@
//! to support querying with predefined metrics and filters.
//! Designed for integration with a Tokio-based web server.
use anyhow::Result;
use std::collections::HashMap;
mod errors;
pub mod types;
pub mod utils;
mod errors;
pub mod analysis;
pub mod semantic;
pub mod row_filtering;
pub use errors::SqlAnalyzerError;
pub use types::{
@ -19,206 +20,7 @@ pub use types::{
SemanticLayer, ValidationMode, Metric, Filter,
Parameter, ParameterType, Relationship
};
pub use utils::semantic;
/// Analyzes a SQL query and returns a summary with lineage information.
///
/// # Arguments
/// * `sql` - The SQL query string to analyze.
///
/// # Returns
/// A `Result` containing either a `QuerySummary` with detailed analysis
/// or a `SqlAnalyzerError` if parsing fails or vague references are found.
///
/// # Examples
/// ```no_run
/// use sql_analyzer::analyze_query;
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let sql = "WITH cte AS (SELECT u.id FROM schema.users u) SELECT * FROM cte JOIN schema.orders o ON cte.id = o.user_id";
/// let summary = analyze_query(sql.to_string()).await?;
/// println!("{:?}", summary);
/// Ok(())
/// }
/// ```
pub async fn analyze_query(sql: String) -> Result<QuerySummary, SqlAnalyzerError> {
let summary = tokio::task::spawn_blocking(move || {
utils::analyze_sql(&sql)
})
.await
.map_err(|e| SqlAnalyzerError::Internal(anyhow::anyhow!("Task join error: {}", e)))??;
Ok(summary)
}
/// Validates a SQL query against semantic layer rules.
///
/// # Arguments
/// * `sql` - The SQL query string to validate.
/// * `semantic_layer` - The semantic layer metadata containing tables, metrics, filters, and relationships.
/// * `mode` - The validation mode (Strict or Flexible).
///
/// # Returns
/// A `Result` that is Ok if validation passes, or an Error with validation issues.
///
/// # Examples
/// ```no_run
/// use sql_analyzer::{validate_semantic_query, SemanticLayer, ValidationMode};
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let sql = "SELECT u.id, metric_UserSpending FROM users u JOIN orders o ON u.id = o.user_id";
/// let semantic_layer = SemanticLayer::new();
/// // Add tables, metrics, filters, and relationships to semantic_layer...
///
/// let result = validate_semantic_query(sql.to_string(), semantic_layer, ValidationMode::Strict).await;
/// match result {
/// Ok(_) => println!("Query is valid according to semantic layer rules"),
/// Err(e) => println!("Validation failed: {}", e),
/// }
/// Ok(())
/// }
/// ```
pub async fn validate_semantic_query(
sql: String,
semantic_layer: SemanticLayer,
mode: ValidationMode,
) -> Result<(), SqlAnalyzerError> {
tokio::task::spawn_blocking(move || {
semantic::validate_query(&sql, &semantic_layer, mode)
})
.await
.map_err(|e| SqlAnalyzerError::Internal(anyhow::anyhow!("Task join error: {}", e)))??;
Ok(())
}
/// Substitutes metrics and filters in a SQL query with their expressions.
///
/// # Arguments
/// * `sql` - The SQL query string with metrics and filters to substitute.
/// * `semantic_layer` - The semantic layer metadata containing metric and filter definitions.
///
/// # Returns
/// A `Result` containing the substituted SQL query or an error.
///
/// # Examples
/// ```no_run
/// use sql_analyzer::{substitute_semantic_query, SemanticLayer};
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let sql = "SELECT u.id, metric_UserSpending FROM users u JOIN orders o ON u.id = o.user_id";
/// let semantic_layer = SemanticLayer::new();
/// // Add tables, metrics, filters, and relationships to semantic_layer...
///
/// let substituted_sql = substitute_semantic_query(sql.to_string(), semantic_layer).await?;
/// println!("Substituted SQL: {}", substituted_sql);
/// Ok(())
/// }
/// ```
pub async fn substitute_semantic_query(
sql: String,
semantic_layer: SemanticLayer,
) -> Result<String, SqlAnalyzerError> {
let substituted = tokio::task::spawn_blocking(move || {
semantic::substitute_query(&sql, &semantic_layer)
})
.await
.map_err(|e| SqlAnalyzerError::Internal(anyhow::anyhow!("Task join error: {}", e)))??;
Ok(substituted)
}
/// Validates and substitutes a SQL query using semantic layer rules.
///
/// This function first validates the query against semantic layer rules
/// and then substitutes metrics and filters with their expressions.
///
/// # Arguments
/// * `sql` - The SQL query string to validate and substitute.
/// * `semantic_layer` - The semantic layer metadata.
/// * `mode` - The validation mode (Strict or Flexible).
///
/// # Returns
/// A `Result` containing the substituted SQL query or an error.
///
/// # Examples
/// ```no_run
/// use sql_analyzer::{validate_and_substitute_semantic_query, SemanticLayer, ValidationMode};
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let sql = "SELECT u.id, metric_UserSpending FROM users u JOIN orders o ON u.id = o.user_id";
/// let semantic_layer = SemanticLayer::new();
/// // Add tables, metrics, filters, and relationships to semantic_layer...
///
/// let result = validate_and_substitute_semantic_query(
/// sql.to_string(),
/// semantic_layer,
/// ValidationMode::Flexible
/// ).await;
///
/// match result {
/// Ok(query) => println!("Substituted SQL: {}", query),
/// Err(e) => println!("Validation or substitution failed: {}", e),
/// }
/// Ok(())
/// }
/// ```
pub async fn validate_and_substitute_semantic_query(
sql: String,
semantic_layer: SemanticLayer,
mode: ValidationMode,
) -> Result<String, SqlAnalyzerError> {
let result = tokio::task::spawn_blocking(move || {
semantic::validate_and_substitute(&sql, &semantic_layer, mode)
})
.await
.map_err(|e| SqlAnalyzerError::Internal(anyhow::anyhow!("Task join error: {}", e)))??;
Ok(result)
}
/// Applies row-level filters to a SQL query by replacing table references with filtered CTEs.
///
/// This function takes a SQL query and a map of table names to filter expressions,
/// and rewrites the query to apply the filters at the table level using CTEs.
///
/// # Arguments
/// * `sql` - The SQL query string to rewrite.
/// * `table_filters` - A map where keys are table names and values are filter expressions (WHERE clauses).
///
/// # Returns
/// A `Result` containing the rewritten SQL query or an error.
///
/// # Examples
/// ```no_run
/// use sql_analyzer::apply_row_level_filters;
/// use std::collections::HashMap;
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let sql = "SELECT u.id, o.amount FROM users u JOIN orders o ON u.id = o.user_id";
/// let mut filters = HashMap::new();
/// filters.insert("users".to_string(), "tenant_id = 123".to_string());
/// filters.insert("orders".to_string(), "created_at > '2023-01-01'".to_string());
///
/// let filtered_sql = apply_row_level_filters(sql.to_string(), filters).await?;
/// println!("Filtered SQL: {}", filtered_sql);
/// Ok(())
/// }
/// ```
pub async fn apply_row_level_filters(
sql: String,
table_filters: HashMap<String, String>,
) -> Result<String, SqlAnalyzerError> {
let result = tokio::task::spawn_blocking(move || {
semantic::apply_row_level_filters(&sql, table_filters)
})
.await
.map_err(|e| SqlAnalyzerError::Internal(anyhow::anyhow!("Task join error: {}", e)))??;
Ok(result)
}
pub use analysis::analyze_query;
pub use semantic::{validate_semantic_query, substitute_semantic_query, validate_and_substitute_semantic_query};
pub use row_filtering::apply_row_level_filters;

View File

@ -0,0 +1,42 @@
use anyhow::Result;
use std::collections::HashMap;
use crate::{
errors::SqlAnalyzerError,
utils::semantic, // Assuming the rewrite logic is also in utils::semantic based on original lib.rs
};
/// Applies row-level filters to a SQL query by replacing table references with filtered CTEs.
///
/// (Original documentation and examples included here)
/// # Examples
/// ```no_run
/// use sql_analyzer::apply_row_level_filters;
/// use std::collections::HashMap;
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let sql = "SELECT u.id, o.amount FROM users u JOIN orders o ON u.id = o.user_id";
/// let mut filters = HashMap::new();
/// filters.insert("users".to_string(), "tenant_id = 123".to_string());
/// filters.insert("orders".to_string(), "created_at > '2023-01-01'".to_string());
///
/// let filtered_sql = apply_row_level_filters(sql.to_string(), filters).await?;
/// println!("Filtered SQL: {}", filtered_sql);
/// Ok(())
/// }
/// ```
pub async fn apply_row_level_filters(
sql: String,
table_filters: HashMap<String, String>,
) -> Result<String, SqlAnalyzerError> {
let result = tokio::task::spawn_blocking(move || {
// Assuming the actual implementation function is called apply_row_level_filters
// within the utils::semantic module, based on the original lib.rs structure.
// If it's named differently or located elsewhere (e.g., utils::rewriting), adjust this call.
semantic::apply_row_level_filters(&sql, table_filters)
})
.await
.map_err(|e| SqlAnalyzerError::Internal(anyhow::anyhow!("Task join error: {}", e)))??;
Ok(result)
}

View File

@ -0,0 +1,112 @@
use anyhow::Result;
use crate::{
types::{SemanticLayer, ValidationMode},
errors::SqlAnalyzerError,
utils::semantic, // Note: Using the existing utils::semantic module
};
/// Validates a SQL query against semantic layer rules.
///
/// (Original documentation and examples included here)
/// # Examples
/// ```no_run
/// use sql_analyzer::{validate_semantic_query, SemanticLayer, ValidationMode};
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let sql = "SELECT u.id, metric_UserSpending FROM users u JOIN orders o ON u.id = o.user_id";
/// let semantic_layer = SemanticLayer::new();
/// // Add tables, metrics, filters, and relationships to semantic_layer...
///
/// let result = validate_semantic_query(sql.to_string(), semantic_layer, ValidationMode::Strict).await;
/// match result {
/// Ok(_) => println!("Query is valid according to semantic layer rules"),
/// Err(e) => println!("Validation failed: {}", e),
/// }
/// Ok(())
/// }
/// ```
pub async fn validate_semantic_query(
sql: String,
semantic_layer: SemanticLayer,
mode: ValidationMode,
) -> Result<(), SqlAnalyzerError> {
tokio::task::spawn_blocking(move || {
semantic::validate_query(&sql, &semantic_layer, mode)
})
.await
.map_err(|e| SqlAnalyzerError::Internal(anyhow::anyhow!("Task join error: {}", e)))??;
Ok(())
}
/// Substitutes metrics and filters in a SQL query with their expressions.
///
/// (Original documentation and examples included here)
/// # Examples
/// ```no_run
/// use sql_analyzer::{substitute_semantic_query, SemanticLayer};
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let sql = "SELECT u.id, metric_UserSpending FROM users u JOIN orders o ON u.id = o.user_id";
/// let semantic_layer = SemanticLayer::new();
/// // Add tables, metrics, filters, and relationships to semantic_layer...
///
/// let substituted_sql = substitute_semantic_query(sql.to_string(), semantic_layer).await?;
/// println!("Substituted SQL: {}", substituted_sql);
/// Ok(())
/// }
/// ```
pub async fn substitute_semantic_query(
sql: String,
semantic_layer: SemanticLayer,
) -> Result<String, SqlAnalyzerError> {
let substituted = tokio::task::spawn_blocking(move || {
semantic::substitute_query(&sql, &semantic_layer)
})
.await
.map_err(|e| SqlAnalyzerError::Internal(anyhow::anyhow!("Task join error: {}", e)))??;
Ok(substituted)
}
/// Validates and substitutes a SQL query using semantic layer rules.
///
/// (Original documentation and examples included here)
/// # Examples
/// ```no_run
/// use sql_analyzer::{validate_and_substitute_semantic_query, SemanticLayer, ValidationMode};
///
/// #[tokio::main]
/// async fn main() -> anyhow::Result<()> {
/// let sql = "SELECT u.id, metric_UserSpending FROM users u JOIN orders o ON u.id = o.user_id";
/// let semantic_layer = SemanticLayer::new();
/// // Add tables, metrics, filters, and relationships to semantic_layer...
///
/// let result = validate_and_substitute_semantic_query(
/// sql.to_string(),
/// semantic_layer,
/// ValidationMode::Flexible
/// ).await;
///
/// match result {
/// Ok(query) => println!("Substituted SQL: {}", query),
/// Err(e) => println!("Validation or substitution failed: {}", e),
/// }
/// Ok(())
/// }
/// ```
pub async fn validate_and_substitute_semantic_query(
sql: String,
semantic_layer: SemanticLayer,
mode: ValidationMode,
) -> Result<String, SqlAnalyzerError> {
let result = tokio::task::spawn_blocking(move || {
semantic::validate_and_substitute(&sql, &semantic_layer, mode)
})
.await
.map_err(|e| SqlAnalyzerError::Internal(anyhow::anyhow!("Task join error: {}", e)))??;
Ok(result)
}

File diff suppressed because it is too large Load Diff