1 unstable release
new 0.1.0 | Feb 20, 2025 |
---|
#897 in Network programming
280KB
6.5K
SLoC
mountain-mqtt
A no_std
compatible MQTT v5 client.
Note that this is in very early development. It is functional but not yet stable or feature complete. The API will almost certainly change over time.
Features
- Compatible with
embedded-hal
. Provides adapters to useembedded-hal-async
andembedded-io-async
traits (Read
,Write
andReadReady
) for network connection, e.g. usingembassy-net
for TCP. - Compatible with
tokio
. Provides adapters to usetokio::net::TcpStream
TcpStream
. - Layered design to allow reuse in different environments.
- Fairly thorough tests for
data
,codec
andpacket
modules against the MQTT v5 specification. - Provides a basic client trait/implementation for connecting, disconnecting, subscribing and unsubscribing, publishing messages and receiving pubished messages from the server. Supports Quality of Service levels 0 and 1.
- Can run without allocation, using only
core
onno_std
.
Adding to your project
There is not yet a published crate, so check out the project sources (alongside the project where you want to use mountain-mqtt) and then reference via path from your Cargo.toml
.
-
For embedded-hal applications:
[dependencies] mountain-mqtt = { path = "../mountain-mqtt", default-features = false, features = [ "embedded-io-async", "embedded-hal-async", ] }
-
For tokio applications:
[dependencies] mountain-mqtt = { path = "../mountain-mqtt", default-features = false, features = [ "tokio", ] }
Todo
- Support for Quality of Service level 2 in
Client
. The relevant MQTT v5 packets are implemented, but not the state management for handling them in the client. - More sophisticated client implementation(s) - the current
Client
implementationClientNoQueue
only supports a single pending acknowledgement at a time, and waits for this before returning when sending packets, by polling for data ready. The concurrency model is not ideal, but allows support for embedded and tokio networking with the same relatively simple code and no allocation. A better model should be achievable, maybe using different approaches for tokio (where we can use std) and embedded/no_std. - Improve and add integration tests for
packet_client
andclient
modules. - Publish as a crate.
Non-goals
The following goals are not planned, but may be considered later:
- MQTT v3 support.
- Server support. Note that the
data
andcodec
modules support the packets needed for this if anyone wants to implement one :)
Layers
data
module - provides basic data types used in MQTT packets.codec
module - provides simple reader and writer traits, and implementations using abuf: &'a [u8]
and position.Read
andWrite
traits for data items.packets
module - provides traits for describing MQTT v5 packets, and a struct for each packet type, withRead
andWrite
implementations.packet_client
module - provides a basic low-level client for reading and writing packets directly, using aConnection
trait with implementations for tokioTcpStream
and embedded-hal-asyncRead + Write + ReadyReady
.client
module - provides a higher-level basic client that manages connection state, waiting for acknowledgement etc.
Example code
See the examples
directory for a simple example of using the basic client - try it out with cargo run --example client_example
:
use mountain_mqtt::{
client::{Client, ClientError},
data::quality_of_service::QualityOfService,
packets::connect::Connect,
tokio::client_tcp,
};
use tokio::sync::mpsc;
/// Connect to an MQTT server on 127.0.0.1:1883,
/// server must accept connections with no username or password.
/// Subscribe to a topic, send a message, check we receive it
/// back, then unsubscribe and disconnect.
#[tokio::main]
async fn main() -> Result<(), ClientError> {
let ip = core::net::Ipv4Addr::new(127, 0, 0, 1);
let port = 1883;
let timeout_millis = 5000;
let mut buf = [0; 1024];
// We'll use a channel to handle incoming messages, this would allow us to receive
// them in another task, here we'll just read them back at the end of the example
let (message_tx, mut message_rx) = mpsc::channel(32);
// Create a client.
// The message_handler closure is called whenever a published message is received.
// This sends copies of the message contents to our channel for later processing.
let mut client = client_tcp(ip, port, timeout_millis, &mut buf, |message| {
message_tx
.try_send((message.topic_name.to_owned(), message.payload.to_vec()))
.map_err(|_| ClientError::MessageHandlerError)
})
.await;
// Send a Connect packet to connect to the server.
// `unauthenticated` uses default settings and no username/password, see `Connect::new` for
// available options (keep alive, will, authentication, additional properties etc.)
client
.connect(Connect::unauthenticated("mountain-mqtt-example-client-id"))
.await?;
let topic_name = "mountain-mqtt-example-topic";
let retain = false;
client.subscribe(topic_name, QualityOfService::QoS0).await?;
client
.publish(
topic_name,
"Hello MQTT!".as_bytes(),
QualityOfService::QoS0,
retain,
)
.await?;
// We are expecting one packet from the server, so just poll once with wait = true.
// The normal way to use this would be to poll in a loop with wait = false, calling
// any other required method between polling (e.g. to publish messages, send pings etc.)
client.poll(true).await?;
// Check we got the message back
let (topic, payload) = message_rx.try_recv().unwrap();
println!(
"Received from '{}': '{}'",
topic,
String::from_utf8_lossy(&payload)
);
client.unsubscribe(topic_name).await?;
client.disconnect().await?;
Ok(())
}
Dependencies
~2.9–9MB
~68K SLoC