From 5e63ca25a2dcc166da51482ff7cceba62d5cdc2b Mon Sep 17 00:00:00 2001 From: Cristiano Piemontese Date: Fri, 21 Apr 2023 12:27:38 +0200 Subject: [PATCH 1/6] add queries --- src/esrs/postgres/store.rs | 88 ++++++++++++++++++-------------------- src/esrs/query.rs | 47 ++++++++++++++++++++ 2 files changed, 88 insertions(+), 47 deletions(-) create mode 100644 src/esrs/query.rs diff --git a/src/esrs/postgres/store.rs b/src/esrs/postgres/store.rs index 95ac51b4..a9dbd461 100644 --- a/src/esrs/postgres/store.rs +++ b/src/esrs/postgres/store.rs @@ -10,19 +10,18 @@ use futures::StreamExt; use sqlx::pool::PoolConnection; use sqlx::postgres::{PgAdvisoryLock, PgAdvisoryLockGuard, PgAdvisoryLockKey, PgQueryResult}; use sqlx::types::Json; -use sqlx::{Executor, Pool, Postgres, Transaction}; +use sqlx::{Executor, PgConnection, Pool, Postgres, Transaction}; use uuid::Uuid; -use crate::esrs::policy; -use crate::esrs::postgres::projector::ProjectorPersistence; +use crate::esrs::query; use crate::esrs::store::{EventStoreLockGuard, UnlockOnDrop}; use crate::types::SequenceNumber; use crate::{Aggregate, AggregateManager, AggregateState, EventStore, StoreEvent}; -use super::{event, projector, statement::Statements}; +use super::{event, statement::Statements}; -type Projector = Box + Send + Sync>; -type Policy = Box + Send + Sync>; +type Query = Box + Send + Sync>; +type TransactionalQuery = Box + Send + Sync>; /// Default Postgres implementation for the [`EventStore`]. Use this struct in order to have a /// pre-made implementation of an [`EventStore`] persisting on Postgres. @@ -43,8 +42,8 @@ where { pool: Pool, statements: Statements, - projectors: ArcSwap>>, - policies: ArcSwap>>, + queries: ArcSwap>>, + transactional_queries: ArcSwap>>, } impl PgStore @@ -59,22 +58,22 @@ where let inner: InnerPgStore = InnerPgStore { pool, statements: Statements::new::(), - projectors: ArcSwap::from_pointee(vec![]), - policies: ArcSwap::from_pointee(vec![]), + queries: ArcSwap::from_pointee(vec![]), + transactional_queries: ArcSwap::from_pointee(vec![]), }; Self { inner: Arc::new(inner) } } - /// Set the list of projectors to the store - pub fn set_projectors(self, projectors: Vec>) -> Self { - self.inner.projectors.store(Arc::new(projectors)); + /// Set the list of (non transactional) queries to the store + pub fn set_queries(self, queries: Vec>) -> Self { + self.inner.queries.store(Arc::new(queries)); self } - /// Set the list of policies to the store - pub fn set_policies(self, policies: Vec>) -> Self { - self.inner.policies.store(Arc::new(policies)); + /// Set the list of transactional queries to the store + pub fn set_transactional_queries(self, queries: Vec>) -> Self { + self.inner.transactional_queries.store(Arc::new(queries)); self } @@ -160,14 +159,14 @@ where /// This function returns the list of all projections added to this store. This function should /// mostly used while creating a custom persistence flow using [`PgStore::persist`]. - pub fn projectors(&self) -> Arc>> { - self.inner.projectors.load().clone() + pub fn transactional_queries(&self) -> Arc>> { + self.inner.transactional_queries.load().clone() } /// This function returns the list of all policies added to this store. This function should /// mostly used while creating a custom persistence flow using [`PgStore::persist`]. - pub fn policies(&self) -> Arc>> { - self.inner.policies.load().clone() + pub fn queries(&self) -> Arc>> { + self.inner.queries.load().clone() } /// This function could be used in order to customize the way the store persist the events. @@ -266,30 +265,26 @@ where } // Acquiring the list of projectors early, as it is an expensive operation. - let projectors = self.projectors(); + let transactional_queries = self.transactional_queries(); for store_event in &store_events { - for projector in projectors.iter() { + for transactional_query in transactional_queries.iter() { let span = tracing::trace_span!( - "esrs.project_event", + "esrs.transactional_query", event_id = %store_event.id, aggregate_id = %store_event.aggregate_id, - persistence = projector.persistence().as_ref(), - projector = projector.name() + query = transactional_query.name() ); let _e = span.enter(); - if let Err(error) = projector.project(store_event, &mut transaction).await { + if let Err(error) = transactional_query.handle(store_event, &mut transaction).await { tracing::error!({ event_id = %store_event.id, aggregate_id = %store_event.aggregate_id, - projector = projector.name(), - persistence = projector.persistence().as_ref(), + query = transactional_query.name(), error = ?error, - }, "projector failed to project event"); + }, "transactional query failed to handle event"); - if let ProjectorPersistence::Mandatory = projector.persistence() { - return Err(error); - } + return Err(error); } } } @@ -301,26 +296,20 @@ where // 2. the policies below might need to access this aggregate atomically (causing a deadlock!). drop(aggregate_state.take_lock()); - // Acquiring the list of policies early, as it is an expensive operation. - let policies = self.policies(); + // Acquiring the list of queries early, as it is an expensive operation. + let queries = self.queries(); for store_event in &store_events { - for policy in policies.iter() { + // NOTE: should this be parallelized? + for query in queries.iter() { let span = tracing::debug_span!( - "esrs.apply_policy", + "esrs.query", event_id = %store_event.id, aggregate_id = %store_event.aggregate_id, - policy = policy.name() + query = query.name() ); let _e = span.enter(); - if let Err(error) = policy.handle_event(store_event).await { - tracing::error!({ - event_id = %store_event.id, - aggregate_id = %store_event.aggregate_id, - policy = policy.name(), - error = ?error, - }, "policy failed to handle event") - } + query.handle(store_event).await; } } @@ -336,12 +325,17 @@ where .await .map(|_| ())?; - for projector in self.projectors().iter() { - projector.delete(aggregate_id, &mut transaction).await?; + for transactional_query in self.transactional_queries().iter() { + transactional_query.delete(aggregate_id, &mut transaction).await?; } transaction.commit().await?; + // NOTE: should this be parallelized? + for query in self.queries().iter() { + query.delete(aggregate_id).await; + } + Ok(()) } } diff --git a/src/esrs/query.rs b/src/esrs/query.rs new file mode 100644 index 00000000..e548ff8c --- /dev/null +++ b/src/esrs/query.rs @@ -0,0 +1,47 @@ +use async_trait::async_trait; +use uuid::Uuid; + +use crate::{AggregateManager, StoreEvent}; + +#[async_trait] +pub trait Query: Send + Sync { + async fn handle(&self, event: &StoreEvent); + + async fn delete(&self, _aggregate_id: Uuid) {} + + /// The name of the projector. By default, this is the type name of the projector, + /// but it can be overridden to provide a custom name. This name is used as + /// part of tracing spans, to identify the projector being run. + fn name(&self) -> &'static str { + std::any::type_name::() + } +} +// +// #[async_trait] +// impl Query for T +// where +// AM: AggregateManager, +// M::Event: Send + Sync, +// Q: Query, +// T: Deref + Send + Sync, +// { +// async fn handle(&self, event: StoreEvent) { +// self.deref().handle(event).await; +// } +// } + +#[async_trait] +pub trait TransactionalQuery { + async fn handle(&self, event: &StoreEvent, executor: &mut E) -> Result<(), AM::Error>; + + async fn delete(&self, aggregate_id: Uuid, executor: &mut E) -> Result<(), AM::Error>; + + /// The name of the projector. By default, this is the type name of the projector, + /// but it can be overridden to provide a custom name. This name is used as + /// part of tracing spans, to identify the projector being run. + fn name(&self) -> &'static str { + std::any::type_name::() + } +} + +pub trait QueryError: std::error::Error {} From 7e8a6d77e556dda7efcd7641405c5029dbd27b66 Mon Sep 17 00:00:00 2001 From: Cristiano Piemontese Date: Fri, 21 Apr 2023 12:31:45 +0200 Subject: [PATCH 2/6] refactor test --- src/esrs/postgres/tests/mod.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/esrs/postgres/tests/mod.rs b/src/esrs/postgres/tests/mod.rs index 7cd1c6e4..16a0831b 100644 --- a/src/esrs/postgres/tests/mod.rs +++ b/src/esrs/postgres/tests/mod.rs @@ -5,8 +5,9 @@ use chrono::{DateTime, Utc}; use sqlx::{PgConnection, Pool, Postgres}; use uuid::Uuid; -use crate::postgres::{PgStore, Projector}; -use crate::{Aggregate, AggregateManager, AggregateState, EventStore, Policy, StoreEvent}; +use crate::esrs::query::{Query, TransactionalQuery}; +use crate::postgres::PgStore; +use crate::{Aggregate, AggregateManager, AggregateState, EventStore, StoreEvent}; #[sqlx::test] fn setup_database_test(pool: Pool) { @@ -136,7 +137,7 @@ fn persist_multiple_events_test(pool: Pool) { #[sqlx::test] fn event_projection_test(pool: Pool) { let store: PgStore = PgStore::new(pool.clone()) - .set_projectors(vec![Box::new(TestProjector {})]) + .set_transactional_queries(vec![Box::new(TestTransactionalQuery {})]) .setup() .await .unwrap(); @@ -165,7 +166,7 @@ fn event_projection_test(pool: Pool) { #[sqlx::test] fn delete_store_events_and_projections_test(pool: Pool) { let store: PgStore = PgStore::new(pool.clone()) - .set_projectors(vec![Box::new(TestProjector {})]) + .set_transactional_queries(vec![Box::new(TestTransactionalQuery {})]) .setup() .await .unwrap(); @@ -209,12 +210,12 @@ fn delete_store_events_and_projections_test(pool: Pool) { #[sqlx::test] fn policy_test(pool: Pool) { let last_id: Arc> = Arc::new(Mutex::new(Uuid::default())); - let policy: Box = Box::new(TestPolicy { + let query: Box = Box::new(TestQuery { last_id: last_id.clone(), }); let store: PgStore = PgStore::new(pool.clone()) - .set_policies(vec![policy]) + .set_queries(vec![query]) .setup() .await .unwrap(); @@ -306,11 +307,11 @@ impl AggregateManager for TestAggregate { } #[derive(Clone)] -struct TestProjector; +struct TestTransactionalQuery; #[async_trait::async_trait] -impl Projector for TestProjector { - async fn project(&self, event: &StoreEvent, connection: &mut PgConnection) -> Result<(), TestError> { +impl TransactionalQuery for TestTransactionalQuery { + async fn handle(&self, event: &StoreEvent, connection: &mut PgConnection) -> Result<(), TestError> { Ok( sqlx::query("INSERT INTO test_projection (id, projection_id) VALUES ($1, $2)") .bind(event.payload.id) @@ -337,15 +338,14 @@ struct ProjectionRow { } #[derive(Clone)] -struct TestPolicy { +struct TestQuery { last_id: Arc>, } #[async_trait::async_trait] -impl Policy for TestPolicy { - async fn handle_event(&self, event: &StoreEvent) -> Result<(), TestError> { +impl Query for TestQuery { + async fn handle(&self, event: &StoreEvent) { let mut guard = self.last_id.lock().unwrap(); *guard = event.payload.id; - Ok(()) } } From 18562e5ea56327c9351341ee27b910230c5beec2 Mon Sep 17 00:00:00 2001 From: Cristiano Piemontese Date: Fri, 21 Apr 2023 12:57:42 +0200 Subject: [PATCH 3/6] port examples to queries --- examples/aggregate_merging/src/aggregates.rs | 6 +-- examples/aggregate_merging/src/projectors.rs | 13 +++--- .../src/aggregate.rs | 19 ++++---- .../src/projector.rs | 9 ++-- examples/delete_aggregate/src/aggregate.rs | 4 +- examples/delete_aggregate/src/projector.rs | 9 ++-- examples/simple_projection/src/aggregate.rs | 4 +- examples/simple_projection/src/projector.rs | 9 ++-- examples/simple_saga/src/lib.rs | 46 +++++++++++-------- examples/simple_side_effect/src/lib.rs | 25 +++++----- src/esrs/mod.rs | 1 + src/esrs/query.rs | 9 +++- src/lib.rs | 1 + 13 files changed, 81 insertions(+), 74 deletions(-) diff --git a/examples/aggregate_merging/src/aggregates.rs b/examples/aggregate_merging/src/aggregates.rs index cb81a61d..824ae847 100644 --- a/examples/aggregate_merging/src/aggregates.rs +++ b/examples/aggregate_merging/src/aggregates.rs @@ -3,7 +3,7 @@ use sqlx::{Pool, Postgres}; use esrs::postgres::PgStore; use esrs::{Aggregate, AggregateManager}; -use crate::projectors::CounterProjector; +use crate::projectors::CounterTransactionalQuery; use crate::structs::{CommandA, CommandB, CounterError, EventA, EventB}; // We use a template here to make instantiating the near-identical @@ -15,7 +15,7 @@ pub struct AggregateA { impl AggregateA { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_projectors(vec![Box::new(CounterProjector)]) + .set_transactional_queries(vec![Box::new(CounterTransactionalQuery)]) .setup() .await?; @@ -63,7 +63,7 @@ pub struct AggregateB { impl AggregateB { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_projectors(vec![Box::new(CounterProjector)]) + .set_transactional_queries(vec![Box::new(CounterTransactionalQuery)]) .setup() .await?; diff --git a/examples/aggregate_merging/src/projectors.rs b/examples/aggregate_merging/src/projectors.rs index 89a1ba74..f60081e7 100644 --- a/examples/aggregate_merging/src/projectors.rs +++ b/examples/aggregate_merging/src/projectors.rs @@ -2,19 +2,18 @@ use async_trait::async_trait; use sqlx::{Executor, PgConnection, Postgres}; use uuid::Uuid; -use esrs::postgres::Projector; -use esrs::StoreEvent; +use esrs::{StoreEvent, TransactionalQuery}; use crate::aggregates::{AggregateA, AggregateB}; use crate::structs::{CounterError, EventA, EventB}; #[derive(Clone)] -pub struct CounterProjector; +pub struct CounterTransactionalQuery; // This is a projector template that will project AggregateA events into a shared projection (DB table). #[async_trait] -impl Projector for CounterProjector { - async fn project(&self, event: &StoreEvent, connection: &mut PgConnection) -> Result<(), CounterError> { +impl TransactionalQuery for CounterTransactionalQuery { + async fn handle(&self, event: &StoreEvent, connection: &mut PgConnection) -> Result<(), CounterError> { match event.payload() { EventA::Inner { shared_id: id } => { let existing = Counter::by_id(*id, &mut *connection).await?; @@ -33,8 +32,8 @@ impl Projector for CounterProjector { // This is a projector template that will project AggregateB events into a shared projection (DB table). #[async_trait] -impl Projector for CounterProjector { - async fn project(&self, event: &StoreEvent, connection: &mut PgConnection) -> Result<(), CounterError> { +impl TransactionalQuery for CounterTransactionalQuery { + async fn handle(&self, event: &StoreEvent, connection: &mut PgConnection) -> Result<(), CounterError> { match event.payload() { EventB::Inner { shared_id: id } => { let existing = Counter::by_id(*id, &mut *connection).await?; diff --git a/examples/customize_persistence_flow/src/aggregate.rs b/examples/customize_persistence_flow/src/aggregate.rs index dcaf84b6..542535c6 100644 --- a/examples/customize_persistence_flow/src/aggregate.rs +++ b/examples/customize_persistence_flow/src/aggregate.rs @@ -8,7 +8,7 @@ use esrs::types::SequenceNumber; use esrs::{Aggregate, AggregateManager}; use esrs::{AggregateState, StoreEvent}; -use crate::projector::CounterProjector; +use crate::projector::CounterTransactionalQuery; use crate::structs::{CounterCommand, CounterError, CounterEvent}; pub struct CounterAggregate { @@ -18,7 +18,7 @@ pub struct CounterAggregate { impl CounterAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_projectors(vec![Box::new(CounterProjector)]) + .set_transactional_queries(vec![Box::new(CounterTransactionalQuery)]) .setup() .await?; @@ -85,10 +85,10 @@ impl AggregateManager for CounterAggregate { } // Acquiring the list of projectors early, as it is an expensive operation. - let projectors = self.event_store().projectors(); + let transactional_queries = self.event_store().transactional_queries(); for store_event in store_events.iter() { - for projector in projectors.iter() { - projector.project(store_event, &mut connection).await?; + for transactional_query in transactional_queries.iter() { + transactional_query.handle(store_event, &mut connection).await?; } } @@ -98,15 +98,12 @@ impl AggregateManager for CounterAggregate { drop(aggregate_state.take_lock()); // Acquiring the list of policies early, as it is an expensive operation. - let policies = self.event_store().policies(); + let queries = self.event_store().queries(); for store_event in store_events.iter() { - for policy in policies.iter() { + for query in queries.iter() { // We want to just log errors instead of return them. This is the customization // we wanted. - match policy.handle_event(store_event).await { - Ok(_) => (), - Err(error) => println!("{:?}", error), - } + query.handle(store_event).await; } } diff --git a/examples/customize_persistence_flow/src/projector.rs b/examples/customize_persistence_flow/src/projector.rs index 633400c3..4752d1a1 100644 --- a/examples/customize_persistence_flow/src/projector.rs +++ b/examples/customize_persistence_flow/src/projector.rs @@ -2,18 +2,17 @@ use async_trait::async_trait; use sqlx::{Executor, PgConnection, Postgres}; use uuid::Uuid; -use esrs::postgres::Projector; -use esrs::StoreEvent; +use esrs::{StoreEvent, TransactionalQuery}; use crate::aggregate::CounterAggregate; use crate::structs::{CounterError, CounterEvent}; #[derive(Clone)] -pub struct CounterProjector; +pub struct CounterTransactionalQuery; #[async_trait] -impl Projector for CounterProjector { - async fn project( +impl TransactionalQuery for CounterTransactionalQuery { + async fn handle( &self, event: &StoreEvent, connection: &mut PgConnection, diff --git a/examples/delete_aggregate/src/aggregate.rs b/examples/delete_aggregate/src/aggregate.rs index 2957f589..02cec41d 100644 --- a/examples/delete_aggregate/src/aggregate.rs +++ b/examples/delete_aggregate/src/aggregate.rs @@ -4,7 +4,7 @@ use sqlx::{Pool, Postgres}; use esrs::postgres::PgStore; use esrs::{Aggregate, AggregateManager}; -use crate::projector::CounterProjector; +use crate::projector::CounterTransactionalQuery; use crate::structs::{CounterCommand, CounterError, CounterEvent}; pub struct CounterAggregate { @@ -14,7 +14,7 @@ pub struct CounterAggregate { impl CounterAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_projectors(vec![Box::new(CounterProjector)]) + .set_transactional_queries(vec![Box::new(CounterTransactionalQuery)]) .setup() .await?; diff --git a/examples/delete_aggregate/src/projector.rs b/examples/delete_aggregate/src/projector.rs index c0ca9c41..4b5ff6d0 100644 --- a/examples/delete_aggregate/src/projector.rs +++ b/examples/delete_aggregate/src/projector.rs @@ -2,18 +2,17 @@ use async_trait::async_trait; use sqlx::{Executor, PgConnection, Postgres}; use uuid::Uuid; -use esrs::postgres::Projector; -use esrs::StoreEvent; +use esrs::{StoreEvent, TransactionalQuery}; use crate::aggregate::CounterAggregate; use crate::structs::{CounterError, CounterEvent}; #[derive(Clone)] -pub struct CounterProjector; +pub struct CounterTransactionalQuery; #[async_trait] -impl Projector for CounterProjector { - async fn project( +impl TransactionalQuery for CounterTransactionalQuery { + async fn handle( &self, event: &StoreEvent, connection: &mut PgConnection, diff --git a/examples/simple_projection/src/aggregate.rs b/examples/simple_projection/src/aggregate.rs index 16eaf0bf..66f45bff 100644 --- a/examples/simple_projection/src/aggregate.rs +++ b/examples/simple_projection/src/aggregate.rs @@ -3,7 +3,7 @@ use sqlx::{Pool, Postgres}; use esrs::postgres::PgStore; use esrs::{Aggregate, AggregateManager}; -use crate::projector::CounterProjector; +use crate::projector::CounterTransactionalQuery; use crate::structs::{CounterCommand, CounterError, CounterEvent}; pub struct CounterAggregate { @@ -13,7 +13,7 @@ pub struct CounterAggregate { impl CounterAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_projectors(vec![Box::new(CounterProjector)]) + .set_transactional_queries(vec![Box::new(CounterTransactionalQuery)]) .setup() .await?; diff --git a/examples/simple_projection/src/projector.rs b/examples/simple_projection/src/projector.rs index 633400c3..4752d1a1 100644 --- a/examples/simple_projection/src/projector.rs +++ b/examples/simple_projection/src/projector.rs @@ -2,18 +2,17 @@ use async_trait::async_trait; use sqlx::{Executor, PgConnection, Postgres}; use uuid::Uuid; -use esrs::postgres::Projector; -use esrs::StoreEvent; +use esrs::{StoreEvent, TransactionalQuery}; use crate::aggregate::CounterAggregate; use crate::structs::{CounterError, CounterEvent}; #[derive(Clone)] -pub struct CounterProjector; +pub struct CounterTransactionalQuery; #[async_trait] -impl Projector for CounterProjector { - async fn project( +impl TransactionalQuery for CounterTransactionalQuery { + async fn handle( &self, event: &StoreEvent, connection: &mut PgConnection, diff --git a/examples/simple_saga/src/lib.rs b/examples/simple_saga/src/lib.rs index 672f266d..d853dbf5 100644 --- a/examples/simple_saga/src/lib.rs +++ b/examples/simple_saga/src/lib.rs @@ -5,7 +5,7 @@ use thiserror::Error; use uuid::Uuid; use esrs::postgres::PgStore; -use esrs::{Aggregate, AggregateManager, AggregateState, Policy, StoreEvent}; +use esrs::{Aggregate, AggregateManager, AggregateState, Query, StoreEvent}; #[derive(Clone)] pub struct LoggingAggregate { @@ -21,7 +21,7 @@ impl LoggingAggregate { event_store: event_store.clone(), }; - event_store.set_policies(vec![Box::new(LoggingPolicy::new(this.clone()))]); + event_store.set_queries(vec![Box::new(LoggingQuery::new(this.clone()))]); Ok(this) } } @@ -77,41 +77,49 @@ impl AggregateManager for LoggingAggregate { // A very simply policy, which tries to log an event, and creates another command after it finishes // which indicates success or failure to log #[derive(Clone)] -struct LoggingPolicy { +struct LoggingQuery { aggregate: LoggingAggregate, } -impl LoggingPolicy { +impl LoggingQuery { pub fn new(aggregate: LoggingAggregate) -> Self { Self { aggregate } } } #[async_trait] -impl Policy for LoggingPolicy { - async fn handle_event(&self, event: &StoreEvent) -> Result<(), LoggingError> { +impl Query for LoggingQuery { + async fn handle(&self, event: &StoreEvent) { let aggregate_id: Uuid = event.aggregate_id; - let aggregate_state: AggregateState = self - .aggregate - .load(aggregate_id) - .await? - .unwrap_or_else(|| AggregateState::with_id(aggregate_id)); // This should never happen + let aggregate_state: AggregateState = match self.aggregate.load(aggregate_id).await { + Ok(Some(aggregate_state)) => aggregate_state, + Ok(None) => AggregateState::with_id(aggregate_id), + Err(e) => return println!("Error loading aggregate {}", e), + }; // This should never happen if let LoggingEvent::Received(msg) = event.payload() { if msg.contains("fail_policy") { - self.aggregate + match self + .aggregate .handle_command(aggregate_state, LoggingCommand::Fail) - .await?; - return Err(LoggingError::Domain(msg.clone())); + .await + { + Ok(_) => (), + Err(e) => println!("Error handling command {}", e), + }; + return; } println!("Logged via policy from {}: {}", aggregate_id, msg); - self.aggregate + match self + .aggregate .handle_command(aggregate_state, LoggingCommand::Succeed) - .await?; - } - - Ok(()) + .await + { + Ok(_) => (), + Err(e) => println!("Error handling command {}", e), + }; + }; } } diff --git a/examples/simple_side_effect/src/lib.rs b/examples/simple_side_effect/src/lib.rs index 4c39e5ec..3805f6cf 100644 --- a/examples/simple_side_effect/src/lib.rs +++ b/examples/simple_side_effect/src/lib.rs @@ -3,8 +3,8 @@ use serde::{Deserialize, Serialize}; use sqlx::{PgConnection, Pool, Postgres}; use thiserror::Error; -use esrs::postgres::{PgStore, Projector}; -use esrs::{Aggregate, AggregateManager, Policy, StoreEvent}; +use esrs::postgres::PgStore; +use esrs::{Aggregate, AggregateManager, Query, StoreEvent, TransactionalQuery}; pub struct LoggingAggregate { event_store: PgStore, @@ -13,8 +13,8 @@ pub struct LoggingAggregate { impl LoggingAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_projectors(vec![Box::new(LoggingProjector)]) - .set_policies(vec![Box::new(LoggingPolicy)]) + .set_transactional_queries(vec![Box::new(LoggingTransactionalQuery)]) + .set_queries(vec![Box::new(LoggingQuery)]) .setup() .await?; @@ -57,11 +57,11 @@ impl AggregateManager for LoggingAggregate { // failure is due to a simple log message filter rule, but you can imagine a // side effect which interacts with some 3rd party service in a failable way instead #[derive(Clone)] -pub struct LoggingProjector; +pub struct LoggingTransactionalQuery; #[async_trait] -impl Projector for LoggingProjector { - async fn project(&self, event: &StoreEvent, _: &mut PgConnection) -> Result<(), LoggingError> { +impl TransactionalQuery for LoggingTransactionalQuery { + async fn handle(&self, event: &StoreEvent, _: &mut PgConnection) -> Result<(), LoggingError> { let id = event.aggregate_id; match event.payload() { LoggingEvent::Logged(msg) => { @@ -83,21 +83,20 @@ impl Projector for LoggingProjector { // stops the event from being persisted to the event store, whereas a failure in // a policy does not. #[derive(Clone)] -pub struct LoggingPolicy; +pub struct LoggingQuery; #[async_trait] -impl Policy for LoggingPolicy { - async fn handle_event(&self, event: &StoreEvent) -> Result<(), LoggingError> { +impl Query for LoggingQuery { + async fn handle(&self, event: &StoreEvent) { let id = event.aggregate_id; match event.payload() { LoggingEvent::Logged(msg) => { if msg.contains("fail_policy") { - return Err(LoggingError::Domain(msg.clone())); + return; } println!("Logged via policy from {}: {}", id, msg); } - } - Ok(()) + }; } } diff --git a/src/esrs/mod.rs b/src/esrs/mod.rs index f36165b3..98b32394 100644 --- a/src/esrs/mod.rs +++ b/src/esrs/mod.rs @@ -1,5 +1,6 @@ pub mod aggregate; pub mod policy; +pub mod query; pub mod state; pub mod store; diff --git a/src/esrs/query.rs b/src/esrs/query.rs index e548ff8c..aa3a713d 100644 --- a/src/esrs/query.rs +++ b/src/esrs/query.rs @@ -31,10 +31,15 @@ pub trait Query: Send + Sync { // } #[async_trait] -pub trait TransactionalQuery { +pub trait TransactionalQuery: Sync +where + AM: AggregateManager, +{ async fn handle(&self, event: &StoreEvent, executor: &mut E) -> Result<(), AM::Error>; - async fn delete(&self, aggregate_id: Uuid, executor: &mut E) -> Result<(), AM::Error>; + async fn delete(&self, _aggregate_id: Uuid, _executor: &mut E) -> Result<(), AM::Error> { + Ok(()) + } /// The name of the projector. By default, this is the type name of the projector, /// but it can be overridden to provide a custom name. This name is used as diff --git a/src/lib.rs b/src/lib.rs index 23d8aab1..c0672ec4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ pub use crate::esrs::aggregate::{Aggregate, AggregateManager}; pub use crate::esrs::policy::Policy; +pub use crate::esrs::query::{Query, TransactionalQuery}; pub use crate::esrs::state::AggregateState; pub use crate::esrs::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop}; From 13f37f6bd1b45f0cf409e4a30683de7546c212ab Mon Sep 17 00:00:00 2001 From: Simone Cottini Date: Wed, 26 Apr 2023 10:42:12 +0200 Subject: [PATCH 4/6] From Query to EventHandler renaming --- examples/aggregate_merging/src/aggregates.rs | 6 +++--- examples/aggregate_merging/src/projectors.rs | 8 ++++---- .../src/aggregate.rs | 4 ++-- .../src/projector.rs | 6 +++--- examples/delete_aggregate/src/aggregate.rs | 4 ++-- examples/delete_aggregate/src/projector.rs | 6 +++--- examples/simple_projection/src/aggregate.rs | 4 ++-- examples/simple_projection/src/projector.rs | 6 +++--- examples/simple_saga/src/lib.rs | 10 +++++----- examples/simple_side_effect/src/lib.rs | 14 +++++++------- src/esrs/{query.rs => event_handler.rs} | 10 +++++----- src/esrs/mod.rs | 2 +- src/esrs/postgres/store.rs | 18 +++++++++--------- src/esrs/postgres/tests/mod.rs | 16 ++++++++-------- src/lib.rs | 2 +- 15 files changed, 58 insertions(+), 58 deletions(-) rename src/esrs/{query.rs => event_handler.rs} (85%) diff --git a/examples/aggregate_merging/src/aggregates.rs b/examples/aggregate_merging/src/aggregates.rs index 824ae847..947eec00 100644 --- a/examples/aggregate_merging/src/aggregates.rs +++ b/examples/aggregate_merging/src/aggregates.rs @@ -3,7 +3,7 @@ use sqlx::{Pool, Postgres}; use esrs::postgres::PgStore; use esrs::{Aggregate, AggregateManager}; -use crate::projectors::CounterTransactionalQuery; +use crate::projectors::CounterTransactionalEventHandler; use crate::structs::{CommandA, CommandB, CounterError, EventA, EventB}; // We use a template here to make instantiating the near-identical @@ -15,7 +15,7 @@ pub struct AggregateA { impl AggregateA { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(CounterTransactionalQuery)]) + .set_transactional_queries(vec![Box::new(CounterTransactionalEventHandler)]) .setup() .await?; @@ -63,7 +63,7 @@ pub struct AggregateB { impl AggregateB { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(CounterTransactionalQuery)]) + .set_transactional_queries(vec![Box::new(CounterTransactionalEventHandler)]) .setup() .await?; diff --git a/examples/aggregate_merging/src/projectors.rs b/examples/aggregate_merging/src/projectors.rs index f60081e7..6a172734 100644 --- a/examples/aggregate_merging/src/projectors.rs +++ b/examples/aggregate_merging/src/projectors.rs @@ -2,17 +2,17 @@ use async_trait::async_trait; use sqlx::{Executor, PgConnection, Postgres}; use uuid::Uuid; -use esrs::{StoreEvent, TransactionalQuery}; +use esrs::{StoreEvent, TransactionalEventHandler}; use crate::aggregates::{AggregateA, AggregateB}; use crate::structs::{CounterError, EventA, EventB}; #[derive(Clone)] -pub struct CounterTransactionalQuery; +pub struct CounterTransactionalEventHandler; // This is a projector template that will project AggregateA events into a shared projection (DB table). #[async_trait] -impl TransactionalQuery for CounterTransactionalQuery { +impl TransactionalEventHandler for CounterTransactionalEventHandler { async fn handle(&self, event: &StoreEvent, connection: &mut PgConnection) -> Result<(), CounterError> { match event.payload() { EventA::Inner { shared_id: id } => { @@ -32,7 +32,7 @@ impl TransactionalQuery for CounterTransactionalQuery // This is a projector template that will project AggregateB events into a shared projection (DB table). #[async_trait] -impl TransactionalQuery for CounterTransactionalQuery { +impl TransactionalEventHandler for CounterTransactionalEventHandler { async fn handle(&self, event: &StoreEvent, connection: &mut PgConnection) -> Result<(), CounterError> { match event.payload() { EventB::Inner { shared_id: id } => { diff --git a/examples/customize_persistence_flow/src/aggregate.rs b/examples/customize_persistence_flow/src/aggregate.rs index 542535c6..1924af30 100644 --- a/examples/customize_persistence_flow/src/aggregate.rs +++ b/examples/customize_persistence_flow/src/aggregate.rs @@ -8,7 +8,7 @@ use esrs::types::SequenceNumber; use esrs::{Aggregate, AggregateManager}; use esrs::{AggregateState, StoreEvent}; -use crate::projector::CounterTransactionalQuery; +use crate::projector::CounterTransactionalEventHandler; use crate::structs::{CounterCommand, CounterError, CounterEvent}; pub struct CounterAggregate { @@ -18,7 +18,7 @@ pub struct CounterAggregate { impl CounterAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(CounterTransactionalQuery)]) + .set_transactional_queries(vec![Box::new(CounterTransactionalEventHandler)]) .setup() .await?; diff --git a/examples/customize_persistence_flow/src/projector.rs b/examples/customize_persistence_flow/src/projector.rs index 4752d1a1..140c540a 100644 --- a/examples/customize_persistence_flow/src/projector.rs +++ b/examples/customize_persistence_flow/src/projector.rs @@ -2,16 +2,16 @@ use async_trait::async_trait; use sqlx::{Executor, PgConnection, Postgres}; use uuid::Uuid; -use esrs::{StoreEvent, TransactionalQuery}; +use esrs::{StoreEvent, TransactionalEventHandler}; use crate::aggregate::CounterAggregate; use crate::structs::{CounterError, CounterEvent}; #[derive(Clone)] -pub struct CounterTransactionalQuery; +pub struct CounterTransactionalEventHandler; #[async_trait] -impl TransactionalQuery for CounterTransactionalQuery { +impl TransactionalEventHandler for CounterTransactionalEventHandler { async fn handle( &self, event: &StoreEvent, diff --git a/examples/delete_aggregate/src/aggregate.rs b/examples/delete_aggregate/src/aggregate.rs index 02cec41d..1c9f7bcb 100644 --- a/examples/delete_aggregate/src/aggregate.rs +++ b/examples/delete_aggregate/src/aggregate.rs @@ -4,7 +4,7 @@ use sqlx::{Pool, Postgres}; use esrs::postgres::PgStore; use esrs::{Aggregate, AggregateManager}; -use crate::projector::CounterTransactionalQuery; +use crate::projector::CounterTransactionalEventHandler; use crate::structs::{CounterCommand, CounterError, CounterEvent}; pub struct CounterAggregate { @@ -14,7 +14,7 @@ pub struct CounterAggregate { impl CounterAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(CounterTransactionalQuery)]) + .set_transactional_queries(vec![Box::new(CounterTransactionalEventHandler)]) .setup() .await?; diff --git a/examples/delete_aggregate/src/projector.rs b/examples/delete_aggregate/src/projector.rs index 4b5ff6d0..70917f52 100644 --- a/examples/delete_aggregate/src/projector.rs +++ b/examples/delete_aggregate/src/projector.rs @@ -2,16 +2,16 @@ use async_trait::async_trait; use sqlx::{Executor, PgConnection, Postgres}; use uuid::Uuid; -use esrs::{StoreEvent, TransactionalQuery}; +use esrs::{StoreEvent, TransactionalEventHandler}; use crate::aggregate::CounterAggregate; use crate::structs::{CounterError, CounterEvent}; #[derive(Clone)] -pub struct CounterTransactionalQuery; +pub struct CounterTransactionalEventHandler; #[async_trait] -impl TransactionalQuery for CounterTransactionalQuery { +impl TransactionalEventHandler for CounterTransactionalEventHandler { async fn handle( &self, event: &StoreEvent, diff --git a/examples/simple_projection/src/aggregate.rs b/examples/simple_projection/src/aggregate.rs index 66f45bff..ce1acc95 100644 --- a/examples/simple_projection/src/aggregate.rs +++ b/examples/simple_projection/src/aggregate.rs @@ -3,7 +3,7 @@ use sqlx::{Pool, Postgres}; use esrs::postgres::PgStore; use esrs::{Aggregate, AggregateManager}; -use crate::projector::CounterTransactionalQuery; +use crate::projector::CounterTransactionalEventHandler; use crate::structs::{CounterCommand, CounterError, CounterEvent}; pub struct CounterAggregate { @@ -13,7 +13,7 @@ pub struct CounterAggregate { impl CounterAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(CounterTransactionalQuery)]) + .set_transactional_queries(vec![Box::new(CounterTransactionalEventHandler)]) .setup() .await?; diff --git a/examples/simple_projection/src/projector.rs b/examples/simple_projection/src/projector.rs index 4752d1a1..140c540a 100644 --- a/examples/simple_projection/src/projector.rs +++ b/examples/simple_projection/src/projector.rs @@ -2,16 +2,16 @@ use async_trait::async_trait; use sqlx::{Executor, PgConnection, Postgres}; use uuid::Uuid; -use esrs::{StoreEvent, TransactionalQuery}; +use esrs::{StoreEvent, TransactionalEventHandler}; use crate::aggregate::CounterAggregate; use crate::structs::{CounterError, CounterEvent}; #[derive(Clone)] -pub struct CounterTransactionalQuery; +pub struct CounterTransactionalEventHandler; #[async_trait] -impl TransactionalQuery for CounterTransactionalQuery { +impl TransactionalEventHandler for CounterTransactionalEventHandler { async fn handle( &self, event: &StoreEvent, diff --git a/examples/simple_saga/src/lib.rs b/examples/simple_saga/src/lib.rs index d853dbf5..8e194957 100644 --- a/examples/simple_saga/src/lib.rs +++ b/examples/simple_saga/src/lib.rs @@ -5,7 +5,7 @@ use thiserror::Error; use uuid::Uuid; use esrs::postgres::PgStore; -use esrs::{Aggregate, AggregateManager, AggregateState, Query, StoreEvent}; +use esrs::{Aggregate, AggregateManager, AggregateState, EventHandler, StoreEvent}; #[derive(Clone)] pub struct LoggingAggregate { @@ -21,7 +21,7 @@ impl LoggingAggregate { event_store: event_store.clone(), }; - event_store.set_queries(vec![Box::new(LoggingQuery::new(this.clone()))]); + event_store.set_queries(vec![Box::new(LoggingEventHandler::new(this.clone()))]); Ok(this) } } @@ -77,18 +77,18 @@ impl AggregateManager for LoggingAggregate { // A very simply policy, which tries to log an event, and creates another command after it finishes // which indicates success or failure to log #[derive(Clone)] -struct LoggingQuery { +struct LoggingEventHandler { aggregate: LoggingAggregate, } -impl LoggingQuery { +impl LoggingEventHandler { pub fn new(aggregate: LoggingAggregate) -> Self { Self { aggregate } } } #[async_trait] -impl Query for LoggingQuery { +impl EventHandler for LoggingEventHandler { async fn handle(&self, event: &StoreEvent) { let aggregate_id: Uuid = event.aggregate_id; diff --git a/examples/simple_side_effect/src/lib.rs b/examples/simple_side_effect/src/lib.rs index 3805f6cf..971eac8a 100644 --- a/examples/simple_side_effect/src/lib.rs +++ b/examples/simple_side_effect/src/lib.rs @@ -4,7 +4,7 @@ use sqlx::{PgConnection, Pool, Postgres}; use thiserror::Error; use esrs::postgres::PgStore; -use esrs::{Aggregate, AggregateManager, Query, StoreEvent, TransactionalQuery}; +use esrs::{Aggregate, AggregateManager, EventHandler, StoreEvent, TransactionalEventHandler}; pub struct LoggingAggregate { event_store: PgStore, @@ -13,8 +13,8 @@ pub struct LoggingAggregate { impl LoggingAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(LoggingTransactionalQuery)]) - .set_queries(vec![Box::new(LoggingQuery)]) + .set_transactional_queries(vec![Box::new(LoggingTransactionalEventHandler)]) + .set_queries(vec![Box::new(LoggingEventHandler)]) .setup() .await?; @@ -57,10 +57,10 @@ impl AggregateManager for LoggingAggregate { // failure is due to a simple log message filter rule, but you can imagine a // side effect which interacts with some 3rd party service in a failable way instead #[derive(Clone)] -pub struct LoggingTransactionalQuery; +pub struct LoggingTransactionalEventHandler; #[async_trait] -impl TransactionalQuery for LoggingTransactionalQuery { +impl TransactionalEventHandler for LoggingTransactionalEventHandler { async fn handle(&self, event: &StoreEvent, _: &mut PgConnection) -> Result<(), LoggingError> { let id = event.aggregate_id; match event.payload() { @@ -83,10 +83,10 @@ impl TransactionalQuery for LoggingTransactional // stops the event from being persisted to the event store, whereas a failure in // a policy does not. #[derive(Clone)] -pub struct LoggingQuery; +pub struct LoggingEventHandler; #[async_trait] -impl Query for LoggingQuery { +impl EventHandler for LoggingEventHandler { async fn handle(&self, event: &StoreEvent) { let id = event.aggregate_id; match event.payload() { diff --git a/src/esrs/query.rs b/src/esrs/event_handler.rs similarity index 85% rename from src/esrs/query.rs rename to src/esrs/event_handler.rs index aa3a713d..5d4c6963 100644 --- a/src/esrs/query.rs +++ b/src/esrs/event_handler.rs @@ -4,7 +4,7 @@ use uuid::Uuid; use crate::{AggregateManager, StoreEvent}; #[async_trait] -pub trait Query: Send + Sync { +pub trait EventHandler: Send + Sync { async fn handle(&self, event: &StoreEvent); async fn delete(&self, _aggregate_id: Uuid) {} @@ -18,11 +18,11 @@ pub trait Query: Send + Sync { } // // #[async_trait] -// impl Query for T +// impl EventHandler for T // where // AM: AggregateManager, // M::Event: Send + Sync, -// Q: Query, +// Q: EventHandler, // T: Deref + Send + Sync, // { // async fn handle(&self, event: StoreEvent) { @@ -31,7 +31,7 @@ pub trait Query: Send + Sync { // } #[async_trait] -pub trait TransactionalQuery: Sync +pub trait TransactionalEventHandler: Sync where AM: AggregateManager, { @@ -49,4 +49,4 @@ where } } -pub trait QueryError: std::error::Error {} +pub trait EventHandlerError: std::error::Error {} diff --git a/src/esrs/mod.rs b/src/esrs/mod.rs index 98b32394..4f67673a 100644 --- a/src/esrs/mod.rs +++ b/src/esrs/mod.rs @@ -1,6 +1,6 @@ pub mod aggregate; +pub mod event_handler; pub mod policy; -pub mod query; pub mod state; pub mod store; diff --git a/src/esrs/postgres/store.rs b/src/esrs/postgres/store.rs index a9dbd461..8e1a5dd9 100644 --- a/src/esrs/postgres/store.rs +++ b/src/esrs/postgres/store.rs @@ -13,15 +13,15 @@ use sqlx::types::Json; use sqlx::{Executor, PgConnection, Pool, Postgres, Transaction}; use uuid::Uuid; -use crate::esrs::query; +use crate::esrs::event_handler; use crate::esrs::store::{EventStoreLockGuard, UnlockOnDrop}; use crate::types::SequenceNumber; use crate::{Aggregate, AggregateManager, AggregateState, EventStore, StoreEvent}; use super::{event, statement::Statements}; -type Query = Box + Send + Sync>; -type TransactionalQuery = Box + Send + Sync>; +type EventHandler = Box + Send + Sync>; +type TransactionalEventHandler = Box + Send + Sync>; /// Default Postgres implementation for the [`EventStore`]. Use this struct in order to have a /// pre-made implementation of an [`EventStore`] persisting on Postgres. @@ -42,8 +42,8 @@ where { pool: Pool, statements: Statements, - queries: ArcSwap>>, - transactional_queries: ArcSwap>>, + queries: ArcSwap>>, + transactional_queries: ArcSwap>>, } impl PgStore @@ -66,13 +66,13 @@ where } /// Set the list of (non transactional) queries to the store - pub fn set_queries(self, queries: Vec>) -> Self { + pub fn set_queries(self, queries: Vec>) -> Self { self.inner.queries.store(Arc::new(queries)); self } /// Set the list of transactional queries to the store - pub fn set_transactional_queries(self, queries: Vec>) -> Self { + pub fn set_transactional_queries(self, queries: Vec>) -> Self { self.inner.transactional_queries.store(Arc::new(queries)); self } @@ -159,13 +159,13 @@ where /// This function returns the list of all projections added to this store. This function should /// mostly used while creating a custom persistence flow using [`PgStore::persist`]. - pub fn transactional_queries(&self) -> Arc>> { + pub fn transactional_queries(&self) -> Arc>> { self.inner.transactional_queries.load().clone() } /// This function returns the list of all policies added to this store. This function should /// mostly used while creating a custom persistence flow using [`PgStore::persist`]. - pub fn queries(&self) -> Arc>> { + pub fn queries(&self) -> Arc>> { self.inner.queries.load().clone() } diff --git a/src/esrs/postgres/tests/mod.rs b/src/esrs/postgres/tests/mod.rs index 16a0831b..06a65ab2 100644 --- a/src/esrs/postgres/tests/mod.rs +++ b/src/esrs/postgres/tests/mod.rs @@ -5,7 +5,7 @@ use chrono::{DateTime, Utc}; use sqlx::{PgConnection, Pool, Postgres}; use uuid::Uuid; -use crate::esrs::query::{Query, TransactionalQuery}; +use crate::esrs::event_handler::{EventHandler, TransactionalEventHandler}; use crate::postgres::PgStore; use crate::{Aggregate, AggregateManager, AggregateState, EventStore, StoreEvent}; @@ -137,7 +137,7 @@ fn persist_multiple_events_test(pool: Pool) { #[sqlx::test] fn event_projection_test(pool: Pool) { let store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(TestTransactionalQuery {})]) + .set_transactional_queries(vec![Box::new(TestTransactionalEventHandler {})]) .setup() .await .unwrap(); @@ -166,7 +166,7 @@ fn event_projection_test(pool: Pool) { #[sqlx::test] fn delete_store_events_and_projections_test(pool: Pool) { let store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(TestTransactionalQuery {})]) + .set_transactional_queries(vec![Box::new(TestTransactionalEventHandler {})]) .setup() .await .unwrap(); @@ -210,7 +210,7 @@ fn delete_store_events_and_projections_test(pool: Pool) { #[sqlx::test] fn policy_test(pool: Pool) { let last_id: Arc> = Arc::new(Mutex::new(Uuid::default())); - let query: Box = Box::new(TestQuery { + let query: Box = Box::new(TestEventHandler { last_id: last_id.clone(), }); @@ -307,10 +307,10 @@ impl AggregateManager for TestAggregate { } #[derive(Clone)] -struct TestTransactionalQuery; +struct TestTransactionalEventHandler; #[async_trait::async_trait] -impl TransactionalQuery for TestTransactionalQuery { +impl TransactionalEventHandler for TestTransactionalEventHandler { async fn handle(&self, event: &StoreEvent, connection: &mut PgConnection) -> Result<(), TestError> { Ok( sqlx::query("INSERT INTO test_projection (id, projection_id) VALUES ($1, $2)") @@ -338,12 +338,12 @@ struct ProjectionRow { } #[derive(Clone)] -struct TestQuery { +struct TestEventHandler { last_id: Arc>, } #[async_trait::async_trait] -impl Query for TestQuery { +impl EventHandler for TestEventHandler { async fn handle(&self, event: &StoreEvent) { let mut guard = self.last_id.lock().unwrap(); *guard = event.payload.id; diff --git a/src/lib.rs b/src/lib.rs index c0672ec4..fb0c56b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,8 @@ //! performed over the event store table. pub use crate::esrs::aggregate::{Aggregate, AggregateManager}; +pub use crate::esrs::event_handler::{EventHandler, TransactionalEventHandler}; pub use crate::esrs::policy::Policy; -pub use crate::esrs::query::{Query, TransactionalQuery}; pub use crate::esrs::state::AggregateState; pub use crate::esrs::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop}; From 31fb036dd88aae81094eb8378fdb1f8a9db1592d Mon Sep 17 00:00:00 2001 From: Simone Cottini Date: Wed, 26 Apr 2023 11:06:48 +0200 Subject: [PATCH 5/6] Fix docs and some other renamings --- examples/aggregate_merging/src/aggregates.rs | 4 +- .../src/aggregate.rs | 6 +- examples/delete_aggregate/src/aggregate.rs | 2 +- examples/simple_projection/src/aggregate.rs | 2 +- examples/simple_saga/src/lib.rs | 2 +- examples/simple_side_effect/src/lib.rs | 4 +- src/esrs/aggregate.rs | 16 ++-- src/esrs/event_handler.rs | 13 ++- src/esrs/mod.rs | 1 - src/esrs/policy.rs | 23 ------ src/esrs/postgres/mod.rs | 1 - src/esrs/postgres/projector.rs | 79 ------------------ src/esrs/postgres/store.rs | 82 ++++++++++--------- src/esrs/postgres/tests/mod.rs | 14 ++-- src/esrs/store.rs | 4 +- src/lib.rs | 2 - 16 files changed, 76 insertions(+), 179 deletions(-) delete mode 100644 src/esrs/policy.rs delete mode 100644 src/esrs/postgres/projector.rs diff --git a/examples/aggregate_merging/src/aggregates.rs b/examples/aggregate_merging/src/aggregates.rs index 947eec00..70d3ca62 100644 --- a/examples/aggregate_merging/src/aggregates.rs +++ b/examples/aggregate_merging/src/aggregates.rs @@ -15,7 +15,7 @@ pub struct AggregateA { impl AggregateA { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(CounterTransactionalEventHandler)]) + .set_transactional_event_handlers(vec![Box::new(CounterTransactionalEventHandler)]) .setup() .await?; @@ -63,7 +63,7 @@ pub struct AggregateB { impl AggregateB { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(CounterTransactionalEventHandler)]) + .set_transactional_event_handlers(vec![Box::new(CounterTransactionalEventHandler)]) .setup() .await?; diff --git a/examples/customize_persistence_flow/src/aggregate.rs b/examples/customize_persistence_flow/src/aggregate.rs index 1924af30..5baf8e2f 100644 --- a/examples/customize_persistence_flow/src/aggregate.rs +++ b/examples/customize_persistence_flow/src/aggregate.rs @@ -18,7 +18,7 @@ pub struct CounterAggregate { impl CounterAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(CounterTransactionalEventHandler)]) + .set_transactional_event_handlers(vec![Box::new(CounterTransactionalEventHandler)]) .setup() .await?; @@ -85,7 +85,7 @@ impl AggregateManager for CounterAggregate { } // Acquiring the list of projectors early, as it is an expensive operation. - let transactional_queries = self.event_store().transactional_queries(); + let transactional_queries = self.event_store().transactional_event_handlers(); for store_event in store_events.iter() { for transactional_query in transactional_queries.iter() { transactional_query.handle(store_event, &mut connection).await?; @@ -98,7 +98,7 @@ impl AggregateManager for CounterAggregate { drop(aggregate_state.take_lock()); // Acquiring the list of policies early, as it is an expensive operation. - let queries = self.event_store().queries(); + let queries = self.event_store().event_handlers(); for store_event in store_events.iter() { for query in queries.iter() { // We want to just log errors instead of return them. This is the customization diff --git a/examples/delete_aggregate/src/aggregate.rs b/examples/delete_aggregate/src/aggregate.rs index 1c9f7bcb..9d6be452 100644 --- a/examples/delete_aggregate/src/aggregate.rs +++ b/examples/delete_aggregate/src/aggregate.rs @@ -14,7 +14,7 @@ pub struct CounterAggregate { impl CounterAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(CounterTransactionalEventHandler)]) + .set_transactional_event_handlers(vec![Box::new(CounterTransactionalEventHandler)]) .setup() .await?; diff --git a/examples/simple_projection/src/aggregate.rs b/examples/simple_projection/src/aggregate.rs index ce1acc95..9386bb07 100644 --- a/examples/simple_projection/src/aggregate.rs +++ b/examples/simple_projection/src/aggregate.rs @@ -13,7 +13,7 @@ pub struct CounterAggregate { impl CounterAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(CounterTransactionalEventHandler)]) + .set_transactional_event_handlers(vec![Box::new(CounterTransactionalEventHandler)]) .setup() .await?; diff --git a/examples/simple_saga/src/lib.rs b/examples/simple_saga/src/lib.rs index 8e194957..fb219358 100644 --- a/examples/simple_saga/src/lib.rs +++ b/examples/simple_saga/src/lib.rs @@ -21,7 +21,7 @@ impl LoggingAggregate { event_store: event_store.clone(), }; - event_store.set_queries(vec![Box::new(LoggingEventHandler::new(this.clone()))]); + event_store.set_event_handlers(vec![Box::new(LoggingEventHandler::new(this.clone()))]); Ok(this) } } diff --git a/examples/simple_side_effect/src/lib.rs b/examples/simple_side_effect/src/lib.rs index 971eac8a..612387a8 100644 --- a/examples/simple_side_effect/src/lib.rs +++ b/examples/simple_side_effect/src/lib.rs @@ -13,8 +13,8 @@ pub struct LoggingAggregate { impl LoggingAggregate { pub async fn new(pool: &Pool) -> Result { let event_store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(LoggingTransactionalEventHandler)]) - .set_queries(vec![Box::new(LoggingEventHandler)]) + .set_transactional_event_handlers(vec![Box::new(LoggingTransactionalEventHandler)]) + .set_event_handlers(vec![Box::new(LoggingEventHandler)]) .setup() .await?; diff --git a/src/esrs/aggregate.rs b/src/esrs/aggregate.rs index 38fd04ba..64f18046 100644 --- a/src/esrs/aggregate.rs +++ b/src/esrs/aggregate.rs @@ -48,8 +48,9 @@ pub trait Aggregate { /// can be persisted when handled, and the state can be reconstructed by loading and apply events sequentially. /// /// It comes batteries-included, as you only need to implement the `event_store` getter. The basic API is: -/// 1. execute_command +/// 1. handle_command /// 2. load +/// 3. lock_and_load /// The other functions are used internally, but can be overridden if needed. #[async_trait] pub trait AggregateManager: Aggregate { @@ -58,7 +59,7 @@ pub trait AggregateManager: Aggregate { /// The `name` function is responsible for naming an aggregate type. /// Each aggregate type should have a name that is unique among all the aggregate types in your application. /// - /// Aggregates are linked to their instances & events using their `name` and their `aggregate_id`. Be very careful when changing + /// Aggregates are linked to their instances & events using their `name` and their `aggregate_id`. Be very careful when changing /// `name`, as doing so will break the link between all the aggregates of their type, and their events! fn name() -> &'static str where @@ -124,15 +125,10 @@ pub trait AggregateManager: Aggregate { } /// Transactional persists events in store - recording it in the aggregate instance's history. - /// The store will also project the events. If an error occurs whilst persisting the events, - /// the whole transaction is rolled back and the error is returned. - /// - /// The policies associated to the store are run here. A failure at this point will be silently - /// ignored, and the new state returned successfully anyway. + /// The store will also handle the events creating read side projections. If an error occurs whilst + /// persisting the events, the whole transaction is rolled back and the error is returned. /// /// You should _avoid_ implementing this function, and be _very_ careful if you decide to do so. - /// The only scenario where this function needs to be overwritten is if you need to change the - /// behaviour of policies, e.g. if you want to log something on error. async fn store_events( &self, aggregate_state: &mut AggregateState, @@ -142,7 +138,7 @@ pub trait AggregateManager: Aggregate { } /// `delete` should either complete the aggregate instance, along with all its associated events - /// and projections, or fail. + /// and read side projections, or fail. /// /// If the deletion succeeds only partially, it _must_ return an error. async fn delete(&self, aggregate_id: impl Into + Send) -> Result<(), Self::Error> { diff --git a/src/esrs/event_handler.rs b/src/esrs/event_handler.rs index 5d4c6963..37fffde5 100644 --- a/src/esrs/event_handler.rs +++ b/src/esrs/event_handler.rs @@ -5,13 +5,15 @@ use crate::{AggregateManager, StoreEvent}; #[async_trait] pub trait EventHandler: Send + Sync { + // TODO: doc async fn handle(&self, event: &StoreEvent); + // TODO: doc async fn delete(&self, _aggregate_id: Uuid) {} - /// The name of the projector. By default, this is the type name of the projector, + /// The name of the event handler. By default, this is the type name of the event handler, /// but it can be overridden to provide a custom name. This name is used as - /// part of tracing spans, to identify the projector being run. + /// part of tracing spans, to identify the event handler being run. fn name(&self) -> &'static str { std::any::type_name::() } @@ -35,18 +37,21 @@ pub trait TransactionalEventHandler: Sync where AM: AggregateManager, { + // TODO: doc async fn handle(&self, event: &StoreEvent, executor: &mut E) -> Result<(), AM::Error>; + // TODO: doc async fn delete(&self, _aggregate_id: Uuid, _executor: &mut E) -> Result<(), AM::Error> { Ok(()) } - /// The name of the projector. By default, this is the type name of the projector, + /// The name of the event handler. By default, this is the type name of the event handler, /// but it can be overridden to provide a custom name. This name is used as - /// part of tracing spans, to identify the projector being run. + /// part of tracing spans, to identify the event handler being run. fn name(&self) -> &'static str { std::any::type_name::() } } +// TODO: doc pub trait EventHandlerError: std::error::Error {} diff --git a/src/esrs/mod.rs b/src/esrs/mod.rs index 4f67673a..8d36351a 100644 --- a/src/esrs/mod.rs +++ b/src/esrs/mod.rs @@ -1,6 +1,5 @@ pub mod aggregate; pub mod event_handler; -pub mod policy; pub mod state; pub mod store; diff --git a/src/esrs/policy.rs b/src/esrs/policy.rs deleted file mode 100644 index 6eff424d..00000000 --- a/src/esrs/policy.rs +++ /dev/null @@ -1,23 +0,0 @@ -use async_trait::async_trait; - -use crate::{AggregateManager, StoreEvent}; - -/// This trait is used to implement a `Policy`. A policy is intended to be an entity where to put -/// non-transactional side effects. -#[async_trait] -pub trait Policy -where - Manager: AggregateManager, -{ - /// This function intercepts the event and, matching on the type of such event - /// produces the appropriate side effects. - /// The result is meant to catch generic errors. - async fn handle_event(&self, event: &StoreEvent) -> Result<(), Manager::Error>; - - /// The name of the policy. By default, this is the type name of the policy, - /// but it can be overridden to provide a custom name. This name is used as - /// part of tracing spans, to identify the policy being run. - fn name(&self) -> &'static str { - std::any::type_name::() - } -} diff --git a/src/esrs/postgres/mod.rs b/src/esrs/postgres/mod.rs index c6fdd127..78a26c02 100644 --- a/src/esrs/postgres/mod.rs +++ b/src/esrs/postgres/mod.rs @@ -1,5 +1,4 @@ mod event; -pub mod projector; mod statement; pub mod store; diff --git a/src/esrs/postgres/projector.rs b/src/esrs/postgres/projector.rs deleted file mode 100644 index f3a84471..00000000 --- a/src/esrs/postgres/projector.rs +++ /dev/null @@ -1,79 +0,0 @@ -use async_trait::async_trait; -use sqlx::PgConnection; -use uuid::Uuid; - -use crate::{AggregateManager, StoreEvent}; - -/// This enum is used to instruct via [`Projector::persistence`] function which guarantees to have -/// while projecting an event in the read side. -/// - [`ProjectorPersistence::Mandatory`] means that the projected data will be always available in the read -/// side. In the actual default store implementation it implies that if a mandatory persistent -/// projector fails the event will not be stored in the event store and the transaction rollbacks. -/// - [`ProjectorPersistence::Fallible`] means that there are no guarantees for the projected data to be -/// persisted in the read side. In the actual default store implementation it implies that if an -/// fallible persistent projector fails that event is stored anyway but nothing will be persisted -/// in the read side. If no other projector fails the event will be stored in the event store with -/// all the other projections and the transaction will be committed. -pub enum ProjectorPersistence { - Mandatory, - Fallible, -} - -impl AsRef for ProjectorPersistence { - fn as_ref(&self) -> &str { - match self { - ProjectorPersistence::Mandatory => "mandatory", - ProjectorPersistence::Fallible => "fallible", - } - } -} - -/// This trait is used to implement a `Projector`. A projector is intended to be an entity where to -/// create, update and delete a read side. Every projector should be responsible to update a single -/// read model. -#[async_trait] -pub trait Projector: Sync -where - Manager: AggregateManager, -{ - /// This function could be used to instruct the [`Projector`] about its the - /// [`ProjectorPersistence`] level. - /// - /// It has a default implementation that returns [`ProjectorPersistence::Mandatory`]. - /// Override this function to change its [`ProjectorPersistence`] level. - fn persistence(&self) -> ProjectorPersistence { - ProjectorPersistence::Mandatory - } - - /// This function projects one event in each read model that implements this trait. - /// The result is meant to catch generic errors. - /// - /// Note: in actual implementation the second parameter is an &mut PgConnection. In further releases - /// of sqlx package this could be changed. At this time the connection could be a simple connection - /// acquired by a pool or a deref of a transaction. - async fn project( - &self, - event: &StoreEvent, - connection: &mut PgConnection, - ) -> Result<(), Manager::Error>; - - /// Delete the read model entry. It is here because of the eventual need of delete an entire - /// aggregate. - /// - /// Default implementation *does nothing* and always returns an Ok. Override this function to - /// implement deletion behaviour for custom projections. - /// - /// Note: in actual implementation the second parameter is an &mut PgConnection. In further releases - /// of sqlx package this could be changed. At this time the connection could be a simple connection - /// acquired by a pool or a deref of a transaction. - async fn delete(&self, _aggregate_id: Uuid, _connection: &mut PgConnection) -> Result<(), Manager::Error> { - Ok(()) - } - - /// The name of the projector. By default, this is the type name of the projector, - /// but it can be overridden to provide a custom name. This name is used as - /// part of tracing spans, to identify the projector being run. - fn name(&self) -> &'static str { - std::any::type_name::() - } -} diff --git a/src/esrs/postgres/store.rs b/src/esrs/postgres/store.rs index 8e1a5dd9..99e3cc88 100644 --- a/src/esrs/postgres/store.rs +++ b/src/esrs/postgres/store.rs @@ -42,8 +42,8 @@ where { pool: Pool, statements: Statements, - queries: ArcSwap>>, - transactional_queries: ArcSwap>>, + event_handlers: ArcSwap>>, + transactional_event_handlers: ArcSwap>>, } impl PgStore @@ -58,22 +58,25 @@ where let inner: InnerPgStore = InnerPgStore { pool, statements: Statements::new::(), - queries: ArcSwap::from_pointee(vec![]), - transactional_queries: ArcSwap::from_pointee(vec![]), + event_handlers: ArcSwap::from_pointee(vec![]), + transactional_event_handlers: ArcSwap::from_pointee(vec![]), }; Self { inner: Arc::new(inner) } } - /// Set the list of (non transactional) queries to the store - pub fn set_queries(self, queries: Vec>) -> Self { - self.inner.queries.store(Arc::new(queries)); + /// Set the list of (non transactional) event handlers to the store + pub fn set_event_handlers(self, event_handlers: Vec>) -> Self { + self.inner.event_handlers.store(Arc::new(event_handlers)); self } - /// Set the list of transactional queries to the store - pub fn set_transactional_queries(self, queries: Vec>) -> Self { - self.inner.transactional_queries.store(Arc::new(queries)); + /// Set the list of transactional event handlers to the store + pub fn set_transactional_event_handlers( + self, + event_handlers: Vec>, + ) -> Self { + self.inner.transactional_event_handlers.store(Arc::new(event_handlers)); self } @@ -157,24 +160,21 @@ where }) } - /// This function returns the list of all projections added to this store. This function should - /// mostly used while creating a custom persistence flow using [`PgStore::persist`]. - pub fn transactional_queries(&self) -> Arc>> { - self.inner.transactional_queries.load().clone() + /// This function returns the list of all transactional event handlers added to this store. + /// This function should mostly used while creating a custom persistence flow using [`PgStore::persist`]. + pub fn transactional_event_handlers(&self) -> Arc>> { + self.inner.transactional_event_handlers.load().clone() } - /// This function returns the list of all policies added to this store. This function should + /// This function returns the list of all event handlers added to this store. This function should /// mostly used while creating a custom persistence flow using [`PgStore::persist`]. - pub fn queries(&self) -> Arc>> { - self.inner.queries.load().clone() + pub fn event_handlers(&self) -> Arc>> { + self.inner.event_handlers.load().clone() } /// This function could be used in order to customize the way the store persist the events. - /// For example could be used to avoid having projectors in transaction with event saving. Or to - /// let the policies return or not an error if one of them fails. /// - /// An example of how to use this function is in `examples/customize_persistence_flow` example - /// folder. + /// An example of how to use this function is in `examples/customize_persistence_flow` example folder. /// /// # Errors /// @@ -264,25 +264,25 @@ where store_events.push(store_event); } - // Acquiring the list of projectors early, as it is an expensive operation. - let transactional_queries = self.transactional_queries(); + // Acquiring the list of transactional event handlers early, as it is an expensive operation. + let transactional_event_handlers = self.transactional_event_handlers(); for store_event in &store_events { - for transactional_query in transactional_queries.iter() { + for transactional_event_handler in transactional_event_handlers.iter() { let span = tracing::trace_span!( - "esrs.transactional_query", + "esrs.transactional_event_handler", event_id = %store_event.id, aggregate_id = %store_event.aggregate_id, - query = transactional_query.name() + transactional_event_handler = transactional_event_handler.name() ); let _e = span.enter(); - if let Err(error) = transactional_query.handle(store_event, &mut transaction).await { + if let Err(error) = transactional_event_handler.handle(store_event, &mut transaction).await { tracing::error!({ event_id = %store_event.id, aggregate_id = %store_event.aggregate_id, - query = transactional_query.name(), + transactional_event_handler = transactional_event_handler.name(), error = ?error, - }, "transactional query failed to handle event"); + }, "transactional event handler failed to handle event"); return Err(error); } @@ -293,23 +293,23 @@ where // We need to drop the lock on the aggregate state here as: // 1. the events have already been persisted, hence the DB has the latest aggregate; - // 2. the policies below might need to access this aggregate atomically (causing a deadlock!). + // 2. the event handlers below might need to access this aggregate atomically (causing a deadlock!). drop(aggregate_state.take_lock()); - // Acquiring the list of queries early, as it is an expensive operation. - let queries = self.queries(); + // Acquiring the list of event handlers early, as it is an expensive operation. + let event_handlers = self.event_handlers(); for store_event in &store_events { // NOTE: should this be parallelized? - for query in queries.iter() { + for event_handler in event_handlers.iter() { let span = tracing::debug_span!( - "esrs.query", + "esrs.event_handler", event_id = %store_event.id, aggregate_id = %store_event.aggregate_id, - query = query.name() + event_handler = event_handler.name() ); let _e = span.enter(); - query.handle(store_event).await; + event_handler.handle(store_event).await; } } @@ -325,15 +325,17 @@ where .await .map(|_| ())?; - for transactional_query in self.transactional_queries().iter() { - transactional_query.delete(aggregate_id, &mut transaction).await?; + for transactional_event_handler in self.transactional_event_handlers().iter() { + transactional_event_handler + .delete(aggregate_id, &mut transaction) + .await?; } transaction.commit().await?; // NOTE: should this be parallelized? - for query in self.queries().iter() { - query.delete(aggregate_id).await; + for event_handler in self.event_handlers().iter() { + event_handler.delete(aggregate_id).await; } Ok(()) diff --git a/src/esrs/postgres/tests/mod.rs b/src/esrs/postgres/tests/mod.rs index 06a65ab2..c5f3ca1b 100644 --- a/src/esrs/postgres/tests/mod.rs +++ b/src/esrs/postgres/tests/mod.rs @@ -135,9 +135,9 @@ fn persist_multiple_events_test(pool: Pool) { } #[sqlx::test] -fn event_projection_test(pool: Pool) { +fn event_handling_test(pool: Pool) { let store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(TestTransactionalEventHandler {})]) + .set_transactional_event_handlers(vec![Box::new(TestTransactionalEventHandler {})]) .setup() .await .unwrap(); @@ -164,9 +164,9 @@ fn event_projection_test(pool: Pool) { } #[sqlx::test] -fn delete_store_events_and_projections_test(pool: Pool) { +fn delete_store_events_and_handle_events_test(pool: Pool) { let store: PgStore = PgStore::new(pool.clone()) - .set_transactional_queries(vec![Box::new(TestTransactionalEventHandler {})]) + .set_transactional_event_handlers(vec![Box::new(TestTransactionalEventHandler {})]) .setup() .await .unwrap(); @@ -208,14 +208,14 @@ fn delete_store_events_and_projections_test(pool: Pool) { } #[sqlx::test] -fn policy_test(pool: Pool) { +fn event_handler_test(pool: Pool) { let last_id: Arc> = Arc::new(Mutex::new(Uuid::default())); - let query: Box = Box::new(TestEventHandler { + let event_handler: Box = Box::new(TestEventHandler { last_id: last_id.clone(), }); let store: PgStore = PgStore::new(pool.clone()) - .set_queries(vec![query]) + .set_event_handlers(vec![event_handler]) .setup() .await .unwrap(); diff --git a/src/esrs/store.rs b/src/esrs/store.rs index 3748b8f3..be30fd35 100644 --- a/src/esrs/store.rs +++ b/src/esrs/store.rs @@ -47,7 +47,7 @@ pub trait EventStore { /// Persists multiple events into the database. This should be done in a single transaction - either /// all the events are persisted correctly, or none are. /// - /// Persisting events may additionally trigger configured Projectors. + /// Persisting events may additionally trigger configured event handlers (transactional and non-transactional). async fn persist( &self, aggregate_state: &mut AggregateState<::State>, @@ -56,7 +56,7 @@ pub trait EventStore { /// Delete all events from events store related to given `aggregate_id`. /// - /// Moreover it should delete all the projections. + /// Moreover it should delete all the read side projections triggered by event handlers. async fn delete(&self, aggregate_id: Uuid) -> Result<(), ::Error>; } diff --git a/src/lib.rs b/src/lib.rs index fb0c56b6..d4aaf197 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,6 @@ pub use crate::esrs::aggregate::{Aggregate, AggregateManager}; pub use crate::esrs::event_handler::{EventHandler, TransactionalEventHandler}; -pub use crate::esrs::policy::Policy; pub use crate::esrs::state::AggregateState; pub use crate::esrs::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop}; @@ -20,7 +19,6 @@ mod esrs; #[cfg(feature = "postgres")] pub mod postgres { //! Provides implementation of the [`EventStore`] for Postgres. - pub use crate::esrs::postgres::projector::{Projector, ProjectorPersistence}; pub use crate::esrs::postgres::store::PgStore; } From 170d68b81ba9927d631c87d1768576e5b6f02f80 Mon Sep 17 00:00:00 2001 From: Simone Cottini Date: Wed, 26 Apr 2023 11:30:46 +0200 Subject: [PATCH 6/6] Fix rebuild strategies example (still in WIP) --- examples/rebuild_strategies/src/lib.rs | 229 ++++++++++++++----------- src/esrs/event_handler.rs | 5 +- src/lib.rs | 2 +- 3 files changed, 133 insertions(+), 103 deletions(-) diff --git a/examples/rebuild_strategies/src/lib.rs b/examples/rebuild_strategies/src/lib.rs index 3f35a49a..c571fae7 100644 --- a/examples/rebuild_strategies/src/lib.rs +++ b/examples/rebuild_strategies/src/lib.rs @@ -1,19 +1,19 @@ use futures_util::stream::StreamExt; -use sqlx::{Pool, Postgres, Transaction}; +use sqlx::{PgConnection, Pool, Postgres, Transaction}; use uuid::Uuid; -use aggregate_merging::aggregates::{AggregateA, AggregateB}; -use aggregate_merging::structs::{CounterError, EventA, EventB}; -use esrs::postgres::{PgStore, Projector}; -use esrs::{AggregateManager, EventStore, StoreEvent}; +use aggregate_merging::aggregates::AggregateA; +use aggregate_merging::structs::{CounterError, EventA}; +use esrs::{AggregateManager, EventStore, ReplayableEventHandler, StoreEvent, TransactionalEventHandler}; /// A simple example demonstrating rebuilding a single projection table from an aggregate. pub async fn rebuild_single_projection_all_at_once(pool: Pool) { let aggregate: AggregateA = AggregateA::new(&pool).await.unwrap(); - // Put here all the projector you want to rebuild, but remember to add a truncate table statement - // for every table the projectors inside this vec insist on. - let projectors: Vec>> = vec![]; + // Put here all the replayable event handlers you want to rebuild, but remember to add a truncate table statement + // for every table the event handlers inside this vec insist on. + let event_handlers: Vec>> = vec![]; + let transactional_event_handlers: Vec>> = vec![]; // Start the transaction let mut transaction: Transaction = pool.begin().await.unwrap(); @@ -35,26 +35,33 @@ pub async fn rebuild_single_projection_all_at_once(pool: Pool) { .expect("Failed to drop table"); // Then fully rebuild the projection table - for event in events { - for projector in projectors.iter() { - projector - .project(&event, &mut transaction) + for event in &events { + for transactional_event_handler in transactional_event_handlers.iter() { + transactional_event_handler + .handle(event, &mut transaction) .await - .expect("Failed to project event"); + .expect("Failed to handle event"); } } - // And finally commit your transaction + // And commit your transaction transaction.commit().await.unwrap(); + + for event in &events { + for event_handler in event_handlers.iter() { + event_handler.handle(event).await; + } + } } -/// An alternative approach to rebuilding that rebuilds the projected table for a given projection one +/// An alternative approach to rebuilding that rebuilds the projected table for a given event handler one /// aggregate ID at a time, rather than committing the entire table all at once pub async fn rebuild_single_projection_per_aggregate_id(pool: Pool) { let aggregate: AggregateA = AggregateA::new(&pool).await.unwrap(); - // Put here all the projector for all the projections you want to rebuild - let projectors: Vec>> = vec![]; + // Put here all the event handlers for all the projections you want to rebuild + let event_handlers: Vec>> = vec![]; + let transactional_event_handlers: Vec>> = vec![]; // Get all unique aggregate_ids from event_store table. This should be a sqlx::query statement. let aggregate_ids: Vec = vec![Uuid::new_v4()]; @@ -64,100 +71,120 @@ pub async fn rebuild_single_projection_per_aggregate_id(pool: Pool) { // .. open a transaction.. let mut transaction: Transaction = pool.begin().await.unwrap(); - // .. and for every projector.. - for projector in projectors.iter() { - // .. delete all the records in the projection that has that aggregate_id as key. In order - // to achieve this remember to override default `delete` implementation in the projector. - projector.delete(aggregate_id, &mut transaction).await.unwrap(); + // ..then queries for all the events in the event store table.. + let events = aggregate.event_store().by_aggregate_id(aggregate_id).await.unwrap(); - // Then queries for all the events in the event store table.. - let events = aggregate.event_store().by_aggregate_id(aggregate_id).await.unwrap(); + // .. and for every transactional event handler.. + for transactional_event_handler in transactional_event_handlers.iter() { + // .. delete all the records in the projection that has that aggregate_id as key. In order + // to achieve this remember to override default `delete` implementation in the event handler. + transactional_event_handler + .delete(aggregate_id, &mut transaction) + .await + .unwrap(); // .. and rebuild all those events. - for event in events { - projector - .project(&event, &mut transaction) + for event in &events { + transactional_event_handler + .handle(event, &mut transaction) .await - .expect("Failed to project event"); + .expect("Failed to handle event"); } } - // And finally commit your transaction + // And commit your transaction transaction.commit().await.unwrap(); - } -} -/// A simple example demonstrating rebuilding a shared projection streaming on two different event -/// stores -pub async fn rebuild_shared_projection_streaming(pool: Pool) { - // Build both the stores - let store_a: PgStore = PgStore::new(pool.clone()); - let store_b: PgStore = PgStore::new(pool.clone()); - - // Put here all the projector from AggregateA you want to rebuild, but remember to add a truncate - // table statement for every table the projectors inside this vec insist on. - let projectors_a: Vec>> = vec![]; - // Put here all the projector from AggregateB you want to rebuild, but remember to add a truncate - // table statement for every table the projectors inside this vec insist on. - let projectors_b: Vec>> = vec![]; - - // Get two streams from both the tables - let mut events_a = store_a.stream_events(&pool); - let mut events_b = store_b.stream_events(&pool); - - // Fetch first element of both the tables - let mut event_a_opt: Option, CounterError>> = events_a.next().await; - let mut event_b_opt: Option, CounterError>> = events_b.next().await; - - // At this point is possible to open a transaction - let mut transaction: Transaction = pool.begin().await.unwrap(); + // Then for every event handler.. + for event_handler in event_handlers.iter() { + // .. delete all the records in the projection that has that aggregate_id as key. In order + // to achieve this remember to override default `delete` implementation in the event handler. + event_handler.delete(aggregate_id).await; - // Truncate the shared projection table. - let _ = sqlx::query("TRUNCATE TABLE counters") - .execute(&mut *transaction) - .await - .unwrap(); - - loop { - let a_opt: Option<&StoreEvent> = event_a_opt.as_ref().map(|v| v.as_ref().unwrap()); - let b_opt: Option<&StoreEvent> = event_b_opt.as_ref().map(|v| v.as_ref().unwrap()); - - match (a_opt, b_opt) { - // If both the streams returned a value we check what's the oldest. If the oldest is a - // we proceed to run the projectors from AggregateA. - (Some(a), Some(b)) if a.occurred_on <= b.occurred_on => { - for projector in projectors_a.iter() { - projector.project(a, &mut transaction).await.unwrap(); - } - - // Get next value from AggregateA events stream - event_a_opt = events_a.next().await; - } - // If only the stream on AggregateA events contains values we proceed to run the projectors - // from AggregateA. - (Some(a), None) => { - for projector in projectors_a.iter() { - projector.project(a, &mut transaction).await.unwrap(); - } - - // Get next value from AggregateA events stream - event_a_opt = events_a.next().await; - } - // If both the streams returned a value and AggregateB event is older or if only the stream - // on AggregateB events contains values we proceed to run the projectors from AggregateB. - (Some(_), Some(b)) | (None, Some(b)) => { - for projector in projectors_b.iter() { - projector.project(b, &mut transaction).await.unwrap(); - } - - // Get next value from AggregateB events stream - event_b_opt = events_b.next().await; + // .. and rebuild all those events. + for event in &events { + event_handler.handle(event).await; } - // If both the streams are empty then we break the loop. - (None, None) => break, - }; + } } - - // Finally commit the transaction - transaction.commit().await.unwrap(); } + +// FIXME: need to fix this code. Maybe when implementing new rebuild logic in library +// /// A simple example demonstrating rebuilding a shared projection streaming on two different event +// /// stores +// pub async fn rebuild_shared_projection_streaming(pool: Pool) { +// // Build both the stores +// let store_a: PgStore = PgStore::new(pool.clone()); +// let store_b: PgStore = PgStore::new(pool.clone()); +// +// // Put here all the event handlers and transactional event handlers from AggregateA you want to +// // rebuild, but remember to add a truncate table statement for every table the projectors inside +// // this vec insist on. +// let event_handlers: Vec>> = vec![]; +// let transactional_event_handlers: Vec>> = vec![]; +// // Put here all the event handlers and transactional event handlers from AggregateB you want to +// // rebuild, but remember to add a truncate table statement for every table the projectors inside +// // this vec insist on. +// let event_handlers: Vec>> = vec![]; +// let transactional_event_handlers: Vec>> = vec![]; +// +// // Get two streams from both the tables +// let mut events_a = store_a.stream_events(&pool); +// let mut events_b = store_b.stream_events(&pool); +// +// // Fetch first element of both the tables +// let mut event_a_opt: Option, CounterError>> = events_a.next().await; +// let mut event_b_opt: Option, CounterError>> = events_b.next().await; +// +// // At this point is possible to open a transaction +// let mut transaction: Transaction = pool.begin().await.unwrap(); +// +// // Truncate the shared projection table. +// let _ = sqlx::query("TRUNCATE TABLE counters") +// .execute(&mut *transaction) +// .await +// .unwrap(); +// +// loop { +// let a_opt: Option<&StoreEvent> = event_a_opt.as_ref().map(|v| v.as_ref().unwrap()); +// let b_opt: Option<&StoreEvent> = event_b_opt.as_ref().map(|v| v.as_ref().unwrap()); +// +// match (a_opt, b_opt) { +// // If both the streams returned a value we check what's the oldest. If the oldest is a +// // we proceed to run the transactional event handlers from AggregateA. +// (Some(a), Some(b)) if a.occurred_on <= b.occurred_on => { +// for projector in projectors_a.iter() { +// projector.project(a, &mut transaction).await.unwrap(); +// } +// +// // Get next value from AggregateA events stream +// event_a_opt = events_a.next().await; +// } +// // If only the stream on AggregateA events contains values we proceed to run the projectors +// // from AggregateA. +// (Some(a), None) => { +// for projector in projectors_a.iter() { +// projector.project(a, &mut transaction).await.unwrap(); +// } +// +// // Get next value from AggregateA events stream +// event_a_opt = events_a.next().await; +// } +// // If both the streams returned a value and AggregateB event is older or if only the stream +// // on AggregateB events contains values we proceed to run the projectors from AggregateB. +// (Some(_), Some(b)) | (None, Some(b)) => { +// for projector in projectors_b.iter() { +// projector.project(b, &mut transaction).await.unwrap(); +// } +// +// // Get next value from AggregateB events stream +// event_b_opt = events_b.next().await; +// } +// // If both the streams are empty then we break the loop. +// (None, None) => break, +// }; +// } +// +// // Finally commit the transaction +// transaction.commit().await.unwrap(); +// } diff --git a/src/esrs/event_handler.rs b/src/esrs/event_handler.rs index 37fffde5..902a670d 100644 --- a/src/esrs/event_handler.rs +++ b/src/esrs/event_handler.rs @@ -18,7 +18,8 @@ pub trait EventHandler: Send + Sync { std::any::type_name::() } } -// + +// FIXME: uncomment // #[async_trait] // impl EventHandler for T // where @@ -53,5 +54,7 @@ where } } +pub trait ReplayableEventHandler: EventHandler + Send + Sync {} + // TODO: doc pub trait EventHandlerError: std::error::Error {} diff --git a/src/lib.rs b/src/lib.rs index d4aaf197..da212f49 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,7 +10,7 @@ //! performed over the event store table. pub use crate::esrs::aggregate::{Aggregate, AggregateManager}; -pub use crate::esrs::event_handler::{EventHandler, TransactionalEventHandler}; +pub use crate::esrs::event_handler::{EventHandler, ReplayableEventHandler, TransactionalEventHandler}; pub use crate::esrs::state::AggregateState; pub use crate::esrs::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop};