56 releases
0.18.3 | Oct 23, 2024 |
---|---|
0.18.2 | Aug 24, 2023 |
0.18.1 | Apr 21, 2023 |
0.18.0 | Feb 4, 2023 |
0.3.4 | Dec 19, 2021 |
#6 in #pallas
406 downloads per month
Used in 9 crates
(8 directly)
230KB
2.5K
SLoC
Pallas Multiplexer
This is an implementation of the Ouroboros multiplexer logic as defined in the The Shelley Networking Protocol specs.
Architectural Decisions
The following architectural decisions were made for this particular Rust implementation:
- each mini-protocol state machine should be able to work in its own thread
- a bounded queue should serve as buffer to decouple mini-protocol logic from multiplexer work
- the implementation should pipelining-friendly, even if we don't have a current use-case
- the multiplexer should be agnostic of the mini-protocols implementation details.
Implementation Details
Given the above definitions, Rust's mpsc channels seem like the correct artifact to orchestrate the communication between the different threads in the multiplexer process.
The following diagram provides an overview of the components involved:
Usage
The following code provides a very rough example of how to setup a client that connects to a node and spawns two concurrent threads running independently, both communication over the same bearer using Pallas multiplexer.
// Setup a new bearer. In this case, we use a unix socket to connect
// to a node running on the local machine.
let bearer = UnixStream::connect("/tmp/pallas").unwrap();
// Setup a new multiplexer using the created bearer and a specification
// of the mini-protocol IDs that we'll be using for our session. In this case, we
// pass id #0 (handshake) and #2 (chainsync).
let muxer = Multiplexer::setup(tcp, &[0, 2])
// Ask the multiplexer to provide us with the channel for the miniprotocol #0.
let mut channel_0 = muxer.use_channel(0);
// Spawn a thread and pass the ownership of the channel.
thread::spawn(move || {
// Deconstruct the channel to get a handle for sending data into the muxer
// ingress and a handle to receive data from the demuxer egress.
let Channel(mux_tx, demux_rx) = channel_0;
// Do something with the channel. In this case, we just keep sending
// dumb data every 50 millis.
loop {
let payload = vec![1; 65545];
tx.send(payload).unwrap();
thread::sleep(Duration::from_millis(50));
}
});
// Ask the multiplexer to provide us with the channel for the miniprotocol #2.
let mut channel_2 = muxer.use_channel(2);
// Spawn a different thread and pass the ownership of the 2nd channel.
thread::spawn(move || {
// Deconstruct the channel to get a handle for sending data into the muxer
// ingress and a handle to receive data from the demuxer egress.
let Channel(mux_tx, demux_rx) = channel_2;
// Do something with the channel. In this case, we just print in stdout
// whatever get received for this mini-protocol.
loop {
let payload = rx.recv().unwrap();
println!("id:{protocol}, length:{}", payload.len());
}
});
Run Examples
For a working example of a two peers communicating (a sender and a listener), check the examples folder. To run the examples, open two different terminals and run a different peer in each one:
# on terminal 1, start the listener
RUST_LOG=info cargo run --example listener
# on terminal 2, start the sender
RUST_LOG=info cargo run --example sender
Real World Usage
For a more complex, real-world example, check the Oura repo, it provides a full-blown client tool designed to live-stream block data from a local or remote node.
Dependencies
~1.5–2.3MB
~47K SLoC