13 releases
Uses new Rust 2024
| new 1.0.0-rc.3 | Feb 2, 2026 |
|---|---|
| 1.0.0-rc.2 | Jan 10, 2026 |
| 1.0.0-rc.1 | Dec 25, 2025 |
| 1.0.0-beta.2 | Nov 17, 2025 |
| 1.0.0-alpha.3 | Oct 25, 2025 |
#502 in Asynchronous
254 downloads per month
Used in spring-apalis
125KB
4K
SLoC
apalis-sqlite
Background task processing for Rust using apalis and sqlite.
Features
- Reliable job queue using SQLite as the backend.
- Multiple storage types: standard polling and event-driven (hooked) storage.
- Custom codecs for serializing/deserializing job arguments as bytes.
- Heartbeat and orphaned job re-enqueueing for robust job processing.
- Integration with
apalisworkers and middleware.
Storage Types
SqliteStorage: Standard polling-based storage.SqliteStorageWithHook: Event-driven storage using SQLite update hooks for low-latency job fetching.SharedSqliteStorage: Shared storage for multiple job types.
The naming is designed to clearly indicate the storage mechanism and its capabilities, but under the hood the result is the SqliteStorage struct with different configurations.
Examples
Basic Worker Example
use std::time::Duration;
use apalis::prelude::*;
use apalis_sqlite::*;
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let pool = SqlitePool::connect(":memory:").await.unwrap();
SqliteStorage::setup(&pool).await.unwrap();
let mut backend = SqliteStorage::new(&pool);
let mut start = 0;
let mut items = stream::repeat_with(move || {
start += 1;
let task = Task::builder(start)
.run_after(Duration::from_secs(1))
.priority(1)
.max_attempts(5)
.build();
task
})
.take(10);
backend.push_all(&mut items).await.unwrap();
async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> {
Ok(())
}
let worker = WorkerBuilder::new("worker-1")
.backend(backend)
.build(send_reminder);
worker.run().await.unwrap();
}
Hooked Worker Example (Event-driven)
use std::time::Duration;
use apalis::prelude::*;
use apalis_sqlite::*;
use futures::stream::{self, StreamExt};
#[tokio::main]
async fn main() {
let lazy_strategy = StrategyBuilder::new()
.apply(IntervalStrategy::new(Duration::from_secs(5)))
.build();
let config = Config::new("queue")
.with_poll_interval(lazy_strategy)
.set_buffer_size(5);
let backend = SqliteStorage::new_with_callback(":memory:", &config);
let pool = backend.pool();
SqliteStorage::setup(&pool).await.unwrap();
tokio::spawn({
let pool = pool.clone();
let config = config.clone();
async move {
tokio::time::sleep(Duration::from_secs(2)).await;
let mut start = 0;
let items = stream::repeat_with(move || {
start += 1;
Task::builder(serde_json::to_vec(&start).unwrap())
.run_after(Duration::from_secs(1))
.priority(start)
.build()
})
.take(20)
.collect::<Vec<_>>()
.await;
/// Push with just a `Pool`
apalis_sqlite::sink::push_tasks(pool, config, items).await.unwrap();
}
});
async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> {
Ok(())
}
let worker = WorkerBuilder::new("worker-2")
.backend(backend)
.build(send_reminder);
worker.run().await.unwrap();
}
Workflow Example
use std::time::Duration;
use apalis::prelude::*;
use apalis_sqlite::*;
use apalis_workflow::*;
#[tokio::main]
async fn main() {
let workflow = Workflow::new("odd-numbers-workflow")
.and_then(|a: usize| async move {
Ok::<Vec<usize>, BoxDynError>((0..=a).collect::<Vec<usize>>())
})
.filter_map(|x: usize| async move {
if x % 2 != 0 { Some(x) } else { None }
})
.filter_map(|x: usize| async move {
if x % 3 != 0 { Some(x) } else { None }
})
.filter_map(|x: usize| async move {
if x % 5 != 0 { Some(x) } else { None }
})
.delay_for(Duration::from_millis(1000))
.and_then(|a: Vec<usize>| async move {
println!("Sum: {}", a.iter().sum::<usize>());
Ok::<(), BoxDynError>(())
});
let pool = SqlitePool::connect(":memory:").await.unwrap();
SqliteStorage::setup(&pool).await.unwrap();
let mut sqlite = SqliteStorage::new_in_queue(&pool, "test-workflow");
sqlite.push_start(100usize).await.unwrap();
let worker = WorkerBuilder::new("rango-tango")
.backend(sqlite)
.on_event(|ctx, ev| {
println!("On Event = {:?}", ev);
if matches!(ev, Event::Error(_)) {
ctx.stop().unwrap();
}
})
.build(workflow);
worker.run().await.unwrap();
}
Shared Example
Full support for sharing the same connection. This example shows how to run multiple types with one function
use std::{collections::HashMap, time::Duration};
use apalis::prelude::*;
use apalis_sqlite::{SharedSqliteStorage, SqliteStorage};
use futures::stream;
#[tokio::main]
async fn main() {
let mut store = SharedSqliteStorage::new(":memory:");
SqliteStorage::setup(store.pool()).await.unwrap();
let mut map_store = store.make_shared().unwrap();
let mut int_store = store.make_shared().unwrap();
map_store
.push_stream(&mut stream::iter(vec![HashMap::<String, String>::new()]))
.await
.unwrap();
int_store.push(99).await.unwrap();
async fn send_reminder<T, I>(
_: T,
_task_id: TaskId<I>,
wrk: WorkerContext,
) -> Result<(), BoxDynError> {
tokio::time::sleep(Duration::from_secs(2)).await;
wrk.stop().unwrap();
println!("Reminder sent!");
Ok(())
}
let int_worker = WorkerBuilder::new("rango-tango-2")
.backend(int_store)
.build(send_reminder);
let map_worker = WorkerBuilder::new("rango-tango-1")
.backend(map_store)
.build(send_reminder);
tokio::try_join!(int_worker.run(), map_worker.run()).unwrap();
}
Observability
You can track your jobs using apalis-board.

License
Licensed under either of Apache License, Version 2.0 or MIT license at your option.
Dependencies
~34–54MB
~772K SLoC