11 releases (5 breaking)

0.6.0 Mar 28, 2024
0.5.1 Mar 26, 2024
0.4.1 Dec 27, 2023
0.3.0 Oct 20, 2023
0.1.5 Dec 9, 2022

#179 in Asynchronous

Download history 16/week @ 2023-12-25 30/week @ 2024-01-22 8/week @ 2024-02-19 12/week @ 2024-02-26 137/week @ 2024-03-04 17/week @ 2024-03-11 273/week @ 2024-03-25 62/week @ 2024-04-01

358 downloads per month

MIT/Apache

255KB
6K SLoC

Effectum

A Rust job queue library, based on SQLite so it doesn't depend on any other services.

Currently this is just a library embeddable into Rust applications, but future goals include bindings into other languages and the ability to run as a standalone server, accessible by HTTP and gRPC. This will be designed so that a product can start with the embedded version to use minimal infrastructure, and then move to the server version with minimal changes when the time comes to scale out.

use effectum::{Error, Job, JobState, JobRunner, RunningJob, Queue, Worker};

#[derive(Debug)]
pub struct JobContext {
   // database pool or other things here
}

#[derive(Serialize, Deserialize)]
struct RemindMePayload {
  email: String,
  message: String,
}

async fn remind_me_job(job: RunningJob, context: Arc<JobContext>) -> Result<(), Error> {
    let payload: RemindMePayload = job.json_payload()?;
    // do something with the job
    Ok(())
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Error> {
  // Create a queue
  let queue = Queue::new(&PathBuf::from("effectum.db")).await?;

  // Define a type job for the queue.
  let a_job = JobRunner::builder("remind_me", remind_me_job).build();

  let context = Arc::new(JobContext{
    // database pool or other things here
  });

  // Create a worker to run jobs.
  let worker = Worker::builder(&queue, context)
    .max_concurrency(10)
    .jobs([a_job])
    .build();

  // Submit a job to the queue.
  let job_id = Job::builder("remind_me")
    .run_at(time::OffsetDateTime::now_utc() + std::time::Duration::from_secs(3600))
    .json_payload(&RemindMePayload {
        email: "me@example.com".to_string(),
        message: "Time to go!".to_string()
    })?
    .add_to(&queue)
    .await?;

  // See what's happening with the job.
  let status = queue.get_job_status(job_id).await?;
  assert_eq!(status.state, JobState::Pending);

  // Do other stuff...

  Ok(())
}

Changelog

Full Development Notes

Roadmap

Released

  • Multiple job types
  • Jobs can be added with higher priority to "skip the line"
  • Workers can run multiple jobs concurrently
  • Schedule jobs in the future
  • Automatically retry failed jobs, with exponential backoff
  • Checkpoints to allow smart resumption of a job if it fails midway through.
  • Immediately schedule a retry for jobs that were running when the process restarts unexpectedly
  • Cancel or modify pending jobs
  • Support for recurring jobs

Soon

  • Optional sweeper to prevent "done" job data from building up indefinitely

Later

  • Node.js bindings
  • Run as a standalone server over gRPC
  • Helpers for communicating with the queue via the outbox pattern.

Dependencies

~40MB
~643K SLoC