#decompression #streaming #fallible #iterator #data

streaming-decompression

Fallible streaming iterator specialized for decompression

3 releases

0.1.2 Jul 30, 2022
0.1.1 Jul 30, 2022
0.1.0 Oct 15, 2021
Download history 9169/week @ 2022-04-25 5262/week @ 2022-05-02 6039/week @ 2022-05-09 6914/week @ 2022-05-16 5146/week @ 2022-05-23 5829/week @ 2022-05-30 7829/week @ 2022-06-06 8899/week @ 2022-06-13 8269/week @ 2022-06-20 7037/week @ 2022-06-27 10386/week @ 2022-07-04 11493/week @ 2022-07-11 13112/week @ 2022-07-18 14793/week @ 2022-07-25 13431/week @ 2022-08-01 12725/week @ 2022-08-08

55,019 downloads per month
Used in 4 crates (via parquet2)

Apache-2.0

11KB
159 lines

This crate contains a [FallibleStreamingIterator] optimized for decompressions.

A typical problem that libraries working with compressed formats face is that they need to preserve an intermediary buffer to decompress multiple items. Specifically, implementations that use

fn decompress(compressed: Vec<u8>) -> Vec<u8> {
    unimplemented!("Decompress")
}

are ineficient because they will need to allocate a new Vec<u8> for every decompression, and this allocation scales with the average decompressed size the items.

The typical solution for this problem is to offer

fn decompress(compressed: Vec<u8>, decompressed: &mut Vec<u8>) {
    decompressed.clear();
    unimplemented!("Decompress into `decompressed`, maybe re-allocing it.")
}

Such API avoids one allocation per item, but requires the user to deal with preserving decompressed between iterations. Such pattern is not possible to achieve with [Iterator] API atm.

This crate offers [Decompressor], a [FallibleStreamingIterator] that consumes an [Iterator] of compressed items that yields uncompressed items, maintaining an internal [Vec<u8>] that is automatically re-used across items.

Example

use streaming_codec::{Decompressor, Compressed, Decompressed, FallibleStreamingIterator};

// An item that is decompressable
#[derive(Debug, PartialEq)]
struct CompressedItem {
    pub metadata: String,
    pub data: Vec<u8>,
}
impl Compressed for CompressedItem {
    fn is_compressed(&self) -> bool {
        // whether it is decompressed or not depends on some metadata.
        self.metadata == "is_compressed"
    }
}

// A decompressed item
#[derive(Debug, PartialEq)]
struct DecompressedItem {
    pub metadata: String,
    pub data: Vec<u8>,
}

impl Decompressed for DecompressedItem {
    fn buffer_mut(&mut self) -> &mut Vec<u8> {
        &mut self.data
    }
}

// the decompression function. This could call LZ4, Snappy, etc.
fn decompress(
    mut i: CompressedItem,
    buffer: &mut Vec<u8>,
) -> Result<DecompressedItem, std::convert::Infallible> {
    if i.is_compressed() {
        // the actual decompression, here identity, but more complex stuff can happen.
        buffer.clear();
        buffer.extend(&mut i.data.iter().rev());
    } else {
        std::mem::swap(&mut i.data, buffer);
    };
    Ok(DecompressedItem {
        metadata: i.metadata,
        data: std::mem::take(buffer),
    })
}

fn main() -> Result<(), std::convert::Infallible> {
   // consider some compressed items
   let item1 = CompressedItem {
       metadata: "is_compressed".to_string(),
       data: vec![1, 2, 3],
   };
   let item2 = CompressedItem {
       metadata: "is_compressed".to_string(),
       data: vec![4, 5, 6],
   };
   let iter = vec![Ok(item1), Ok(item2)].into_iter();

   let buffer = vec![0; 4];  // the internal buffer: it could contain anything.
   let mut decompressor = Decompressor::new(iter, buffer, decompress);

   let item = decompressor.next()?.unwrap();
   // the item was decompressed
   assert_eq!(item.data, vec![3, 2, 1]);
   assert_eq!(item.metadata, "is_compressed".to_string());

   let item = decompressor.next()?.unwrap();
   // the item was decompressed
   assert_eq!(item.data, vec![6, 5, 4]);
   assert_eq!(item.metadata, "is_compressed".to_string());

   assert_eq!(decompressor.next()?, None);

   // we can re-use the internal buffer if we wish to
   let internal = decompressor.into_inner();
   assert_eq!(internal, vec![6, 5, 4]);
   Ok(())
}

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.

Dependencies

~23KB