Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLATFORM-573]: [esrs] POC different projectors on same table #106

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ members = [
"examples/simple_side_effect",
"examples/simple_saga",
"examples/aggregate_merging",
"examples/rebuilder",
"examples/rebuild_shared_projection",
]

[features]
Expand All @@ -34,7 +36,7 @@ chrono = {version = "0.4", features = ["serde"]}
async-trait = "0.1.50"

# Sql library for async impl
sqlx = {version = "0.6", features = ["runtime-tokio-native-tls", "uuid", "json", "chrono"]}
sqlx = {version = "0.6.1", features = ["runtime-tokio-native-tls", "uuid", "json", "chrono"]}
# To stream over sqlx results
futures = "0.3"

Expand Down
27 changes: 27 additions & 0 deletions examples/rebuild_shared_projection/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
authors = ["Oliver Browne <[email protected]"]
description = "Simple esrs rebuild example - a projection is deleted and reconstructed from an event stream"
edition = "2018"
license = "MIT OR Apache-2.0"
name = "rebuild_shared_projection"
version = "0.1.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

# async main
tokio = {version = "1.6", features = ["full"]}

esrs = {version = "*", path = "../../", features = ["postgres"]}
aggregate_merging = {version = "*", path = "../aggregate_merging/"}
# Sql library for async impl
sqlx = {version = "0.6", features = ["runtime-tokio-native-tls", "uuid", "json", "chrono"]}

async-trait = "0.1.50"
chrono = {version = "0.4", features = ["serde"]}
serde = {version = "1.0", features = ["derive"]}
serde_json = "1.0"
thiserror = "1.0"
uuid = {version = "1.0", features = ["serde", "v4"]}
futures = "0.3"
1 change: 1 addition & 0 deletions examples/rebuild_shared_projection/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
An example of rebuilding a projection which derives from two aggregates.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE counters (
"counter_id" UUID PRIMARY KEY NOT NULL,
"count_a" INTEGER NOT NULL,
"count_b" INTEGER NOT NULL
);
127 changes: 127 additions & 0 deletions examples/rebuild_shared_projection/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use aggregate_merging::aggregates::{AggregateA, AggregateB};
use aggregate_merging::projectors::{Counter, CounterProjector};
use aggregate_merging::structs::{CommandA, CommandB, CounterError, ProjectorEvent};
use esrs::aggregate::{AggregateManager, AggregateState};
use esrs::projector::PgProjector;

use futures::StreamExt;
use sqlx::migrate::MigrateDatabase;
use sqlx::{pool::PoolOptions, Pool, Postgres};
use uuid::Uuid;

// A simple example demonstrating rebuilding a read-side projection from an event
// stream
#[tokio::main]
async fn main() {
let database_url: String = std::env::var("DATABASE_URL").expect("DATABASE_URL variable not set");

Postgres::drop_database(database_url.as_str()).await.unwrap();
Postgres::create_database(database_url.as_str()).await.unwrap();

let pool: Pool<Postgres> = PoolOptions::new()
.connect(database_url.as_str())
.await
.expect("Failed to create pool");
let count_id = Uuid::new_v4();

setup(&pool, count_id).await;
let agg_a = AggregateA::new(&pool).await.unwrap();
let agg_b = AggregateB::new(&pool).await.unwrap();
let store_a = agg_a.event_store();
let store_b = agg_b.event_store();

let mut events_a = store_a.get_all().map(|r| match r {
Ok(e) => Ok(e.map(ProjectorEvent::from)),
Err(e) => Err(e),
});
let mut events_b = store_b.get_all().map(|r| match r {
Ok(e) => Ok(e.map(ProjectorEvent::from)),
Err(e) => Err(e),
});

let projectors: Vec<Box<dyn PgProjector<ProjectorEvent, CounterError>>> = vec![Box::new(CounterProjector)];

let mut a = None;
let mut b = None;
loop {
if a.is_none() {
a = events_a.next().await;
}
if b.is_none() {
b = events_b.next().await;
}
if a.is_none() && b.is_none() {
break;
}
let mut transcation = pool.begin().await.unwrap();
for projector in &projectors {
if a.is_none() {
projector
.project(b.as_ref().unwrap().as_ref().unwrap(), &mut transcation)
.await
.unwrap();
b = None;
continue;
}
if b.is_none() {
projector
.project(a.as_ref().unwrap().as_ref().unwrap(), &mut transcation)
.await
.unwrap();
a = None;
continue;
}
let a_inner = a.as_ref().unwrap().as_ref().unwrap();
let b_inner = b.as_ref().unwrap().as_ref().unwrap();
if a_inner.occurred_on > b_inner.occurred_on {
projector.project(b_inner, &mut transcation).await.unwrap();
b = None;
} else {
projector.project(a_inner, &mut transcation).await.unwrap();
a = None;
}
}
transcation.commit().await.unwrap();
}

let counter = Counter::by_id(count_id, &pool)
.await
.expect("Failed to retrieve counter")
.expect("Failed to find counter");
assert!(counter.count_a == 1 && counter.count_b == 1);
}

async fn setup(pool: &Pool<Postgres>, count_id: Uuid) {
sqlx::migrate!("./migrations")
.run(pool)
.await
.expect("Failed to run migrations");

// Construct the two aggregates
let agg_a = AggregateA::new(pool).await.expect("Failed to construct aggregate");
let a_state = AggregateState::new(count_id);

let agg_b = AggregateB::new(pool).await.expect("Failed to construct aggregate");
let b_state = AggregateState::new(count_id);

// Increment each count once
let _ = agg_a
.handle(a_state, CommandA::Inner)
.await
.expect("Failed to handle command a");

let _ = agg_b
.handle(b_state, CommandB::Inner)
.await
.expect("Failed to handle command b");

//Drop and rebuild the counters projection table
sqlx::query("DROP TABLE counters")
.execute(pool)
.await
.expect("Failed to drop table");
sqlx::query("CREATE TABLE counters (\"counter_id\" UUID PRIMARY KEY NOT NULL, \"count_a\" INTEGER NOT NULL, \"count_b\" INTEGER NOT NULL );")
.execute(pool)
.await
.expect("Failed to recreate counters table");
}
26 changes: 26 additions & 0 deletions examples/rebuilder/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
authors = ["Oliver Browne <[email protected]"]
description = "Simple esrs rebuild example - a projection is deleted and reconstructed from an event stream"
edition = "2018"
license = "MIT OR Apache-2.0"
name = "rebuilder"
version = "0.1.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

# async main
tokio = {version = "1.6", features = ["full"]}

esrs = {version = "*", path = "../../", features = ["postgres"]}
simple_projection = {version = "*", path = "../simple_projection/"}
# Sql library for async impl
sqlx = {version = "0.6", features = ["runtime-tokio-native-tls", "uuid", "json", "chrono"]}

async-trait = "0.1.50"
chrono = {version = "0.4", features = ["serde"]}
serde = {version = "1.0", features = ["derive"]}
serde_json = "1.0"
thiserror = "1.0"
uuid = {version = "1.0", features = ["serde", "v4"]}
1 change: 1 addition & 0 deletions examples/rebuilder/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
An example of rebuilding a projection, given an aggregate_id and a Projector. See `rebuild_all_at_once` in main.rs for the implementation of the rebuilding itself - the rest is just setup to allow us to demonstrate.
4 changes: 4 additions & 0 deletions examples/rebuilder/migrations/20220824151818_counters.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
CREATE TABLE counters (
"counter_id" UUID PRIMARY KEY NOT NULL,
"count" INTEGER NOT NULL
);
145 changes: 145 additions & 0 deletions examples/rebuilder/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use std::fmt::Debug;

use esrs::aggregate::{Aggregate, AggregateManager, AggregateState};
use esrs::projector::PgProjector;
use serde::de::DeserializeOwned;
use serde::Serialize;
use simple_projection::aggregate::CounterAggregate;
use simple_projection::projector::{Counter, CounterProjector};
use simple_projection::structs::*;
use sqlx::migrate::MigrateDatabase;
use sqlx::{pool::PoolOptions, Pool, Postgres};
use uuid::Uuid;

// Rebuild the projection of a single aggregation, given the aggregate, an aggregate ID, a projector to rebuild and a pool connection
// This rebuilds the projection for all aggregate ids in a single transaction. An alternative (see _rebuild_per_id, below) is
// to rebuild on a per-id basis.
async fn rebuild_all_at_once<E, Err, A>(
aggregate: &A,
ids: Vec<Uuid>,
projector: &dyn PgProjector<E, Err>,
pool: &Pool<Postgres>,
) where
A: AggregateManager<Event = E, Error = Err>,
E: Serialize + DeserializeOwned + Send + Sync,
Err: Debug,
{
let mut events = Vec::new();
for id in ids {
events.append(
&mut aggregate
.event_store()
.by_aggregate_id(id)
.await
.expect("failed to retrieve events"),
);
}
let mut transaction = pool.begin().await.unwrap();
for event in events {
projector
.project(&event, &mut transaction)
.await
.expect("Failed to project event");
}
transaction.commit().await.unwrap();
}

// An alternative approach to rebuilding that rebuilds the projected table for a given projection one
// aggregate ID at a time, rather than committing the entire table all at once
async fn _rebuild_per_id<E, Err, A>(
aggregate: &A,
ids: Vec<Uuid>,
projector: &dyn PgProjector<E, Err>,
pool: &Pool<Postgres>,
) where
A: AggregateManager<Event = E, Error = Err>,
<A as Aggregate>::Error: Debug,
E: Serialize + DeserializeOwned + Send + Sync,
{
for id in ids {
rebuild_all_at_once(aggregate, vec![id], projector, pool).await;
}
}

// Rebuild a number of boxed projectors at once, for a single aggregate, for a number of aggregate ids
async fn _rebuild_multiple_projectors<'a, E, Err, A>(
aggregate: &'a A,
ids: Vec<Uuid>,
projectors: Vec<Box<dyn PgProjector<E, Err>>>,
pool: &'a Pool<Postgres>,
) where
A: AggregateManager<Event = E, Error = Err>,
<A as Aggregate>::Error: Debug,
E: Serialize + DeserializeOwned + Send + Sync,
{
for projector in projectors {
for id in &ids {
rebuild_all_at_once(aggregate, vec![*id], projector.as_ref(), pool).await;
}
}
}

// A simple example demonstrating rebuilding a read-side projection from an event
// stream
#[tokio::main]
async fn main() {
let database_url: String = std::env::var("DATABASE_URL").expect("DATABASE_URL variable not set");

Postgres::drop_database(database_url.as_str()).await.unwrap();
Postgres::create_database(database_url.as_str()).await.unwrap();

let pool: Pool<Postgres> = PoolOptions::new()
.connect(database_url.as_str())
.await
.expect("Failed to create pool");
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("Failed to run migrations");

let count_id = Uuid::new_v4();

// Construct the aggregation, and some nil state for it
let aggregate = CounterAggregate::new(&pool)
.await
.expect("Failed to construct aggregate");
let state = AggregateState::new(count_id);

// Increment counter three times
let state = aggregate
.handle(state, CounterCommand::Increment)
.await
.expect("Failed to handle increment command");
let state = aggregate
.handle(state, CounterCommand::Increment)
.await
.expect("Failed to handle increment command");
let _state = aggregate
.handle(state, CounterCommand::Increment)
.await
.expect("Failed to handle increment command");

//Drop and rebuild the counters table
sqlx::query("DROP TABLE counters")
.execute(&pool)
.await
.expect("Failed to drop table");
sqlx::query("CREATE TABLE counters (\"counter_id\" UUID PRIMARY KEY NOT NULL, \"count\" INTEGER NOT NULL );")
.execute(&pool)
.await
.expect("Failed to recreate counters table");

// Assert the counter doesn't exist
let res = Counter::by_id(count_id, &pool).await.expect("Query failed");
assert!(res.is_none());

let projector = CounterProjector {};
rebuild_all_at_once(&aggregate, vec![count_id], &projector, &pool).await;

// Assert the counter has been rebuilt
let res = Counter::by_id(count_id, &pool)
.await
.expect("Query failed")
.expect("counter not found");
assert!(res.counter_id == count_id && res.count == 3);
}
16 changes: 15 additions & 1 deletion src/esrs/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::marker::PhantomData;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::stream::BoxStream;
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use serde::de::DeserializeOwned;
use serde::Serialize;
use sqlx::types::Json;
Expand Down Expand Up @@ -182,6 +182,20 @@ impl<
async fn close(&self) {
self.pool.close().await
}

fn get_all(&self) -> BoxStream<Result<StoreEvent<Event>, Error>> {
Box::pin(
sqlx::query_as::<_, event::Event>(self.queries.select_all())
.fetch(&self.pool)
.map(|res| match res {
Ok(e) => match e.try_into() {
Ok(e) => Ok(e),
Err(e) => Err(Error::from(e)),
},
Err(e) => Err(e.into()),
}),
)
}
}

#[async_trait]
Expand Down
Loading