#arrow #query #sql

datafusion

DataFusion is an in-memory query engine that uses Apache Arrow as the memory model

39 releases (6 breaking)

✓ Uses Rust 2018 edition

0.13.0 Apr 1, 2019
0.6.0 Jan 20, 2019
0.5.2 Jan 5, 2019
0.5.0 Dec 15, 2018
0.2.2 Mar 26, 2018

#10 in Database interfaces

Download history 163/week @ 2018-12-20 6/week @ 2018-12-27 73/week @ 2019-01-03 50/week @ 2019-01-10 21/week @ 2019-01-17 9/week @ 2019-01-24 11/week @ 2019-01-31 14/week @ 2019-02-07 13/week @ 2019-02-14 51/week @ 2019-02-21 86/week @ 2019-02-28 7/week @ 2019-03-07 5/week @ 2019-03-14 538/week @ 2019-03-21 64/week @ 2019-03-28

313 downloads per month

Apache-2.0

1.5MB
35K SLoC

DataFusion

DataFusion is an in-memory query engine that uses Apache Arrow as the memory model. It supports executing SQL queries against CSV and Parquet files as well as querying directly against in-memory data.

Usage

Add this to your Cargo.toml:

[dependencies]
datafusion = "0.14.0-SNAPSHOT"

Status

General

  • SQL Parser
  • SQL Query Planner
  • Query Optimizer
  • Projection push down
  • Predicate push down
  • Type coercion
  • Parallel query execution

SQL Support

  • Projection
  • Selection
  • Aggregate
  • Sorting
  • Limit
  • Nested types and dot notation
  • Lists
  • UDFs
  • Subqueries
  • Joins

Data Sources

  • CSV
  • Parquet primitive types
  • Parquet nested types

Example

Here is a brief example for running a SQL query against a CSV file. See the examples directory for full examples.

fn main() {
    // create local execution context
    let mut ctx = ExecutionContext::new();

    // define schema for data source (csv file)
    let schema = Arc::new(Schema::new(vec![
        Field::new("c1", DataType::Utf8, false),
        Field::new("c2", DataType::UInt32, false),
        Field::new("c3", DataType::Int8, false),
        Field::new("c4", DataType::Int16, false),
        Field::new("c5", DataType::Int32, false),
        Field::new("c6", DataType::Int64, false),
        Field::new("c7", DataType::UInt8, false),
        Field::new("c8", DataType::UInt16, false),
        Field::new("c9", DataType::UInt32, false),
        Field::new("c10", DataType::UInt64, false),
        Field::new("c11", DataType::Float32, false),
        Field::new("c12", DataType::Float64, false),
        Field::new("c13", DataType::Utf8, false),
    ]));

    // register csv file with the execution context
    let csv_datasource = CsvDataSource::new(
        "../../testing/data/csv/aggregate_test_100.csv",
        schema.clone(),
        1024,
    );
    ctx.register_datasource("aggregate_test_100", Rc::new(RefCell::new(csv_datasource)));

    // execute the query
    let sql = "SELECT c1, MIN(c12), MAX(c12) FROM aggregate_test_100 WHERE c11 > 0.1 AND c11 < 0.9 GROUP BY c1";
    let relation = ctx.sql(&sql).unwrap();
    let mut results = relation.borrow_mut();

    // iterate over result batches
    while let Some(batch) = results.next().unwrap() {
        println!(
            "RecordBatch has {} rows and {} columns",
            batch.num_rows(),
            batch.num_columns()
        );

        let c1 = batch
            .column(0)
            .as_any()
            .downcast_ref::<BinaryArray>()
            .unwrap();

        let min = batch
            .column(1)
            .as_any()
            .downcast_ref::<Float64Array>()
            .unwrap();

        let max = batch
            .column(2)
            .as_any()
            .downcast_ref::<Float64Array>()
            .unwrap();

        for i in 0..batch.num_rows() {
            let c1_value: String = String::from_utf8(c1.value(i).to_vec()).unwrap();

            println!("{}, Min: {}, Max: {}", c1_value, min.value(i), max.value(i),);
        }
    }
}

Dependencies

~22MB
~415K SLoC