Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLATFORM-1079]: Update doc and examples #154

Merged
merged 3 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/esrs/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub trait Aggregate {
/// of their type, and their events!
const NAME: &'static str;

/// Internal aggregate state. This will be wrapped in `AggregateState` and could be used to validate
/// Internal aggregate state. This will be wrapped in [`AggregateState`] and could be used to validate
/// commands.
type State: Default + Clone + Send + Sync;

Expand Down
5 changes: 4 additions & 1 deletion src/esrs/event_bus.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use crate::{Aggregate, StoreEvent};
use async_trait::async_trait;

/// The `EventBus` trait is responsible of the publishing an event on a given bus implementation.
#[async_trait]
pub trait EventBus<A>: Sync
where
A: Aggregate,
{
/// Publish an Aggregate event on an Event bus defined by the user.
/// Publish an `Aggregate` event on an `EventBus` defined by the user.
///
/// All the errors should be handled from within the `EventBus` and shouldn't panic.
async fn publish(&self, store_event: &StoreEvent<A::Event>);
}
34 changes: 30 additions & 4 deletions src/esrs/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,20 @@ use uuid::Uuid;

use crate::{Aggregate, StoreEvent};

/// This trait is used to implement an `EventHandler`. An event handler is intended to be an entity
/// which can create, update and delete a read side and perform side effects.
///
/// The main purpose of an `EventHandler` is to have an eventually persistent processor.
#[async_trait]
pub trait EventHandler<A>: Sync
where
A: Aggregate,
{
// TODO: doc
/// Handle an event and perform an action. This action could be over a read model or a side-effect.
/// All the errors should be handled from within the `EventHandler` and shouldn't panic.
async fn handle(&self, event: &StoreEvent<A::Event>);

// TODO: doc
/// Perform a deletion of a resource using the given aggregate_id.
async fn delete(&self, _aggregate_id: Uuid) {}

/// The name of the event handler. By default, this is the type name of the event handler,
Expand All @@ -31,20 +36,29 @@ where
Q: EventHandler<A>,
T: Deref<Target = Q> + Send + Sync,
{
/// Deref call to [`EventHandler::handle`].
async fn handle(&self, event: &StoreEvent<A::Event>) {
self.deref().handle(event).await;
}
}

/// This trait is used to implement a `TransactionalEventHandler`. A transactional event handler is
/// intended to be an entity which can create, update and delete a read side. No side effects must be
/// performed inside of this kind on handler.
///
/// An `handle` operation will result in a _deadlock_ if the implementation of this trait is used to
/// apply an event on an [`Aggregate`].
#[async_trait]
pub trait TransactionalEventHandler<A, E>: Sync
where
A: Aggregate,
{
// TODO: doc
/// Handle an event in a transactional fashion and perform a read side crate, update or delete.
/// If an error is returned the transaction will be aborted and the handling of a command by an
/// aggregate will return an error.
async fn handle(&self, event: &StoreEvent<A::Event>, executor: &mut E) -> Result<(), A::Error>;

// TODO: doc
/// Perform a deletion of a read side projection using the given aggregate_id.
async fn delete(&self, _aggregate_id: Uuid, _executor: &mut E) -> Result<(), A::Error> {
Ok(())
}
Expand All @@ -66,11 +80,23 @@ where
Q: TransactionalEventHandler<A, E>,
T: Deref<Target = Q> + Send + Sync,
{
/// Deref call to [`TransactionalEventHandler::handle`].
async fn handle(&self, event: &StoreEvent<A::Event>, executor: &mut E) -> Result<(), A::Error> {
self.deref().handle(event, executor).await
}
}

/// The `ReplayableEventHandler` trait is used to add the `replay` behavior on an `EventHandler`.
///
/// Being replayable means that the operation performed by this EventHandler should be idempotent
/// and should be intended to be "eventually consistent".
/// In other words it means that they should not perform external API calls, generate random numbers
/// or do anything that relies on external state and might change the outcome of this function.
///
/// The most common use case for this is when rebuilding read models: `EventHandler`s that write on
cottinisimone marked this conversation as resolved.
Show resolved Hide resolved
/// the database should be marked as replayable.
///
/// Another use case could be if there's the need to implement a retry logic for this event handler.
pub trait ReplayableEventHandler<A>: Sync
where
Self: EventHandler<A>,
Expand Down
22 changes: 6 additions & 16 deletions src/esrs/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{Aggregate, AggregateState, EventStore, StoreEvent};
/// The AggregateManager is responsible for coupling the Aggregate with a Store, so that the events
/// can be persisted when handled, and the state can be reconstructed by loading and apply events sequentially.
///
/// It comes batteries-included, as you only need to implement the `event_store` getter. The basic API is:
/// The basic APIs are:
/// 1. handle_command
/// 2. load
/// 3. lock_and_load
Expand All @@ -21,18 +21,21 @@ impl<A> AggregateManager<A>
where
A: Aggregate,
{
/// Creates a new instance of an [`AggregateManager`].
pub fn new(event_store: Box<dyn EventStore<A> + Send + Sync>) -> Self {
Self { event_store }
}

/// Validates and handles the command onto the given state, and then passes the events to the store.
///
/// The store transactional persists the events - recording it in the aggregate instance's history.
pub async fn handle_command(
&self,
mut aggregate_state: AggregateState<A::State>,
command: A::Command,
) -> Result<(), A::Error> {
let events: Vec<A::Event> = A::handle_command(aggregate_state.inner(), command)?;
self.store_events(&mut aggregate_state, events).await?;
self.event_store.persist(&mut aggregate_state, events).await?;
Ok(())
}

Expand Down Expand Up @@ -77,21 +80,8 @@ where
}))
}

/// Transactional persists events in store - recording it in the aggregate instance's history.
/// The store will also handle the events creating read side projections. If an error occurs whilst
/// persisting the events, the whole transaction is rolled back and the error is returned.
pub async fn store_events(
&self,
aggregate_state: &mut AggregateState<A::State>,
events: Vec<A::Event>,
) -> Result<Vec<StoreEvent<A::Event>>, A::Error> {
self.event_store.persist(aggregate_state, events).await
}

/// `delete` should either complete the aggregate instance, along with all its associated events
/// and read side projections, or fail.
///
/// If the deletion succeeds only partially, it _must_ return an error.
/// and transactional read side projections, or fail.
pub async fn delete(&self, aggregate_id: impl Into<Uuid> + Send) -> Result<(), A::Error> {
self.event_store.delete(aggregate_id.into()).await
}
Expand Down
3 changes: 2 additions & 1 deletion src/esrs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ pub mod aggregate;
pub mod event_bus;
pub mod event_handler;
pub mod manager;
pub mod sql;
pub mod state;
pub mod store;

#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(any(feature = "postgres"))]
pub mod sql;

#[cfg(all(test, feature = "postgres"))]
mod tests;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use std::{sync::Arc, vec};
use sqlx::{PgConnection, Pool, Postgres};

use crate::esrs::sql::migrations::{Migrations, MigrationsHandler};
use crate::esrs::sql::statements::Statements;
use crate::esrs::sql::statements::{Statements, StatementsHandler};
use crate::Aggregate;

use super::{EventBus, EventHandler, InnerPgStore, PgStore, TransactionalEventHandler};

/// Struct used to build a brand new [`PgStore`].
pub struct PgStoreBuilder<A>
where
A: Aggregate,
Expand All @@ -24,6 +25,7 @@ impl<A> PgStoreBuilder<A>
where
A: Aggregate,
{
/// Creates a new instance of a [`PgStoreBuilder`].
pub fn new(pool: Pool<Postgres>) -> Self {
PgStoreBuilder {
pool,
Expand Down Expand Up @@ -84,17 +86,14 @@ where
self
}

/// This function sets up the database in a transaction and returns an instance of PgStore.
///
/// It will create the event store table (if it doesn't exist) and two indexes (if they don't exist).
/// The first one is over the `aggregate_id` field to speed up `by_aggregate_id` query.
/// The second one is a unique constraint over the tuple `(aggregate_id, sequence_number)` to avoid race conditions.
/// This function runs all the needed [`Migrations`], atomically setting up the database.
/// Eventually returns an instance of PgStore.
///
/// This function should be used only once at your application startup.
///
/// # Errors
///
/// Will return an `Err` if there's an error connecting with database or creating tables/indexes.
/// Will return an `Err` if there's an error running [`Migrations`].
pub async fn try_build(self) -> Result<PgStore<A>, sqlx::Error> {
if self.run_migrations {
Migrations::run::<A>(&self.pool).await?;
Expand Down
Loading