6 releases (stable)
1.0.4 | Dec 2, 2023 |
---|---|
1.0.3 | Dec 1, 2023 |
1.0.0 | Nov 29, 2023 |
0.1.0 | Nov 19, 2023 |
#413 in Encoding
50 downloads per month
40KB
634 lines
TCP message I/O
A simple TCP client/server implementation for tokio
.
The library contains two abstraction levels:
- High-level interface: allows
exchanging Rust types using
serde
. - Low-level interface: allows exchanging
Vec<u8>
messages.
This page describes the high-level abstraction, for the low-level one
check the [raw
] submodule.
Goals & non-goals
- Hide complexity of listening and accepting TCP connections, and turning a TCP stream into a request/response stream.
- Serialize and deserialize messages into Rust types.
- Customizable serialization & compression.
- Transparent compression using
zstd
. - Flexibility: use convenience features or the raw interface.
This library intentionally leaves most error handling to the user.
Cargo features
By default, no feature is enabled. Available features:
postcard
- Enable automatic serialization usingpostcard
(a fast and efficient serialization format).zstd
- Enable transparent compression of messages usingzstd
.
We recommend enabling both for maximum simplicity.
Note that both client and server must use the same features, otherwise they won't be able to understand each other messages.
Client
Client usage is straightforward:
use tcp_message_io::TCPClient;
use tokio;
use serde::{Deserialize, Serialize};
// This type represents the requests to the server.
#[derive(Serialize, Deserialize)]
enum Request {
Hello,
}
// This type represents the responses from the server.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
enum Response {
World,
}
#[tokio::main]
async fn main() {
// We need to specify the response type so that tcp_message_io
// knows what object to use for response deserialization.
let client = TCPClient::<_, Response>::connect("127.0.0.1", 12345).await.unwrap();
let response = client.send(Request::Hello).await.unwrap();
assert_eq!(response, Some(Response::World));
}
Server
Creating a server is very straightforward as well:
use anyhow::Result;
use tcp_message_io::{TCPServer, TCPResponse};
use tokio;
use serde::{Deserialize, Serialize};
// This type represents the requests to the server.
#[derive(Serialize, Deserialize)]
enum Request {
Hello,
}
// This type represents the responses from the server.
#[derive(Serialize, Deserialize)]
enum Response {
World,
}
// We need a request handler: in this case we implement a simple
// "Hello, World" handler.
async fn hello_world_handler(request: Request) -> Result<TCPResponse<Response>> {
match request {
Request::Hello => Ok(TCPResponse::Message(Response::World))
// Handle additional request types here!
}
}
#[tokio::main]
async fn main() {
TCPServer::new("127.0.0.1", 12345, hello_world_handler).listen().await;
}
TCPResponse
can be one of the following:
TCPResponse::Message(response)
to send a response message.TCPResponse::CloseConnection
to close the connection with the client. This will also send an empty response to the client.TCPResponse::StopServer
to shut the server down. This will also send an empty response to the client and close the connection.
The library relies on anyhow
for
error handling, enabling the handler to return errors of any type.
If the handler returns an error, the client recives an empty message,
a tracing
error message will be logged
and the server keeps listening for new messages from the same client.
This mechanism is meant for unhandled errors, and avoids leaving
the client hanging for a response.
It's left as a responsibility of the user to build an error reporting
mechanism on top of the transform if required. For example, this can
be achieved by ensuring the handler always returns Ok(...)
, and
errors are send back as an enum variant.
Stopping the TCP server after a timeout
Additionally, this crate supports stopping the server after a certain amount of inactivity (inactivity timeout):
TCPServer::new("127.0.0.1", 12345, echo_handler)
.with_inactivity_timeout(60) // Seconds
.listen()
.await;
This feature is useful when building something like a worker node: a node might be orphaned for many reasons (network issues, master crashing, etc). With this feature you can implement a clean-up mechanism causing the worker to shut down automatically.
Choosing what to do in case of bad requests
This crate assumes the client and server share a request / response type and use the same serialization format and compression setting (both enabled or disabled). Versioning of payloads is left as a responsibility of the user.
This can cause the situation in which a client uses a different
type or compression setting and the server is unable to deserialize
the request (bad request). By default, this crate returns an
empty message to the client and logs a
tracing
error.
You can customize the behavior using the TCPServer::with_bad_request_handler
method to set a handler for that case:
#[derive(Serialize, Deserialize)]
enum Response {
World,
BadRequest,
}
TCPServer::new("127.0.0.1", 12345, echo_handler)
// This will be called when a bad request happens.
// In this example we return a BadRequest message to the client.
.with_bad_request_handler(|| TCPResponse::Message(Response::BadRequest))
.listen()
.await;
Customizing serialization
tcp_message_io
tries to make it as easy as possible to get started
by using sane defaults: enable the postcard
feature and any Serialize
and Deserialize
type will work as request or response, enable zstd
for transparent compression of the payload.
If you want to customize the serialization method, you can disable
the postcard
feature and implement the SerializeMessage
trait
for your message types:
#[derive(Serialize, Deserialize)]
enum Response {
World,
}
impl SerializeMessage for Response {
fn serialize(&self) -> Result<Vec<u8>> {
// Implement serialization to bytes
}
fn deserialize(message: &[u8]) -> Result<Self> {
// Implement deserialization from bytes
}
}
If you want to use another compression method, disable zstd
and
implement your compression method in the serialize
and deserialize
methods above.
Wire format
The wire format used by the library is the message plus an internal 8-byte header encoding the length of each message.
Dependencies
~3–12MB
~126K SLoC