10 breaking releases

0.14.0 Oct 4, 2023
0.13.0 May 17, 2022
0.11.0 May 5, 2022
0.2.0 Jan 6, 2022
0.1.0 Dec 20, 2021

#377 in Encoding

Download history 19/week @ 2024-02-26 34/week @ 2024-03-04 8/week @ 2024-03-11 13/week @ 2024-03-18 8/week @ 2024-03-25 55/week @ 2024-04-01

87 downloads per month

Apache-2.0

26KB
431 lines

rl-core

Docs

The core logic for a token-bucket rate limiter.

This just implements the logic for the limiting and has no method for ensuring consistency or blocking. Those can either be added by the application or a wrapping library.

Local Example

Here is an example of applying an in-process rate limit. This should be wrapped up into a library and likely integrated with your favourite async runtime. However no one has done this yet.

// Send at most 1 message per second per domain with a burst of 4.
const DOMAIN_LIMIT: rl_core::Config = rl_core::Config::new(
	std::time::Duration::from_secs(1),
	4);

lazy_static::lazy_static! {
	// This uses a Mutex<HashMap<..>> as the outer structure. Since this lock is always short-lived (just for cloning or creating and storing the Arc) this is likely sufficent for almost all applications. If concurrency on the outer structure is a contention point a better concurrent map can be substituted.
	static ref DOMAIN_TO_RATE_LIMIT: std::sync::Mutex<
		std::collections::HashMap<
			String,
			std::rc::Arc<std::sync::Mutex<rl_core::Tracker>>> = Default::default();
}

fn send_mail(msg: Message) {
	// Calculate our rate-limit key.
	let domain = msg.to().email().domain();

	// Get (or create) the rate-limit tracker for this key.
	let tracker_arc = DOMAIN_TO_RATE_LIMIT.lock().unwrap()
		.entry(domain)
		.or_default()
		.clone();

	// Lock the tracker itself.
	let tracker = tracker_arc.lock().unwrap();

	// Acquire the required tokens.
	loop {
		match rate_limit.acquire(&DOMAIN_LIMIT, 1) {
			Ok(()) => break, // Granted.
			Err(rl_core::Denied::TooEarly(denial)) => {
				// Wait for required token grants.
				if let Ok(to_wait) = denial.available_at().duration_since(std::time::SystemTime::now()) {
					std::time::sleep(to_wait)
				}
				// Restart the loop. The next acquire shuold always succeed unless time has jumped backwards.
			}
			Err(e) => panic!("Error: {}", e),
		}
	}

	std::mem::drop(tracker); // Unlock rate limit tracker.

	// Actualy send the message...
	unimplemented!("Send {:?}", msg)
}

Distributed Example

Here is a simple example of applying a per-user rate limit to login attempts. In this example it is assumed that we can acquire row-level locks from our DB to ensure serialized rate-limit updates.

// Rate login attempts to to 1 per hour with a 10 login burst.
const LOGIN_RATE_LIMIT: rl_core::Config = rl_core::Config::new(
	std::time::Duration::from_secs(3600),
	10);

fn try_login(user: String, pass: String) {
	let User{password_hash, mut rate_limit} = fetch_and_lock_user(&user);

	if let Err(e) = rate_limit.acquire(&LOGIN_RATE_LIMIT, 1) {
		panic!("Login failed: {}", e)
	}

	store_rate_limit_and_unlock_user(&user, &rate_limit);

	// Verify password and issue token ...
}

If your DB doesn't support row-level updates you can do optimistic checking where after acquiring the rate limit you compare-and-set the new value. If the compare fails someone else acquired it in the meantime and you need to retry. For high-throughput use-cases you likely want to manage rate limits on a shared-service and utilize rl-core on that service. (This service has not yet been written.)

Features

  • Weighted request support.
  • Calculation of required wait time.
  • Efficient updates.
  • Low memory use.
  • Minimal dependencies.
  • Serialization of Tracker via serde. (Enable feature use_serde.)

Non-features

It is intended that the following could be implemented on top of rl-core.

  • Distributed rate-limiting support.
  • Waiting support.

Dependencies

~175KB