1 unstable release
Uses new Rust 2024
| 0.1.0 | Oct 12, 2025 |
|---|
#1520 in Network programming
182 downloads per month
64KB
1K
SLoC
Convoy
A reliable MQTT bridge with SQLite message caching - for edge devices with patchy connectivity.
This is an opinionated implementation. Currently it has the following limitations:
- TLS support via
native-tlsand SQLite support via system library - to reduce binary size for edge environments - Topic mapping is not fully dynamic; currently always based on prefix on remote broker
- No MQTT v5 support (only v3.1.1)
Features
- Bidirectional MQTT bridging between local and remote brokers
- SQLite-backed message cache for local→remote messages when remote is unavailable
- Automatic cache replay when connection is restored (FIFO order)
- Topic mapping with wildcard support (
+,#) - TLS support for secure remote connections
- MQTT v3.1.1 support (default)
- Bridge state publishing with Last Will and Testament (LWT)
- Configurable cache policies (size limits, eviction strategies)
Use Cases
- Edge-to-Cloud: Bridge edge devices to cloud MQTT brokers with resilience to network outages
- IoT Gateways: Forward sensor data from local networks to remote servers
- Data Aggregation: Collect telemetry from local devices and batch forward to cloud
Quick Start
As a Standalone Application
1. Build
cargo build --release
2. Configure
Copy the example configuration and edit it:
cp config.example.toml config.toml
# Edit config.toml with your broker settings
3. Run
./target/release/convoy --config config.toml
Or with debug logging:
./target/release/convoy --config config.toml --log-level debug
As a Library
Add to your Cargo.toml (no default features suppresses default CLI dependencies):
[dependencies]
convoy = { version = "0.1", default-features = false }
Example usage (see examples/programmatic.rs for a complete example):
use convoy::{Bridge, BridgeConfig, BrokerConfig, CacheConfig, CacheManager, ForwardRule, TlsConfig};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Configure bridge programmatically
let bridge_config = BridgeConfig {
local: BrokerConfig {
addr: "127.0.0.1:1883".to_string(),
client_id: "convoy-local".to_string(),
keep_alive_secs: 30,
clean_session: false,
max_inflight: 100,
username: None,
password: None,
tls: None,
},
remote: BrokerConfig {
addr: "mqtt.example.com:8883".to_string(),
client_id: "convoy-remote".to_string(),
keep_alive_secs: 30,
clean_session: false,
max_inflight: 100,
username: Some("user".to_string()),
password: Some("pass".to_string()),
tls: Some(TlsConfig {
ca_file: Some("/etc/ssl/certs/ca-certificates.crt".into()),
client_cert: None,
client_password: None,
danger_accept_invalid_certs: false,
}),
},
state_topic: "bridge/state".to_string(),
state_online_payload: "1".to_string(),
state_offline_payload: "0".to_string(),
forward: vec![ForwardRule {
local_filter: "sensors/#".to_string(),
remote_prefix: "devices/edge1/".to_string(),
qos: 1,
}],
subscribe: vec![],
};
// Configure cache with defaults
let cache_config = CacheConfig {
sqlite_path: "/tmp/convoy-cache.db".into(),
..Default::default()
};
// Create cache and bridge
let cache = Arc::new(CacheManager::new(cache_config)?);
let bridge = Bridge::new(bridge_config, cache).await?;
// Run bridge
bridge.run().await?;
Ok(())
}
Run the example:
cargo run --example programmatic
Configuration
See config.example.toml for a fully documented configuration template. Key sections:
Bridge Settings
[bridge]
# Local broker (no auth/TLS)
local_addr = "127.0.0.1:1883"
local_client_id = "convoy-local"
# Remote broker (TLS + auth)
remote_addr = "mqtt.example.com:8883"
remote_client_id = "convoy-remote"
remote_username = "device01"
remote_password = "secret"
# Bridge state topic (published to remote)
state_topic = "bridge/convoy/state"
state_online_payload = "1"
state_offline_payload = "0"
Topic Forwarding (Local → Remote)
Messages are cached if remote is unavailable:
[[bridge.forward]]
local_filter = "sensors/#"
remote_prefix = "devices/edge1/"
qos = 1
Example: Local sensors/temp → Remote devices/edge1/sensors/temp
Topic Subscription (Remote → Local)
Messages are NOT cached (real-time only):
[[bridge.subscribe]]
remote_filter = "commands/edge1/#"
remote_prefix = "commands/edge1/"
qos = 1
Example: Remote commands/edge1/restart → Local restart
Cache Settings
[cache]
sqlite_path = "/var/lib/convoy/cache.sqlite"
max_rows = 500000 # Maximum cached messages
eviction = "drop_oldest" # or "reject_new"
flush_batch = 1000 # Messages per replay batch
flush_interval_ms = 100 # Replay interval
How It Works
Message Flow
-
Local → Remote (with caching):
- Bridge subscribes to configured topics on local broker
- When message arrives, applies topic mapping
- If remote connected: publishes immediately
- If remote down or publish fails: caches to SQLite
- On reconnect: replays cache in FIFO order
-
Remote → Local (no caching):
- Bridge subscribes to configured topics on remote broker
- When message arrives, applies topic mapping and publishes to local
- If local is down, message is lost (no caching)
Topic Mapping
-
Wildcards:
+matches single level:sensors/+/tempmatchessensors/room1/temp#matches multiple levels:data/#matchesdata/sensor/temp/value
-
Prefix mapping:
- Forward: prepends
remote_prefixto local topic - Subscribe: strips
remote_prefixfrom remote topic
- Forward: prepends
Bridge State
- On connect: publishes online state to
state_topic(retained) - On disconnect: LWT publishes offline state to
state_topic - Allows remote systems to monitor bridge health
Architecture
┌──────────────────┐
│ Local Broker │ (localhost:1883, no auth)
│ (Mosquitto) │
└────────┬─────────┘
│
┌────▼────┐
│ Bridge │ (rumqttc clients)
│ Client │
└────┬────┘
│
┌────▼────┐
│ SQLite │ (cache A→B only)
│ Cache │
└────┬────┘
│
┌────────▼─────────┐
│ Remote Broker │ (TLS, port 8883)
│ (e.g. AWS IoT) │
└──────────────────┘
Cargo Features
cli(default): Enables the command-line interface with TOML config file support- Required dependencies:
clap,toml,tracing-subscriber - Use this feature when building the standalone binary
- Library users don't need this feature (set
default-features = false)
- Required dependencies:
Implementation Notes
- MQTT Protocol: v3.1.1 (via rumqttc)
- Cache: SQLite with WAL mode for concurrency
- TLS: native-tls with support for:
- Custom CA certificates (PEM format)
- Client certificates for mTLS (PKCS12 format)
- System certificate store as fallback
- Async Runtime: Tokio
Testing
Run unit tests:
cargo test
For integration testing with actual MQTT brokers, see SPECS.md section 6.
CLI Options
convoy [OPTIONS]
Options:
-c, --config <CONFIG> Path to config file [default: config.toml]
-l, --log-level <LOG_LEVEL> Log level (trace|debug|info|warn|error) [default: info]
-h, --help Print help
License
See SPECS.md for design documentation and acceptance criteria.
Dependencies
~28–44MB
~691K SLoC