2 releases

0.0.1-beta Oct 18, 2024
0.0.0 Nov 27, 2017

#1220 in Concurrency

MIT license

175KB
4K SLoC

firefly

A collection of high performance concurrent channels.

// create a SPSC channel with a capacity of 2
let (mut tx, mut rx) = firefly::spsc::bounded(2);

task::spawn(async move {
    // send a message across asynchronously
    tx.send(42).await.unwrap();
});

// receive the message synchronously
assert_eq!(rx.recv_blocking().unwrap(), 42);

Channel Flavors

Firefly provides a variety of channel flavors, optimized for specific use cases:

In general, a channel flavor higher up on the list is likely to be more performant than a more generic one lower down.


Bounded channels are created with a bounded capacity; the maximum number of messages that can be held at a given time:

// create a channel that can hold at most 8 messages at a time
let (mut tx, mut rx) = firefly::spsc::bounded(8);

task::spawn(async move {
    for i in 0..100 {
        // send a message, potentially waiting until capacity frees up
        tx.send(i).await.unwrap();
    }
});

// block until messages are sent
while let Ok(i) = rx.recv_blocking() {
    println!("{i}");
}

Unbounded channels on the other hand are unlimited in their capacity, meaning that sending never blocks:

// create an unbounded channel
let (mut tx, mut rx) = firefly::spsc::unbounded();

task::spawn(async move {
    // send an arbitrary amount of messages
    for i in 0..10_000 {
        tx.send(i).unwrap();
    }
});

// block until all messages are sent
while let Ok(i) = rx.recv_blocking() {
    println!("{i}");
}

Blocking

Send and receive operations can be performed four different ways:

  • Non-blocking (returns immediately with success or failure).
  • Asynchronously (blocks the async task).
  • Blocking (blocks the thread until the operation succeeds or the channel disconnects).
  • Blocking with a timeout (blocks upto a maximum duration of time).
let (mut tx, mut rx) = firefly::spsc::bounded(4);

thread::spawn(move || {
    for _ in 0..3 {
        // this can never fail because we never exceed the capacity
        tx.try_send(42).unwrap();
    }
});

// attempt to receive the message without blocking
match rx.try_recv() {
    Ok(x) => assert_eq!(x, 42),
    Err(_) => println!("message has not been sent yet")
}

// block until the message is sent
assert_eq!(rx.recv_blocking(), Ok(42));

// block for at most 1 second
match rx.recv_blocking_timeout(Duration::from_secs(1)) {
    Ok(x) => assert_eq!(x, 42),
    Err(_) => println!("message took too long to send")
}

// spawn a task that receives the message asynchronously
task::spawn(async move {
    assert_eq!(rx.recv().await, Ok(42));
});

All channels can be used to "bridge" between async and sync code:

let (mut tx, mut rx) = firefly::spsc::bounded(8);

// send messages synchronously
thread::spawn(move || {
    for i in 0..16 {
        tx.send_blocking(i).unwrap()
    }
});

// receive asynchronously
task::spawn(async move {
    while let Ok(i) = rx.recv().await {
        println!("{i}");
    }
});

Disconnection

When all senders or receivers of a given channel are dropped, the channel is disconnected. Any attempts to send a message will fail. Any remaining messages in the channel can be received, but subsequent attempts to receive will also fail:

let (mut tx, mut rx) = firefly::spsc::unbounded();

tx.send(1).unwrap();
tx.send(2).unwrap();

// disconnect the sender
drop(tx);

// any remaining messages can be received
assert_eq!(rx.recv().await, Ok(1));
assert_eq!(rx.recv().await, Ok(2));

// subsequent attempts will error
assert_eq!(rx.recv().await, Err(firefly::RecvError));

Dependencies

~420KB