#nats #event-bus #rabbitmq #client #implemented #kafaka

feventbus

eventbus is implemented using nats,kafaka and rabbitmq client

7 releases

0.3.1 Dec 4, 2024
0.3.0 Nov 27, 2024
0.2.3 Nov 26, 2024
0.1.0 Nov 19, 2024

#887 in Encoding

Download history 302/week @ 2024-11-18 537/week @ 2024-11-25 161/week @ 2024-12-02 40/week @ 2024-12-09

1,040 downloads per month
Used in fleet_apiserver

Apache-2.0

40KB
470 lines

eventbus

介绍

使用nats,kafaka,rabbitmq 实现的eventbus

使用说明

/**
* Copyright(2024,)Institute of Software, Chinese Academy of Sciences
* author: jiangliuwei@iscas.ac.cn
* since: 0.1.0
*
**/

/**
* Copyright(2024,)Institute of Software, Chinese Academy of Sciences
* author: jiangliuwei@iscas.ac.cn
* since: 0.1.0
*
**/

mod err;
mod impls;
mod message;
mod tests;
mod traits;
mod config;
use crate::impls::nats::nats::NatsCli;
use crate::message::Message;
use crate::traits::consumer::Consumer;
use crate::traits::consumer::MessageHandler;
use crate::traits::controller::EventBus;
use crate::traits::producer::Producer;
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::sync::Arc;
use std::time;
use std::time::Duration;
use tokio::time::sleep;

#[derive(Serialize, Deserialize, Debug, Clone)]
struct MyMessage {
    content: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct YourMessage {
    contentyou: String,
}

async fn ceshi(val: String)->String{
    val
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let nats_cli = Arc::new(NatsCli::new().await.unwrap());

    let reply_handler: MessageHandler<serde_json::Value> = Arc::new(move |msg: Message<serde_json::Value>| {
        Box::pin(async move {
            match msg.body {
                Some(body) => {
                    if let Ok(my_message) = serde_json::from_value::<MyMessage>(body.clone()) {
                        ceshi(my_message.content.clone()).await;
                        Ok(my_message.content)
                    } else if let Ok(your_message) = serde_json::from_value::<YourMessage>(body) {
                        // println!("Received YourMessage: {:?}", your_message);
                        Ok(your_message.contentyou)
                    } else {
                        Err(crate::err::Error::MessageHandling(
                            "Cargo creation failed: metadata is null".to_string(),
                        ))
                    }
                }
                None => {
                    Err(crate::err::Error::MessageHandling(
                        "Cargo creation failed: metadata is null".to_string(),
                    ))
                }
            }
        })
    });

    tokio::spawn(reply_to_topic(
        "my_topic2",
        Arc::clone(&nats_cli),
        reply_handler,
    ));

    // request消息
    let request_message = Message::new(
        "my_topic2".to_string(),
        crate::message::NativeEventAction::Other,
        None,
        Some(MyMessage {
            content: "request 123!".to_string(),
        }),
        None,
    );
    tokio::spawn(send_request(request_message, Arc::clone(&nats_cli)));

    // request消息
    let request_message2 = Message::new(
        "my_topic2".to_string(),
        crate::message::NativeEventAction::Other,
        None,
        Some(YourMessage {
            contentyou: "request 234!".to_string(),
        }),
        None,
    );

    tokio::spawn(send_request(request_message2, Arc::clone(&nats_cli)));


    let sub_handler: MessageHandler<serde_json::Value> = Arc::new(move |msg: Message<serde_json::Value>| {
        Box::pin(async move {
            match msg.body {
                Some(body) => {
                    if let Ok(my_message) = serde_json::from_value::<MyMessage>(body.clone()) {
                        ceshi(my_message.content.clone()).await;
                        println!("-------{}----------",my_message.content);
                        Ok("---sub Response from MyMessage---".to_string())
                    } else if let Ok(your_message) = serde_json::from_value::<YourMessage>(body) {
                        // println!("Received YourMessage: {:?}", your_message);
                        println!("-------{}------",your_message.contentyou);
                        Ok("---sub Response from YourMessage---".to_string())
                    } else {
                        Err(crate::err::Error::MessageHandling(
                            "---Cargo creation failed: metadata is null---".to_string(),
                        ))
                    }
                }
                None => {
                    Err(crate::err::Error::MessageHandling(
                        "---Cargo creation failed: metadata is null---".to_string(),
                    ))
                }
            }
        })
    });

    tokio::spawn(subscribe_to_topic(
        "my_topic3",
        Arc::clone(&nats_cli),
        sub_handler,
    ));

    // request消息
    let pub_message = Message::new(
        "my_topic3".to_string(),
        crate::message::NativeEventAction::Other,
        None,
        Some(MyMessage {
            content: "pub 123!".to_string(),
        }),
        None,
    );
    tokio::spawn(publish_message(pub_message, Arc::clone(&nats_cli)));

    // request消息
    let pub_message2 = Message::new(
        "my_topic3".to_string(),
        crate::message::NativeEventAction::Other,
        None,
        Some(YourMessage {
            contentyou: "pub 234!".to_string(),
        }),
        None,
    );

    tokio::spawn(publish_message(pub_message2, Arc::clone(&nats_cli)));

    sleep(Duration::from_secs(100)).await;
    Ok(())
}



// 收到请求并等待响应
async fn reply_to_topic(
    topic: &str,
    nats_cli: Arc<NatsCli>,
    handler: MessageHandler<serde_json::Value>,
) -> Result<(), Box<dyn std::error::Error + Send>>
{
    nats_cli
        .reply(topic, handler)
        .await
        .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send>)?;
    Ok(())
}


// 订阅消息并处理
async fn subscribe_to_topic<T>(topic: &str, nats_cli: Arc<NatsCli>, handler: MessageHandler<T>)
where
    T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static,
{
    nats_cli.subscribe(topic, handler).await;
    println!("Subscribed to topic: {}", topic);
}

// 发布消息
async fn publish_message<T>(
    message: Message<T>,
    nats_cli: Arc<NatsCli>,
) -> Result<(), Box<dyn std::error::Error + Send>>
where
    T: Serialize + for<'de> Deserialize<'de> + Debug + Send + Clone + Sync + 'static,
{
    if let Err(e) = nats_cli.publish(message).await {
        println!("Failed to publish message: {:?}", e);
        return Err(Box::new(e) as Box<dyn std::error::Error + Send>);
    }
    Ok(())
}

// 发送请求并等待响应
async fn send_request<T>(
    message: Message<T>,
    nats_cli: Arc<NatsCli>,
) -> Result<(), Box<dyn std::error::Error + Send>>
where
    T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static,
{
    match nats_cli
        .request(message, time::Duration::from_secs(100))
        .await
    {
        Ok(response) => {
            println!(
                "===============Received response: {:?}==================",
                response
            );
            Ok(())
        }
        Err(e) => {
            println!("Failed to get response: {:?}", e);
            Err(Box::new(e) as Box<dyn std::error::Error + Send>)
        }
    }
}

Dependencies

~22–35MB
~628K SLoC