8 releases
0.0.1 | Nov 1, 2024 |
---|---|
0.0.1-beta.5 | Jul 3, 2024 |
0.0.1-beta.4 | May 10, 2024 |
0.0.1-beta.3 | Apr 2, 2024 |
0.0.1-beta | Sep 20, 2023 |
#15 in #spark
515KB
10K
SLoC
Apache Spark Connect Client for Rust
This project houses the experimental client for Spark Connect for Apache Spark written in Rust
Current State of the Project
Currently, the Spark Connect client for Rust is highly experimental and should not be used in any production setting. This is currently a "proof of concept" to identify the methods of interacting with Spark cluster from rust.
The spark-connect-rs
aims to provide an entrypoint to Spark Connect, and provide similar DataFrame API interactions.
Project Layout
├── crates <- crates for the implementation of the client side spark-connect bindings
│ └─ connect <- crate for 'spark-connect-rs'
│ └─ protobuf <- connect protobuf for apache/spark
├── examples <- examples of using different aspects of the crate
├── datasets <- sample files from the main spark repo
Future state would be to have additional crates that allow for easier creation of other language bindings.
Getting Started
This section explains how run Spark Connect Rust locally starting from 0.
Step 1: Install rust via rustup: https://www.rust-lang.org/tools/install
Step 2: Ensure you have a cmake and protobuf installed on your machine
Step 3: Run the following commands to clone the repo
git clone https://github.com/sjrusso8/spark-connect-rs.git
cargo build
Step 4: Setup the Spark Driver on localhost either by downloading spark or with docker.
With local spark:
-
Download Spark distribution (3.5.1 recommended), unzip the package.
-
Set your
SPARK_HOME
environment variable to the location where spark was extracted to, -
Start the Spark Connect server with the following command (make sure to use a package version that matches your Spark distribution):
$ $SPARK_HOME/sbin/start-connect-server.sh --packages "org.apache.spark:spark-connect_2.12:3.5.1,io.delta:delta-spark_2.12:3.0.0" \
--conf "spark.driver.extraJavaOptions=-Divy.cache.dir=/tmp -Divy.home=/tmp" \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
With docker:
- Start the Spark Connect server by leveraging the created
docker-compose.yml
in this repo. This will start a Spark Connect Server running on port 15002
$ docker compose up --build -d
Step 5: Run an example from the repo under /examples
Features
The following section outlines some of the larger functionality that are not yet working with this Spark Connect implementation.
- TLS authentication & Databricks compatability via the feature flag
feature = 'tls'
- UDFs or any type of functionality that takes a closure (foreach, foreachBatch, etc.)
SparkSession
Spark Session type object and its implemented traits
SparkSession | API | Comment |
---|---|---|
active | ||
addArtifact(s) | ||
addTag | ||
clearTags | ||
copyFromLocalToFs | ||
createDataFrame | Partial. Only works for RecordBatch |
|
getActiveSessions | ||
getTags | ||
interruptAll | ||
interruptOperation | ||
interruptTag | ||
newSession | ||
range | ||
removeTag | ||
sql | ||
stop | ||
table | ||
catalog | Catalog | |
client | unstable developer api for testing only | |
conf | Conf | |
read | DataFrameReader | |
readStream | DataStreamReader | |
streams | Streams | |
udf | Udf - may not be possible | |
udtf | Udtf - may not be possible | |
version |
SparkSessionBuilder
SparkSessionBuilder | API | Comment |
---|---|---|
appName | ||
config | ||
master | ||
remote | Validate using spark connection string |
RuntimeConfig
RuntimeConfig | API | Comment |
---|---|---|
get | ||
isModifiable | ||
set | ||
unset |
Catalog
Catalog | API | Comment |
---|---|---|
cacheTable | ||
clearCache | ||
createExternalTale | ||
createTable | ||
currentCatalog | ||
currentDatabase | ||
databaseExists | ||
dropGlobalTempView | ||
dropTempView | ||
functionExists | ||
getDatabase | ||
getFunction | ||
getTable | ||
isCached | ||
listCatalogs | ||
listDatabases | ||
listFunctions | ||
listTables | ||
recoverPartitions | ||
refreshByPath | ||
refreshTable | ||
registerFunction | ||
setCurrentCatalog | ||
setCurrentDatabase | ||
tableExists | ||
uncacheTable |
DataFrameReader
DataFrameReader | API | Comment |
---|---|---|
csv | ||
format | ||
json | ||
load | ||
option | ||
options | ||
orc | ||
parquet | ||
schema | ||
table | ||
text |
DataFrameWriter
Spark Connect should respect the format as long as your cluster supports the specified type and has the required jars
DataFrameWriter | API | Comment |
---|---|---|
bucketBy | ||
csv | ||
format | ||
insertInto | ||
jdbc | ||
json | ||
mode | ||
option | ||
options | ||
orc | ||
parquet | ||
partitionBy | ||
save | ||
saveAsTable | ||
sortBy | ||
text |
DataFrameWriterV2
DataFrameWriterV2 | API | Comment |
---|---|---|
append | ||
create | ||
createOrReplace | ||
option | ||
options | ||
overwrite | ||
overwritePartitions | ||
partitionedBy | ||
replace | ||
tableProperty | ||
using |
DataStreamReader
DataStreamReader | API | Comment |
---|---|---|
csv | ||
format | ||
json | ||
load | ||
option | ||
options | ||
orc | ||
parquet | ||
schema | ||
table | ||
text |
DataStreamWriter
Start a streaming job and return a StreamingQuery
object to handle the stream operations.
DataStreamWriter | API | Comment |
---|---|---|
foreach | ||
foreachBatch | ||
format | ||
option | ||
options | ||
outputMode | Uses an Enum for OutputMode |
|
partitionBy | ||
queryName | ||
start | ||
toTable | ||
trigger | Uses an Enum for TriggerMode |
StreamingQuery
StreamingQuery | API | Comment |
---|---|---|
awaitTermination | ||
exception | ||
explain | ||
processAllAvailable | ||
stop | ||
id | ||
isActive | ||
lastProgress | ||
name | ||
recentProgress | ||
runId | ||
status |
StreamingQueryManager
StreamingQueryManager | API | Comment |
---|---|---|
awaitAnyTermination | ||
get | ||
resetTerminated | ||
active |
StreamingQueryListener
StreamingQueryListener | API | Comment |
---|---|---|
onQueryIdle | ||
onQueryProgress | ||
onQueryStarted | ||
onQueryTerminated |
DataFrame
Spark DataFrame type object and its implemented traits.
DataFrame | API | Comment |
---|---|---|
agg | ||
alias | ||
approxQuantile | ||
cache | ||
checkpoint | Not part of Spark Connect | |
coalesce | ||
colRegex | ||
collect | ||
columns | ||
corr | ||
count | ||
cov | ||
createGlobalTempView | ||
createOrReplaceGlobalTempView | ||
createOrReplaceTempView | ||
createTempView | ||
crossJoin | ||
crosstab | ||
cube | ||
describe | ||
distinct | ||
drop | ||
dropDuplicatesWithinWatermark | ||
drop_duplicates | ||
dropna | ||
dtypes | ||
exceptAll | ||
explain | ||
fillna | ||
filter | ||
first | ||
foreach | ||
foreachPartition | ||
freqItems | ||
groupBy | ||
head | ||
hint | ||
inputFiles | ||
intersect | ||
intersectAll | ||
isEmpty | ||
isLocal | ||
isStreaming | ||
join | ||
limit | ||
localCheckpoint | Not part of Spark Connect | |
mapInPandas | TBD on this exact implementation | |
mapInArrow | TBD on this exact implementation | |
melt | ||
na | ||
observe | ||
offset | ||
orderBy | ||
persist | ||
printSchema | ||
randomSplit | ||
registerTempTable | ||
repartition | ||
repartitionByRange | ||
replace | ||
rollup | ||
sameSemantics | ||
sample | ||
sampleBy | ||
schema | ||
select | ||
selectExpr | ||
semanticHash | ||
show | ||
sort | ||
sortWithinPartitions | ||
sparkSession | ||
stat | ||
storageLevel | ||
subtract | ||
summary | ||
tail | ||
take | ||
to | ||
toDF | ||
toJSON | Does not return an RDD but a long JSON formatted String |
|
toLocalIterator | ||
Convert to a polars::frame::DataFrame |
||
new to_datafusion & toDataFusion | Convert to a datafusion::dataframe::DataFrame |
|
transform | ||
union | ||
unionAll | ||
unionByName | ||
unpersist | ||
unpivot | ||
where | use filter instead, where is a keyword for rust |
|
withColumn | ||
withColumns | ||
withColumnRenamed | ||
withColumnsRenamed | ||
withMetadata | ||
withWatermark | ||
write | ||
writeStream | ||
writeTo |
Column
Spark Column type object and its implemented traits
Column | API | Comment |
---|---|---|
alias | ||
asc | ||
asc_nulls_first | ||
asc_nulls_last | ||
astype | ||
between | ||
cast | ||
contains | ||
desc | ||
desc_nulls_first | ||
desc_nulls_last | ||
dropFields | ||
endswith | ||
eqNullSafe | ||
getField | This is depreciated but will need to be implemented | |
getItem | This is depreciated but will need to be implemented | |
ilike | ||
isNotNull | ||
isNull | ||
isin | ||
like | ||
name | ||
otherwise | ||
over | Refer to Window for creating window specifications | |
rlike | ||
startswith | ||
substr | ||
when | ||
withField | ||
eq == |
Rust does not like when you try to overload == and return something other than a bool . Currently implemented column equality like col('name').eq(col('id')) . Not the best, but it works for now |
|
addition + |
||
subtration - |
||
multiplication * |
||
division / |
||
OR | |
||
AND & |
||
XOR ^ |
||
Negate ~ |
Data Types
Data types are used for creating schemas and for casting columns to specific types
Column | API | Comment |
---|---|---|
ArrayType | ||
BinaryType | ||
BooleanType | ||
ByteType | ||
DateType | ||
DecimalType | ||
DoubleType | ||
FloatType | ||
IntegerType | ||
LongType | ||
MapType | ||
NullType | ||
ShortType | ||
StringType | ||
CharType | ||
VarcharType | ||
StructField | ||
StructType | ||
TimestampType | ||
TimestampNTZType | ||
DayTimeIntervalType | ||
YearMonthIntervalType |
Literal Types
Create Spark literal types from these rust types. E.g. lit(1_i64)
would be a LongType()
in the schema.
An array can be made like lit([1_i16,2_i16,3_i16])
would result in an ArrayType(Short)
since all the values of the slice can be translated into literal type.
Spark Literal Type | Rust Type | Status |
---|---|---|
Null | ||
Binary | &[u8] |
|
Boolean | bool |
|
Byte | ||
Short | i16 |
|
Integer | i32 |
|
Long | i64 |
|
Float | f32 |
|
Double | f64 |
|
Decimal | ||
String | &str / String |
|
Date | chrono::NaiveDate |
|
Timestamp | chrono::DateTime<Tz> |
|
TimestampNtz | chrono::NaiveDateTime |
|
CalendarInterval | ||
YearMonthInterval | ||
DayTimeInterval | ||
Array | slice / Vec |
|
Map | Create with the function create_map |
|
Struct | Create with the function struct_col or named_struct |
Window & WindowSpec
For ease of use it's recommended to use Window
to create the WindowSpec
.
Window | API | Comment |
---|---|---|
currentRow | ||
orderBy | ||
partitionBy | ||
rangeBetween | ||
rowsBetween | ||
unboundedFollowing | ||
unboundedPreceding | ||
WindowSpec.orderBy | ||
WindowSpec.partitionBy | ||
WindowSpec.rangeBetween | ||
WindowSpec.rowsBetween |
Functions
Only a few of the functions are covered by unit tests. Functions involving closures or lambdas are not feasible.
Functions | API | Comments |
---|---|---|
abs | ||
acos | ||
acosh | ||
add_months | ||
aes_decrypt | ||
aes_encrypt | ||
aggregate | ||
any_value | ||
approx_count_distinct | ||
approx_percentile | ||
array | ||
array_agg | ||
array_append | ||
array_compact | ||
array_contains | ||
array_distinct | ||
array_except | ||
array_insert | ||
array_intersect | ||
array_join | ||
array_max | ||
array_min | ||
array_position | ||
array_prepend | ||
array_remove | ||
array_repeat | ||
array_size | ||
array_sort | ||
array_union | ||
arrays_overlap | ||
arrays_zip | ||
asc | ||
asc_nulls_first | ||
asc_nulls_last | ||
ascii | ||
asin | ||
asinh | ||
assert_true | ||
atan | ||
atan2 | ||
atanh | ||
avg | ||
base64 | ||
bin | ||
bit_and | ||
bit_count | ||
bit_get | ||
bit_length | ||
bit_or | ||
bit_xor | ||
bitmap_bit_position | ||
bitmap_bucket_number | ||
bitmap_construct_agg | ||
bitmap_count | ||
bitmap_or_agg | ||
bitwise_not | ||
bool_and | ||
bool_or | ||
broadcast | ||
bround | ||
btrim | ||
bucket | ||
call_function | ||
call_udf | ||
cardinality | ||
cbrt | ||
ceil | ||
ceiling | ||
char | ||
char_length | ||
character_length | ||
coalesce | ||
col | ||
collect_list | ||
collect_set | ||
column | ||
concat | ||
concat_ws | ||
contains | ||
conv | ||
convert_timezone | ||
corr | ||
cos | ||
cosh | ||
cot | ||
count | ||
count_distinct | ||
count_if | ||
count_min_sketch | ||
covar_pop | ||
covar_samp | ||
crc32 | ||
create_map | ||
csc | ||
cume_dist | ||
curdate | ||
current_catalog | ||
current_database | ||
current_date | ||
current_schema | ||
current_timestamp | ||
current_timezone | ||
current_user | ||
date_add | ||
date_diff | ||
date_format | ||
date_from_unix_date | ||
date_part | ||
date_sub | ||
date_trunc | ||
dateadd | ||
datediff | ||
datepart | ||
day | ||
dayofmonth | ||
dayofweek | ||
dayofyear | ||
days | ||
decode | ||
degrees | ||
dense_rank | ||
desc | ||
desc_nulls_first | ||
desc_nulls_last | ||
e | ||
element_at | ||
elt | ||
encode | ||
endswith | ||
equal_null | ||
every | ||
exists | ||
exp | ||
explode | ||
explode_outer | ||
expm1 | ||
expr | ||
extract | ||
factorial | ||
filter | ||
find_in_set | ||
first | ||
first_value | ||
flatten | ||
floor | ||
forall | ||
format_number | ||
format_string | ||
from_csv | ||
from_json | ||
from_unixtime | ||
from_utc_timestamp | ||
get | ||
get_json_object | ||
getbit | ||
greatest | ||
grouping | ||
grouping_id | ||
hash | ||
hex | ||
histogram_numeric | ||
hll_sketch_agg | ||
hll_sketch_estimate | ||
hll_union | ||
hll_union_agg | ||
hour | ||
hours | ||
hypot | ||
ifnull | ||
ilike | ||
initcap | ||
inline | ||
inline_outer | ||
input_file_block_length | ||
input_file_block_start | ||
input_file_name | ||
instr | ||
isnan | ||
isnotnull | ||
isnull | ||
java_method | ||
json_array_length | ||
json_object_keys | ||
json_tuple | ||
kurtosis | ||
lag | ||
last | ||
last_day | ||
last_value | ||
lcase | ||
lead | ||
least | ||
left | ||
length | ||
levenshtein | ||
like | ||
lit | ||
ln | ||
localtimestamp | ||
locate | ||
log | ||
log10 | ||
log1p | ||
log2 | ||
lower | ||
lpad | ||
ltrim | ||
make_date | ||
make_dt_interval | ||
make_interval | ||
make_timestamp | ||
make_timestamp_ltz | ||
make_timestamp_ntz | ||
make_ym_interval | ||
map_concat | ||
map_contains_key | ||
map_entries | ||
map_filter | ||
map_from_arrays | ||
map_from_entries | ||
map_keys | ||
map_values | ||
map_zip_with | ||
mask | ||
max | ||
max_by | ||
md5 | ||
mean | ||
median | ||
min | ||
min_by | ||
minute | ||
mode | ||
monotonically_increasing_id | ||
month | ||
months | ||
months_between | ||
named_struct | ||
nanvl | ||
negate | ||
negative | ||
next_day | ||
now | ||
nth_value | ||
ntile | ||
nullif | ||
nvl | ||
nvl2 | ||
octet_length | ||
overlay | ||
pandas_udf | ||
parse_url | ||
percent_rank | ||
percentile | ||
percentile_approx | ||
pi | ||
pmod | ||
posexplode | ||
posexplode_outer | ||
position | ||
positive | ||
pow | ||
power | ||
printf | ||
product | ||
quarter | ||
radians | ||
raise_error | ||
rand | ||
randn | ||
rank | ||
reduce | ||
reflect | ||
regexp | ||
regexp_count | ||
regexp_extract | ||
regexp_extract_all | ||
regexp_instr | ||
regexp_like | ||
regexp_replace | ||
regexp_substr | ||
regr_avgx | ||
regr_avgy | ||
regr_count | ||
regr_intercept | ||
regr_r2 | ||
regr_slope | ||
regr_sxx | ||
regr_sxy | ||
regr_syy | ||
repeat | ||
replace | ||
reverse | ||
right | ||
rint | ||
rlike | ||
round | ||
row_number | ||
rpad | ||
rtrim | ||
schema_of_csv | ||
schema_of_json | ||
sec | ||
second | ||
sentences | ||
sequence | ||
session_window | ||
sha | ||
sha1 | ||
sha2 | ||
shiftleft | ||
shiftright | ||
shiftrightunsigned | ||
shuffle | ||
sign | ||
signum | ||
sin | ||
sinh | ||
size | ||
skewness | ||
slice | ||
some | ||
sort_array | ||
soundex | ||
spark_partition_id | ||
split | ||
split_part | ||
sqrt | ||
stack | ||
startswith | ||
std | ||
stddev | ||
stddev_pop | ||
stddev_samp | ||
str_to_map | ||
struct | ||
substr | ||
substring | ||
substring_index | ||
sum | ||
sum_distinct | ||
tan | ||
tanh | ||
timestamp_micros | ||
timestamp_millis | ||
timestamp_seconds | ||
to_binary | ||
to_char | ||
to_csv | ||
to_date | ||
to_json | ||
to_number | ||
to_timestamp | ||
to_timestamp_ltz | ||
to_timestamp_ntz | ||
to_unix_timestamp | ||
to_utc_timestamp | ||
to_varchar | ||
to_degrees | ||
to_radians | ||
transform | ||
transform_keys | ||
transform_values | ||
translate | ||
trim | ||
trunc | ||
try_add | ||
try_aes_decrypt | ||
try_avg | ||
try_divide | ||
try_element_at | ||
try_multiply | ||
try_subtract | ||
try_sum | ||
try_to_binary | ||
try_to_number | ||
try_to_timestamp | ||
typeof | ||
ucase | ||
udf | ||
udtf | ||
unbase64 | ||
unhex | ||
unix_date | ||
unix_micros | ||
unix_millis | ||
unix_seconds | ||
unix_timestamp | ||
unwrap_udt | ||
upper | ||
url_decode | ||
url_encode | ||
user | ||
var_pop | ||
var_samp | ||
variance | ||
version | ||
weekday | ||
weekofyear | ||
when | ||
width_bucket | ||
window | ||
window_time | ||
xpath | ||
xpath_boolean | ||
xpath_double | ||
xpath_float | ||
xpath_int | ||
xpath_long | ||
xpath_number | ||
xpath_short | ||
xpath_string | ||
xxhash64 | ||
year | ||
years | ||
zip_with |
UdfRegistration (may not be possible)
UDFRegistration | API | Comment |
---|---|---|
register | ||
registerJavaFunction | ||
registerJavaUDAF |
UdtfRegistration (may not be possible)
UDTFRegistration | API | Comment |
---|---|---|
register |
Dependencies
~25–45MB
~744K SLoC