Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLATFORM-573]: [esrs] POC different projectors on same table #100

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ version = "0.6.2"
members = [
"examples/simple_projection",
"examples/simple_side_effect",
"examples/simple_saga"
"examples/simple_saga",
"examples/aggregate_merging",
]

[features]
Expand Down
25 changes: 25 additions & 0 deletions examples/aggregate_merging/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
authors = ["Oliver Browne <[email protected]"]
description = "example of using a pair of projectors to project a pair of aggregations to a single read-side table"
edition = "2018"
license = "MIT OR Apache-2.0"
name = "aggregate_merging"
version = "0.1.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

# async main
tokio = {version = "1.6", features = ["full"]}

esrs = {version = "*", path = "../../", features = ["sqlite"]}
# Sql library for async impl
sqlx = {version = "0.6", features = ["runtime-tokio-native-tls", "uuid", "json", "chrono"]}

async-trait = "0.1.50"
chrono = {version = "0.4", features = ["serde"]}
serde = {version = "1.0", features = ["derive"]}
serde_json = "1.0"
thiserror = "1.0"
uuid = {version = "1.0", features = ["serde", "v4"]}
3 changes: 3 additions & 0 deletions examples/aggregate_merging/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This example demonstrates a `Projector` which consumes events from two different `Aggregates`, and projects those events to a single projection (in this case, a count of how many of each command the aggregates have received)

When either `AggregateA` or `AggregateB` in this example receive a command, they translate that into an (empty) `Event` specific to them (`EventA` or `EventB`). Both of these implement `Into<ProjectorEvent>`, and as such, both `AggregateA` and `AggregateB` can have an instantiation of `CounterProjector` in their projectors vec. This aggregate specific event is passed, by the underlying `EventStore`, to the aggregates instantiation of the generic `CounterProjector` (either `CounterProjector<EventA>` or `CounterProjector<EventB>`), and this projector updates the event counts in the projection. NOTE: this is race prone. See projector.rs for more details about how to make this sound.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE counters (
"counter_id" UUID PRIMARY KEY NOT NULL,
"count_a" INTEGER NOT NULL,
"count_b" INTEGER NOT NULL
);
133 changes: 133 additions & 0 deletions examples/aggregate_merging/src/aggregates.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
use async_trait::async_trait;
use esrs::projector::SqliteProjector;
use serde::de::DeserializeOwned;
use serde::Serialize;
use sqlx::{Pool, Sqlite};
use uuid::Uuid;

use esrs::aggregate::{AggregateManager, AggregateState, Identifier};
use esrs::store::{EventStore, SqliteStore, StoreEvent};

use crate::projectors::CounterProjector;
use crate::structs::{CommandA, CommandB, CounterError, EventA, EventB, ProjectorEvent};

const A_EVENTS: &str = "A";
const B_EVENTS: &str = "B";

// A store of events
type Store<E> = SqliteStore<E, CounterError>;

// We use a template here to make instantiating the near-identical
// AggregateA and AggregateB easier.
pub struct Aggregate<E: Send + Sync + Serialize + DeserializeOwned> {
event_store: Store<E>,
}

impl<
E: Send
+ Sync
+ Serialize
+ DeserializeOwned
+ // EventStore bounds
Into<ProjectorEvent>
+ Clone, // for the CounterProjector
> Aggregate<E>
where
Aggregate<E>: Identifier,
{
pub async fn new(pool: &Pool<Sqlite>) -> Result<Self, CounterError> {
Ok(Self {
event_store: Self::new_store(pool).await?,
})
}

pub async fn new_store(pool: &Pool<Sqlite>) -> Result<Store<E>, CounterError> {
// Any aggregate based off this template will project to the CounterProjector
let projectors: Vec<Box<dyn SqliteProjector<E, CounterError> + Send + Sync>> = vec![Box::new(CounterProjector)];

SqliteStore::new::<Self>(pool, projectors, vec![]).await
}
}

pub type AggregateA = Aggregate<EventA>;
pub type AggregateB = Aggregate<EventB>;

impl Identifier for AggregateA {
fn name() -> &'static str
where
Self: Sized,
{
A_EVENTS
}
}

impl Identifier for AggregateB {
fn name() -> &'static str
where
Self: Sized,
{
B_EVENTS
}
}

#[async_trait]
impl AggregateManager for AggregateA {
type State = ();
type Command = CommandA;
type Event = EventA;
type Error = CounterError;

fn event_store(&self) -> &(dyn EventStore<Self::Event, Self::Error> + Send + Sync) {
&self.event_store
}

fn apply_event(_id: &Uuid, state: Self::State, _: &StoreEvent<Self::Event>) -> Self::State {
// Take no action as this aggregate has no in memory state - only the projection is stateful
state
}

fn validate_command(_: &AggregateState<Self::State>, _: &Self::Command) -> Result<(), Self::Error> {
Ok(()) // No validation done on commands received in this aggregate
}

async fn do_handle_command(
&self,
aggregate_state: AggregateState<Self::State>,
command: Self::Command,
) -> Result<AggregateState<Self::State>, Self::Error> {
match command {
CommandA::Inner => self.persist(aggregate_state, vec![EventA::Inner]).await,
}
}
}

#[async_trait]
impl AggregateManager for AggregateB {
type State = ();
type Command = CommandB;
type Event = EventB;
type Error = CounterError;

fn event_store(&self) -> &(dyn EventStore<Self::Event, Self::Error> + Send + Sync) {
&self.event_store
}

fn apply_event(_id: &Uuid, state: Self::State, _: &StoreEvent<Self::Event>) -> Self::State {
// Take no action as this aggregate has no in memory state - only the projection is stateful
state
}

fn validate_command(_: &AggregateState<Self::State>, _: &Self::Command) -> Result<(), Self::Error> {
Ok(()) // No validation done on commands received in this aggregate
}

async fn do_handle_command(
&self,
aggregate_state: AggregateState<Self::State>,
command: Self::Command,
) -> Result<AggregateState<Self::State>, Self::Error> {
match command {
CommandB::Inner => self.persist(aggregate_state, vec![EventB::Inner]).await,
}
}
}
3 changes: 3 additions & 0 deletions examples/aggregate_merging/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod aggregates;
pub mod projectors;
pub mod structs;
54 changes: 54 additions & 0 deletions examples/aggregate_merging/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use esrs::aggregate::{AggregateManager, AggregateState};
use sqlx::{pool::PoolOptions, Pool, Sqlite};
use uuid::Uuid;

use aggregate_merging::{
aggregates::AggregateA,
aggregates::AggregateB,
projectors::Counter,
structs::{CommandA, CommandB},
};

#[tokio::main]
async fn main() {
println!("Starting pool");
let pool: Pool<Sqlite> = PoolOptions::new()
.connect("sqlite::memory:")
.await
.expect("Failed to create pool");

println!("Running migrations");
let () = sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("Failed to run migrations");

println!("Migrations run");

let count_id = Uuid::new_v4();

// Construct the two aggregates
let agg_a = AggregateA::new(&pool).await.expect("Failed to construct aggregate");
let a_state = AggregateState::new(count_id);

let agg_b = AggregateB::new(&pool).await.expect("Failed to construct aggregate");
let b_state = AggregateState::new(count_id);

// Increment each count once
let _ = agg_a
.handle_command(a_state, CommandA::Inner)
.await
.expect("Failed to handle command a");

let _ = agg_b
.handle_command(b_state, CommandB::Inner)
.await
.expect("Failed to handle command b");

// Retrieve counter projection from sqlite and print
let counter = Counter::by_id(count_id, &pool)
.await
.expect("Failed to retrieve counter")
.expect("Failed to find counter");
assert!(counter.count_a == 1 && counter.count_b == 1);
}
Loading