#async-stream #io #context #adapter #read-write #io-read #error

async-stdio

Adapter for using async read/write streams in std::io contexts

6 releases

0.3.0-alpha.4 Oct 11, 2019
0.3.0-alpha.3 Jun 20, 2019
0.3.0-alpha.2 Jun 14, 2019
0.0.0 Jun 14, 2019

#1793 in Asynchronous


Used in openssl-async

MIT/Apache

12KB
111 lines

Adapter for using async read/write streams in std::io contexts

version documentation license

License

Licensed under either of

at your option.

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.


lib.rs:

Adapter for using async read/write streams in std::io contexts

Sometimes, you'll come across an interface that only takes std::io Read + Write types, but also needs to be adapted for an async/await application. The [AsStdIo] adapter allows an [AsyncRead] + [AsyncWrite] stream to be used as its counterpart from std::io. Assuming that whatever is consuming the wrapped stream will bubble up io::ErrorKind::WouldBlock errors and allows operations to be resumed, this provides a way to both use an async stream with the std::io-only interface, and to write an async wrapper around it.

Example

#
#
use async_stdio::*;

struct ChunkReader<R> {
    // ...
    # reader: R,
    # chunk_size: usize,
    # buffer: VecDeque<u8>,
}

impl<R: Read> ChunkReader<R> {
    fn new(reader: R, chunk_size: usize) -> Self {
        // ...
        # ChunkReader {
        #     reader,
        #     chunk_size,
        #     buffer: Default::default(),
        # }
    }

    /// Reads a chunk from the stream
    ///
    /// If the stream ends before a full chunk is read, may return a smaller
    /// chunk. Returns an empty chunk if there is no more to be read.
    fn read_chunk(&mut self) -> io::Result<Vec<u8>> {
        // ...
        # let mut tmp = vec![0u8; self.chunk_size];
        # let mut bytes = self.chunk_size;
        # loop {
        #     if self.buffer.len() >= self.chunk_size || bytes == 0 {
        #         let end = self.buffer.len().min(self.chunk_size);
        #         tmp.truncate(0);
        #         return Ok(self.buffer.drain(..end).fold(tmp, |mut out, b| {
        #             out.push(b);
        #             out
        #         }));
        #     }
        #     bytes = self.reader.read(&mut tmp)?;
        #     self.buffer.extend(&tmp[..bytes]);
        # }
    }
}

/// Wrapper around the std-only `ChunkReader` to turn it
/// into an async `Stream`
struct AsyncChunked<S> {
    inner: ChunkReader<AsStdIo<S>>,
    waker_ctrl: WakerCtrl,
}

impl<S: AsyncRead + Unpin> AsyncChunked<S> {
    fn new(stream: S, chunk_size: usize) -> AsyncChunked<S> {
        let (stream, waker_ctrl) = AsStdIo::new(stream, None);
        let inner = ChunkReader::new(stream, chunk_size);
        AsyncChunked { inner, waker_ctrl }
    }
}

impl<S: AsyncRead + Unpin> Stream for AsyncChunked<S> {
    type Item = io::Result<Vec<u8>>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        // Make sure the waker is set before the calls to `std::io::Read::read`
        this.waker_ctrl.register(cx.waker());
        // `into_poll` (from `ResultExt`) converts `WouldBlock` into `Pending`
        let chunk_res = ready!(this.inner.read_chunk().into_poll());

        Poll::Ready(
            chunk_res
                .map(|chunk| if chunk.is_empty() { None } else { Some(chunk) })
                .transpose(),
        )
    }
}

// Pretend this doesn't already implement `io::Read`
let stream = io::Cursor::new(vec![0, 1, 2, 3, 4, 5]);
let mut async_chunked = AsyncChunked::new(stream, 2);

let chunks: Vec<Vec<u8>> = block_on(async_chunked.map(|chunk| chunk.unwrap()).collect());

let expected: Vec<Vec<u8>> = vec![vec![0, 1], vec![2, 3], vec![4, 5]];

assert_eq!(chunks, expected,);

Dependencies

~1MB
~18K SLoC