#json #serde-json #key-value #streaming-json #json-key #pair #processing

streaming_serde_json

This is a streaming alternative to serde_json for processing JSON data sources that are too large to fit in memory

3 releases

0.1.2 Nov 18, 2024
0.1.1 Nov 18, 2024
0.1.0 Nov 17, 2024

#315 in Encoding

Download history 39/week @ 2024-11-11 320/week @ 2024-11-18 31/week @ 2024-12-02 6/week @ 2024-12-09

396 downloads per month

MIT license

115KB
2.5K SLoC

Streaming Serde JSON

This project is still experimental and not intended for production use. It has not been optimized to use any hardware capabilities like SIMD. The main serde_json crate is a better fit for all non-streaming use cases.

Goal

This library aims to solve a very simple to understand problem. JSON files can be larger than the memory limitations of the machine that is processing the data. As an example, if I have a file with 300 GB of JSON u64 values as input from a hardware device and am working on a machine with only 64 GB of RAM, that will not fit into my machine's memory. Assuming that JSON is well-formed and is simply in an extremely large array or dictionary, using the standard serde_json crate will not suffice. The standard serde_json crate will eagerly parse all 300 GB of values. This crate provides methods and iterator implementations for lazy-parsing JSON.

The Public API

This crate only exposes 4 functions. Each returns an iterator that performs the desired conversion (either character stream to value stream or value stream to JSON stream).

Functions:

  1. from_key_value_pair_stream - takes in a stream of characters and parses it as a single JSON object. The key value pairs will be returned one at a time from the returned iterator.
  2. from_value_stream - takes in a stream of characters and parses it as a single JSON array. The values will be returned one at a time from the returned iterator.
  3. key_value_pairs_to_json_stream - takes in an iterator of serializable values and returns an iterator of JSON string segments. These segments can then be incrementally written to an output target (probably a file or database in most cases).
  4. values_to_json_stream - takes in an iterator of key value pairs and returns an iterator of JSON string segments. These segments can then be incrementally written to an output target (probably a file or database in most cases).

Examples

Values API

The simplest example is reading from a file, aggregating, and writing to a file.

Let's say I have a 300 GB stream of object values where each object holds the min/max and average over a 5 minute window of data. This is placed in myFile.json, which might look something like this:

[
    {
        "avg": 100,
        "min": 2,
        "max": 178
    },
    {
        "avg": 102,
        "min": 5,
        "max": 198
    },
    "...etc"
]

If I want to find the global minimum and maximum values in this stream, I could do that with the following code:

use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Write};

use utf8_chars::BufReadCharsExt;
use serde_derive::{Serialize, Deserialize};
use streaming_serde_json::{from_value_stream, values_to_json_stream};

#[derive(Deserialize)]
struct InputData {
    avg: i32,
    min: i32,
    max: i32,
}

#[derive(Serialize)]
struct OutputData {
    min: i32,
    max: i32,
}

let mut reader = 
    BufReader::new(
        File::open("./myFile.json")
            .unwrap());

let chars = reader.chars();

// The value stream has a PassThroughError type parameter since most
// sources with data large enough to need this will be fallible to
// read from.
let values = from_value_stream::<InputData, _, _>(chars);

let mut global_min = i32::MAX;
let mut global_max = i32::MIN;
for result in values {
    // Your error handling goes here. Since this is a 
    // demonstration, I'm not worried about panics.
    let input_data_value: InputData = result.unwrap(); 

    if input_data_value.min < global_min {
        global_min = input_data_value.min;
    }
    if input_data_value.max > global_max {
        global_max = input_data_value.max;
    }
}

let output_data = OutputData {
    min: global_min,
    max: global_max,
};

// Since the output data will be only one item, using the 
// buffered writers and streaming JSON output functions 
// is overkill, but I am using them to demonstrate their 
// usage.
let output_file = OpenOptions::new()
            .read(false)
            .write(true)
            .create(true)
            .truncate(true)
            .open("./myResultJson.json")
            .unwrap();

let mut writer = BufWriter::new(output_file);

// I am using an array, but the values_to_json_stream
// function will accept any type that implements IntoIterator
// and has an Item type that implements Serialize.
let iter = [output_data];
for str in values_to_json_stream(iter) {
    writer.write_all(str.unwrap().as_bytes()).unwrap();
}
writer.flush().unwrap();

Key Value Pairs API

The simplest example is again, reading from a file, making some modification, and writing to a file.

Let's say I have a 300 GB stream of key value pairs where each value holds the population of a city in the world. This is placed in worldPopulations.json, which might look something like this:

{
    "Seattle": 755078,
    "New York": 19469200,
    "...etc"
}

If I want to find the global average population of a city, I could do that with the following code:

use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Write};

use utf8_chars::BufReadCharsExt;
use serde_derive::{Serialize, Deserialize};
use streaming_serde_json::{from_key_value_pair_stream, key_value_pairs_to_json_stream};

let mut reader = 
    BufReader::new(
        File::open("./worldPopulations.json")
            .unwrap());

let chars = reader.chars();

// The value stream has a PassThroughError type parameter since most
// sources with data large enough to need this API will be fallible to
// read from.
let values = from_key_value_pair_stream::<u32, _, _>(chars);

let mut global_total = 0_u128;
let mut city_count = 0_u32;
for result in values {
    // Your error handling goes here. Since this is a 
    // demonstration, I'm not worried about panics.
    let input_data_value: (String, u32) = result.unwrap(); 

    global_total += input_data_value.1 as u128;
    city_count += 1;
}

let global_average = global_total / city_count as u128;

// Since the output data will be only one item, using the 
// buffered writers and streaming JSON output functions 
// is overkill, but I am using them to demonstrate their 
// usage.
let output_file = OpenOptions::new()
            .read(false)
            .write(true)
            .create(true)
            .truncate(true)
            .open("./globalAvgPop.json")
            .unwrap();

let mut writer = BufWriter::new(output_file);

// I am using an array, but the values_to_json_stream
// function will accept any type that implements IntoIterator
// and has an Item type that implements Serialize.
let iter = [("global total".to_string(), global_average)];
for str in key_value_pairs_to_json_stream(iter) {
    writer.write_all(str.unwrap().as_bytes()).unwrap();
}
writer.flush().unwrap();

Dependencies

~0.5–1MB
~20K SLoC