1 unstable release

Uses new Rust 2024

0.1.0 Dec 26, 2025

#1783 in Web programming

Apache-2.0

67KB
1.5K SLoC

armature-cqrs

CQRS (Command Query Responsibility Segregation) for the Armature framework.

Features

  • Commands - Write operations
  • Queries - Read operations
  • Handlers - Command/query processing
  • Mediator - Request dispatching

Installation

[dependencies]
armature-cqrs = "0.1"

Quick Start

use armature_cqrs::{Command, Query, Mediator};

#[derive(Command)]
struct CreateUser { name: String }

#[derive(Query)]
struct GetUser { id: String }

let mediator = Mediator::new()
    .register_command::<CreateUser, _>(create_user_handler)
    .register_query::<GetUser, _>(get_user_handler);

// Send command
mediator.send(CreateUser { name: "Alice".into() }).await?;

// Execute query
let user = mediator.query(GetUser { id: "123".into() }).await?;

License

MIT OR Apache-2.0


lib.rs:

CQRS (Command Query Responsibility Segregation) for Armature

This crate provides CQRS pattern implementation with command/query separation.

Features

  • Command Bus - Execute commands (writes)
  • Query Bus - Execute queries (reads)
  • Projections - Build read models from events
  • Type-safe - Strong typing with compile-time safety
  • Async - Full async/await support

Quick Start

use armature_cqrs::*;
use async_trait::async_trait;

// Define a command
struct CreateUserCommand {
    email: String,
}

impl Command for CreateUserCommand {
    type Result = String; // User ID
}

// Define command handler
struct CreateUserHandler;

#[async_trait]
impl CommandHandler<CreateUserCommand> for CreateUserHandler {
    async fn handle(&self, command: CreateUserCommand) -> Result<String, CommandError> {
        // Business logic here
        Ok(format!("user-{}", uuid::Uuid::new_v4()))
    }
}

// Define a query
struct GetUserQuery {
    user_id: String,
}

impl Query for GetUserQuery {
    type Result = User;
}

// Define query handler
struct GetUserHandler;

#[async_trait]
impl QueryHandler<GetUserQuery> for GetUserHandler {
    async fn handle(&self, query: GetUserQuery) -> Result<User, QueryError> {
        // Fetch from read model
        Ok(User {
            id: query.user_id,
            email: "alice@example.com".to_string(),
        })
    }
}

// Use the buses
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create buses
    let command_bus = CommandBus::new();
    let query_bus = QueryBus::new();

    // Register handlers
    command_bus.register::<CreateUserCommand, _>(CreateUserHandler);
    query_bus.register::<GetUserQuery, _>(GetUserHandler);

    // Execute command
    let user_id = command_bus.execute(CreateUserCommand {
        email: "alice@example.com".to_string(),
    }).await?;

    // Execute query
    let user = query_bus.execute(GetUserQuery { user_id }).await?;
    println!("User: {:?}", user);

    Ok(())
}

Projections

use armature_events::Event;
use async_trait::async_trait;

struct UserListProjection {
    // Read model storage
}

#[async_trait]
impl Projection for UserListProjection {
    async fn project(&self, event: &dyn Event) -> Result<(), ProjectionError> {
        match event.event_name() {
            "user_created" => {
                // Update read model
            }
            "user_deleted" => {
                // Update read model
            }
            _ => {}
        }
        Ok(())
    }
}

Integration with Event Sourcing

use armature_eventsourcing::*;
use armature_events::EventBus;

// Command handler uses aggregate repository
#[async_trait]
impl CommandHandler<CreateUserCommand> for CreateUserHandler {
    async fn handle(&self, command: CreateUserCommand) -> Result<String, CommandError> {
        let mut user = UserAggregate::new_instance(uuid::Uuid::new_v4().to_string());

        // Add domain event
        user.create(command.email)?;

        // Save aggregate (persists events)
        self.repository.save(&mut user).await?;

        // Publish events to event bus for projections
        for event in user.uncommitted_events() {
            self.event_bus.publish(event.clone()).await?;
        }

        Ok(user.aggregate_id().to_string())
    }
}

Dependencies

~10–14MB
~173K SLoC