#rpc-client #client-server #tcp-socket #rpc-server #communication #unix-socket #tokio

krpc

A asynchronous RPC library(include client and server) which can use easly and communicate by tokio unix/tcp socket

5 releases

0.2.0 Oct 10, 2023
0.1.3 Jul 26, 2023
0.1.2 Jul 10, 2023
0.1.1 Dec 29, 2022
0.1.0 Dec 29, 2022

#17 in #rpc-server

MIT/Apache

50KB
1K SLoC

This is a RPC library which include client and server, it has the following characteristics:

  • support multily thread
  • use asynchronous communication by tokio
  • support unix and tcp socket
  • support client subcribe and server publish
  • easy to use

Instructions for use


lib.rs:

krpc

This is a RPC library which include client and server, it has the following characteristics:

  1. support multily thread
  2. use asynchronous communication by tokio
  3. support unix and tcp socket
  4. support client subcribe and server publish
  5. easy to use

server example

use tokio::time;

#[tokio::main(flavor = "current_thread")]
async fn main() -> std::io::Result<()> {
let mut service = krpc::Server::new();
service.bind(
"test_notargs_and_notret",
krpc::callback!(|| {
println!("test_notargs_and_notret called");
}),
);
service.bind(
"test_notret",
krpc::callback!(|s: String| {
println!("str is {}", s);
}),
);
// 若回调函数提前return需要加上`@with_return`
service.bind(
"test_notret2",
krpc::callback!(@with_return |s: String| {
if s.is_empty() {
return;
}
println!("str is {}", s);
}),
);
service.bind(
"test_notargs",
krpc::callback!(|| -> String { String::from("test_notargs called!") }),
);
service.bind(
"test_has_args_and_ret",
krpc::callback!(|a0: i32, a1: i16, a2: i8, a3: bool| -> String {
format!("{} {} {} {}", a0, a1, a2, a3)
}),
);
let clone_string = String::from("abcd");
service.bind(
"test_with_clone",
krpc::callback!(krpc::clone!(clone_string), || -> String {
clone_string
}),
);
println!("clone string is {}", clone_string);
let publisher = service.publisher();
tokio::spawn(async move {
let mut interval = time::interval(time::Duration::from_secs(1));
loop {
publisher.push("sub", ("hello", 100));
interval.tick().await;
}
});
println!("server start");
service.run("/tmp/local/unix").await
}

client example1

use macro define! to define a new type with some methods, call or subcribe by it's method

use krpc::*;
use std::sync::Arc;
use tokio::time;

struct MySub;
impl SubcribeCallback<(String, i32)> for MySub {
fn callback(&mut self, data: (String, i32)) -> bool {
assert_eq!(&data.0, "hello");
assert_eq!(data.1, 100i32);
false
}
}

define!(unix, RPC,
fn test_notargs_and_notret(),
fn test_notret(s:&'static str),
fn test_notargs()->String,
fn test_has_args_and_ret(a:i32,b:i16,c:i8,d:bool)->String,
sub sub1(topic:&'static str, f:impl FnMut((String,i32))),
sub sub2(topic:&'static str, v:&mut MySub)
);

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Error> {
let rpc = Arc::new(RPC::new("/tmp/local/unix").await?);
let _ = rpc.heartbeat().await?; //检测服务端是否存在
let _ = rpc.test_notargs_and_notret().await?;
let _ = rpc.test_notret("hello client").await?;
let ret = rpc.test_notargs().await?;
assert_eq!(&ret, "test_notargs called!");
let ret = rpc.test_has_args_and_ret(10000, 1000, 10, true).await?;
assert_eq!(&ret, "10000 1000 10 true");
let task = tokio::spawn(async move {
let mut v = MySub;
loop {
let c1 = rpc.clone();
let c2 = rpc.clone();
tokio::select! {
e = c1.sub1("sub", |(s, id)| {
assert_eq!(&s, "hello");
assert_eq!(id, 100i32);
println!("xxx");
}) => break e,
Err(e) = c2.sub2("sub", &mut v) => break Err(e),
}
}
});
time::sleep(time::Duration::new(3, 0)).await;
task.abort();
Ok(())
}

client example2

use macro call! to call and subcribe! to subcribe directly

use krpc::*;

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Error> {
let _ = call!(unix, "/tmp/local/unix", test_notargs_and_notret()).await?;
let _ = call!(unix, "/tmp/local/unix", test_notret("hello client")).await?;
let ret = call!(unix, "/tmp/local/unix", test_notargs() -> String).await?;
assert_eq!(&ret, "test_notargs called!");
let ret = call!(unix, "/tmp/local/unix", test_has_args_and_ret(10000, 1000, 10, true) -> String).await?;
assert_eq!(&ret, "10000 1000 10 true");
//test subcribe with trait
let mut sub = MySub;
subcribe!(unix, "/tmp/local/unix", "sub", &mut sub).await?;
let task = tokio::spawn(async move {
//test subcribe with lambda
subcribe!(unix, "/tmp/local/unix", "sub", |s: String, v: i32| {
assert_eq!(&s, "hello");
assert_eq!(v, 100i32);
})
.await
});
time::sleep(time::Duration::new(3, 0)).await;
task.abort();
Ok(())
}

Dependencies

~3–14MB
~144K SLoC