20 releases
0.6.4 | Oct 25, 2021 |
---|---|
0.6.1 | Jul 8, 2021 |
0.5.0 | Jan 23, 2021 |
0.4.5 | Dec 16, 2020 |
0.4.3 | Jul 24, 2020 |
#501 in Compression
15KB
252 lines
Compress prost! messages with zstd, async streams support
Simple compress/decompress
fn do_roundtrip_coders(level: i32, dummies: Vec<proto::Dummy>) {
tracing_subscriber::fmt::try_init().ok();
let writer = vec![];
let mut encoder = ProstEncoder::new(writer, level).unwrap();
for dummy in &dummies {
encoder.write(dummy).unwrap();
}
let compressed = encoder.finish().unwrap();
let mut decoder = ProstDecoder::<proto::Dummy>::new_decompressed(&compressed[..]).unwrap();
let mut i: usize = 0;
while let Some(dummy) = decoder.next() {
let dummy = dummy.unwrap();
assert_eq!(&dummy, dummies.get(i).unwrap());
i += 1;
}
assert_eq!(dummies.len(), i);
}
Async streams support
enable-async
Cargo feature (enabled by default) exposes Compressor
and Decompressor
structs:
Compressor::build_stream
converts a stream of prost! messages to a stream of bytes;Decompressor::stream
converts a stream of compressed bytes to a stream of prost! messages.
Despite this example utilizes tokio channels, this crate does not depend on tokio, it's just used in tests.
fn do_roundtrip_channels(chunk_size: usize, level: i32, dummies: Vec<proto::Dummy>) {
tracing_subscriber::fmt::try_init().ok();
let mut rt = Runtime::new().unwrap();
// Dummy source ~> Compressor
let (mut source, dummy_rx) = mpsc::channel::<proto::Dummy>(dummies.len());
// Compressor ~> Decompressor
let (compressed_tx, compressed_rx) = mpsc::channel::<Vec<u8>>(dummies.len());
// Decompressor ~> Dummy sink
let (dummy_tx, mut sink) = mpsc::channel::<proto::Dummy>(dummies.len());
let compressor = Compressor::build_stream(dummy_rx, level, chunk_size).unwrap();
let decompressor = Decompressor::stream(compressed_rx);
rt.block_on(async move {
let compress_task = tokio::task::spawn(
compressor
.map_err(anyhow::Error::new)
.try_fold(compressed_tx, |mut ctx, compressed| async {
ctx.send(compressed)
.await
.map_err(|_| anyhow!("Failed to send compressed"))?;
Ok(ctx)
})
.map_ok(|_| ()),
);
let decompress_task = tokio::task::spawn(
decompressor
.map_err(anyhow::Error::new)
.try_fold(dummy_tx, |mut utx, message| async {
utx.send(message)
.await
.map_err(|_| anyhow!("Failed to send decompressed"))?;
Ok(utx)
})
.map_ok(|_| ()),
);
for dummy in &dummies {
source
.send(dummy.clone())
.await
.map_err(|_| anyhow!("Failed to send to source"))
.unwrap();
}
std::mem::drop(source);
let mut i: usize = 0;
while let Some(dummy) = sink.recv().await {
assert_eq!(&dummy, dummies.get(i).unwrap());
i += 1;
}
let (compress, decompress) =
futures::try_join!(compress_task, decompress_task).unwrap();
compress.unwrap();
decompress.unwrap();
assert_eq!(dummies.len(), i);
});
}
Dependencies
~5.5MB
~97K SLoC