#protocol-buffers #protobuf #prost #zstd #streams

prosto

Compress prost! messages with zstd, optional tokio channels support

13 unstable releases (3 breaking)

0.4.4 Aug 22, 2020
0.4.3 Jul 24, 2020
0.4.0 Jun 18, 2020
0.3.0 Jun 18, 2020
0.1.4 Jun 16, 2020

#44 in Compression

Download history 67/week @ 2020-06-11 79/week @ 2020-06-18 4/week @ 2020-06-25 18/week @ 2020-07-02 39/week @ 2020-07-09 46/week @ 2020-07-16 59/week @ 2020-07-23 65/week @ 2020-07-30 55/week @ 2020-08-06 64/week @ 2020-08-13 31/week @ 2020-08-20 21/week @ 2020-08-27 45/week @ 2020-09-03 63/week @ 2020-09-10

170 downloads per month

Custom license

20KB
398 lines

Compress prost! messages with zstd, async streams support

docs

Simple compress/decompress

fn do_roundtrip_coders(level: i32, dummies: Vec<proto::Dummy>) {
    tracing_subscriber::fmt::try_init().ok();

    let writer = vec![];
    let mut encoder = ProstEncoder::new(writer, level).unwrap();
    for dummy in &dummies {
        encoder.write(dummy).unwrap();
    }
    let compressed = encoder.finish().unwrap();

    let mut decoder = ProstDecoder::<proto::Dummy>::new_decompressed(&compressed[..]).unwrap();

    let mut i: usize = 0;
    while let Some(dummy) = decoder.next() {
        let dummy = dummy.unwrap();
        assert_eq!(&dummy, dummies.get(i).unwrap());
        i += 1;
    }

    assert_eq!(dummies.len(), i);
}

Async streams support

enable-async Cargo feature (enabled by default) exposes Compressor and Decompressor structs:

  • Compressor::build_stream converts a stream of prost! messages to a stream of bytes;
  • Decompressor::stream converts a stream of compressed bytes to a stream of prost! messages.

Despite this example utilizes tokio channels, this crate does not depend on tokio, it's just used in tests.

fn do_roundtrip_channels(chunk_size: usize, level: i32, dummies: Vec<proto::Dummy>) {
    tracing_subscriber::fmt::try_init().ok();

    let mut rt = Runtime::new().unwrap();

    // Dummy source ~> Compressor
    let (mut source, dummy_rx) = mpsc::channel::<proto::Dummy>(dummies.len());
    // Compressor ~> Decompressor
    let (compressed_tx, compressed_rx) = mpsc::channel::<Vec<u8>>(dummies.len());
    // Decompressor ~> Dummy sink
    let (dummy_tx, mut sink) = mpsc::channel::<proto::Dummy>(dummies.len());

    let compressor = Compressor::build_stream(dummy_rx, level, chunk_size).unwrap();
    let decompressor = Decompressor::stream(compressed_rx);

    rt.block_on(async move {
        let compress_task = tokio::task::spawn(
            compressor
                .map_err(anyhow::Error::new)
                .try_fold(compressed_tx, |mut ctx, compressed| async {
                    ctx.send(compressed)
                        .await
                        .map_err(|_| anyhow!("Failed to send compressed"))?;
                    Ok(ctx)
                })
                .map_ok(|_| ()),
        );
        let decompress_task = tokio::task::spawn(
            decompressor
                .map_err(anyhow::Error::new)
                .try_fold(dummy_tx, |mut utx, message| async {
                    utx.send(message)
                        .await
                        .map_err(|_| anyhow!("Failed to send decompressed"))?;
                    Ok(utx)
                })
                .map_ok(|_| ()),
        );

        for dummy in &dummies {
            source
                .send(dummy.clone())
                .await
                .map_err(|_| anyhow!("Failed to send to source"))
                .unwrap();
        }

        std::mem::drop(source);

        let mut i: usize = 0;
        while let Some(dummy) = sink.recv().await {
            assert_eq!(&dummy, dummies.get(i).unwrap());
            i += 1;
        }

        let (compress, decompress) =
            futures::try_join!(compress_task, decompress_task).unwrap();
        compress.unwrap();
        decompress.unwrap();
        assert_eq!(dummies.len(), i);
    });
}

Dependencies

~8MB
~148K SLoC