28 releases
new 0.12.2 | Dec 11, 2024 |
---|---|
0.12.1 | Oct 27, 2024 |
0.11.0 | May 8, 2024 |
0.9.0 | Feb 20, 2024 |
0.1.7 | Oct 4, 2022 |
#287 in Network programming
574 downloads per month
94KB
2K
SLoC
SHORS
Shors - a library for creating transport layer for distributed systems built with tarantool-module. Shors contains four components:
- http router (open api integration)
- rpc router
- rpc client
- builtin components like:
- middlewares (opentelemetry, metrics, etc)
- logger
HTTP
Create http route
Use a route::Builder for create http routes. After route created just register it with http Server.
Example:
use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request};
use shors::transport::Context;
use std::error::Error;
fn make_http_endpoint() {
let endpoint = Builder::new()
.with_method("GET")
.with_path("/concat/a/:a/b/:b")
.build(
|_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
let a = request
.stash
.get("a")
.map(|s| s.as_str())
.unwrap_or_default();
let b = request
.stash
.get("b")
.map(|s| s.as_str())
.unwrap_or_default();
Ok(a.to_string() + b)
},
);
let s = server::Server::new();
s.register(Box::new(endpoint));
}
A more complex example (with groups, error handling, custom and builtin middlewares):
use once_cell::sync::Lazy;
use opentelemetry::sdk::export::trace::stdout;
use opentelemetry::sdk::trace::Tracer;
use shors::transport::http::route::Builder;
use shors::transport::http::{server, Request, Response};
use shors::transport::Context;
use shors::{middleware, shors_error};
use std::error::Error;
static OPENTELEMETRY_TRACER: Lazy<Tracer> = Lazy::new(|| stdout::new_pipeline().install_simple());
pub fn make_http_endpoints() {
let route_group = Builder::new()
.with_path("/v1")
.with_middleware(|route| {
println!("got new http request!");
route
})
.with_middleware(middleware::http::otel(&OPENTELEMETRY_TRACER))
.group();
#[derive(serde::Deserialize)]
struct EchoRequest {
text: String,
}
let echo = route_group
.builder()
.with_method("POST")
.with_path("/echo")
.build(
|_ctx: &mut Context, request: Request| -> Result<_, Box<dyn Error>> {
let req: EchoRequest = request.parse()?;
Ok(req.text)
},
);
let ping = route_group
.builder()
.with_method("GET")
.with_path("/ping")
.build(|_ctx: &mut Context, _request: Request| -> Result<_, Box<dyn Error>> { Ok("pong") });
let s = server::Server::new();
s.register(Box::new(echo));
s.register(Box::new(ping));
}
Add OpenAPI docs
First, add shors = { ..., features = ["open-api"]}
to Cargo.toml of your project.
Use .define_open_api
method on route builder and define OpenAPI operation. Underline
shors using utoipa crate for create OpenAPI schema.
For user convenience this crate reexported as shors::utoipa
.
!Important: if you use derive macros from shors::utoipa
please add this line into .rs file:
use shors::utoipa as utoipa;
for correct work of a derive macros.
To access the resulting OpenAPI document use a shors::transport::http::openapi::with_open_api
function.
See test application routes for familiarize with examples of usage.
For usage of swagger see shors::transport::http::openapi::swagger_ui_route
function.
RPC
Prepare
Rpc transport required exported stored procedure - rpc_handler.
Create stored procedure. Example (where RPC_SERVER is the Server instance):
#[no_mangle]
pub extern "C" fn rpc_handler(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
RPC_SERVER.with(|srv| srv.handle(ctx, args))
}
And export it from cartridge role. Example:
box.schema.func.create('mylib.rpc_handler', { language = 'C', if_not_exists = true })
rawset(_G, 'rpc_handler', function(path, ctx, mp_request)
return box.func['mylib.rpc_handler']:call({ path, ctx, mp_request })
end)
Create rpc routes
Working with rpc routes same as http: use route::Builder for creating rpc routes. After route created register it with rpc Server.
Complex example:
use once_cell::unsync::Lazy;
use shors::log::RequestIdOwner;
use shors::transport::rpc::server::Server;
use shors::transport::{rpc, Context};
use std::error::Error;
use shors::{middleware, shors_error};
thread_local! {
pub static RPC_SERVER: Lazy<Server> = Lazy::new(Server::new);
}
#[tarantool::proc]
fn init_rpc() -> tarantool::Result<()> {
let routes = rpc::route::Builder::new()
.with_error_handler(|ctx, err| {
shors_error!(ctx: ctx, "rpc error {}", err);
})
.with_middleware(|route| {
println!("got new rpc request!");
route
})
.with_middleware(middleware::rpc::otel(&OPENTELEMETRY_TRACER))
.group();
let sum_route = routes.builder().with_path("/sum").build(
|_ctx: &mut Context, req: rpc::Request| -> Result<_, Box<dyn Error>> {
let numbers = req.parse::<Vec<u64>>()?;
Ok(numbers.into_iter().sum::<u64>())
},
);
RPC_SERVER.with(|srv| {
srv.register(Box::new(sum_route));
});
Ok(())
}
RPC client
There is a special component for interaction with remote rpc endpoints. Currently, client can call rpc endpoint in four modes:
- by bucket_id (vshard)
- by bucket_id (vshard) async (without waiting for an response)
- by replicaset id (call current master)
- by cartridge role (call current master)
- by cartridge role with uri (call instance by uri that can be not current master)
Prepare
The RPC client requires some lua code to be registered whether in the luaopen_
function or an init function.
Note that a luaopen-function is called when a related library is properly loaded e.g. from the init.lua
file or the RPC client is intentionally initialized in the init function, as shown in the following examples
Examples:
Initialization of rpc-client directly from the init method
#[proc]
fn init_rpc_client() {
init_logger();
let lua = unsafe { tlua::StaticLua::from_static(tarantool::lua_state().as_lua()) };
shors::init_lua_functions(&lua)}
Defining a luaopen-function
#[no_mangle]
pub unsafe extern "C" fn luaopen_libstub(l: *mut ffi_lua::lua_State) -> c_int {
let lua = tlua::StaticLua::from_static(l);
shors::init_lua_functions(&lua).unwrap();
1
}
Call rpc endpoint by bucket_id
let lua = tarantool::lua_state();
let params = vec![2, 3, 4];
let resp = rpc::client::Builder::new(&lua)
.shard_endpoint("/add")
.call(&mut Context::background(), bucket_id, ¶ms)?;
Call rpc endpoint by bucket_id async
let lua = tarantool::lua_state();
rpc::client::Builder::new(&lua)
.async_shard_endpoint("/ping")
.call(&mut Context::background(), bucket_id, &())?;
Call rpc endpoint by replicaset uuid
let lua = tarantool::lua_state();
let params = vec![2, 3, 4];
let resp = rpc::client::Builder::new(&lua)
.replicaset_endpoint("/add")
.prefer_replica()
.call(&mut Context::background(), rs_uuid, ¶ms)?;
Call rpc endpoint by cartridge role
NOTE: calling rpc endpoint by role require register exported rpc_handler stored procedure as exported role method. For example:
return {
role_name = 'app.roles.stub',
...
rpc_handler = function(path, ctx, mp_request)
return box.func['libstub.rpc_handle']:call({ path, ctx, mp_request })
end,
}
Call example:
let lua = tarantool::lua_state();
let resp = rpc::client::Builder::new(&lua)
.role_endpoint("app.roles.stub", "/ping")
.call(&mut Context::background(), &())?;
Call rpc endpoint by cartridge role
NOTE: depend on rpc_handler (detail in Call rpc endpoint by cartridge role item)
let lua = tarantool::lua_state();
let resp = rpc::client::Builder::new(&lua)
.role_endpoint("app.roles.stub", "/ping")
.with_uri("localhost:3031")
.call(&mut Context::background(), &())?;
Builtin middlewares
- http server
- debug - print debug information in debug logs
- otel - opentelemetry tracing
- otel_conditional - opentelemetry tracing, disabled if http-header
with-trace
not set - access_logs - nginx like access logs
- rpc server
- debug - print debug information in debug logs
- otel - opentelemetry tracing
- record_latency - record route latency as prometheus metric
- rpc client
- otel - opentelemetry tracing
- retry - retry call on server side errors
- record_latency - record call latency as prometheus metric
Testing
Unit
make unit-test
Integration
(cd tests/testapplication/ && cartridge build)
make int-test
Request pipeline (actual for 0.1.x version)
!NOTE text bellow not actual for shors v 0.2.0+
Shors use vshard/cartridge API underline for make remote requests. Both cartridge and vshard api is a LUA api. So, look at pipeline of shors rpc request:
Client side
- Rust side:
- sender create an instance of rpc::client::Client
- sender use rpc::client::Client::call method with rust structure that represent request payload
- rust structure serialize into LUA table (using tarantool::tlua - Push trait)
- LUA side:
- calling vshard or cartridge method with LUA table derived from previous step
- vshard or cartridge api calling iproto
- iproto serialize LUA table into msgpack and send it into server side
Server side
- LUA side:
- received message from iproto, message representation is a tarantool tuple
- call function-handler in rust using internal routes table
- Rust side:
- Call .decode method on rpc::Request to restore the request
So serialization/deserialization scheme looks like this: rust structure -> lua table -> msgpack representation -> rust structure
There is a problem in this scheme: what if we use enum in fields of initial rust structure? For example
#[derive(Debug, Deserialize, Serialize, tlua::Push)]
enum Value {
String(String),
Code(String),
}
#[derive(Debug, Deserialize, Serialize, tlua::Push)]
struct Foo {
bar: Value,
}
let example = Foo{ bar: Value::Code("abc".to_string()) };
tarantool::tlua will serialize this struct in lua table like this:
{
bar = "abc"
}
So as you see, information about which enum variant using is lost. In the future, when we serialize this LUA table into msgpack and then deserialize it into a rust structure, we will get a serde error. Serde expect some type information for deserializing, but there is no. In this example if LUA table looks like:
{
bar = {"Code" = "abc"}
}
Deserialization will be success and no errors occurred.
How we can fix this? Currently most generic approach is an implement tlua::Push trait for enum. For this example (note, this is example implementation, don't create hashmap at production code):
impl<L> tlua::Push<L> for Value
where
L: tlua::AsLua,
{
type Err = TuplePushError<tlua::Void, tlua::Void>;
fn push_to_lua(&self, lua: L) -> Result<PushGuard<L>, (Self::Err, L)> {
let mut hm = HashMap::<String, String>::new();
match self {
Value::String(s) => hm.insert("String".to_string(), s.to_string()),
Value::Code(s) => hm.insert("Code".to_string(), s.to_string()),
};
hm.push_to_lua(lua)
}
}
impl<L> tlua::PushOne<L> for Value where L: tlua::AsLua {}
Dependencies
~9–23MB
~336K SLoC