#job-queue #postgresql #task-scheduling #async-task #job-scheduling #background-task #worker

graphile_worker

High performance Rust/PostgreSQL job queue (also suitable for getting jobs generated by PostgreSQL triggers/functions out into a different work queue)

9 releases (4 breaking)

0.8.0 May 29, 2024
0.7.2 May 28, 2024
0.7.0 Feb 28, 2024
0.6.2 Feb 14, 2024
0.4.0 Jan 31, 2024

#404 in Database interfaces

Download history 7/week @ 2024-09-23 12/week @ 2024-09-30 3/week @ 2024-10-14 25/week @ 2024-10-21 20/week @ 2024-10-28 11/week @ 2024-11-04 2/week @ 2024-11-18 15/week @ 2024-11-25 81/week @ 2024-12-02 88/week @ 2024-12-09 44/week @ 2024-12-16

228 downloads per month

Custom license

280KB
5.5K SLoC

Graphile Worker RS

Codecov Crates.io Documentation

Rewrite of Graphile Worker in Rust. If you like this library go sponsor Benjie project, all research has been done by him, this library is only a rewrite in Rust 🦀. The port should mostly be compatible with graphile-worker (meaning you can run it side by side with Node.JS).

The following differs from Graphile Worker :

  • No support for batch job (I don't need it personnally, but if this is not your case, create an issue and I will see what I can do)
  • In Graphile Worker, each process has it's worker_id. In rust there is only one worker_id, then jobs are processed in your async runtime thread.

Job queue for PostgreSQL running on Rust - allows you to run jobs (e.g. sending emails, performing calculations, generating PDFs, etc) "in the background" so that your HTTP response/application code is not held up. Can be used with any PostgreSQL-backed application.

Add the worker to your project:

cargo add graphile_worker

Create tasks and run the worker

The definition of a task consist simply of an async function and a task identifier

use serde::{Deserialize, Serialize};
use graphile_worker::{WorkerContext, TaskHandler};

#[derive(Deserialize, Serialize)]
struct SayHello {
    message: String,
}

impl TaskHandler for SayHello {
    const IDENTIFIER: &'static str = "say_hello";

    async fn run(self, _ctx: WorkerContext) -> Result<(), ()> {
        println!("Hello {} !", self.message);
        Ok(())
    }
}

#[tokio::main]
async fn main() -> Result<(), ()> {
    graphile_worker::WorkerOptions::default()
        .concurrency(2)
        .schema("example_simple_worker")
        .define_job::<SayHello>()
        .pg_pool(pg_pool)
        .init()
        .await?
        .run()
        .await?;

    Ok(())
}

Schedule a job via SQL

Connect to your database and run the following SQL:

SELECT graphile_worker.add_job('say_hello', json_build_object('name', 'Bobby Tables'));

Schedule a job via RUST

#[tokio::main]
async fn main() -> Result<(), ()> {
    // ...
    let utils = worker.create_utils();

    // Using add_job
    utils.add_job(
        SayHello { name: "Bobby Tables".to_string() },
        Default::default(),
    ).await.unwrap();

    // You can also use `add_raw_job` if you don't have access to the task, or don't care about end 2 end safety
    utils.add_raw_job("say_hello", serde_json::json!({ "name": "Bobby Tables" }), Default::default()).await.unwrap();

    Ok(())
}

App state

You can provide app state through extension :

use serde::{Deserialize, Serialize};
use graphile_worker::{WorkerContext, TaskHandler};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;

#[derive(Clone, Debug)]
struct AppState {
    run_count: Arc<AtomicUsize>,
}

#[derive(Deserialize, Serialize)]
pub struct CounterTask;

impl TaskHandler for CounterTask {
    const IDENTIFIER: &'static str = "counter_task";

    async fn run(self, ctx: WorkerContext) {
        let app_state = ctx.extensions().get::<AppState>().unwrap();
        let run_count = app_state.run_count.fetch_add(1, SeqCst);
        println!("Run count: {run_count}");
    }
}

#[tokio::main]
async fn main() -> Result<(), ()> {
    graphile_worker::WorkerOptions::default()
        .concurrency(2)
        .schema("example_simple_worker")
        .add_extension(AppState {
            run_count: Arc::new(AtomicUsize::new(0)),
        })
        .define_job::<CounterTask>()
        .pg_pool(pg_pool)
        .init()
        .await?
        .run()
        .await?;

    Ok(())
}

Features

  • Standalone and embedded modes
  • Designed to be used both from JavaScript or directly in the database
  • Easy to test (recommended: run_once util)
  • Low latency (typically under 3ms from task schedule to execution, uses LISTEN/NOTIFY to be informed of jobs as they're inserted)
  • High performance (uses SKIP LOCKED to find jobs to execute, resulting in faster fetches)
  • Small tasks (uses explicit task names / payloads resulting in minimal serialisation/deserialisation overhead)
  • Parallel by default
  • Adding jobs to same named queue runs them in series
  • Automatically re-attempts failed jobs with exponential back-off
  • Customisable retry count (default: 25 attempts over ~3 days)
  • Crontab-like scheduling feature for recurring tasks (with optional backfill)
  • Task de-duplication via unique job_key
  • Append data to already enqueued jobs with "batch jobs"
  • Open source; liberal MIT license
  • Executes tasks written in Rust (these can call out to any other language or networked service)
  • Written natively in Rust
  • If you're running really lean, you can run Graphile Worker in the same Rust process as your server to keep costs and devops complexity down.

Status

Production ready but the API may be rough around the edges and might change.

Requirements

PostgreSQL 12+ Might work with older versions, but has not been tested.

Note: Postgres 12 is required for the generated always as (expression) feature

Installation

cargo add graphile_worker

Running

graphile_worker manages its own database schema (graphile_worker_worker). Just point at your database and we handle our own migrations.

Dependencies

~42–56MB
~1M SLoC