2 unstable releases
0.2.0 | Jun 27, 2020 |
---|---|
0.1.0 | Apr 27, 2020 |
#2822 in Database interfaces
Used in 2 crates
67KB
1.5K
SLoC
tang_rs
A light weight connection pool.
The main purpose is to keep a fair and balance usage of every connection no matter the workload.
Requirement:
rustc 1.42.0
- some example may require nightly in their dependencies.
Features:
tokio-postgres
redis
mongodb
(experimental)
lib.rs
:
An asynchronous connection pool.
feature
default
- multi thread pool whereSend
bound is needed for all futures.no-send
- single thread pool where!Send
futures are accepted.
Known Limitation:
can't be used in nested runtimes.
Example:
// This example shows how to implement the pool on async_std runtime.
// Most of the xxx-tang crates are implemented with tokio runtime so they can be seen as examples on that matter.
use std::fmt::{Debug, Formatter, Result as FmtResult};
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use async_std::task;
use smol::Timer;
use tang_rs::{Builder, Manager, ManagerFuture, ManagerTimeout};
// our test pool would just generate usize from 0 as connections.
struct TestPoolManager(AtomicUsize);
impl TestPoolManager {
fn new() -> Self {
TestPoolManager(AtomicUsize::new(0))
}
}
// dummy error type
struct TestPoolError;
impl Debug for TestPoolError {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
f.debug_struct("TestPoolError")
.field("source", &"Unknown")
.finish()
}
}
// convert instant as timeout error to our pool error.
impl From<Instant> for TestPoolError {
fn from(_: Instant) -> Self {
TestPoolError
}
}
impl Manager for TestPoolManager {
type Connection = usize;
type Error = TestPoolError;
type Timeout = Timer;
type TimeoutError = Instant;
fn connect(&self) -> ManagerFuture<'_, Result<Self::Connection, Self::Error>> {
// how we generate new connections and put them into pool.
Box::pin(async move { Ok(self.0.fetch_add(1, Ordering::SeqCst)) })
}
fn is_valid<'a>(
&self,
_conn: &'a mut Self::Connection,
) -> ManagerFuture<'a, Result<(), Self::Error>> {
Box::pin(async {
// when the connection is pulled from the pool we can check if it's valid.
Ok(())
})
}
fn is_closed(&self, _conn: &mut Self::Connection) -> bool {
// return true if you check the connection and want it to be dropped from the pool because it's closed.
false
}
fn spawn<Fut>(&self, fut: Fut)
where
Fut: Future<Output = ()> + Send + 'static,
{
// some pool inner functions would want to spawn on your executor.
// you can use the handler to further manage them if you want.
// normally we just spawn the task and forget about it.
let _handler = task::spawn(fut);
}
// Boilerplate implement for runtime specific timeout future.
fn timeout<Fut: Future>(&self,fut: Fut, dur: Duration) -> ManagerTimeout<Fut, Self::Timeout> {
ManagerTimeout::new(fut, Timer::after(dur))
}
}
#[async_std::main]
async fn main() {
let mgr = TestPoolManager::new();
let builder = Builder::new()
.always_check(false)
.idle_timeout(None)
.max_lifetime(None)
.min_idle(24)
.max_size(24)
.build(mgr);
let pool = builder.await.expect("fail to build pool");
// spawn 24 futures and pull connections from pool at the same time.
let (tx, rx) = async_std::sync::channel(100);
for _i in 0..24 {
let pool = pool.clone();
let tx = tx.clone();
task::spawn(async move {
let mut pool_ref = pool.get().await.expect("fail to get PoolRef");
let conn_ref = &*pool_ref;
println!("we have the reference of a connection : {:?}", conn_ref);
// we can also get a mut reference from pool_ref
let conn_ref = &mut *pool_ref;
let _ = tx.send(*conn_ref);
});
}
drop(tx);
while let Ok(_connection) = rx.recv().await {
// We just wait until all connections are pulled out once
}
}