bevy_mqtt

A simple MQTT client for Bevy ECS

9 unstable releases (3 breaking)

0.4.2 Sep 26, 2024
0.4.1 Sep 26, 2024
0.3.3 Sep 24, 2024
0.2.0 Jul 5, 2024
0.1.1 Mar 29, 2024

#512 in Network programming

Download history 1/week @ 2024-06-13 3/week @ 2024-06-27 156/week @ 2024-07-04 3/week @ 2024-07-11 6/week @ 2024-07-25 2/week @ 2024-08-01 330/week @ 2024-09-12 148/week @ 2024-09-19 424/week @ 2024-09-26

902 downloads per month

MIT/Apache

25KB
270 lines

bevy_mqtt

crates.io MIT/Apache 2.0 crates.io CI Documentation

A mqtt client Plugin for Bevy game engine.

Example

first run as mqtt broker like mosquitto

then run the example

use bevy::{prelude::*, time::common_conditions::on_timer};
use bevy_log::LogPlugin;
use bevy_mqtt::{
    rumqttc::QoS, MqttClient, MqttClientConnected, MqttClientError, MqttConnectError, MqttEvent,
    MqttPlugin, MqttSetting, SubscribeTopic, TopicMessage,
};
use rumqttc::{MqttOptions, Transport};
use std::time::Duration;

fn main() {
    App::new()
        .add_plugins((MinimalPlugins, MqttPlugin, LogPlugin::default()))
        .add_systems(Startup, setup_clients)
        .add_systems(Update, (sub_topic, handle_message, handle_error))
        .add_systems(
            Update,
            publish_message.run_if(on_timer(std::time::Duration::from_secs(1))),
        )
        .run();
}

fn setup_clients(mut commands: Commands) {
    commands.spawn(MqttSetting {
        mqtt_options: MqttOptions::new("bevy-mqtt-client", "127.0.0.1", 1883),
        cap: 10,
    });

    // spawn websocket client
    let mut mqtt_options = MqttOptions::new("mqtt-ws-client", "ws://127.0.0.1:8080", 8080);
    mqtt_options.set_transport(Transport::Ws);
    // mqtt_options.set_credentials("username", "password");
    mqtt_options.set_keep_alive(Duration::from_secs(5));

    commands.spawn((
        MqttSetting {
            mqtt_options,
            cap: 10,
        },
        WebsocketMqttClient,
    ));
}

// just a marker component for filter
#[derive(Component)]
struct WebsocketMqttClient;

/// this is a system that subscribes to a topic and handle the incoming messages
fn sub_topic(
    mqtt_client: Query<(Entity, &MqttClient, &MqttSetting), Added<MqttClientConnected>>,
    mut commands: Commands,
) {
    for (entity, client, setting) in mqtt_client.iter() {
        client
            .subscribe("hello".to_string(), QoS::AtMostOnce)
            .unwrap();

        let setting = setting.clone();
        let child_id = commands
            .spawn(SubscribeTopic::new("+/mqtt", QoS::AtMostOnce))
            .observe(move |topic_message: Trigger<TopicMessage>| {
                println!(
                    "{:?}: Topic: '+/mqtt' received : {:?}",
                    setting.mqtt_options.broker_address().clone(),
                    topic_message.event().payload
                );
            })
            .id();
        commands.entity(entity).add_child(child_id);
    }
}

/// this is global handler for all incoming messages
fn handle_message(mut mqtt_event: EventReader<MqttEvent>) {
    for event in mqtt_event.read() {
        match &event.event {
            rumqttc::Event::Incoming(income) => match income {
                rumqttc::Incoming::Publish(publish) => {
                    println!(
                        "Topic Component: {} Received: {:?}",
                        publish.topic, publish.payload
                    );
                }
                _ => {
                    println!("Incoming: {:?}", income);
                }
            },
            rumqttc::Event::Outgoing(_) => {}
        }
    }
}

fn handle_error(
    mut connect_errors: EventReader<MqttConnectError>,
    mut client_errors: EventReader<MqttClientError>,
) {
    for error in connect_errors.read() {
        println!("connect Error: {:?}", error);
    }

    for error in client_errors.read() {
        println!("client Error: {:?}", error);
    }
}

fn publish_message(mqtt_client: Query<&MqttClient, With<MqttClientConnected>>) {
    for client in mqtt_client.iter() {
        client
            .publish(
                "hello".to_string(),
                QoS::AtMostOnce,
                false,
                "mqtt".as_bytes(),
            )
            .unwrap();
        for i in 0..3 {
            client
                .publish(format!("{}/mqtt", i), QoS::AtMostOnce, false, b"hello")
                .unwrap();
        }
    }
}


Supported Versions

bevy bevy_mqtt
0.14 0.2, 0.3, 0.4
0.13 0.1

License

Dual-licensed under either

  • MIT
  • Apache 2.0

Dependencies

~17–31MB
~464K SLoC