18 releases (10 breaking)

0.12.0 Feb 7, 2024
0.11.0 Feb 28, 2023
0.10.0 Nov 21, 2022
0.7.0 May 16, 2022
0.1.2 Jul 9, 2019

#485 in Database interfaces

Download history 6/week @ 2024-02-02 21/week @ 2024-02-16 37/week @ 2024-02-23 10/week @ 2024-03-01 11/week @ 2024-03-08 17/week @ 2024-03-15 30/week @ 2024-03-29 12/week @ 2024-04-05

60 downloads per month
Used in 3 crates

Apache-2.0

620KB
13K SLoC

Ballista: Distributed Scheduler for Apache Arrow DataFusion

Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.

The foundational technologies in Ballista are:

Ballista can be deployed as a standalone cluster and also supports Kubernetes. In either case, the scheduler can be configured to use etcd as a backing store to (eventually) provide redundancy in the case of a scheduler failing.

Rust Version Compatibility

This crate is tested with the latest stable version of Rust. We do not currrently test against other, older versions of the Rust compiler.

Starting a cluster

There are numerous ways to start a Ballista cluster, including support for Docker and Kubernetes. For full documentation, refer to the deployment section of the Ballista User Guide

A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.

cargo install --locked ballista-scheduler
cargo install --locked ballista-executor

With these crates installed, it is now possible to start a scheduler process.

RUST_LOG=info ballista-scheduler

The scheduler will bind to port 50050 by default.

Next, start an executor processes in a new terminal session with the specified concurrency level.

RUST_LOG=info ballista-executor -c 4

The executor will bind to port 50051 by default. Additional executors can be started by manually specifying a bind port. For example:

RUST_LOG=info ballista-executor --bind-port 50052 -c 4

Executing a query

Ballista provides a BallistaContext as a starting point for creating queries. DataFrames can be created by invoking the read_csv, read_parquet, and sql methods.

To build a simple ballista example, add the following dependencies to your Cargo.toml file:

[dependencies]
ballista = "0.11"
datafusion = "28.0.0"
tokio = "1.0"

The following example runs a simple aggregate SQL query against a Parquet file (yellow_tripdata_2022-01.parquet) from the New York Taxi and Limousine Commission data set. Download the file and add it to the testdata folder before running the example.

use ballista::prelude::*;
use datafusion::prelude::{col, min, max, avg, sum, ParquetReadOptions};
use datafusion::arrow::util::pretty;
use datafusion::prelude::CsvReadOptions;

#[tokio::main]
async fn main() -> Result<()> {
    // create configuration
    let config = BallistaConfig::builder()
        .set("ballista.shuffle.partitions", "4")
        .build()?;

    // connect to Ballista scheduler
    let ctx = BallistaContext::remote("localhost", 50050, &config).await?;

    let filename = "testdata/yellow_tripdata_2022-01.parquet";

    // define the query using the DataFrame trait
    let df = ctx
        .read_parquet(filename, ParquetReadOptions::default())
        .await?
        .select_columns(&["passenger_count", "fare_amount"])?
        .aggregate(vec![col("passenger_count")], vec![min(col("fare_amount")), max(col("fare_amount")), avg(col("fare_amount")), sum(col("fare_amount"))])?
        .sort(vec![col("passenger_count").sort(true,true)])?;

    // this is equivalent to the following SQL
    // SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
    //     FROM tripdata
    //     GROUP BY passenger_count
    //     ORDER BY passenger_count

    // print the results
    df.show().await?;

    Ok(())
}

The output should look similar to the following table.

+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
| passenger_count | MIN(?table?.fare_amount) | MAX(?table?.fare_amount) | AVG(?table?.fare_amount) | SUM(?table?.fare_amount) |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
|                 | -159.5                   | 285.2                    | 17.60577640099004        | 1258865.829999991        |
| 0               | -115                     | 500                      | 11.794859107585335       | 614052.1600000001        |
| 1               | -480                     | 401092.32                | 12.61028389876563        | 22623542.879999973       |
| 2               | -250                     | 640.5                    | 13.79501011585127        | 4732047.139999998        |
| 3               | -130                     | 480                      | 13.473184817311106       | 1139427.2400000002       |
| 4               | -250                     | 464                      | 14.232650547832726       | 502711.4499999997        |
| 5               | -52                      | 668                      | 12.160378472086954       | 624289.51                |
| 6               | -52                      | 252.5                    | 12.576583325529857       | 402916                   |
| 7               | 7                        | 79                       | 61.77777777777778        | 556                      |
| 8               | 8.3                      | 115                      | 79.9125                  | 639.3                    |
| 9               | 9.3                      | 96.5                     | 65.26666666666667        | 195.8                    |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+

More examples can be found in the arrow-ballista repository.

Dependencies

~71MB
~1.5M SLoC