#retry #rabbitmq #processing #efficient

dbq

Job queueing and processing library with queues stored in Postgres 9.5+

1 unstable release

0.1.0 Apr 6, 2019

MIT license

38KB
741 lines

DBQ

Job queueing and processing library with queues stored in Postgres 9.5+

Documentation

Overview

Job queues and background processing often require additional technologies like Redis or RabbitMQ. While these are great solutions, they can be overkill for many job processing requirements. The goal of dbq is to make job queueing and background processing simple and efficient with the Postgres database that is already part of your stack. dbq provides strong durability and the common features found in queueing specific technologies: support for multiple queues, retry on failure, backoff on retry, and dead letters storage.

Example

extern crate dbq;
extern crate postgres;
extern crate serde_json;

use std::error::Error;
use std::result::Result;
use std::thread;
use std::time::Duration;

// A simple handler that prints "Hello!" for any job it runs
#[derive(Clone)]
struct HelloHandler {}

fn main() -> Result<(), Box<Error>> {
    let db_conn_params = "postgres://postgres:password@localhost/db";
    let conn = postgres::Connection::connect(db_conn_params, postgres::TlsMode::None)?;

    // Schema config allows for changing the database schema and table names
    // Defaults are no schema (default is used) and tables are prefixed with "dbq_"
    let schema_config = dbq::SchemaConfig::default();
    // Run the migrations on start. Migrations are idempotent and should be run
    // on startup
    dbq::run_migrations(&schema_config, &conn, Some(646_271)).unwrap();

    let queue = dbq::Queue::new(schema_config, "de_lancie_q".to_string());

    // Enqueue a job
    queue.enqueue("job_a", serde_json::Value::Null, 3, &conn)?;

    // Start a worker pool
    let workers_config =
        dbq::WorkerPoolConfig::new(queue, db_conn_params, HelloHandler {})?;
    let workers = dbq::WorkerPool::start(workers_config);

    // Give a worker time to find and start the job
    thread::sleep(Duration::new(1, 0));
    // Shutdown the worker pool waiting for all currently executing jobs to finish
    workers.join();
    Ok(())
}

impl dbq::Handler for HelloHandler {
    type Error = std::io::Error;

    fn handle(&self, _ctx: dbq::JobContext) -> Result<(), Self::Error> {
        println!("Hello!");
        Ok(())
    }
}

How it works

dbq creates two tables in Postgres that it uses for storing the queue and "dead letter" jobs. These name and schema of these tables can be configured using SchemaConfig. Enqueueing a job is as simple as inserting into the queue table. A WorkerPool is used for job processing. The WorkerPool spawns an OS thread for each worker, and each worker continuously polls the queue to see if any jobs are available. Each work claims a job by locking the row using FOR UPDATE and running the job as part of a transaction. Workers poll the queue using SKIP LOCKED to ignore any work that is currently being processed by another worker. If the job fails, the job is updated in the queue so that it will be retried or moved to the dead letters table if it has reached its max attempts. If the job succeeds, the job is deleted from the queue.

Each job is part of a queue and is parameterized by a string class and JSON arguments. The class identifies the type of the job and the arguments are anything that the job needs to run.

But database queues are a bad idea...

While database queues may still not be the best approach for very high throughput use cases, newer database features (particularly SKIP LOCKED) in postgres make a database queue a very reasonable choice for low to medium throughput applications. Here are some of the major tradeoffs between database queues and queueing-specific technologies:

Advantages of a database queue

  • Synchronize all database changes within a job to the success or failure of the job using transactions
  • Strong durability guarantees means your jobs are as safe as the rest of your data (durability is often available in other queueing technologies as an option)
  • No need to add another piece of technology to the stack. Use the Postgres database you already have!

Disadvantages of a database queue

  • Read and write scalability are limited by the database (almost always a single node for writes)
  • Durability may not be necessary for all use cases
  • Machine resources are shared with all other database queries and statements

Dependencies

~5.5MB
~125K SLoC