1 unstable release

0.1.0 Jun 5, 2024

#934 in Asynchronous

MIT/Apache

375KB
6K SLoC

CI codecov

Rexecutor

A robust job execution library for rust built on the tokio runtime.

For example usage see postgres example.

Setting up Rexecutor

To create an instance of Rexecutor you will need to have an implementation of Backend.

The rexecutor library only provides and in memory implementation backend::memory::InMemoryBackend which is primarily provided for testing purposes. Instead a seperate crate implementing the Backend should be used for example rexecutor-sqlx

Creating executors

Jobs are defined by creating a struct/enum and implementing Executor for it.

Example defining an executor

You can define and enqueue a job as follows:

use rexecutor::prelude::*;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
let backend = InMemoryBackend::new().paused();
Rexecutor::new(backend).set_global_backend().unwrap();
struct EmailJob;

#[async_trait::async_trait]
impl Executor for EmailJob {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_job";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        println!("{} running, with args: {}", Self::NAME, job.data);
        /// Do something important with an email
        ExecutionResult::Done
    }
}

tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let _ = EmailJob::builder()
        .with_data("bob.shuruncle@example.com".to_owned())
        .schedule_in(TimeDelta::hours(3))
        .enqueue()
        .await;

    assert_enqueued!(
        with_data: "bob.shuruncle@example.com".to_owned(),
        scheduled_after: Utc::now() + TimeDelta::minutes(170),
        scheduled_before: Utc::now() + TimeDelta::minutes(190),
        for_executor: EmailJob
    );
});

Unique jobs

It is possible to ensure uniqueness of jobs based on certain criteria. This can be defined as part of the implementation of Executor via Executor::UNIQUENESS_CRITERIA or when inserting the job via job::builder::JobBuilder::unique.

For example to ensure that only one unique job is ran every five minutes it is possible to use the following uniqueness criteria.

use rexecutor::prelude::*;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
let backend = InMemoryBackend::new().paused();
Rexecutor::new(backend).set_global_backend().unwrap();
struct UniqueJob;

#[async_trait::async_trait]
impl Executor for UniqueJob {
    type Data = ();
    type Metadata = ();
    const NAME: &'static str = "unique_job";
    const MAX_ATTEMPTS: u16 = 1;
    const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = Some(
        UniquenessCriteria::by_executor()
            .and_within(TimeDelta::seconds(300))
            .on_conflict(Replace::priority().for_statuses(&JobStatus::ALL)),
    );
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        println!("{} running, with args: {:?}", Self::NAME, job.data);
        // Do something important
        ExecutionResult::Done
    }
}

tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let _ = UniqueJob::builder().enqueue().await;
    let _ = UniqueJob::builder().enqueue().await;

    // Only one of jobs was enqueued
    assert_enqueued!(
        1 job,
        scheduled_before: Utc::now(),
        for_executor: UniqueJob
    );
});

Additionally it is possible to specify what action should be taken when there is a conflicting job. In the example above the priority is override. For more details of how to use uniqueness see job::uniqueness_criteria::UniquenessCriteria.

Overriding Executor default values

When defining an Executor you specify the maximum number of attempts via Executor::MAX_ATTEMPTS. However, when inserting a job it is possible to override this value by calling job::builder::JobBuilder::with_max_attempts (if not called the max attempts will be equal to Executor::MAX_ATTEMPTS).

Similarly, the executor can define a job uniqueness criteria via Executor::UNIQUENESS_CRITERIA. However, using job::builder::JobBuilder::unique it is possible to override this value for a specific job.

Setting up executors to run

For each executor you would like to run Rexecutor::with_executor should be called. Being explicit about this opens the possibility of having specific nodes in a cluster running as worker nodes for certain enqueued jobs while other node not responsible for their execution.

Example setting up executors

use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;

#[async_trait::async_trait]
impl Executor for RefreshWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "refresh_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_scheduler";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "registration_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = InMemoryBackend::new();
    Rexecutor::new(backend)
        .with_executor::<RefreshWorker>()
        .with_executor::<EmailScheduler>()
        .with_executor::<RegistrationWorker>();
});

Enqueuing jobs

Generally jobs will be enqueued using the job::builder::JobBuilder returned by Executor::builder.

When enqueuing jobs the data and metadata of the job can be specified. Additionally, the default value of the Executor can be overriden.

Overriding Executor default values

When defining an Executor you specify the maximum number of attempts via Executor::MAX_ATTEMPTS. However, when inserting a job it is possible to override this value by calling job::builder::JobBuilder::with_max_attempts (if not called the max attempts will be equal to Executor::MAX_ATTEMPTS.

Similarly, the executor can define a job uniqueness criteria via Executor::UNIQUENESS_CRITERIA. However, using job::builder::JobBuilder::unique it is possible to override this value for a specific job.

Example enqueuing a job

use rexecutor::prelude::*;
use std::sync::Arc;
use chrono::{Utc, TimeDelta};
use rexecutor::backend::memory::InMemoryBackend;
use rexecutor::assert_enqueued;
pub(crate) struct ExampleExecutor;

#[async_trait::async_trait]
impl Executor for ExampleExecutor {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "simple_executor";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = Arc::new(InMemoryBackend::new().paused());
    Rexecutor::new(backend.clone()).set_global_backend().unwrap();

    ExampleExecutor::builder()
        .with_max_attempts(2)
        .with_tags(vec!["initial_job", "delayed"])
        .with_data("First job".into())
        .schedule_in(TimeDelta::hours(2))
        .enqueue_to_backend(&backend)
        .await
        .unwrap();

    assert_enqueued!(
        to: backend,
        with_data: "First job".to_owned(),
        tagged_with: ["initial_job", "delayed"],
        scheduled_after: Utc::now() + TimeDelta::minutes(110),
        scheduled_before: Utc::now() + TimeDelta::minutes(130),
        for_executor: ExampleExecutor
    );
});

Compile time scheduling of cron jobs

It can be useful to have jobs that run on a given schedule. Jobs like this can be setup using either Rexecutor::with_cron_executor or Rexecutor::with_cron_executor_for_timezone. The later is use to specify the specific timezone that the jobs should be scheduled to run in.

Example setting up a UTC cron job

To setup a cron jobs to run every day at midnight you can use the following code.

use rexecutor::prelude::*;
use rexecutor::backend::{Backend, memory::InMemoryBackend};
struct CronJob;
#[async_trait::async_trait]
impl Executor for CronJob {
    type Data = String;
    type Metadata = ();
    const NAME: &'static str = "cron_job";
    const MAX_ATTEMPTS: u16 = 1;
    async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        /// Do something important
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let schedule = cron::Schedule::try_from("0 0 0 * * *").unwrap();

    let backend = InMemoryBackend::new();
    Rexecutor::new(backend).with_cron_executor::<CronJob>(schedule, "important data".to_owned());
});

Pruning jobs

After jobs have completed, been cancelled, or discarded it is useful to be able to clean up. To setup the job pruner Rexecutor::with_job_pruner should be called passing in the given PrunerConfig.

Given the different ways in which jobs can finish it is often useful to be able to have fine grained control over how old jobs should be cleaned up. PrunerConfig enables such control.

When constructing PrunerConfig a cron::Schedule is provided to specify when the pruner should run.

Depending on the load/throughput of the system the pruner can be scheduled to run anywhere from once a year through to multiple times per hour.

Example configuring the job pruner

use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;

#[async_trait::async_trait]
impl Executor for RefreshWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "refresh_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_scheduler";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "registration_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
let config = PrunerConfig::new(cron::Schedule::from_str("0 0 * * * *").unwrap())
    .with_max_concurrency(Some(2))
    .with_pruner(
        Pruner::max_age(TimeDelta::days(31), JobStatus::Complete)
            .only::<RefreshWorker>()
            .and::<EmailScheduler>(),
    )
    .with_pruner(
        Pruner::max_length(200, JobStatus::Discarded)
            .except::<RefreshWorker>()
            .and::<EmailScheduler>(),
    );

tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = InMemoryBackend::new();
    Rexecutor::new(backend)
        .with_executor::<RefreshWorker>()
        .with_executor::<EmailScheduler>()
        .with_executor::<RegistrationWorker>()
        .with_job_pruner(config);
});

Shutting rexecutor down

To avoid jobs getting killed mid way through their executions it is important to make use of graceful shutdown. This can either explicitly be called using Rexecutor::graceful_shutdown, or via use of the DropGuard obtained via Rexecutor::drop_guard.

Using Rexecutor::graceful_shutdown or Rexecutor::drop_guard will ensure that all currently executing jobs will be allowed time to complete before shutting rexecutor down.

Example using the DropGuard

use rexecutor::prelude::*;
use std::str::FromStr;
use chrono::TimeDelta;
use rexecutor::backend::memory::InMemoryBackend;
pub(crate) struct RefreshWorker;
pub(crate) struct EmailScheduler;
pub(crate) struct RegistrationWorker;

#[async_trait::async_trait]
impl Executor for RefreshWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "refresh_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for EmailScheduler {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "email_scheduler";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
#[async_trait::async_trait]
impl Executor for RegistrationWorker {
    type Data = String;
    type Metadata = String;
    const NAME: &'static str = "registration_worker";
    const MAX_ATTEMPTS: u16 = 2;
    async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
        ExecutionResult::Done
    }
}
tokio::runtime::Builder::new_current_thread().build().unwrap().block_on(async {
    let backend = InMemoryBackend::new();
    // Note this must be given a name to ensure it is dropped at the end of the scope.
    // See https://doc.rust-lang.org/book/ch18-03-pattern-syntax.html#ignoring-an-unused-variable-by-starting-its-name-with-_
    let _guard = Rexecutor::new(backend)
        .with_executor::<RefreshWorker>()
        .with_executor::<EmailScheduler>()
        .with_executor::<RegistrationWorker>()
        .drop_guard();
});

Global backend

Rexecutor can be ran either with use of a global backend. This enables the use of the convenience job::builder::JobBuilder::enqueue method which does not require a reference to the backend to be passed down to the code that needs to enqueue a job.

The global backend can be set using Rexecutor::set_global_backend this should only be called once otherwise it will return an error.

In fact for a single Rexecutor instance it is impossible to call this twice, the following code snippet will fail to compile

use rexecutor::prelude::*;
let backend = rexecutor::backend::memory::InMemoryBackend::new();
Rexecutor::new(backend).set_global_backend().set_global_backend();

Note, using a global backend has many of the same drawbacks of any global variable in particular it can make unit testing more difficult.

Code of conduct

We follow the Rust code of conduct.

Currently the moderation team consists of John Bell only. We would welcome more members: if you would like to join the moderation team, please contact John Bell.

Licence

The project is licensed under the MIT license.

Dependencies

~37–51MB
~880K SLoC