18 releases (10 breaking)
0.12.0 | Feb 7, 2024 |
---|---|
0.11.0 | Feb 28, 2023 |
0.10.0 | Nov 21, 2022 |
0.7.0 | May 16, 2022 |
0.1.2 | Jul 9, 2019 |
#3 in #big-data
Used in 3 crates
620KB
13K
SLoC
Ballista: Distributed Scheduler for Apache Arrow DataFusion
Ballista is a distributed compute platform primarily implemented in Rust, and powered by Apache Arrow and DataFusion. It is built on an architecture that allows other programming languages (such as Python, C++, and Java) to be supported as first-class citizens without paying a penalty for serialization costs.
The foundational technologies in Ballista are:
- Apache Arrow memory model and compute kernels for efficient processing of data.
- Apache Arrow Flight Protocol for efficient data transfer between processes.
- Google Protocol Buffers for serializing query plans.
- Docker for packaging up executors along with user-defined code.
Ballista can be deployed as a standalone cluster and also supports Kubernetes. In either case, the scheduler can be configured to use etcd as a backing store to (eventually) provide redundancy in the case of a scheduler failing.
Rust Version Compatibility
This crate is tested with the latest stable version of Rust. We do not currrently test against other, older versions of the Rust compiler.
Starting a cluster
There are numerous ways to start a Ballista cluster, including support for Docker and Kubernetes. For full documentation, refer to the deployment section of the Ballista User Guide
A simple way to start a local cluster for testing purposes is to use cargo to install the scheduler and executor crates.
cargo install --locked ballista-scheduler
cargo install --locked ballista-executor
With these crates installed, it is now possible to start a scheduler process.
RUST_LOG=info ballista-scheduler
The scheduler will bind to port 50050 by default.
Next, start an executor processes in a new terminal session with the specified concurrency level.
RUST_LOG=info ballista-executor -c 4
The executor will bind to port 50051 by default. Additional executors can be started by manually specifying a bind port. For example:
RUST_LOG=info ballista-executor --bind-port 50052 -c 4
Executing a query
Ballista provides a BallistaContext
as a starting point for creating queries. DataFrames can be created
by invoking the read_csv
, read_parquet
, and sql
methods.
To build a simple ballista example, add the following dependencies to your Cargo.toml
file:
[dependencies]
ballista = "0.11"
datafusion = "28.0.0"
tokio = "1.0"
The following example runs a simple aggregate SQL query against a Parquet file (yellow_tripdata_2022-01.parquet
) from the
New York Taxi and Limousine Commission
data set. Download the file and add it to the testdata
folder before running the example.
use ballista::prelude::*;
use datafusion::prelude::{col, min, max, avg, sum, ParquetReadOptions};
use datafusion::arrow::util::pretty;
use datafusion::prelude::CsvReadOptions;
#[tokio::main]
async fn main() -> Result<()> {
// create configuration
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
// connect to Ballista scheduler
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
let filename = "testdata/yellow_tripdata_2022-01.parquet";
// define the query using the DataFrame trait
let df = ctx
.read_parquet(filename, ParquetReadOptions::default())
.await?
.select_columns(&["passenger_count", "fare_amount"])?
.aggregate(vec![col("passenger_count")], vec![min(col("fare_amount")), max(col("fare_amount")), avg(col("fare_amount")), sum(col("fare_amount"))])?
.sort(vec![col("passenger_count").sort(true,true)])?;
// this is equivalent to the following SQL
// SELECT passenger_count, MIN(fare_amount), MAX(fare_amount), AVG(fare_amount), SUM(fare_amount)
// FROM tripdata
// GROUP BY passenger_count
// ORDER BY passenger_count
// print the results
df.show().await?;
Ok(())
}
The output should look similar to the following table.
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
| passenger_count | MIN(?table?.fare_amount) | MAX(?table?.fare_amount) | AVG(?table?.fare_amount) | SUM(?table?.fare_amount) |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
| | -159.5 | 285.2 | 17.60577640099004 | 1258865.829999991 |
| 0 | -115 | 500 | 11.794859107585335 | 614052.1600000001 |
| 1 | -480 | 401092.32 | 12.61028389876563 | 22623542.879999973 |
| 2 | -250 | 640.5 | 13.79501011585127 | 4732047.139999998 |
| 3 | -130 | 480 | 13.473184817311106 | 1139427.2400000002 |
| 4 | -250 | 464 | 14.232650547832726 | 502711.4499999997 |
| 5 | -52 | 668 | 12.160378472086954 | 624289.51 |
| 6 | -52 | 252.5 | 12.576583325529857 | 402916 |
| 7 | 7 | 79 | 61.77777777777778 | 556 |
| 8 | 8.3 | 115 | 79.9125 | 639.3 |
| 9 | 9.3 | 96.5 | 65.26666666666667 | 195.8 |
+-----------------+--------------------------+--------------------------+--------------------------+--------------------------+
More examples can be found in the arrow-ballista repository.
Dependencies
~72MB
~1.5M SLoC