diff --git a/CHANGELOG.md b/CHANGELOG.md index c239234c..8dd3a3a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `AggregateState::new` second parameter from `Uuid` to `impl Into`. - `AggregateManager::load` first parameter from `Uuid` to `impl Into`. - `AggregateState::delete` first parameter from `Uuid` to `impl Into`. - +- [[#118]]: Merged rebuild examples into one; removed mains and migrations from examples. --- ## [0.7.0] diff --git a/Cargo.toml b/Cargo.toml index b1171ebf..2237e00a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,8 +17,7 @@ members = [ "examples/customize_persistence_flow", "examples/delete_aggregate", "examples/dyn_event_store_in_aggregate", - "examples/rebuild_shared_projection", - "examples/rebuilder", + "examples/rebuild_strategies", "examples/simple_projection", "examples/simple_saga", "examples/simple_side_effect", diff --git a/examples/aggregate_merging/migrations/20220824151818_counters.sql b/examples/aggregate_merging/migrations/20220824151818_counters.sql deleted file mode 100644 index 85125bd3..00000000 --- a/examples/aggregate_merging/migrations/20220824151818_counters.sql +++ /dev/null @@ -1,7 +0,0 @@ -CREATE TABLE counters ( - "counter_id" UUID PRIMARY KEY NOT NULL, - "counter_a_id" UUID, - "counter_b_id" UUID, - "count_a" INTEGER NOT NULL, - "count_b" INTEGER NOT NULL -); diff --git a/examples/aggregate_merging/src/main.rs b/examples/aggregate_merging/src/main.rs deleted file mode 100644 index 6ea9646b..00000000 --- a/examples/aggregate_merging/src/main.rs +++ /dev/null @@ -1,68 +0,0 @@ -use sqlx::migrate::MigrateDatabase; -use sqlx::{pool::PoolOptions, Pool, Postgres}; -use uuid::Uuid; - -use aggregate_merging::aggregates::AggregateA; -use aggregate_merging::aggregates::AggregateB; -use aggregate_merging::projectors::Counter; -use aggregate_merging::structs::CommandA; -use aggregate_merging::structs::CommandB; -use esrs::AggregateManager; - -#[tokio::main] -async fn main() { - let database_url: String = std::env::var("DATABASE_URL").expect("DATABASE_URL variable not set"); - - Postgres::drop_database(database_url.as_str()).await.unwrap(); - Postgres::create_database(database_url.as_str()).await.unwrap(); - - let pool: Pool = PoolOptions::new() - .connect(database_url.as_str()) - .await - .expect("Failed to create pool"); - - sqlx::migrate!("./migrations") - .run(&pool) - .await - .expect("Failed to run migrations"); - - let shared_id: Uuid = Uuid::new_v4(); - - // Construct the two aggregates - let agg_a = AggregateA::new(&pool).await.expect("Failed to construct aggregate"); - let agg_b = AggregateB::new(&pool).await.expect("Failed to construct aggregate"); - - let counter = Counter::by_id(shared_id, &pool) - .await - .expect("Failed to retrieve counter"); - - assert!(counter.is_none()); - - // Increment each count once - let _ = agg_a - .handle_command(Default::default(), CommandA::Inner { shared_id }) - .await - .expect("Failed to handle command a"); - - let counter = Counter::by_id(shared_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - - println!("Count A is {} and count B is {}", counter.count_a, counter.count_b); - assert!(counter.count_a == 1 && counter.count_b == 0); - - let _ = agg_b - .handle_command(Default::default(), CommandB::Inner { shared_id }) - .await - .expect("Failed to handle command b"); - - // Retrieve counter projection from database and print - let counter = Counter::by_id(shared_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - - println!("Count A is {} and count B is {}", counter.count_a, counter.count_b); - assert!(counter.count_a == 1 && counter.count_b == 1); -} diff --git a/examples/customize_persistence_flow/migrations/20220824151818_counters.sql b/examples/customize_persistence_flow/migrations/20220824151818_counters.sql deleted file mode 100644 index 49c29709..00000000 --- a/examples/customize_persistence_flow/migrations/20220824151818_counters.sql +++ /dev/null @@ -1,4 +0,0 @@ -CREATE TABLE counters ( - "counter_id" UUID PRIMARY KEY NOT NULL, - "count" INTEGER NOT NULL -); diff --git a/examples/customize_persistence_flow/src/aggregate.rs b/examples/customize_persistence_flow/src/aggregate.rs index 8c989f34..41682e8e 100644 --- a/examples/customize_persistence_flow/src/aggregate.rs +++ b/examples/customize_persistence_flow/src/aggregate.rs @@ -62,6 +62,7 @@ impl AggregateManager for CounterAggregate { aggregate_state: &AggregateState, events: Vec, ) -> Result>, Self::Error> { + // Here is the persistence flow customization. self.event_store .persist(|pool| async move { let mut connection: PoolConnection = pool.acquire().await?; @@ -91,7 +92,8 @@ impl AggregateManager for CounterAggregate { for store_event in store_events.iter() { for policy in self.event_store.policies().iter() { - // We want to just log errors instead of return them + // We want to just log errors instead of return them. This is the customization + // we wanted. match policy.handle_event(store_event).await { Ok(_) => (), Err(error) => println!("{:?}", error), diff --git a/examples/customize_persistence_flow/src/main.rs b/examples/customize_persistence_flow/src/main.rs deleted file mode 100644 index a68b7c0f..00000000 --- a/examples/customize_persistence_flow/src/main.rs +++ /dev/null @@ -1,78 +0,0 @@ -use sqlx::migrate::MigrateDatabase; -use sqlx::{pool::PoolOptions, Pool, Postgres}; -use uuid::Uuid; - -use customize_persistence_flow::{aggregate::CounterAggregate, projector::Counter, structs::CounterCommand}; -use esrs::{AggregateManager, AggregateState}; - -#[tokio::main] -async fn main() { - let database_url: String = std::env::var("DATABASE_URL").expect("DATABASE_URL variable not set"); - - Postgres::drop_database(database_url.as_str()).await.unwrap(); - Postgres::create_database(database_url.as_str()).await.unwrap(); - - let pool: Pool = PoolOptions::new() - .connect(database_url.as_str()) - .await - .expect("Failed to create pool"); - - sqlx::migrate!("./migrations") - .run(&pool) - .await - .expect("Failed to run migrations"); - - let count_id = Uuid::new_v4(); - - // Construct the aggregation, and some nil state for it - let aggregate = CounterAggregate::new(&pool) - .await - .expect("Failed to construct aggregate"); - let state = AggregateState::new(count_id); - - // Increment counter once - let state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - - // Retrieve counter projection from database and print - let counter = Counter::by_id(count_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - println!("Count is: {}", counter.count); - assert_eq!(counter.count, 1); - - // Increment counter twice - let state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - let state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - - // Retrieve counter projection from database and print - let counter = Counter::by_id(count_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - println!("Count is: {}", counter.count); - assert_eq!(counter.count, 3); - - // Decrement counter once - let _state = aggregate - .handle_command(state, CounterCommand::Decrement) - .await - .expect("Failed to handle increment command"); - - // Retrieve counter projection from database and print - let counter = Counter::by_id(count_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - println!("Count is: {}", counter.count); - assert_eq!(counter.count, 2); -} diff --git a/examples/delete_aggregate/migrations/20220824151818_counters.sql b/examples/delete_aggregate/migrations/20220824151818_counters.sql deleted file mode 100644 index 49c29709..00000000 --- a/examples/delete_aggregate/migrations/20220824151818_counters.sql +++ /dev/null @@ -1,4 +0,0 @@ -CREATE TABLE counters ( - "counter_id" UUID PRIMARY KEY NOT NULL, - "count" INTEGER NOT NULL -); diff --git a/examples/delete_aggregate/src/main.rs b/examples/delete_aggregate/src/main.rs deleted file mode 100644 index 49e6c801..00000000 --- a/examples/delete_aggregate/src/main.rs +++ /dev/null @@ -1,89 +0,0 @@ -use sqlx::migrate::MigrateDatabase; -use sqlx::{pool::PoolOptions, Pool, Postgres}; -use uuid::Uuid; - -use delete_aggregate::{aggregate::CounterAggregate, projector::Counter, structs::CounterCommand}; -use esrs::{AggregateManager, AggregateState}; - -#[tokio::main] -async fn main() { - let database_url: String = std::env::var("DATABASE_URL").expect("DATABASE_URL variable not set"); - - Postgres::drop_database(database_url.as_str()).await.unwrap(); - Postgres::create_database(database_url.as_str()).await.unwrap(); - - let pool: Pool = PoolOptions::new() - .connect(database_url.as_str()) - .await - .expect("Failed to create pool"); - - sqlx::migrate!("./migrations") - .run(&pool) - .await - .expect("Failed to run migrations"); - - let count_id = Uuid::new_v4(); - - // Construct the aggregation, and some nil state for it - let aggregate = CounterAggregate::new(&pool) - .await - .expect("Failed to construct aggregate"); - let state = AggregateState::new(count_id); - - // Increment counter once - let state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - - // Retrieve counter projection from database and print - let counter = Counter::by_id(count_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - println!("Count is: {}", counter.count); - assert_eq!(counter.count, 1); - - // Increment counter twice - let state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - let state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - - // Retrieve counter projection from database and print - let counter = Counter::by_id(count_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - println!("Count is: {}", counter.count); - assert_eq!(counter.count, 3); - - // Decrement counter once - let state = aggregate - .handle_command(state, CounterCommand::Decrement) - .await - .expect("Failed to handle increment command"); - - // Retrieve counter projection from database and print - let counter = Counter::by_id(count_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - - println!("Count is: {}", counter.count); - assert_eq!(counter.count, 2); - - aggregate.delete(*state.id()).await.expect("Failed to delete aggregate"); - - let counter_opt: Option = Counter::by_id(count_id, &pool) - .await - .expect("Failed to retrieve counter"); - - assert!(counter_opt.is_none()); - - println!("Counter has been deleted successfully"); -} diff --git a/examples/rebuild_shared_projection/Cargo.toml b/examples/rebuild_shared_projection/Cargo.toml deleted file mode 100644 index 256a92a7..00000000 --- a/examples/rebuild_shared_projection/Cargo.toml +++ /dev/null @@ -1,27 +0,0 @@ -[package] -authors = ["Oliver Browne = PoolOptions::new() - .connect(database_url.as_str()) - .await - .expect("Failed to create pool"); - let counter_id = Uuid::new_v4(); - - setup(&pool, counter_id).await; - - let store_a = AggregateA::new(&pool).await.unwrap().event_store; - let store_b = AggregateB::new(&pool).await.unwrap().event_store; - - let mut events_a = store_a.stream_events(&pool); - let mut events_b = store_b.stream_events(&pool); - - let mut event_a_opt: Option, CounterError>> = events_a.next().await; - let mut event_b_opt: Option, CounterError>> = events_b.next().await; - - let mut transaction: Transaction = pool.begin().await.expect("Failed to create transaction"); - - let _ = sqlx::query("TRUNCATE TABLE counters") - .execute(&mut *transaction) - .await - .unwrap(); - - loop { - let a_opt: Option<&StoreEvent> = event_a_opt.as_ref().map(|v| v.as_ref().unwrap()); - let b_opt: Option<&StoreEvent> = event_b_opt.as_ref().map(|v| v.as_ref().unwrap()); - - match (a_opt, b_opt) { - (Some(a), Some(b)) if a.occurred_on <= b.occurred_on => { - for projector in store_a.projectors().iter() { - projector.project(a, &mut *transaction).await.unwrap(); - } - - event_a_opt = events_a.next().await; - } - (Some(a), None) => { - for projector in store_a.projectors().iter() { - projector.project(a, &mut *transaction).await.unwrap(); - } - - event_a_opt = events_a.next().await; - } - (Some(_), Some(b)) | (None, Some(b)) => { - for projector in store_b.projectors().iter() { - projector.project(b, &mut *transaction).await.unwrap(); - } - - event_b_opt = events_b.next().await; - } - (None, None) => break, - }; - } - - transaction.commit().await.unwrap(); - - let counter = Counter::by_id(counter_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - - assert_eq!(counter.count_b, 1); - assert_eq!(counter.count_a, 1); -} - -async fn setup(pool: &Pool, shared_id: Uuid) { - sqlx::migrate!("./migrations") - .run(pool) - .await - .expect("Failed to run migrations"); - - // Construct the two aggregates - let agg_a = AggregateA::new(pool).await.expect("Failed to construct aggregate"); - let agg_b = AggregateB::new(pool).await.expect("Failed to construct aggregate"); - - // Increment each count once - let _ = agg_a - .handle_command(Default::default(), CommandA::Inner { shared_id }) - .await - .expect("Failed to handle command a"); - - let _ = agg_b - .handle_command(Default::default(), CommandB::Inner { shared_id }) - .await - .expect("Failed to handle command b"); -} diff --git a/examples/rebuilder/Cargo.toml b/examples/rebuild_strategies/Cargo.toml similarity index 76% rename from examples/rebuilder/Cargo.toml rename to examples/rebuild_strategies/Cargo.toml index 733a678c..b48380e4 100644 --- a/examples/rebuilder/Cargo.toml +++ b/examples/rebuild_strategies/Cargo.toml @@ -1,15 +1,17 @@ [package] -authors = ["Oliver Browne "] +description = "An example of different strategies to rebuild projections" edition = "2018" license = "MIT OR Apache-2.0" -name = "rebuilder" +name = "rebuild_strategies" version = "0.1.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +aggregate_merging = { version = "*", path = "../aggregate_merging/" } + # async main tokio = { version = "1.6", features = ["full"] } futures-util = "0.3" diff --git a/examples/rebuild_strategies/README.md b/examples/rebuild_strategies/README.md new file mode 100644 index 00000000..e47fd665 --- /dev/null +++ b/examples/rebuild_strategies/README.md @@ -0,0 +1,22 @@ +# Rebuild strategies + +In this example there are some strategies you may need when there's the need to rebuild one or more shared projections. + +## Rebuild single projection all at once + +In this example the user may want to delete the entire table and rebuild all its entries. This is done in a +transaction truncating all the table content and then rebuild all the events retrieved in the moment the transaction +is opened. + +## Rebuild single projection per aggregate id + +In this example the table is not locked in a transaction but the rebuild is made opening a transaction for every id +fetched by a pre-made query. In that transaction every row having a matching aggregate id is deleted and then +reprojected. + +## Rebuild shared projection streaming + +In this example the user need to rebuild two (or more) projections. This could be achieved by opening a transaction, +getting all the events from both the stores and deleting both the projection tables. After that (streaming all the +events previously got from the store) both the aggregates projectors run event by event reconstructing the two +projections. diff --git a/examples/rebuild_strategies/src/lib.rs b/examples/rebuild_strategies/src/lib.rs new file mode 100644 index 00000000..e3e8c338 --- /dev/null +++ b/examples/rebuild_strategies/src/lib.rs @@ -0,0 +1,163 @@ +use futures_util::stream::StreamExt; +use sqlx::{Pool, Postgres, Transaction}; +use uuid::Uuid; + +use aggregate_merging::aggregates::{AggregateA, AggregateB}; +use aggregate_merging::structs::{CounterError, EventA, EventB}; +use esrs::postgres::{PgStore, Projector}; +use esrs::{AggregateManager, EventStore, StoreEvent}; + +/// A simple example demonstrating rebuilding a single projection table from an aggregate. +pub async fn rebuild_single_projection_all_at_once(pool: Pool) { + let aggregate: AggregateA = AggregateA::new(&pool).await.unwrap(); + + // Put here all the projector you want to rebuild, but remember to add a truncate table statement + // for every table the projectors inside this vec insist on. + let projectors: Vec>> = vec![]; + + // Start the transaction + let mut transaction: Transaction = pool.begin().await.unwrap(); + + // Get all events from the event_store + let events: Vec> = aggregate + .event_store + .stream_events(&mut transaction) + .collect::, CounterError>>>() + .await + .into_iter() + .collect::>, CounterError>>() + .expect("Failed to get all events from event_table"); + + // From within the transaction truncate the projection table you are going to rebuild + sqlx::query("TRUNCATE TABLE counters") + .execute(&mut transaction) + .await + .expect("Failed to drop table"); + + // Then fully rebuild the projection table + for event in events { + for projector in projectors.iter() { + projector + .project(&event, &mut transaction) + .await + .expect("Failed to project event"); + } + } + + // And finally commit your transaction + transaction.commit().await.unwrap(); +} + +/// An alternative approach to rebuilding that rebuilds the projected table for a given projection one +/// aggregate ID at a time, rather than committing the entire table all at once +pub async fn rebuild_single_projection_per_aggregate_id(pool: Pool) { + let aggregate: AggregateA = AggregateA::new(&pool).await.unwrap(); + + // Put here all the projector for all the projections you want to rebuild + let projectors: Vec>> = vec![]; + + // Get all unique aggregate_ids from event_store table. This should be a sqlx::query statement. + let aggregate_ids: Vec = vec![Uuid::new_v4()]; + + // For every aggregate_id.. + for aggregate_id in aggregate_ids { + // .. open a transaction.. + let mut transaction: Transaction = pool.begin().await.unwrap(); + + // .. and for every projector.. + for projector in projectors.iter() { + // .. delete all the records in the projection that has that aggregate_id as key. In order + // to achieve this remember to override default `delete` implementation in the projector. + projector.delete(aggregate_id, &mut *transaction).await.unwrap(); + + // Then queries for all the events in the event store table.. + let events = aggregate.event_store().by_aggregate_id(aggregate_id).await.unwrap(); + + // .. and rebuild all those events. + for event in events { + projector + .project(&event, &mut transaction) + .await + .expect("Failed to project event"); + } + } + + // And finally commit your transaction + transaction.commit().await.unwrap(); + } +} + +/// A simple example demonstrating rebuilding a shared projection streaming on two different event +/// stores +pub async fn rebuild_shared_projection_streaming(pool: Pool) { + // Build both the stores + let store_a: PgStore = PgStore::new(pool.clone()); + let store_b: PgStore = PgStore::new(pool.clone()); + + // Put here all the projector from AggregateA you want to rebuild, but remember to add a truncate + // table statement for every table the projectors inside this vec insist on. + let projectors_a: Vec>> = vec![]; + // Put here all the projector from AggregateB you want to rebuild, but remember to add a truncate + // table statement for every table the projectors inside this vec insist on. + let projectors_b: Vec>> = vec![]; + + // Get two streams from both the tables + let mut events_a = store_a.stream_events(&pool); + let mut events_b = store_b.stream_events(&pool); + + // Fetch first element of both the tables + let mut event_a_opt: Option, CounterError>> = events_a.next().await; + let mut event_b_opt: Option, CounterError>> = events_b.next().await; + + // At this point is possible to open a transaction + let mut transaction: Transaction = pool.begin().await.unwrap(); + + // Truncate the shared projection table. + let _ = sqlx::query("TRUNCATE TABLE counters") + .execute(&mut *transaction) + .await + .unwrap(); + + loop { + let a_opt: Option<&StoreEvent> = event_a_opt.as_ref().map(|v| v.as_ref().unwrap()); + let b_opt: Option<&StoreEvent> = event_b_opt.as_ref().map(|v| v.as_ref().unwrap()); + + match (a_opt, b_opt) { + // If both the streams returned a value we check what's the oldest. If the oldest is a + // we proceed to run the projectors from AggregateA. + (Some(a), Some(b)) if a.occurred_on <= b.occurred_on => { + for projector in projectors_a.iter() { + projector.project(a, &mut *transaction).await.unwrap(); + } + + // Get next value from AggregateA events stream + event_a_opt = events_a.next().await; + } + // If only the stream on AggregateA events contains values we proceed to run the projectors + // from AggregateA. + (Some(a), None) => { + for projector in projectors_a.iter() { + projector.project(a, &mut *transaction).await.unwrap(); + } + + // Get next value from AggregateA events stream + event_a_opt = events_a.next().await; + } + // If both the streams returned a value and AggregateB event is older or if only the stream + // on AggregateB events contains values we proceed to run the projectors from AggregateB. + (Some(_), Some(b)) | (None, Some(b)) => { + for projector in projectors_b.iter() { + projector.project(b, &mut *transaction).await.unwrap(); + } + + // Get next value from AggregateB events stream + event_b_opt = events_b.next().await; + } + // If both the streams are empty then we break the loop. + (None, None) => break, + }; + } + + // Finally commit the transaction + transaction.commit().await.unwrap(); +} diff --git a/examples/rebuilder/README.md b/examples/rebuilder/README.md deleted file mode 100644 index 33ad55f4..00000000 --- a/examples/rebuilder/README.md +++ /dev/null @@ -1,2 +0,0 @@ -An example of rebuilding a projection, given an aggregate_id and a Projector. See `rebuild_all_at_once` in main.rs for -the implementation of the rebuilding itself - the rest is just setup to allow us to demonstrate. diff --git a/examples/rebuilder/migrations/20220824151818_counters.sql b/examples/rebuilder/migrations/20220824151818_counters.sql deleted file mode 100644 index 49c29709..00000000 --- a/examples/rebuilder/migrations/20220824151818_counters.sql +++ /dev/null @@ -1,4 +0,0 @@ -CREATE TABLE counters ( - "counter_id" UUID PRIMARY KEY NOT NULL, - "count" INTEGER NOT NULL -); diff --git a/examples/rebuilder/src/main.rs b/examples/rebuilder/src/main.rs deleted file mode 100644 index 875cf0a4..00000000 --- a/examples/rebuilder/src/main.rs +++ /dev/null @@ -1,140 +0,0 @@ -use std::fmt::Debug; - -use futures_util::stream::StreamExt; -use sqlx::migrate::MigrateDatabase; -use sqlx::{pool::PoolOptions, Pool, Postgres, Transaction}; -use uuid::Uuid; - -use esrs::postgres::Projector; -use esrs::{Aggregate, AggregateManager, AggregateState, EventStore, StoreEvent}; -use simple_projection::aggregate::CounterAggregate; -use simple_projection::projector::{Counter, CounterProjector}; -use simple_projection::structs::*; - -// Rebuild the projection of a single aggregation, given the aggregate, an aggregate ID, a list of -// projectors to rebuild and a pool connection this rebuilds the projections for all aggregate ids -// in a single transaction. An alternative (see _rebuild_per_id, below) is -// to rebuild on a per-id basis. -async fn rebuild_all_at_once( - events: Vec>, - projectors: &[Box + Send + Sync>], - transaction: &mut Transaction<'_, Postgres>, -) where - A: AggregateManager, - ::Error: Debug, -{ - for event in events { - for projector in projectors { - projector - .project(&event, &mut *transaction) - .await - .expect("Failed to project event"); - } - } -} - -// An alternative approach to rebuilding that rebuilds the projected table for a given projection one -// aggregate ID at a time, rather than committing the entire table all at once -async fn _rebuild_per_id( - aggregate: &A, - ids: Vec, - projectors: &[Box + Send + Sync>], - pool: &Pool, -) where - A: AggregateManager, - ::Error: Debug, -{ - for id in ids { - let mut transaction = pool.begin().await.unwrap(); - - for projector in projectors { - projector.delete(id, &mut *transaction).await.unwrap(); - - let events = aggregate.event_store().by_aggregate_id(id).await.unwrap(); - - for event in events { - projector - .project(&event, &mut transaction) - .await - .expect("Failed to project event"); - } - } - - transaction.commit().await.unwrap(); - } -} - -// A simple example demonstrating rebuilding a read-side projection from an event -// stream -#[tokio::main] -async fn main() { - let database_url: String = std::env::var("DATABASE_URL").expect("DATABASE_URL variable not set"); - - Postgres::drop_database(database_url.as_str()).await.unwrap(); - Postgres::create_database(database_url.as_str()).await.unwrap(); - - let pool: Pool = PoolOptions::new() - .connect(database_url.as_str()) - .await - .expect("Failed to create pool"); - sqlx::migrate!("./migrations") - .run(&pool) - .await - .expect("Failed to run migrations"); - - let count_id = Uuid::new_v4(); - - // Construct the aggregation, and some nil state for it - let aggregate = CounterAggregate::new(&pool) - .await - .expect("Failed to construct aggregate"); - let state = AggregateState::new(count_id); - - // Increment counter three times - let state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - let state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - let _state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - - let projectors: Vec + Send + Sync>> = vec![Box::new(CounterProjector {})]; - - let mut transaction = pool.begin().await.unwrap(); - - let events: Vec> = aggregate - .event_store - .stream_events(&mut transaction) - .collect::, CounterError>>>() - .await - .into_iter() - .collect::>, CounterError>>() - .expect("Failed to get all events from event_table"); - - sqlx::query("TRUNCATE TABLE counters") - .execute(&mut transaction) - .await - .expect("Failed to drop table"); - - // Assert the counter doesn't exist - let res = Counter::by_id(count_id, &mut transaction).await.expect("Query failed"); - assert!(res.is_none()); - - rebuild_all_at_once(events, projectors.as_slice(), &mut transaction).await; - - transaction.commit().await.unwrap(); - - // Assert the counter has been rebuilt - let res = Counter::by_id(count_id, &pool) - .await - .expect("Query failed") - .expect("counter not found"); - - assert!(res.counter_id == count_id && res.count == 3); -} diff --git a/examples/simple_projection/migrations/20220824151818_counters.sql b/examples/simple_projection/migrations/20220824151818_counters.sql deleted file mode 100644 index 49c29709..00000000 --- a/examples/simple_projection/migrations/20220824151818_counters.sql +++ /dev/null @@ -1,4 +0,0 @@ -CREATE TABLE counters ( - "counter_id" UUID PRIMARY KEY NOT NULL, - "count" INTEGER NOT NULL -); diff --git a/examples/simple_projection/src/main.rs b/examples/simple_projection/src/main.rs deleted file mode 100644 index 81f4e0a7..00000000 --- a/examples/simple_projection/src/main.rs +++ /dev/null @@ -1,78 +0,0 @@ -use sqlx::migrate::MigrateDatabase; -use sqlx::{pool::PoolOptions, Pool, Postgres}; -use uuid::Uuid; - -use esrs::{AggregateManager, AggregateState}; -use simple_projection::{aggregate::CounterAggregate, projector::Counter, structs::CounterCommand}; - -#[tokio::main] -async fn main() { - let database_url: String = std::env::var("DATABASE_URL").expect("DATABASE_URL variable not set"); - - Postgres::drop_database(database_url.as_str()).await.unwrap(); - Postgres::create_database(database_url.as_str()).await.unwrap(); - - let pool: Pool = PoolOptions::new() - .connect(database_url.as_str()) - .await - .expect("Failed to create pool"); - - sqlx::migrate!("./migrations") - .run(&pool) - .await - .expect("Failed to run migrations"); - - let count_id = Uuid::new_v4(); - - // Construct the aggregation, and some nil state for it - let aggregate = CounterAggregate::new(&pool) - .await - .expect("Failed to construct aggregate"); - let state = AggregateState::new(count_id); - - // Increment counter once - let state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - - // Retrieve counter projection from database and print - let counter = Counter::by_id(count_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - println!("Count is: {}", counter.count); - assert_eq!(counter.count, 1); - - // Increment counter twice - let state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - let state = aggregate - .handle_command(state, CounterCommand::Increment) - .await - .expect("Failed to handle increment command"); - - // Retrieve counter projection from database and print - let counter = Counter::by_id(count_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - println!("Count is: {}", counter.count); - assert_eq!(counter.count, 3); - - // Decrement counter once - let _state = aggregate - .handle_command(state, CounterCommand::Decrement) - .await - .expect("Failed to handle increment command"); - - // Retrieve counter projection from database and print - let counter = Counter::by_id(count_id, &pool) - .await - .expect("Failed to retrieve counter") - .expect("Failed to find counter"); - println!("Count is: {}", counter.count); - assert_eq!(counter.count, 2); -} diff --git a/examples/simple_saga/migrations/README.md b/examples/simple_saga/migrations/README.md deleted file mode 100644 index cc91050d..00000000 --- a/examples/simple_saga/migrations/README.md +++ /dev/null @@ -1 +0,0 @@ -This example requires no migrations, as there is no projection for which a table must be set up \ No newline at end of file diff --git a/examples/simple_saga/src/aggregate.rs b/examples/simple_saga/src/lib.rs similarity index 82% rename from examples/simple_saga/src/aggregate.rs rename to examples/simple_saga/src/lib.rs index 4fe17563..96f41aaa 100644 --- a/examples/simple_saga/src/aggregate.rs +++ b/examples/simple_saga/src/lib.rs @@ -1,12 +1,12 @@ use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use sqlx::{Pool, Postgres}; +use thiserror::Error; use uuid::Uuid; use esrs::postgres::PgStore; use esrs::{Aggregate, AggregateManager, AggregateState, Policy, StoreEvent}; -use crate::structs::{LoggingCommand, LoggingError, LoggingEvent}; - #[derive(Clone)] pub struct LoggingAggregate { event_store: PgStore, @@ -26,47 +26,6 @@ impl LoggingAggregate { } } -// A very simply policy, which tries to log an event, and creates another command after it finishes -// which indicates success or failure to log -#[derive(Clone)] -struct LoggingPolicy { - aggregate: LoggingAggregate, -} - -impl LoggingPolicy { - pub fn new(aggregate: LoggingAggregate) -> Self { - Self { aggregate } - } -} - -#[async_trait] -impl Policy for LoggingPolicy { - async fn handle_event(&self, event: &StoreEvent) -> Result<(), LoggingError> { - let aggregate_id: Uuid = event.aggregate_id; - - let aggregate_state: AggregateState = self - .aggregate - .load(aggregate_id) - .await - .unwrap_or_else(|| AggregateState::new(aggregate_id)); // This should never happen - - if let LoggingEvent::Received(msg) = event.payload() { - if msg.contains("fail_policy") { - self.aggregate - .handle_command(aggregate_state, LoggingCommand::Fail) - .await?; - return Err(LoggingError::Domain(msg.clone())); - } - println!("Logged via policy from {}: {}", aggregate_id, msg); - self.aggregate - .handle_command(aggregate_state, LoggingCommand::Succeed) - .await?; - } - - Ok(()) - } -} - impl Aggregate for LoggingAggregate { type State = u64; type Command = LoggingCommand; @@ -114,3 +73,74 @@ impl AggregateManager for LoggingAggregate { &self.event_store } } + +// A very simply policy, which tries to log an event, and creates another command after it finishes +// which indicates success or failure to log +#[derive(Clone)] +struct LoggingPolicy { + aggregate: LoggingAggregate, +} + +impl LoggingPolicy { + pub fn new(aggregate: LoggingAggregate) -> Self { + Self { aggregate } + } +} + +#[async_trait] +impl Policy for LoggingPolicy { + async fn handle_event(&self, event: &StoreEvent) -> Result<(), LoggingError> { + let aggregate_id: Uuid = event.aggregate_id; + + let aggregate_state: AggregateState = self + .aggregate + .load(aggregate_id) + .await + .unwrap_or_else(|| AggregateState::new(aggregate_id)); // This should never happen + + if let LoggingEvent::Received(msg) = event.payload() { + if msg.contains("fail_policy") { + self.aggregate + .handle_command(aggregate_state, LoggingCommand::Fail) + .await?; + return Err(LoggingError::Domain(msg.clone())); + } + println!("Logged via policy from {}: {}", aggregate_id, msg); + self.aggregate + .handle_command(aggregate_state, LoggingCommand::Succeed) + .await?; + } + + Ok(()) + } +} + +// A simple error enum for event processing errors +#[derive(Debug, Error)] +pub enum LoggingError { + #[error("Err {0}")] + Domain(String), + + #[error(transparent)] + Json(#[from] esrs::error::JsonError), + + #[error(transparent)] + Sql(#[from] esrs::error::SqlxError), +} + +// The events to be processed. On receipt of a new log message, the aggregate stores a Received event. +// If the message contained within can be logged, it is, and then a Succeeded event is produced, otherwise +// a Failed event is produced. +#[derive(Serialize, Deserialize, Debug)] +pub enum LoggingEvent { + Received(String), + Succeeded, + Failed, +} + +// The aggregate receives commands to log a message +pub enum LoggingCommand { + TryLog(String), + Succeed, + Fail, +} diff --git a/examples/simple_saga/src/main.rs b/examples/simple_saga/src/main.rs deleted file mode 100644 index 8e1c7de4..00000000 --- a/examples/simple_saga/src/main.rs +++ /dev/null @@ -1,59 +0,0 @@ -use sqlx::migrate::MigrateDatabase; -use sqlx::{pool::PoolOptions, Pool, Postgres}; -use uuid::Uuid; - -use esrs::{AggregateManager, AggregateState, EventStore}; - -use crate::{aggregate::LoggingAggregate, structs::LoggingCommand}; - -pub mod aggregate; -pub mod structs; - -#[tokio::main] -async fn main() { - let database_url: String = std::env::var("DATABASE_URL").expect("DATABASE_URL variable not set"); - - Postgres::drop_database(database_url.as_str()).await.unwrap(); - Postgres::create_database(database_url.as_str()).await.unwrap(); - - let pool: Pool = PoolOptions::new() - .connect(database_url.as_str()) - .await - .expect("Failed to create pool"); - - let logger_id = Uuid::new_v4(); - - // Construct the aggregation, and some null state for it - let aggregate = LoggingAggregate::new(&pool) - .await - .expect("Failed to construct aggregate"); - let state = AggregateState::new(logger_id); - - // Log some messages - let state = aggregate - .handle_command(state, LoggingCommand::TryLog(String::from("First logging message"))) - .await - .expect("Failed to log message"); - - // Due to how the saga pattern is implemented (with policies passing commands to the aggregate during another - // commands handling), the state we get back is always invalid, so we need to retrieve it from the DB. - // To demonstrate, 2 events exist in the event store, but if we check the state we get back from our call to - // handle_command, it only shows 1 event as applied. Loading our state from the DB again, we see the correct - // value of 2: - let events = aggregate - .event_store() - .by_aggregate_id(logger_id) - .await - .expect("Failed to get events"); - - assert_eq!(events.len(), 2); // 2 events in the store, 1 from our command and 1 from the policy - assert_eq!(*state.inner(), 1); // However, the state we get back only has 1 event applied (it isn't valid) - let state = aggregate.load(logger_id).await.expect("Failed to load state"); - assert_eq!(*state.inner(), 2); // On loading the state from the DB, we get the correct number of applied events - - // Now we can use the newly loaded state to log another message, but we drop the invalid returned state - let _ = aggregate - .handle_command(state, LoggingCommand::TryLog(String::from("Second logging message"))) - .await - .expect("Failed to log message"); -} diff --git a/examples/simple_saga/src/structs.rs b/examples/simple_saga/src/structs.rs deleted file mode 100644 index 6d973b20..00000000 --- a/examples/simple_saga/src/structs.rs +++ /dev/null @@ -1,32 +0,0 @@ -use serde::{Deserialize, Serialize}; -use thiserror::Error; - -// A simple error enum for event processing errors -#[derive(Debug, Error)] -pub enum LoggingError { - #[error("Err {0}")] - Domain(String), - - #[error(transparent)] - Json(#[from] esrs::error::JsonError), - - #[error(transparent)] - Sql(#[from] esrs::error::SqlxError), -} - -// The events to be processed. On receipt of a new log message, the aggregate stores a Received event. -// If the message contained within can be logged, it is, and then a Succeeded event is produced, otherwise -// a Failed event is produced. -#[derive(Serialize, Deserialize, Debug)] -pub enum LoggingEvent { - Received(String), - Succeeded, - Failed, -} - -// The aggregate receives commands to log a message -pub enum LoggingCommand { - TryLog(String), - Succeed, - Fail, -} diff --git a/examples/simple_side_effect/migrations/README.md b/examples/simple_side_effect/migrations/README.md deleted file mode 100644 index cc91050d..00000000 --- a/examples/simple_side_effect/migrations/README.md +++ /dev/null @@ -1 +0,0 @@ -This example requires no migrations, as there is no projection for which a table must be set up \ No newline at end of file diff --git a/examples/simple_side_effect/src/aggregate.rs b/examples/simple_side_effect/src/lib.rs similarity index 85% rename from examples/simple_side_effect/src/aggregate.rs rename to examples/simple_side_effect/src/lib.rs index 3a22ad01..4c39e5ec 100644 --- a/examples/simple_side_effect/src/aggregate.rs +++ b/examples/simple_side_effect/src/lib.rs @@ -1,10 +1,56 @@ use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use sqlx::{PgConnection, Pool, Postgres}; +use thiserror::Error; use esrs::postgres::{PgStore, Projector}; use esrs::{Aggregate, AggregateManager, Policy, StoreEvent}; -use crate::structs::{LoggingCommand, LoggingError, LoggingEvent}; +pub struct LoggingAggregate { + event_store: PgStore, +} + +impl LoggingAggregate { + pub async fn new(pool: &Pool) -> Result { + let event_store: PgStore = PgStore::new(pool.clone()) + .set_projectors(vec![Box::new(LoggingProjector)]) + .set_policies(vec![Box::new(LoggingPolicy)]) + .setup() + .await?; + + Ok(Self { event_store }) + } +} + +impl Aggregate for LoggingAggregate { + type State = u64; + type Command = LoggingCommand; + type Event = LoggingEvent; + type Error = LoggingError; + + fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { + match command { + Self::Command::Log(msg) => Ok(vec![Self::Event::Logged(msg)]), + } + } + + fn apply_event(state: Self::State, _: Self::Event) -> Self::State { + // This aggregate state just counts the number of applied - equivalent to the number in the event store + state + 1 + } +} + +impl AggregateManager for LoggingAggregate { + type EventStore = PgStore; + + fn name() -> &'static str { + "message" + } + + fn event_store(&self) -> &Self::EventStore { + &self.event_store + } +} // Here's a simple projection that just carries out a side effect, and can fail, // causing the event not to be written to the event store. In this case the @@ -55,48 +101,26 @@ impl Policy for LoggingPolicy { } } -pub struct LoggingAggregate { - event_store: PgStore, -} +// A simple error enum for event processing errors +#[derive(Debug, Error)] +pub enum LoggingError { + #[error("[Err {0}]")] + Domain(String), -impl LoggingAggregate { - pub async fn new(pool: &Pool) -> Result { - let event_store: PgStore = PgStore::new(pool.clone()) - .set_projectors(vec![Box::new(LoggingProjector)]) - .set_policies(vec![Box::new(LoggingPolicy)]) - .setup() - .await?; + #[error(transparent)] + Json(#[from] esrs::error::JsonError), - Ok(Self { event_store }) - } + #[error(transparent)] + Sql(#[from] esrs::error::SqlxError), } -impl Aggregate for LoggingAggregate { - type State = u64; - type Command = LoggingCommand; - type Event = LoggingEvent; - type Error = LoggingError; - - fn handle_command(_state: &Self::State, command: Self::Command) -> Result, Self::Error> { - match command { - Self::Command::Log(msg) => Ok(vec![Self::Event::Logged(msg)]), - } - } - - fn apply_event(state: Self::State, _: Self::Event) -> Self::State { - // This aggregate state just counts the number of applied - equivalent to the number in the event store - state + 1 - } +// The events to be processed +#[derive(Serialize, Deserialize, Debug)] +pub enum LoggingEvent { + Logged(String), } -impl AggregateManager for LoggingAggregate { - type EventStore = PgStore; - - fn name() -> &'static str { - "message" - } - - fn event_store(&self) -> &Self::EventStore { - &self.event_store - } +// The commands received by the application, which will produce the events +pub enum LoggingCommand { + Log(String), } diff --git a/examples/simple_side_effect/src/main.rs b/examples/simple_side_effect/src/main.rs deleted file mode 100644 index 8018756c..00000000 --- a/examples/simple_side_effect/src/main.rs +++ /dev/null @@ -1,72 +0,0 @@ -use sqlx::migrate::MigrateDatabase; -use sqlx::{pool::PoolOptions, Pool, Postgres}; -use uuid::Uuid; - -use esrs::{AggregateManager, AggregateState}; - -use crate::{aggregate::LoggingAggregate, structs::LoggingCommand}; - -pub mod aggregate; -pub mod structs; - -#[tokio::main] -async fn main() { - let database_url: String = std::env::var("DATABASE_URL").expect("DATABASE_URL variable not set"); - - Postgres::drop_database(database_url.as_str()).await.unwrap(); - Postgres::create_database(database_url.as_str()).await.unwrap(); - - let pool: Pool = PoolOptions::new() - .connect(database_url.as_str()) - .await - .expect("Failed to create pool"); - - let logger_id = Uuid::new_v4(); - - // Construct the aggregation, and some null state for it - let aggregate = LoggingAggregate::new(&pool) - .await - .expect("Failed to construct aggregate"); - let state = AggregateState::new(logger_id); - - // Log some messages - let state = aggregate - .handle_command(state, LoggingCommand::Log(String::from("First logging message"))) - .await - .expect("Failed to log message"); - let state = aggregate - .handle_command(state, LoggingCommand::Log(String::from("Second logging message"))) - .await - .expect("Failed to log message"); - let state = aggregate - .handle_command(state, LoggingCommand::Log(String::from("Third logging message"))) - .await - .expect("Failed to log message"); - - // Lets cause a projection error, to illustrate the difference between - // projection and policy errors, from an AggregateState perspective - let res = aggregate - .handle_command( - state, - LoggingCommand::Log(String::from("This will fail since it contains fail_projection")), - ) - .await; - assert!(res.is_err()); - // We now need to rebuild the event state - and we'll see that there are only 3 events - let state = aggregate.load(logger_id).await.expect("Failed to load aggregate state"); - assert_eq!(*state.inner(), 3); - - // Now we'll cause a policy error. This error is silenced by the `AggregateManager::store_events` - // actual impl. It is overridable - let res = aggregate - .handle_command( - state, - LoggingCommand::Log(String::from("This will fail since it contains fail_policy")), - ) - .await; - assert!(res.is_ok()); - - // We now need to rebuild the event state - and we'll see that there are 4 events - let state = aggregate.load(logger_id).await.expect("Failed to load aggregate state"); - assert_eq!(*state.inner(), 4); -} diff --git a/examples/simple_side_effect/src/structs.rs b/examples/simple_side_effect/src/structs.rs deleted file mode 100644 index 22f6e98d..00000000 --- a/examples/simple_side_effect/src/structs.rs +++ /dev/null @@ -1,26 +0,0 @@ -use serde::{Deserialize, Serialize}; -use thiserror::Error; - -// A simple error enum for event processing errors -#[derive(Debug, Error)] -pub enum LoggingError { - #[error("[Err {0}]")] - Domain(String), - - #[error(transparent)] - Json(#[from] esrs::error::JsonError), - - #[error(transparent)] - Sql(#[from] esrs::error::SqlxError), -} - -// The events to be processed -#[derive(Serialize, Deserialize, Debug)] -pub enum LoggingEvent { - Logged(String), -} - -// The commands received by the application, which will produce the events -pub enum LoggingCommand { - Log(String), -}