#event-sourcing #cqrs #ddd

event-store-adapter-rs

crate to make DynamoDB an Event Store

95 releases (46 stable)

new 1.1.30 Apr 27, 2024
1.1.19 Mar 29, 2024
0.2.45 Jan 18, 2024
0.2.39 Dec 31, 2023
0.2.28 Nov 25, 2023

#2 in #cqrs

Download history 73/week @ 2024-01-05 633/week @ 2024-01-12 80/week @ 2024-01-19 521/week @ 2024-01-26 234/week @ 2024-02-02 474/week @ 2024-02-09 817/week @ 2024-02-16 888/week @ 2024-02-23 812/week @ 2024-03-01 884/week @ 2024-03-08 914/week @ 2024-03-15 834/week @ 2024-03-22 414/week @ 2024-03-29 488/week @ 2024-04-05 723/week @ 2024-04-12 671/week @ 2024-04-19

2,428 downloads per month

MIT/Apache

49KB
1K SLoC

event-store-adapter-rs

Workflow Status crates.io docs.rs Renovate License

This library is designed to turn DynamoDB into an Event Store for CQRS/Event Sourcing.

日本語

Usage

You can easily implement an Event Sourcing-enabled repository using EventStore.

pub struct UserAccountRepository {
  event_store: EventStore<UserAccount, UserAccountEvent>,
}

impl UserAccountRepository {
  pub async fn store_event(&mut self, event: &UserAccountEvent, version: usize) -> Result<(), RepositoryError> {
    let result = self.event_store.persist_event(event, version).await;
    match result {
      Ok(_) => Ok(()),
      Err(err) => Err(Self::handle_event_store_write_error(err)),
    }
  }

  pub async fn store_event_and_snapshot(
    &mut self,
    event: &UserAccountEvent,
    snapshot: &UserAccount,
  ) -> Result<(), RepositoryError> {
    let result = self.event_store.persist_event_and_snapshot(event, snapshot).await;
    match result {
      Ok(_) => Ok(()),
      Err(err) => Err(Self::handle_event_store_write_error(err)),
    }
  }

  pub async fn find_by_id(&self, id: &UserAccountId) -> Result<Option<UserAccount>, RepositoryError> {
    let snapshot_result = self.event_store.get_latest_snapshot_by_id(id).await;
    match snapshot_result {
      Ok(snapshot_opt) => match snapshot_opt {
        Some(snapshot) => {
          let events = self
            .event_store
            .get_events_by_id_since_seq_nr(id, snapshot.seq_nr() + 1)
            .await;
          match events {
            Ok(events) => Ok(Some(UserAccount::replay(events, snapshot))),
            Err(err) => Err(Self::handle_event_store_read_error(err)),
          }
        }
        None => Ok(None),
      },
      Err(err) => Err(Self::handle_event_store_read_error(err)),
    }
  }
    
}

The following is an example of the repository usage.

let event_store = EventStore::new(
  aws_dynamodb_client.clone(),
  journal_table_name.to_string(),
  journal_aid_index_name.to_string(),
  snapshot_table_name.to_string(),
  snapshot_aid_index_name.to_string(),
  64,
);
 
let mut repository = UserAccountRepository::new(event_store);
 
// Replay the aggregate from the event store
let mut user_account = repository.find_by_id(user_account_id).await.unwrap();

// Execute a command on the aggregate
let user_account_event = user_account.rename(name).unwrap();
 
// Store the new event without a snapshot
repository
  .store_event(&user_account_event, user_account.version())
  .await
// Store the new event with a snapshot
//  repository
//  .store_event_and_snapshot(&user_account_event, &user_account)
//  .await

Table Specifications

See docs/DATABASE_SCHEMA.md.

CQRS/Event Sourcing Example

See j5ik2o/cqrs-es-example-rs.

License.

MIT License. See LICENSE for details.

Dependencies

~22MB
~391K SLoC