#timely #dataflow

timely_communication

Communication layer for timely dataflow

20 releases

0.12.0 Mar 10, 2021
0.11.1 Nov 18, 2019
0.10.0 Jul 9, 2019
0.9.0 Mar 31, 2019
0.1.2 Nov 2, 2015

#3 in #timely

Download history 673/week @ 2022-12-05 688/week @ 2022-12-12 787/week @ 2022-12-19 436/week @ 2022-12-26 573/week @ 2023-01-02 517/week @ 2023-01-09 370/week @ 2023-01-16 604/week @ 2023-01-23 988/week @ 2023-01-30 725/week @ 2023-02-06 939/week @ 2023-02-13 1034/week @ 2023-02-20 892/week @ 2023-02-27 1705/week @ 2023-03-06 1520/week @ 2023-03-13 1310/week @ 2023-03-20

5,507 downloads per month
Used in 14 crates (via timely)

MIT license

125KB
2K SLoC

A simple communication infrastructure providing typed exchange channels.

This crate is part of the timely dataflow system, used primarily for its inter-worker communication. It may be independently useful, but it is separated out mostly to make clear boundaries in the project.

Threads are spawned with an allocator::Generic, whose allocate method returns a pair of several send endpoints and one receive endpoint. Messages sent into a send endpoint will eventually be received by the corresponding worker, if it receives often enough. The point-to-point channels are each FIFO, but with no fairness guarantees.

To be communicated, a type must implement the Serialize trait when using the bincode feature or the Abomonation trait when not.

Channel endpoints also implement a lower-level push and pull interface (through the Push and Pull traits), which is used for more precise control of resources.

Examples

use timely_communication::Allocate;

// configure for two threads, just one process.
let config = timely_communication::Config::Process(2);

// initializes communication, spawns workers
let guards = timely_communication::initialize(config, |mut allocator| {
    println!("worker {} started", allocator.index());

    // allocates a pair of senders list and one receiver.
    let (mut senders, mut receiver) = allocator.allocate(0);

    // send typed data along each channel
    use timely_communication::Message;
    senders[0].send(Message::from_typed(format!("hello, {}", 0)));
    senders[1].send(Message::from_typed(format!("hello, {}", 1)));

    // no support for termination notification,
    // we have to count down ourselves.
    let mut expecting = 2;
    while expecting > 0 {

        allocator.receive();
        if let Some(message) = receiver.recv() {
            use std::ops::Deref;
            println!("worker {}: received: <{}>", allocator.index(), message.deref());
            expecting -= 1;
        }
        allocator.release();
    }

    // optionally, return something
    allocator.index()
});

// computation runs until guards are joined or dropped.
if let Ok(guards) = guards {
    for guard in guards.join() {
        println!("result: {:?}", guard);
    }
}
else { println!("error in computation"); }

The should produce output like:

worker 0 started
worker 1 started
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
worker 0: received: <hello, 0>
worker 1: received: <hello, 1>
result: Ok(0)
result: Ok(1)

Dependencies

~1.3–2MB
~43K SLoC