5 releases
0.0.1-beta.3 | Apr 2, 2024 |
---|---|
0.0.1-beta.2 | Mar 24, 2024 |
0.0.1-beta.1 | Mar 4, 2024 |
0.0.1-beta | Sep 20, 2023 |
0.0.1-alpha | Sep 18, 2023 |
#180 in Database interfaces
391 downloads per month
240KB
5K
SLoC
Apache Spark Connect Client for Rust
This project houses the experimental client for Spark Connect for Apache Spark written in Rust
Current State of the Project
Currently, the Spark Connect client for Rust is highly experimental and should not be used in any production setting. This is currently a "proof of concept" to identify the methods of interacting with Spark cluster from rust.
Quick Start
The spark-connect-rs
aims to provide an entrypoint to Spark Connect, and provide similar DataFrame API interactions.
docker compose up --build -d
use spark_connect_rs;
use spark_connect_rs::{SparkSession, SparkSessionBuilder};
use spark_connect_rs::functions as F;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let spark: SparkSession = SparkSessionBuilder::remote("sc://127.0.0.1:15002/")
.build()
.await?;
let df = spark
.sql("SELECT * FROM json.`/opt/spark/examples/src/main/resources/employees.json`")
.await?;
df.filter("salary >= 3500")
.select(F::col("name"))
.show(Some(5), None, None)
.await?;
// +-------------+
// | show_string |
// +-------------+
// | +------+ |
// | |name | |
// | +------+ |
// | |Andy | |
// | |Justin| |
// | |Berta | |
// | +------+ |
// | |
// +-------------+
Ok(())
}
Getting Started
git clone https://github.com/sjrusso8/spark-connect-rs.git
git submodule update --init --recursive
docker compose up --build -d
cargo build && cargo test
Features
The following section outlines some of the larger functionality that are not yet working with this Spark Connect implementation.
- TLS authentication & Databricks compatability
- StreamingQueryManager
- Window and Pivot functions
- UDFs or any type of functionality that takes a closure (foreach, foreachBatch, etc.)
SparkSession
Spark Session type object and its implemented traits
SparkSession | API | Comment |
---|---|---|
active | ||
appName | ||
catalog | Partial. Only Get/List traits are implemented | |
createDataFrame | Partial. Only works for RecordBatch |
|
range | ||
read | ||
readStream | Creates a DataStreamReader object |
|
sql | ||
stop | ||
streams | Stream Manager is not yet implemented | |
table | ||
version | ||
addArtifact(s) | ||
interruptAll | ||
interruptTag | ||
interruptOperation | ||
addTag | ||
removeTag | ||
getTags | ||
clearTags |
DataFrame
Spark DataFrame type object and its implemented traits.
DataFrame | API | Comment |
---|---|---|
agg | ||
alias | ||
approxQuantile | ||
cache | ||
checkpoint | ||
coalesce | ||
colRegex | ||
collect | ||
columns | ||
corr | ||
count | ||
cov | ||
createGlobalTempView | ||
createOrReplaceGlobalTempView | ||
createOrReplaceTempView | ||
createTempView | ||
crossJoin | ||
crosstab | ||
cube | ||
describe | ||
distinct | ||
drop | ||
dropDuplicates | ||
dropDuplicatesWithinWatermark | Windowing functions are currently in progress | |
drop_duplicates | ||
dropna | ||
dtypes | ||
exceptAll | ||
explain | ||
fillna | ||
filter | ||
first | ||
foreach | ||
foreachPartition | ||
freqItems | ||
groupBy | ||
head | ||
hint | ||
inputFiles | ||
intersect | ||
intersectAll | ||
isEmpty | ||
isLocal | ||
isStreaming | ||
join | ||
limit | ||
localCheckpoint | ||
mapInPandas | TBD on this exact implementation | |
mapInArrow | TBD on this exact implementation | |
melt | groupby and aggregrate functions are currently in progress | |
na | ||
observe | ||
offset | ||
orderBy | ||
persist | ||
printSchema | ||
randomSplit | ||
registerTempTable | ||
repartition | ||
repartitionByRange | ||
replace | ||
rollup | ||
sameSemantics | ||
sample | ||
sampleBy | ||
schema | ||
select | ||
selectExpr | ||
semanticHash | ||
show | ||
sort | ||
sortWithinPartitions | ||
sparkSession | ||
stat | ||
storageLevel | ||
subtract | ||
summary | ||
tail | ||
take | ||
to | ||
toDF | ||
toJSON | ||
toLocalIterator | ||
toPandas | TBD on this exact implementation. Might be toPolars | |
transform | ||
union | ||
unionAll | ||
unionByName | ||
unpersist | ||
unpivot | ||
where | use sort where is a keyword for rust |
|
withColumn | ||
withColumns | ||
withColumnRenamed | ||
withColumnsRenamed | ||
withMetadata | ||
withWatermark | ||
write | ||
writeStream | ||
writeTo |
DataFrameWriter
Spark Connect should respect the format as long as your cluster supports the specified type and has the required jars
DataFrameWriter | API | Comment |
---|---|---|
format | ||
option | ||
options | ||
mode | ||
bucketBy | ||
sortBy | ||
partitionBy | ||
save | ||
saveAsTable | ||
insertInto |
DataStreamWriter
Start a streaming job and return a StreamingQuery
object to handle the stream operations.
DataStreamWriter | API | Comment |
---|---|---|
format | ||
foreach | ||
foreachBatch | ||
option | ||
options | ||
outputMode | Uses an Enum for OutputMode |
|
partitionBy | ||
queryName | ||
trigger | Uses an Enum for TriggerMode |
|
start | ||
toTable |
StreamingQuery
A handle to a query that is executing continuously in the background as new data arrives.
StreamingQuery | API | Comment |
---|---|---|
awaitTermination | ||
exception | ||
explain | ||
processAllAvailable | ||
stop | ||
id | ||
isActive | ||
lastProgress | ||
name | ||
recentProgress | ||
runId | ||
status |
Column
Spark Column type object and its implemented traits
Column | API | Comment |
---|---|---|
alias | ||
asc | ||
asc_nulls_first | ||
asc_nulls_last | ||
astype | ||
between | ||
cast | ||
contains | ||
desc | ||
desc_nulls_first | ||
desc_nulls_last | ||
dropFields | ||
endswith | ||
ilike | ||
isNotNull | ||
isNull | ||
isin | ||
like | ||
name | ||
otherwise | ||
rlike | ||
startswith | ||
substr | ||
when | ||
eq == |
Rust does not like when you try to overload == and return something other than a bool . Currently implemented column equality like col('name').eq(col('id')) . Not the best, but it works for now |
|
addition + |
||
subtration - |
||
multiplication * |
||
division / |
||
OR | |
||
AND & |
||
XOR ^ |
||
Negate ~ |
Functions
Only a few of the functions are covered by unit tests.
Functions | API | Comment |
---|---|---|
abs | ||
acos | ||
acosh | ||
add_months | ||
aggregate | ||
approxCountDistinct | ||
approx_count_distinct | ||
array | ||
array_append | ||
array_compact | ||
array_contains | ||
array_distinct | ||
array_except | ||
array_insert | ||
array_intersect | ||
array_join | ||
array_max | ||
array_min | ||
array_position | ||
array_remove | ||
array_repeat | ||
array_sort | ||
array_union | ||
arrays_overlap | ||
arrays_zip | ||
asc | ||
asc_nulls_first | ||
asc_nulls_last | ||
ascii | ||
asin | ||
asinh | ||
assert_true | ||
atan | ||
atan2 | ||
atanh | ||
avg | ||
base64 | ||
bin | ||
bit_length | ||
bitwiseNOT | ||
bitwise_not | ||
broadcast | ||
bround | ||
bucket | ||
call_udf | ||
cbrt | ||
ceil | ||
coalesce | ||
col | ||
collect_list | ||
collect_set | ||
column | ||
concat | ||
concat_ws | ||
conv | ||
corr | ||
cos | ||
cosh | ||
cot | ||
count | ||
countDistinct | ||
count_distinct | ||
covar_pop | ||
covar_samp | ||
crc32 | ||
create_map | ||
csc | ||
cume_dist | ||
current_date | ||
current_timestamp | ||
date_add | ||
date_format | ||
date_sub | ||
date_trunc | ||
datediff | ||
dayofmonth | ||
dayofweek | ||
dayofyear | ||
days | ||
decode | ||
degrees | ||
dense_rank | ||
desc | ||
desc_nulls_first | ||
desc_nulls_last | ||
element_at | ||
encode | ||
exists | ||
exp | ||
explode | ||
explode_outer | ||
expm1 | ||
expr | ||
factorial | ||
filter | ||
first | ||
flatten | ||
floor | ||
forall | ||
format_number | ||
format_string | ||
from_csv | ||
from_json | ||
from_unixtime | ||
from_utc_timestamp | ||
functools | ||
get | ||
get_active_spark_context | ||
get_json_object | ||
greatest | ||
grouping | ||
grouping_id | ||
has_numpy | ||
hash | ||
hex | ||
hour | ||
hours | ||
hypot | ||
initcap | ||
inline | ||
inline_outer | ||
input_file_name | ||
inspect | ||
instr | ||
isnan | ||
isnull | ||
json_tuple | ||
kurtosis | ||
lag | ||
last | ||
last_day | ||
lead | ||
least | ||
length | ||
levenshtein | ||
lit | ||
localtimestamp | ||
locate | ||
log | ||
log10 | ||
log1p | ||
log2 | ||
lower | ||
lpad | ||
ltrim | ||
make_date | ||
map_concat | ||
map_contains_key | ||
map_entries | ||
map_filter | ||
map_from_arrays | ||
map_from_entries | ||
map_keys | ||
map_values | ||
map_zip_with | ||
max | ||
max_by | ||
md5 | ||
mean | ||
median | ||
min | ||
min_by | ||
minute | ||
mode | ||
monotonically_increasing_id | ||
month | ||
months | ||
months_between | ||
nanvl | ||
next_day | ||
np | ||
nth_value | ||
ntile | ||
octet_length | ||
overlay | ||
overload | ||
pandas_udf | ||
percent_rank | ||
percentile_approx | ||
pmod | ||
posexplode | ||
posexplode_outer | ||
pow | ||
product | ||
quarter | ||
radians | ||
raise_error | ||
rand | ||
randn | ||
rank | ||
regexp_extract | ||
regexp_replace | ||
repeat | ||
reverse | ||
rint | ||
round | ||
row_number | ||
rpad | ||
rtrim | ||
schema_of_csv | ||
schema_of_json | ||
sec | ||
second | ||
sentences | ||
sequence | ||
session_window | ||
sha1 | ||
sha2 | ||
shiftLeft | ||
shiftRight | ||
shiftRightUnsigned | ||
shiftleft | ||
shiftright | ||
shiftrightunsigned | ||
shuffle | ||
signum | ||
sin | ||
sinh | ||
size | ||
skewness | ||
slice | ||
sort_array | ||
soundex | ||
spark_partition_id | ||
split | ||
sqrt | ||
stddev | ||
stddev_pop | ||
stddev_samp | ||
struct | ||
substring | ||
substring_index | ||
sum | ||
sumDistinct | ||
sum_distinct | ||
sys | ||
tan | ||
tanh | ||
timestamp_seconds | ||
toDegrees | ||
toRadians | ||
to_csv | ||
to_date | ||
to_json | ||
to_str | ||
to_timestamp | ||
to_utc_timestamp | ||
transform | ||
transform_keys | ||
transform_values | ||
translate | ||
trim | ||
trunc | ||
try_remote_functions | ||
udf | ||
unbase64 | ||
unhex | ||
unix_timestamp | ||
unwrap_udt | ||
upper | ||
var_pop | ||
var_samp | ||
variance | ||
warnings | ||
weekofyear | ||
when | ||
window | ||
window_time | ||
xxhash64 | ||
year | ||
years | ||
zip_with |
Schema
Spark schema objects have not yet been translated into rust objects.
Literal Types
Create Spark literal types from these rust types. E.g. lit(1_i64)
would be a LongType()
in the schema.
An array can be made like lit([1_i16,2_i16,3_i16])
would result in an ArrayType(Short)
since all the values of the slice can be translated into literal type.
Spark Literal Type | Rust Type | Status |
---|---|---|
Null | ||
Binary | ||
Boolean | bool |
|
Byte | ||
Short | i16 |
|
Integer | i32 |
|
Long | i64 |
|
Float | f32 |
|
Double | f64 |
|
Decimal | ||
String | &str / String |
|
Date | chrono::NaiveDate |
|
Timestamp | ||
TimestampNtz | chrono::TimeZone |
|
CalendarInterval | ||
YearMonthInterval | ||
DayTimeInterval | ||
Array | slice / Vec |
|
Map | ||
Struct |
Dependencies
~26–41MB
~730K SLoC