1 unstable release
0.2.0 | Jul 5, 2024 |
---|
#40 in #recipe
Used in pyspark-arrow-rs
3KB
Pyspark Arrow Rust (pyspark-arrow-rs)
Intention is to be used together with Pyo3 (maturin), arrow-rs, serde_arrow to generate python code that can be used in Spark for ETL jobs
Context
The naive way to try to use Spark would be to use Spark UDFs or Pandas UDFs. Spark is natively written in the JVM, and interops with Python through Py4J. The conversion overhead from a Java RDD into something that a Python UDF can process is quite high.
As such, the way around this is to use Apache Arrow. PySpark already uses Arrow for interop between JVM and Python, and by directly leveraging the Arrow format, we are able to skip any additional overhead and directly read the Arrow object in a Rust function.
RDD -> Spark (Java) -> Arrow -> Pyspark -> Rust
How to do this
We are able to directly load Arrow objects into a Rust function wrapped in Python (through Maturin) through the pyspark.sql.Dataframe.mapInArrow
API.
This API expects two arguments:
- A Python function that yields an Apache Arrow RecordBatch item
- The expected schema
We are able to write a Rust function that returns an arrow-rs RecordBatch item that can be yielded by the python function.
We are also able to pass in the expected schema by using serde_arrow (and a macro provided by this crate) to derive the schema.
TL;DR
This crate provides some macros that make life easier to be used together with mapInArrow
as well as provide some recipes on how to leverage Rust in Pyspark
Example Usage
- Refer to
tests
folder for a real example. But the basic idea is, given a struct, you can generate a spark DDL to be used inmap_in_arrow
on a pyspark dataframe
use pyspark_arrow_rs::HasArrowSparkSchema;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, HasArrowSparkSchema)]
pub(crate) struct TestStruct {
pub(crate) col: String,
pub(crate) int_col: i64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_macro() {
let _a = TestStruct {
col: "bla".to_string(),
int_col: 1000,
};
let ddl = TestStruct::get_spark_ddl();
assert_eq!("`col` STRING, `int_col` BIGINT", ddl.unwrap());
}
}
Dependencies
~1.5MB
~37K SLoC