2 releases
Uses new Rust 2024
new 0.1.1 | Apr 22, 2025 |
---|---|
0.1.0 | Apr 22, 2025 |
#1721 in Database interfaces
173 downloads per month
145KB
3K
SLoC
DataFusion Quality (DFQ)
A data quality framework for DataFusion, inspired by Great Expectations and Spark Expectations.
Features
- Schema-level rules
- Column-level rules
- Table-level aggregate rules
- Custom rule support
- Rich expression support
- Async processing
- Fluent API for rule creation
Getting Started
Add the following to your Cargo.toml
:
[dependencies]
datafusion-quality = "0.1.0"
Basic Usage
You can use either the traditional API or the new fluent API for creating rules. For a complete example, see the basic example.
Using the Fluent API
use datafusion_quality::{rules::{column::{dfq_in_range, dfq_not_null}, dfq_gt}, RuleSet};
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a new DataFusion context
let ctx = SessionContext::new();
// Create a sample DataFrame
let df = ctx.read_csv("data.csv", CsvReadOptions::new()).await?;
// Create a new RuleSet instance
let mut rule_set = RuleSet::new();
// Add rules using fluent API
rule_set.with_column_rule("name", dfq_not_null())
.with_column_rule("age", dfq_in_range(18.0, 100.0))
.with_column_rule("score", dfq_gt(lit(50.0)));
// Apply rules
let result_df = rule_set.apply(&df).await?;
// Show the results
result_df.show().await?;
// Partition data into good and bad records
let (good_data, bad_data) = rule_set.partition(&df).await?;
Ok(())
}
Using the Traditional API
use dfq::{RuleSet, rules::{column::{NotNullRule, RangeRule, PatternRule, CustomRule}}};
use datafusion::prelude::*;
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create a new DataFusion context
let ctx = SessionContext::new();
// Create a sample DataFrame
let schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("age", DataType::Int32, false),
Field::new("email", DataType::Utf8, false),
]);
let df = ctx.read_csv("data.csv", CsvReadOptions::new().schema(&schema)).await?;
// Create a new RuleSet instance
let mut rule_set = RuleSet::new();
// Add rules
rule_set.add_column_rule(Arc::new(NotNullRule::new("name")));
rule_set.add_column_rule(Arc::new(RangeRule::new("age", 18.0, 100.0)));
rule_set.add_column_rule(Arc::new(PatternRule::new("email", "%@%.%")));
rule_set.add_column_rule(Arc::new(CustomRule::new("age", "age > 25")));
// Apply rules
let result_df = rule_set.apply(&df).await?;
// Show the results
result_df.show().await?;
Ok(())
}
Available Rules
Column Rules
dfq_not_null()
: Checks if values in a column are not nulldfq_null()
: Checks if values in a column are nulldfq_in_range(min, max)
: Checks if values in a column fall within a specified rangedfq_not_in_range(min, max)
: Checks if values in a column fall outside a specified rangedfq_like(pattern)
: Checks if string values match a case-sensitive patterndfq_not_like(pattern)
: Checks if string values do not match a case-sensitive patterndfq_ilike(pattern)
: Checks if string values match a case-insensitive patterndfq_not_ilike(pattern)
: Checks if string values do not match a case-insensitive patterndfq_lt(value)
: Checks if values are less than a specified valuedfq_lte(value)
: Checks if values are less than or equal to a specified valuedfq_not_lt(value)
: Checks if values are not less than a specified valuedfq_not_lte(value)
: Checks if values are not less than or equal to a specified valuedfq_gt(value)
: Checks if values are greater than a specified valuedfq_gte(value)
: Checks if values are greater than or equal to a specified valuedfq_not_gt(value)
: Checks if values are not greater than a specified valuedfq_not_gte(value)
: Checks if values are not greater than or equal to a specified valuedfq_eq(value)
: Checks if values are equal to a specified valuedfq_not_eq(value)
: Checks if values are not equal to a specified valuedfq_str_length(min, max)
: Checks if string length is within specified boundsdfq_str_min_length(min)
: Checks if string length is at least the specified minimumdfq_str_max_length(max)
: Checks if string length is at most the specified maximumdfq_str_empty()
: Checks if strings are emptydfq_str_not_empty()
: Checks if strings are not emptydfq_custom(rule_name, expression)
: Applies a custom SQL expression to a column
Table Rules
dfq_null_count()
: Counts the number of null values in a columndfq_not_null_count()
: Counts the number of non-null values in a columndfq_count()
: Counts the total number of rows in a columndfq_count_distinct()
: Counts the number of distinct values in a columndfq_avg()
: Calculates the average value of a columndfq_stddev()
: Calculates the standard deviation of a columndfq_max()
: Finds the maximum value in a columndfq_min()
: Finds the minimum value in a columndfq_sum()
: Calculates the sum of values in a columndfq_median()
: Calculates the median value of a columndfq_last_value()
: Gets the last value in a columndfq_stddev_pop()
: Calculates the population standard deviation of a columndfq_var_pop()
: Calculates the population variance of a columndfq_var_samp()
: Calculates the sample variance of a columndfq_covar_pop(x, y)
: Calculates the population covariance between two columnsdfq_covar_samp(x, y)
: Calculates the sample covariance between two columnsdfq_regr_avgx(x, y)
: Calculates the average of x values in a linear regressiondfq_regr_avgy(x, y)
: Calculates the average of y values in a linear regressiondfq_regr_count(x, y)
: Counts the number of rows used in a linear regressiondfq_regr_intercept(x, y)
: Calculates the intercept of a linear regressiondfq_regr_r2(x, y)
: Calculates the R-squared value of a linear regressiondfq_regr_slope(x, y)
: Calculates the slope of a linear regressiondfq_regr_sxx(x, y)
: Calculates the sum of squared deviations from the mean for x valuesdfq_regr_sxy(x, y)
: Calculates the sum of products of deviations from the mean for x and y valuesdfq_regr_syy(x, y)
: Calculates the sum of squared deviations from the mean for y valuesdfq_nth_value(n, sort_exprs)
: Gets the nth value in a column with optional sortingdfq_first_value(sort_exprs)
: Gets the first value in a column with optional sortingdfq_custom_agg(aggregation, rule_name)
: Creates a custom aggregation rule with a specified expression and name
Schema Rules
ColumnExistsRule
: Checks if a column exists in the schemaColumnTypeRule
: Checks if a column has a specific data typeColumnNullableRule
: Checks if a column is nullable
Creating Custom Rules
You can create custom rules by implementing the appropriate trait (ColumnRule
, TableRule
, or SchemaRule
):
use dfq::{ColumnRule, ValidationError};
use datafusion::prelude::*;
pub struct CustomColumnRule {
column_name: String,
expression: String,
}
impl CustomColumnRule {
pub fn new(column_name: &str, expression: &str) -> Self {
Self {
column_name: column_name.to_string(),
expression: expression.to_string(),
}
}
}
impl ColumnRule for CustomColumnRule {
fn apply(&self, df: &DataFrame) -> Result<DataFrame, ValidationError> {
let result_col = format!("{}_custom", self.column_name);
df.select(vec![
col("*"),
sql(&self.expression).alias(&result_col),
])
}
fn name(&self) -> &str {
"custom"
}
fn description(&self) -> &str {
"Applies a custom SQL expression to a column"
}
fn column_name(&self) -> &str {
&self.column_name
}
}
Rule Results
Each rule adds a new column to the DataFrame with a name in the format <column_name>_<rule_name>
. The value in these columns is a boolean indicating whether the rule passed for that row. One final column is created called dq_pass
that is the boolean AND
of all of the rule columns.
Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
License
This project is licensed under the Apache License - see the LICENSE file for details.
Dependencies
~71MB
~1.5M SLoC