#event-bus #nats #rabbitmq #client #implemented #kafaka

testeventbus

eventbus is implemented using nats,kafaka and rabbitmq client

1 unstable release

new 0.1.0 Nov 21, 2024

#17 in #event-bus

Apache-2.0

34KB
476 lines

eventbus

介绍

使用nats,kafaka,rabbitmq 实现的eventbus

使用说明



#[derive(Serialize, Deserialize, Debug, Clone)]
struct MyMessage {
    content: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

    // 订阅消息
    let sub_handler: MessageHandler<MyMessage> = Arc::new(move |msg: Message<MyMessage>| {
        println!(
            "---------Received message in subscriber: {:?}--------------",
            msg.body
        );
        Ok("".to_string())
    });
    tokio::spawn(subscribe_to_topic("my_topic", sub_handler));
    
    // 发布消息
    let message = Message::new(
        "my_topic".to_string(),
        crate::message::NativeEventAction::Other,
        None,
        Some(MyMessage {
            content: "Hello from publisher!".to_string(),
        }),
        None,
    );
    tokio::spawn(publish_message(message));

    // reply消息
    let reply_handler: MessageHandler<MyMessage> = Arc::new(move |msg: Message<MyMessage>| {
        println!(
            "============Received request, preparing response: {:?}================",
            msg
        );
        Ok("lw".to_string())
    });

    tokio::spawn(reply_to_topic("my_topic2", reply_handler));
    
    // request消息
    let request_message = Message::new(
        "my_topic2".to_string(),
        crate::message::NativeEventAction::Other,
        None,
        Some(MyMessage {
            content: "Hello from publisher!".to_string(),
        }),
        None,
    );
    tokio::spawn(send_request(request_message));

    sleep(Duration::from_secs(1000)).await;

    println!("Disconnected from NATS!");

    Ok(())
}

// 订阅消息并处理
async fn subscribe_to_topic<T>(topic: &str, handler: MessageHandler<T>)
where
    T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static,
{
    let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端
    nats_cli.subscribe(topic, handler).await;
    println!("Subscribed to topic: {}", topic);
}

// 发布消息
async fn publish_message<T>(message: Message<T>) -> Result<(), Box<dyn std::error::Error + Send>>
where
    T: Serialize + for<'de> Deserialize<'de> + Debug + Send + Clone + Sync + 'static,
{
    // 使用 if let 处理 NatsCli::new()
    let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端
    sleep(Duration::from_secs(5));
    if let Err(e) = nats_cli.publish(message).await {
        println!("Failed to publish message: {:?}", e);
        return Err(Box::new(e) as Box<dyn std::error::Error + Send>);
    }
    println!("---------Message published!---------------");
    Ok(())
}

// 收到请求并等待响应
async fn reply_to_topic<T>(
    topic: &str,
    handler: MessageHandler<T>,
) -> Result<(), Box<dyn std::error::Error + Send>>
where
    T: Serialize + for<'de> Deserialize<'de> + Debug + Send + Clone + Sync + 'static,
{
    let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端
    nats_cli
        .reply(topic, handler)
        .await
        .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send>)?;
    Ok(())
}

// 发送请求并等待响应
async fn send_request<T>(message: Message<T>) -> Result<(), Box<dyn std::error::Error + Send>>
where
    T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static,
{
    let mut nats_cli = NatsCli::new().await.unwrap();
    sleep(Duration::from_secs(15));
    match nats_cli
        .request(message, time::Duration::from_secs(100))
        .await
    {
        Ok(response) => {
            println!(
                "===============Received response: {:?}==================",
                response
            );
            Ok(())
        }
        Err(e) => {
            println!("Failed to get response: {:?}", e);
            Err(Box::new(e) as Box<dyn std::error::Error + Send>)
        }
    }
}




Dependencies

~22–36MB
~622K SLoC