#bridge #mqtt #iot #cache

bin+lib convoy

A reliable MQTT bridge with SQLite message caching - for edge devices with patchy connectivity

1 unstable release

Uses new Rust 2024

0.1.0 Oct 12, 2025

#1520 in Network programming

Download history 155/week @ 2025-10-10 24/week @ 2025-10-17 3/week @ 2025-10-24

182 downloads per month

Apache-2.0

64KB
1K SLoC

Convoy

A reliable MQTT bridge with SQLite message caching - for edge devices with patchy connectivity.

This is an opinionated implementation. Currently it has the following limitations:

  • TLS support via native-tls and SQLite support via system library - to reduce binary size for edge environments
  • Topic mapping is not fully dynamic; currently always based on prefix on remote broker
  • No MQTT v5 support (only v3.1.1)

Features

  • Bidirectional MQTT bridging between local and remote brokers
  • SQLite-backed message cache for local→remote messages when remote is unavailable
  • Automatic cache replay when connection is restored (FIFO order)
  • Topic mapping with wildcard support (+, #)
  • TLS support for secure remote connections
  • MQTT v3.1.1 support (default)
  • Bridge state publishing with Last Will and Testament (LWT)
  • Configurable cache policies (size limits, eviction strategies)

Use Cases

  • Edge-to-Cloud: Bridge edge devices to cloud MQTT brokers with resilience to network outages
  • IoT Gateways: Forward sensor data from local networks to remote servers
  • Data Aggregation: Collect telemetry from local devices and batch forward to cloud

Quick Start

As a Standalone Application

1. Build

cargo build --release

2. Configure

Copy the example configuration and edit it:

cp config.example.toml config.toml
# Edit config.toml with your broker settings

3. Run

./target/release/convoy --config config.toml

Or with debug logging:

./target/release/convoy --config config.toml --log-level debug

As a Library

Add to your Cargo.toml (no default features suppresses default CLI dependencies):

[dependencies]
convoy = { version = "0.1", default-features = false }

Example usage (see examples/programmatic.rs for a complete example):

use convoy::{Bridge, BridgeConfig, BrokerConfig, CacheConfig, CacheManager, ForwardRule, TlsConfig};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Configure bridge programmatically
    let bridge_config = BridgeConfig {
        local: BrokerConfig {
            addr: "127.0.0.1:1883".to_string(),
            client_id: "convoy-local".to_string(),
            keep_alive_secs: 30,
            clean_session: false,
            max_inflight: 100,
            username: None,
            password: None,
            tls: None,
        },
        remote: BrokerConfig {
            addr: "mqtt.example.com:8883".to_string(),
            client_id: "convoy-remote".to_string(),
            keep_alive_secs: 30,
            clean_session: false,
            max_inflight: 100,
            username: Some("user".to_string()),
            password: Some("pass".to_string()),
            tls: Some(TlsConfig {
                ca_file: Some("/etc/ssl/certs/ca-certificates.crt".into()),
                client_cert: None,
                client_password: None,
                danger_accept_invalid_certs: false,
            }),
        },
        state_topic: "bridge/state".to_string(),
        state_online_payload: "1".to_string(),
        state_offline_payload: "0".to_string(),
        forward: vec![ForwardRule {
            local_filter: "sensors/#".to_string(),
            remote_prefix: "devices/edge1/".to_string(),
            qos: 1,
        }],
        subscribe: vec![],
    };

    // Configure cache with defaults
    let cache_config = CacheConfig {
        sqlite_path: "/tmp/convoy-cache.db".into(),
        ..Default::default()
    };

    // Create cache and bridge
    let cache = Arc::new(CacheManager::new(cache_config)?);
    let bridge = Bridge::new(bridge_config, cache).await?;

    // Run bridge
    bridge.run().await?;
    Ok(())
}

Run the example:

cargo run --example programmatic

Configuration

See config.example.toml for a fully documented configuration template. Key sections:

Bridge Settings

[bridge]
# Local broker (no auth/TLS)
local_addr = "127.0.0.1:1883"
local_client_id = "convoy-local"

# Remote broker (TLS + auth)
remote_addr = "mqtt.example.com:8883"
remote_client_id = "convoy-remote"
remote_username = "device01"
remote_password = "secret"

# Bridge state topic (published to remote)
state_topic = "bridge/convoy/state"
state_online_payload = "1"
state_offline_payload = "0"

Topic Forwarding (Local → Remote)

Messages are cached if remote is unavailable:

[[bridge.forward]]
local_filter = "sensors/#"
remote_prefix = "devices/edge1/"
qos = 1

Example: Local sensors/temp → Remote devices/edge1/sensors/temp

Topic Subscription (Remote → Local)

Messages are NOT cached (real-time only):

[[bridge.subscribe]]
remote_filter = "commands/edge1/#"
remote_prefix = "commands/edge1/"
qos = 1

Example: Remote commands/edge1/restart → Local restart

Cache Settings

[cache]
sqlite_path = "/var/lib/convoy/cache.sqlite"
max_rows = 500000              # Maximum cached messages
eviction = "drop_oldest"       # or "reject_new"
flush_batch = 1000             # Messages per replay batch
flush_interval_ms = 100        # Replay interval

How It Works

Message Flow

  1. Local → Remote (with caching):

    • Bridge subscribes to configured topics on local broker
    • When message arrives, applies topic mapping
    • If remote connected: publishes immediately
    • If remote down or publish fails: caches to SQLite
    • On reconnect: replays cache in FIFO order
  2. Remote → Local (no caching):

    • Bridge subscribes to configured topics on remote broker
    • When message arrives, applies topic mapping and publishes to local
    • If local is down, message is lost (no caching)

Topic Mapping

  • Wildcards:

    • + matches single level: sensors/+/temp matches sensors/room1/temp
    • # matches multiple levels: data/# matches data/sensor/temp/value
  • Prefix mapping:

    • Forward: prepends remote_prefix to local topic
    • Subscribe: strips remote_prefix from remote topic

Bridge State

  • On connect: publishes online state to state_topic (retained)
  • On disconnect: LWT publishes offline state to state_topic
  • Allows remote systems to monitor bridge health

Architecture

┌──────────────────┐
│  Local Broker    │  (localhost:1883, no auth)(Mosquitto)    │
└────────┬─────────┘
         │
    ┌────▼────┐
    │ Bridge  │  (rumqttc clients)
    │ Client  │
    └────┬────┘
         │
    ┌────▼────┐
    │ SQLite  │  (cache A→B only)
    │  Cache  │
    └────┬────┘
         │
┌────────▼─────────┐
│  Remote Broker   │  (TLS, port 8883)(e.g. AWS IoT)  │
└──────────────────┘

Cargo Features

  • cli (default): Enables the command-line interface with TOML config file support
    • Required dependencies: clap, toml, tracing-subscriber
    • Use this feature when building the standalone binary
    • Library users don't need this feature (set default-features = false)

Implementation Notes

  • MQTT Protocol: v3.1.1 (via rumqttc)
  • Cache: SQLite with WAL mode for concurrency
  • TLS: native-tls with support for:
    • Custom CA certificates (PEM format)
    • Client certificates for mTLS (PKCS12 format)
    • System certificate store as fallback
  • Async Runtime: Tokio

Testing

Run unit tests:

cargo test

For integration testing with actual MQTT brokers, see SPECS.md section 6.

CLI Options

convoy [OPTIONS]

Options:
  -c, --config <CONFIG>        Path to config file [default: config.toml]
  -l, --log-level <LOG_LEVEL>  Log level (trace|debug|info|warn|error) [default: info]
  -h, --help                   Print help

License

See SPECS.md for design documentation and acceptance criteria.

Dependencies

~28–44MB
~691K SLoC