0.1.0 |
|
---|
#69 in #byte-string
93KB
1K
SLoC
futures-codec2
Utilities for encoding and decoding frames forked from tokio-util.
License
This project is licensed under the MIT license.
Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in Tokio by you, shall be licensed as MIT, without any additional terms or conditions.
lib.rs
:
Adaptors from AsyncRead/AsyncWrite to Stream/Sink
Raw I/O objects work with byte sequences, but higher-level code usually wants to batch these into meaningful chunks, called "frames".
This module contains adapters to go from streams of bytes, AsyncRead
and
AsyncWrite
, to framed streams implementing Sink
and Stream
.
Framed streams are also known as transports.
The Decoder trait
A Decoder
is used together with FramedRead
or Framed
to turn an
AsyncRead
into a Stream
. The job of the decoder trait is to specify
how sequences of bytes are turned into a sequence of frames, and to
determine where the boundaries between frames are. The job of the
FramedRead
is to repeatedly switch between reading more data from the IO
resource, and asking the decoder whether we have received enough data to
decode another frame of data.
The main method on the Decoder
trait is the decode
method. This method
takes as argument the data that has been read so far, and when it is called,
it will be in one of the following situations:
- The buffer contains less than a full frame.
- The buffer contains exactly a full frame.
- The buffer contains more than a full frame.
In the first situation, the decoder should return Ok(None)
.
In the second situation, the decoder should clear the provided buffer and
return Ok(Some(the_decoded_frame))
.
In the third situation, the decoder should use a method such as split_to
or advance
to modify the buffer such that the frame is removed from the
buffer, but any data in the buffer after that frame should still remain in
the buffer. The decoder should also return Ok(Some(the_decoded_frame))
in
this case.
Finally the decoder may return an error if the data is invalid in some way. The decoder should not return an error just because it has yet to receive a full frame.
It is guaranteed that, from one call to decode
to another, the provided
buffer will contain the exact same data as before, except that if more data
has arrived through the IO resource, that data will have been appended to
the buffer. This means that reading frames from a FramedRead
is
essentially equivalent to the following loop:
use futures::io::AsyncBufReadExt;
let mut buf = bytes::BytesMut::new();
loop {
// The read_buf call will append to buf rather than overwrite existing data.
let len = {
let rbuf = io_resource.fill_buf().await?;
buf.extend_from_slice(rbuf);
rbuf.len()
};
io_resource.consume_unpin(len);
if len == 0 {
while let Some(frame) = decoder.decode_eof(&mut buf)? {
yield frame;
}
break;
}
while let Some(frame) = decoder.decode(&mut buf)? {
yield frame;
}
}
The example above uses yield
whenever the Stream
produces an item.
Example decoder
As an example, consider a protocol that can be used to send strings where each frame is a four byte integer that contains the length of the frame, followed by that many bytes of string data. The decoder fails with an error if the string data is not valid utf-8 or too long.
Such a decoder can be written like this:
use futures_codec2::Decoder;
use bytes::{BytesMut, Buf};
struct MyStringDecoder {}
const MAX: usize = 8 * 1024 * 1024;
impl Decoder for MyStringDecoder {
type Item = String;
type Error = std::io::Error;
fn decode(
&mut self,
src: &mut BytesMut
) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < 4 {
// Not enough data to read length marker.
return Ok(None);
}
// Read length marker.
let mut length_bytes = [0u8; 4];
length_bytes.copy_from_slice(&src[..4]);
let length = u32::from_le_bytes(length_bytes) as usize;
// Check that the length is not too large to avoid a denial of
// service attack where the server runs out of memory.
if length > MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Frame of length {} is too large.", length)
));
}
if src.len() < 4 + length {
// The full string has not yet arrived.
//
// We reserve more space in the buffer. This is not strictly
// necessary, but is a good idea performance-wise.
src.reserve(4 + length - src.len());
// We inform the Framed that we need more bytes to form the next
// frame.
return Ok(None);
}
// Use advance to modify src such that it no longer contains
// this frame.
let data = src[4..4 + length].to_vec();
src.advance(4 + length);
// Convert the data to a string, or fail if it is not valid utf-8.
match String::from_utf8(data) {
Ok(string) => Ok(Some(string)),
Err(utf8_error) => {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
utf8_error.utf8_error(),
))
},
}
}
}
The Encoder trait
An Encoder
is used together with FramedWrite
or Framed
to turn
an AsyncWrite
into a Sink
. The job of the encoder trait is to
specify how frames are turned into a sequences of bytes. The job of the
FramedWrite
is to take the resulting sequence of bytes and write it to the
IO resource.
The main method on the Encoder
trait is the encode
method. This method
takes an item that is being written, and a buffer to write the item to. The
buffer may already contain data, and in this case, the encoder should append
the new frame the to buffer rather than overwrite the existing data.
It is guaranteed that, from one call to encode
to another, the provided
buffer will contain the exact same data as before, except that some of the
data may have been removed from the front of the buffer. Writing to a
FramedWrite
is essentially equivalent to the following loop:
use futures::future::FutureExt;
use futures::io::AsyncWriteExt;
use bytes::Buf; // for advance
const MAX: usize = 8192;
let mut buf = bytes::BytesMut::new();
loop {
futures::select! {
num_written = io_resource.write(&buf).fuse() => {
if !buf.is_empty() {
buf.advance(num_written?);
}
},
frame = next_frame().fuse() => {
if buf.len() < MAX {
encoder.encode(frame, &mut buf)?;
}
},
_ = no_more_frames().fuse() => {
io_resource.write_all(&buf).await?;
io_resource.close().await?;
return Ok(());
},
}
}
Here the next_frame
method corresponds to any frames you write to the
FramedWrite
. The no_more_frames
method corresponds to closing the
FramedWrite
with SinkExt::close
.
Example encoder
As an example, consider a protocol that can be used to send strings where each frame is a four byte integer that contains the length of the frame, followed by that many bytes of string data. The encoder will fail if the string is too long.
Such an encoder can be written like this:
use futures_codec2::Encoder;
use bytes::BytesMut;
struct MyStringEncoder {}
const MAX: usize = 8 * 1024 * 1024;
impl Encoder<String> for MyStringEncoder {
type Error = std::io::Error;
fn encode(&mut self, item: String, dst: &mut BytesMut) -> Result<(), Self::Error> {
// Don't send a string if it is longer than the other end will
// accept.
if item.len() > MAX {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Frame of length {} is too large.", item.len())
));
}
// Convert the length into a byte array.
// The cast to u32 cannot overflow due to the length check above.
let len_slice = u32::to_le_bytes(item.len() as u32);
// Reserve space in the buffer.
dst.reserve(4 + item.len());
// Write the length and string to the buffer.
dst.extend_from_slice(&len_slice);
dst.extend_from_slice(item.as_bytes());
Ok(())
}
}
Dependencies
~0.7–1.4MB
~26K SLoC