#producer-consumer #rocket-mq #client #protocols #remote #message #body

rocketmq-client-v4

rocket mq rust client for remote protocol. works on rocket mq V4

18 unstable releases (3 breaking)

0.4.2 Oct 31, 2024
0.4.1 Oct 29, 2024
0.3.11 Oct 23, 2024
0.2.2 Sep 26, 2024
0.1.0 Sep 19, 2024

#737 in Asynchronous

MIT license

135KB
3K SLoC

desc

This is a rust rocket mq client for rocket mq version 4.
if you use rocket mq 5. you should use rocketmq-client-rs

how to use

consumer


use log::{info, LevelFilter};
use rocketmq_client_v4::consumer::message_handler::MessageHandler;
use rocketmq_client_v4::consumer::pull_consumer::MqConsumer;
use rocketmq_client_v4::protocols::body::message_body::MessageBody;
use std::sync::Arc;
use std::time::Duration;
use time::UtcOffset;
use tokio::sync::RwLock;
use rocketmq_client_v4::consumer::pull_consumer_v2::PullConsumer;

struct Handler {}
impl MessageHandler for Handler {
    async fn handle(&self, message: &MessageBody) {
        info!("read message:{:?}", String::from_utf8(message.body.clone()))
    }
}

unsafe impl Send for Handler {
}

unsafe impl Sync for Handler {}
#[tokio::main]
pub async fn main() {
    let offset = UtcOffset::from_hms(8, 0, 0).unwrap();
    simple_logger::SimpleLogger::new()
        .with_utc_offset(offset)
        .with_level(LevelFilter::Debug)
        .env()
        .init()
        .unwrap();

    let name_addr = "192.168.3.49:9876".to_string();
    let topic = "pushNoticeMessage_To".to_string();
    let consume_group = "consume_pushNoticeMessage_test_2".to_string();
    let consumer = PullConsumer::new(name_addr, consume_group, topic);

    let handle = Arc::new(Handler {});
    let lock = Arc::new(RwLock::new(true));
    let run = lock.clone();
    tokio::spawn(async move {
        consumer.start_consume(handle, run).await;
    });
    tokio::time::sleep(Duration::from_secs(40)).await;
    {
        let mut run = lock.write().await;
        *run = false;
    }
    tokio::time::sleep(Duration::from_secs(2)).await;
    info!("quit the test")
}


broadcast consumer

use std::sync::Arc;
use std::time::Duration;
use log::{info, LevelFilter};
use time::UtcOffset;
use tokio::sync::RwLock;
use rocketmq_client_v4::consumer::message_handler::MessageHandler;
use rocketmq_client_v4::consumer::pull_consumer_v2::PullConsumer;
use rocketmq_client_v4::protocols::body::message_body::MessageBody;


struct Handler {}
impl MessageHandler for Handler {
    async fn handle(&self, message: &MessageBody) {
        info!("read message:{:?}", String::from_utf8(message.body.clone()))
    }
}

unsafe impl Send for Handler {}

unsafe impl Sync for Handler {}
#[tokio::main]
pub async fn main() {
    let offset = UtcOffset::from_hms(8, 0, 0).unwrap();
    simple_logger::SimpleLogger::new()
        .with_utc_offset(offset)
        .with_level(LevelFilter::Debug)
        .env()
        .init()
        .unwrap();

    let name_addr = "192.168.3.49:9876".to_string();

    let topic = "MessageCluster_To".to_string();
    let consume_group = "Message_messageClusterInput_group".to_string();
    let consumer = PullConsumer::new_broadcast_consumer(name_addr.clone(), consume_group, topic);


    let handle = Arc::new(Handler {});
    let lock = Arc::new(RwLock::new(true));
    let run = lock.clone();

    tokio::spawn(async move {
        consumer.start_consume(handle.clone(), run.clone()).await;
    });
    tokio::time::sleep(Duration::from_secs(180)).await;
    {
        let mut run = lock.write().await;
        *run = false;
    }
    tokio::time::sleep(Duration::from_secs(2)).await;
    info!("quit the test")
}



producer

#[tokio::test]
    async fn send_message_test() {

        let offset = UtcOffset::from_hms(8, 0, 0).unwrap();
        simple_logger::SimpleLogger::new().with_utc_offset(offset).with_level(LevelFilter::Debug).env().init().unwrap();

        let message_body = r#"{"id":"3910000000000056508"}"#;
        let body = message_body.as_bytes().to_vec();

        let name_addr = "192.168.3.49:9876".to_string();
        let topic = "topic_test_007".to_string();

        let mut producer = Producer::new("rust_send_group_1".to_string(), name_addr.clone()).await;
        for i in 0..10 {
            producer.send_message(topic.clone(), body.clone(), format!("{i}")).await.unwrap();
            tokio::time::sleep(Duration::from_secs(5)).await;
        }
    }


todo *

  • tag supports
  • connect me

    zyy20101289@outlook.com

    Dependencies

    ~10–20MB
    ~267K SLoC