6 releases
0.3.0-alpha.4 | Oct 11, 2019 |
---|---|
0.3.0-alpha.3 | Jun 20, 2019 |
0.3.0-alpha.2 | Jun 14, 2019 |
0.0.0 | Jun 14, 2019 |
#1721 in Asynchronous
31 downloads per month
Used in openssl-async
12KB
111 lines
Adapter for using async read/write streams in std::io contexts
License
Licensed under either of
- Apache License, Version 2.0, (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any additional terms or conditions.
lib.rs
:
Adapter for using async read/write streams in std::io contexts
Sometimes, you'll come across an interface that only takes std::io Read + Write
types, but also needs to be adapted for an async/await application. The
[AsStdIo] adapter allows an [AsyncRead] + [AsyncWrite] stream to be used
as its counterpart from std::io. Assuming that whatever is consuming the
wrapped stream will bubble up io::ErrorKind::WouldBlock errors and allows
operations to be resumed, this provides a way to both use an async stream
with the std::io-only interface, and to write an async wrapper around it.
Example
#
#
use async_stdio::*;
struct ChunkReader<R> {
// ...
# reader: R,
# chunk_size: usize,
# buffer: VecDeque<u8>,
}
impl<R: Read> ChunkReader<R> {
fn new(reader: R, chunk_size: usize) -> Self {
// ...
# ChunkReader {
# reader,
# chunk_size,
# buffer: Default::default(),
# }
}
/// Reads a chunk from the stream
///
/// If the stream ends before a full chunk is read, may return a smaller
/// chunk. Returns an empty chunk if there is no more to be read.
fn read_chunk(&mut self) -> io::Result<Vec<u8>> {
// ...
# let mut tmp = vec![0u8; self.chunk_size];
# let mut bytes = self.chunk_size;
# loop {
# if self.buffer.len() >= self.chunk_size || bytes == 0 {
# let end = self.buffer.len().min(self.chunk_size);
# tmp.truncate(0);
# return Ok(self.buffer.drain(..end).fold(tmp, |mut out, b| {
# out.push(b);
# out
# }));
# }
# bytes = self.reader.read(&mut tmp)?;
# self.buffer.extend(&tmp[..bytes]);
# }
}
}
/// Wrapper around the std-only `ChunkReader` to turn it
/// into an async `Stream`
struct AsyncChunked<S> {
inner: ChunkReader<AsStdIo<S>>,
waker_ctrl: WakerCtrl,
}
impl<S: AsyncRead + Unpin> AsyncChunked<S> {
fn new(stream: S, chunk_size: usize) -> AsyncChunked<S> {
let (stream, waker_ctrl) = AsStdIo::new(stream, None);
let inner = ChunkReader::new(stream, chunk_size);
AsyncChunked { inner, waker_ctrl }
}
}
impl<S: AsyncRead + Unpin> Stream for AsyncChunked<S> {
type Item = io::Result<Vec<u8>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// Make sure the waker is set before the calls to `std::io::Read::read`
this.waker_ctrl.register(cx.waker());
// `into_poll` (from `ResultExt`) converts `WouldBlock` into `Pending`
let chunk_res = ready!(this.inner.read_chunk().into_poll());
Poll::Ready(
chunk_res
.map(|chunk| if chunk.is_empty() { None } else { Some(chunk) })
.transpose(),
)
}
}
// Pretend this doesn't already implement `io::Read`
let stream = io::Cursor::new(vec![0, 1, 2, 3, 4, 5]);
let mut async_chunked = AsyncChunked::new(stream, 2);
let chunks: Vec<Vec<u8>> = block_on(async_chunked.map(|chunk| chunk.unwrap()).collect());
let expected: Vec<Vec<u8>> = vec![vec![0, 1], vec![2, 3], vec![4, 5]];
assert_eq!(chunks, expected,);
Dependencies
~1MB
~18K SLoC