#outbox #http-gateway #outbox-pattern #outbox-table #outbox-event

outbox-pattern-processor

Library to make easier to dispatch your outbox-pattern data from database to SQS, SNS and/or HTTP(S) gateways

12 releases

new 0.3.6 Dec 6, 2024
0.3.5 Nov 13, 2024
0.3.3 Oct 31, 2024
0.2.3 Oct 3, 2024
0.1.1 Sep 30, 2024

#396 in Web programming

Download history 126/week @ 2024-08-31 11/week @ 2024-09-07 22/week @ 2024-09-14 11/week @ 2024-09-21 634/week @ 2024-09-28 129/week @ 2024-10-05 90/week @ 2024-10-12 368/week @ 2024-10-19 248/week @ 2024-10-26 168/week @ 2024-11-02 221/week @ 2024-11-09 74/week @ 2024-11-16 169/week @ 2024-11-23 120/week @ 2024-11-30

604 downloads per month

MIT license

72KB
1.5K SLoC

Outbox Pattern Processor

Library to make easier to dispatch your outbox-pattern data from database to SQS, SNS and/or HTTP(S) gateways.

  • Simple: Your application only need to write into outbox table.
  • Scalable: It's possible to run more than one instance to increase performance without lose order.

MIT licensed

Pre-requisites

Database configuration

How to use

To use as docker image, see more here

[!TIP] All events, HTTP, SNS or SQS, always include the header or message-attribute called x-idempotent-key. It can be used to avoid consume duplicated events.

Persisting outbox event data

Rust
HTTP
let partition_key = Uuid::now_v7(); // or your own domain unique uuid
let url = "https://your-detination-url.com/some-path";
let headers = None;
let payload = json!({
    "foo": "bar",
};

let outbox = Outbox::http_post_json(partition_key, url, headers, &payload); // can also choose for post, put or patch
SNS
let partition_key = Uuid::now_v7(); // or your own domain unique uuid
let topic_arn = "arn:aws:sns:us-east-1:000000000000:topic";
let headers = None;
let payload = "any data"; // can also be a JSON stringified

let outbox = Outbox::sns(partition_key, topic_arn, headers, &payload);
SQS
let partition_key = Uuid::now_v7(); // or your own domain unique uuid
let queue_url = "http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000/queue";
let headers = None;
let payload = "any data"; // can also be a JSON stringified

let outbox = Outbox::sqs(partition_key, url, headers, &payload);
Any outbox kind with delay
let outbox = Outbox::http_post_json(partition_key, url, headers, &payload) // or any other, like sqs and sns
    .delay(Utc::now() + Duration::from_secs(10));
Persisting
let stored_outbox = OutboxRepository::insert(&mut transaction, outbox).await?;
let idempotent_key = stored_outbox.idempotent_key;
Manually

[!NOTE]
More details and examples about columns data in database configuration

insert into outbox
    (idempotent_key, partition_key, destinations, headers, payload)
values
    ($1, $2, $3, $4, $5)
returning *

Initialize

Simple
let custom_resources = OutboxProcessorResources::new(
    ctx.resources.postgres_pool.clone(),
    ctx.resources.sqs_client.clone(),
    ctx.resources.sns_client.clone(),
)
    // each optional with default value
    .with_outbox_query_limit(50)
    .with_http_timeout_in_millis(3000)
    .with_max_in_flight_interval_in_seconds(30)
    .with_outbox_execution_interval_in_seconds(5)
    .with_delete_after_process_successfully(false)
    .with_delay_for_failure_attempt_in_seconds(0)
    .with_outbox_failure_limit(10)
    .with_scheduled_clear_locked_partition(false)
    .with_outbox_cleaner_execution_interval_in_seconds(60);

let _ = OutboxProcessor::new(outbox_processor_resources)
        .init()
        .await;
Tokio + Axum example
use outbox_pattern_processor::environment::Environment;
use outbox_pattern_processor::outbox_processor::{OutboxProcessor, OutboxProcessorResources};
use outbox_pattern_processor::shutdown::Shutdown;
use outbox_pattern_processor_worker::routes::Routes;
use outbox_pattern_processor_worker::state::AppState;
use std::env;
use std::net::SocketAddr;
use tokio::net::TcpListener;
use tracing::log::info;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use wg::WaitGroup;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());

    let rust_log = Environment::string("RUST_LOG", "INFO,sqlx::postgres::notice=WARN,sqlx::query=WARN");
    env::set_var("RUST_LOG", rust_log);

    tracing_subscriber::registry()
        .with(EnvFilter::from_default_env())
        .with(Box::new(tracing_subscriber::fmt::layer().with_writer(non_blocking)))
        .init();

    info!("Starting...");

    let wait_group = WaitGroup::new();

    let app_state = AppState::new().await?;

    tokio::spawn(init_http_server(app_state.clone(), wait_group.add(1)));
    tokio::spawn(init_outbox(app_state.clone(), wait_group.add(1)));

    wait_group.wait();

    info!("Stopped!");

    Ok(())
}

async fn init_http_server(
    app_state: AppState,
    wait_group: WaitGroup,
) {
    info!("Starting http server...");
    let routes = Routes::routes(&app_state).await;

    let addr = SocketAddr::from(([0, 0, 0, 0], 9095));

    if let Ok(listener) = TcpListener::bind(addr).await {
        info!("Running http server...");
        let _ = axum::serve(listener, routes).with_graceful_shutdown(Shutdown::signal("Stopping http server...")).await;
    }

    wait_group.done();

    info!("Http server stopped!");
}

async fn init_outbox(
    app_state: AppState,
    wait_group: WaitGroup,
) {
    let custom_resources = OutboxProcessorResources::new(
        ctx.resources.postgres_pool.clone(),
        ctx.resources.sqs_client.clone(),
        ctx.resources.sns_client.clone(),
    )
        // each optional with default value
        .with_outbox_query_limit(50)
        .with_http_timeout_in_millis(3000)
        .with_max_in_flight_interval_in_seconds(5)
        .with_outbox_execution_interval_in_seconds(30)
        .with_delete_after_process_successfully(false);

    let _ = OutboxProcessor::new(outbox_processor_resources)
        .with_graceful_shutdown(Shutdown::signal("Stopping outbox processor..."))
        .init()
        .await;

    wait_group.done();
}

License

This project is licensed under the MIT license.

Dependencies

~71MB
~1M SLoC