Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLATFORM-573]: [esrs] POC different projectors on same table #100

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ version = "0.6.2"
members = [
"examples/simple_projection",
"examples/simple_side_effect",
"examples/simple_saga"
"examples/simple_saga",
"examples/aggregate_merging",
]

[features]
Expand Down
25 changes: 25 additions & 0 deletions examples/aggregate_merging/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
authors = ["Oliver Browne <[email protected]"]
description = "example of using a pair of projectors to project a pair of aggregations to a single read-side table"
edition = "2018"
license = "MIT OR Apache-2.0"
name = "aggregate_merging"
version = "0.1.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

# async main
tokio = {version = "1.6", features = ["full"]}

esrs = {version = "*", path = "../../", features = ["sqlite"]}
# Sql library for async impl
sqlx = {version = "0.6", features = ["runtime-tokio-native-tls", "uuid", "json", "chrono"]}

async-trait = "0.1.50"
chrono = {version = "0.4", features = ["serde"]}
serde = {version = "1.0", features = ["derive"]}
serde_json = "1.0"
thiserror = "1.0"
uuid = {version = "1.0", features = ["serde", "v4"]}
3 changes: 3 additions & 0 deletions examples/aggregate_merging/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This example demonstrates a `Projector` which consumes events from two different `Aggregates`, and projects those events to a single projection (in this case, a count of how many of each command the aggregates have received)

When either `AggregateA` or `AggregateB` in this example receive a command, they translate that into an (empty) `Event` specific to them (`EventA` or `EventB`). Both of these implement `Into<ProjectorEvent>`, and as such, both `AggregateA` and `AggregateB` can have an instantiation of `CounterProjector` in their porjectors vec. These aggregate specific event is passed, by the underlying `EventStore`, to the aggregates instantiation of the generic `CounterProjector` (either `CounterProjector<EventA>` or `CounterProjector<EventB>`), and this projector updates the event counts in the projection. NOTE: this is race prone. See projector.rs for more details about how to make this sound.
oliverbrowneprima marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE counters (
"counter_id" UUID PRIMARY KEY NOT NULL,
"count_a" INTEGER NOT NULL,
"count_b" INTEGER NOT NULL
);
133 changes: 133 additions & 0 deletions examples/aggregate_merging/src/aggregates.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use async_trait::async_trait;
use esrs::projector::SqliteProjector;
use serde::de::DeserializeOwned;
use serde::Serialize;
use sqlx::{Pool, Sqlite};
use uuid::Uuid;

use esrs::aggregate::{AggregateManager, AggregateState, Identifier};
use esrs::store::{EventStore, SqliteStore, StoreEvent};

use crate::projectors::CounterProjector;
use crate::structs::{CommandA, CommandB, CounterError, EventA, EventB, ProjectorEvent};

const A_EVENTS: &str = "A";
const B_EVENTS: &str = "B";

// A store of events
type Store<E> = SqliteStore<E, CounterError>;

// We use a template here to make instantiating the near-identical
// AggregateA and AggregateB easier.
pub struct Aggregate<E: Send + Sync + Serialize + DeserializeOwned> {
event_store: Store<E>,
}

impl<
E: Send
+ Sync
+ Serialize
+ DeserializeOwned
+ // EventStore bounds
Into<ProjectorEvent>
+ Clone, // for the CounterProjector
> Aggregate<E>
where
Aggregate<E>: Identifier,
{
pub async fn new(pool: &Pool<Sqlite>) -> Result<Self, CounterError> {
Ok(Self {
event_store: Self::new_store(pool).await?,
})
}

pub async fn new_store(pool: &Pool<Sqlite>) -> Result<Store<E>, CounterError> {
// Any aggregate based off this template will project to the CounterProjector
let projectors: Vec<Box<dyn SqliteProjector<E, CounterError> + Send + Sync>> = vec![Box::new(CounterProjector)];

SqliteStore::new::<Self>(pool, projectors, vec![]).await
}
}

pub type AggregateA = Aggregate<EventA>;
pub type AggregateB = Aggregate<EventB>;

impl Identifier for AggregateA {
fn name() -> &'static str
where
Self: Sized,
{
A_EVENTS
}
}

impl Identifier for AggregateB {
fn name() -> &'static str
where
Self: Sized,
{
B_EVENTS
}
}

#[async_trait]
impl AggregateManager for AggregateA {
type State = ();
type Command = CommandA;
type Event = EventA;
type Error = CounterError;

fn event_store(&self) -> &(dyn EventStore<Self::Event, Self::Error> + Send + Sync) {
&self.event_store
}

fn apply_event(_id: &Uuid, state: Self::State, _: &StoreEvent<Self::Event>) -> Self::State {
// Take no action as this aggregate has no in memory state - only the projection is stateful
state
}

fn validate_command(_: &AggregateState<Self::State>, _: &Self::Command) -> Result<(), Self::Error> {
Ok(()) // No validation done on commands received in this aggregate
}

async fn do_handle_command(
&self,
aggregate_state: AggregateState<Self::State>,
command: Self::Command,
) -> Result<AggregateState<Self::State>, Self::Error> {
match command {
CommandA::Inner => self.persist(aggregate_state, vec![EventA::Inner]).await,
}
}
}

#[async_trait]
impl AggregateManager for AggregateB {
type State = ();
type Command = CommandB;
type Event = EventB;
type Error = CounterError;

fn event_store(&self) -> &(dyn EventStore<Self::Event, Self::Error> + Send + Sync) {
&self.event_store
}

fn apply_event(_id: &Uuid, state: Self::State, _: &StoreEvent<Self::Event>) -> Self::State {
// Take no action as this aggregate has no in memory state - only the projection is stateful
state
}

fn validate_command(_: &AggregateState<Self::State>, _: &Self::Command) -> Result<(), Self::Error> {
Ok(()) // No validation done on commands received in this aggregate
}

async fn do_handle_command(
&self,
aggregate_state: AggregateState<Self::State>,
command: Self::Command,
) -> Result<AggregateState<Self::State>, Self::Error> {
match command {
CommandB::Inner => self.persist(aggregate_state, vec![EventB::Inner]).await,
}
}
}
3 changes: 3 additions & 0 deletions examples/aggregate_merging/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod aggregates;
pub mod projectors;
pub mod structs;
54 changes: 54 additions & 0 deletions examples/aggregate_merging/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use esrs::aggregate::{AggregateManager, AggregateState};
use sqlx::{pool::PoolOptions, Pool, Sqlite};
use uuid::Uuid;

use aggregate_merging::{
aggregates::AggregateA,
aggregates::AggregateB,
projectors::Counter,
structs::{CommandA, CommandB},
};

#[tokio::main]
async fn main() {
println!("Starting pool");
let pool: Pool<Sqlite> = PoolOptions::new()
.connect("sqlite::memory:")
.await
.expect("Failed to create pool");

println!("Running migrations");
let () = sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("Failed to run migrations");

println!("Migrations run");

let count_id = Uuid::new_v4();

// Construct the two aggregates
let agg_a = AggregateA::new(&pool).await.expect("Failed to construct aggregate");
let a_state = AggregateState::new(count_id);

let agg_b = AggregateB::new(&pool).await.expect("Failed to construct aggregate");
let b_state = AggregateState::new(count_id);

// Increment each count once
let _ = agg_a
.handle_command(a_state, CommandA::Inner)
.await
.expect("Failed to handle command a");

let _ = agg_b
.handle_command(b_state, CommandB::Inner)
.await
.expect("Failed to handle command b");

// Retrieve counter projection from sqlite and print
let counter = Counter::by_id(count_id, &pool)
.await
.expect("Failed to retrieve counter")
.expect("Failed to find counter");
assert!(counter.count_a == 1 && counter.count_b == 1);
}
153 changes: 153 additions & 0 deletions examples/aggregate_merging/src/projectors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
use std::sync::Arc;

use async_trait::async_trait;
use serde::de::DeserializeOwned;
use serde::Serialize;
use sqlx::{Executor, Sqlite};
use tokio::sync::Mutex;
use uuid::Uuid;

use esrs::projector::SqliteProjector;
use esrs::store::StoreEvent;

use sqlx::pool::PoolConnection;

use crate::structs::{CounterError, ProjectorEvent};

pub struct CounterProjector;

// This is a projector template that will project any events that implement Into<ProjectorEvent>
// into a shared projection (DB table). This behaviour - consuming events from more than one aggregate,
// and projecting them to a shared table, is RACE PRONE - if the projector writes to the same column
// when consuming more than one kind of event (as it does in this example, writing to both count_a and
// count_b when updating the counts, regardless of if it received and EventA or an EventB), then it is
// possible for two simultaneous transactions, updating the same projection, to occur. If the projector
// also relies on previous state to calculate next state (as this one does), this is a data race. The fix
// (not implemented here, for demonstration purposes) is dependant on your database transaction isolation
// model - in this case, using queries which only update count_a when EventA is received, and count_b
// when EventB is received, would be sufficient to guarantee soundness.
#[async_trait]
impl<T: Clone + Into<ProjectorEvent> + Send + Sync + Serialize + DeserializeOwned> SqliteProjector<T, CounterError>
for CounterProjector
{
async fn project(
&self,
event: &StoreEvent<T>,
connection: &mut PoolConnection<Sqlite>,
) -> Result<(), CounterError> {
let existing: Option<Counter> = Counter::by_id(event.aggregate_id, &mut *connection)
.await?
.map(|existing| existing);
let payload: ProjectorEvent = event.payload.clone().into();
match payload {
ProjectorEvent::A => match existing {
Some(counter) => Ok(Counter::update(
event.aggregate_id,
counter.count_a + 1,
counter.count_b,
&mut *connection,
)
.await?),
None => Ok(Counter::insert(event.aggregate_id, 1, 0, &mut *connection).await?),
},
ProjectorEvent::B => match existing {
Some(counter) => Ok(Counter::update(
event.aggregate_id,
counter.count_a,
counter.count_b + 1,
&mut *connection,
)
.await?),
None => Ok(Counter::insert(event.aggregate_id, 0, 1, &mut *connection).await?),
},
}
}
}

#[derive(sqlx::FromRow, Debug)]
pub struct Counter {
pub counter_id: Uuid,
pub count_a: i32,
pub count_b: i32,
}

impl Counter {
pub async fn by_id(id: Uuid, executor: impl Executor<'_, Database = Sqlite>) -> Result<Option<Self>, sqlx::Error> {
sqlx::query_as::<_, Self>("SELECT * FROM counters WHERE counter_id = $1")
.bind(id)
.fetch_optional(executor)
.await
}

pub async fn insert(
id: Uuid,
count_a: i32,
count_b: i32,
executor: impl Executor<'_, Database = Sqlite>,
) -> Result<(), sqlx::Error> {
sqlx::query_as::<_, Self>("INSERT INTO counters (counter_id, count_a, count_b) VALUES ($1, $2, $3)")
.bind(id)
.bind(count_a)
.bind(count_b)
.fetch_optional(executor)
.await
.map(|_| ())
}

pub async fn update(
id: Uuid,
count_a: i32,
count_b: i32,
executor: impl Executor<'_, Database = Sqlite>,
) -> Result<(), sqlx::Error> {
sqlx::query_as::<_, Self>("UPDATE counters SET count_a = $2, count_b = $3 WHERE counter_id = $1")
.bind(id)
.bind(count_a)
.bind(count_b)
.fetch_optional(executor)
.await
.map(|_| ())
}
matteosister marked this conversation as resolved.
Show resolved Hide resolved

pub async fn delete(id: Uuid, executor: impl Executor<'_, Database = Sqlite>) -> Result<(), sqlx::Error> {
sqlx::query_as::<_, Self>("DELETE FROM counter WHERE counter_id = $1")
.bind(id)
.fetch_optional(executor)
.await
.map(|_| ())
}
}

// Here's an example of how one might implement a projector that is shared between two aggregates, in a
// way that guarantees soundness when both aggregate event types can lead to modifying a shared
// column in the projection. Note that this requires a different Aggregate setup, where a shared
// (inner) projector is constructed, two SharedProjectors are constructed to wrap the inner, and then
// an EventStore for each event type is constructed and passed the right SharedProjector for it's Event type,
// before being passed into some Aggregate::new. It also, requires cloning every event
pub struct SharedProjector<InnerEvent, InnerError> {
inner: Arc<Mutex<dyn SqliteProjector<InnerEvent, InnerError> + Send + Sync>>,
}

impl<InnerEvent, InnerError> SharedProjector<InnerEvent, InnerError> {
pub fn new(inner: Arc<Mutex<dyn SqliteProjector<InnerEvent, InnerError> + Send + Sync>>) -> Self {
SharedProjector { inner: inner }
}
}

#[async_trait]
impl<Event, Error, InnerEvent, InnerError> SqliteProjector<Event, Error> for SharedProjector<InnerEvent, InnerError>
where
Event: Into<InnerEvent> + Clone + Send + Sync + Serialize + DeserializeOwned,
InnerEvent: Send + Sync + Serialize + DeserializeOwned,
InnerError: Into<Error>,
{
async fn project(&self, event: &StoreEvent<Event>, connection: &mut PoolConnection<Sqlite>) -> Result<(), Error> {
let event: StoreEvent<InnerEvent> = event.clone().map(Event::into);
let inner = self.inner.lock().await;
let result = inner.project(&event, connection).await;
match result {
Ok(()) => Ok(()),
Err(e) => Err(e.into()),
}
}
}
Loading