1 unstable release
new 0.1.0 | Nov 21, 2024 |
---|
#17 in #event-bus
34KB
476 lines
eventbus
介绍
使用nats,kafaka,rabbitmq 实现的eventbus
使用说明
#[derive(Serialize, Deserialize, Debug, Clone)]
struct MyMessage {
content: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 订阅消息
let sub_handler: MessageHandler<MyMessage> = Arc::new(move |msg: Message<MyMessage>| {
println!(
"---------Received message in subscriber: {:?}--------------",
msg.body
);
Ok("".to_string())
});
tokio::spawn(subscribe_to_topic("my_topic", sub_handler));
// 发布消息
let message = Message::new(
"my_topic".to_string(),
crate::message::NativeEventAction::Other,
None,
Some(MyMessage {
content: "Hello from publisher!".to_string(),
}),
None,
);
tokio::spawn(publish_message(message));
// reply消息
let reply_handler: MessageHandler<MyMessage> = Arc::new(move |msg: Message<MyMessage>| {
println!(
"============Received request, preparing response: {:?}================",
msg
);
Ok("lw".to_string())
});
tokio::spawn(reply_to_topic("my_topic2", reply_handler));
// request消息
let request_message = Message::new(
"my_topic2".to_string(),
crate::message::NativeEventAction::Other,
None,
Some(MyMessage {
content: "Hello from publisher!".to_string(),
}),
None,
);
tokio::spawn(send_request(request_message));
sleep(Duration::from_secs(1000)).await;
println!("Disconnected from NATS!");
Ok(())
}
// 订阅消息并处理
async fn subscribe_to_topic<T>(topic: &str, handler: MessageHandler<T>)
where
T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static,
{
let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端
nats_cli.subscribe(topic, handler).await;
println!("Subscribed to topic: {}", topic);
}
// 发布消息
async fn publish_message<T>(message: Message<T>) -> Result<(), Box<dyn std::error::Error + Send>>
where
T: Serialize + for<'de> Deserialize<'de> + Debug + Send + Clone + Sync + 'static,
{
// 使用 if let 处理 NatsCli::new()
let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端
sleep(Duration::from_secs(5));
if let Err(e) = nats_cli.publish(message).await {
println!("Failed to publish message: {:?}", e);
return Err(Box::new(e) as Box<dyn std::error::Error + Send>);
}
println!("---------Message published!---------------");
Ok(())
}
// 收到请求并等待响应
async fn reply_to_topic<T>(
topic: &str,
handler: MessageHandler<T>,
) -> Result<(), Box<dyn std::error::Error + Send>>
where
T: Serialize + for<'de> Deserialize<'de> + Debug + Send + Clone + Sync + 'static,
{
let mut nats_cli = NatsCli::new().await.unwrap(); // 创建并初始化 NATS 客户端
nats_cli
.reply(topic, handler)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send>)?;
Ok(())
}
// 发送请求并等待响应
async fn send_request<T>(message: Message<T>) -> Result<(), Box<dyn std::error::Error + Send>>
where
T: Serialize + for<'de> Deserialize<'de> + Debug + Clone + Send + Sync + 'static,
{
let mut nats_cli = NatsCli::new().await.unwrap();
sleep(Duration::from_secs(15));
match nats_cli
.request(message, time::Duration::from_secs(100))
.await
{
Ok(response) => {
println!(
"===============Received response: {:?}==================",
response
);
Ok(())
}
Err(e) => {
println!("Failed to get response: {:?}", e);
Err(Box::new(e) as Box<dyn std::error::Error + Send>)
}
}
}
Dependencies
~22–36MB
~622K SLoC