Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLATFORM-1073]: Remove projectors and policies in favour of event handlers #148

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
port examples to queries
cpiemontese committed Apr 21, 2023
commit 18562e5ea56327c9351341ee27b910230c5beec2
6 changes: 3 additions & 3 deletions examples/aggregate_merging/src/aggregates.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ use sqlx::{Pool, Postgres};
use esrs::postgres::PgStore;
use esrs::{Aggregate, AggregateManager};

use crate::projectors::CounterProjector;
use crate::projectors::CounterTransactionalQuery;
use crate::structs::{CommandA, CommandB, CounterError, EventA, EventB};

// We use a template here to make instantiating the near-identical
@@ -15,7 +15,7 @@ pub struct AggregateA {
impl AggregateA {
pub async fn new(pool: &Pool<Postgres>) -> Result<Self, CounterError> {
let event_store: PgStore<AggregateA> = PgStore::new(pool.clone())
.set_projectors(vec![Box::new(CounterProjector)])
.set_transactional_queries(vec![Box::new(CounterTransactionalQuery)])
.setup()
.await?;

@@ -63,7 +63,7 @@ pub struct AggregateB {
impl AggregateB {
pub async fn new(pool: &Pool<Postgres>) -> Result<Self, CounterError> {
let event_store: PgStore<AggregateB> = PgStore::new(pool.clone())
.set_projectors(vec![Box::new(CounterProjector)])
.set_transactional_queries(vec![Box::new(CounterTransactionalQuery)])
.setup()
.await?;

13 changes: 6 additions & 7 deletions examples/aggregate_merging/src/projectors.rs
Original file line number Diff line number Diff line change
@@ -2,19 +2,18 @@ use async_trait::async_trait;
use sqlx::{Executor, PgConnection, Postgres};
use uuid::Uuid;

use esrs::postgres::Projector;
use esrs::StoreEvent;
use esrs::{StoreEvent, TransactionalQuery};

use crate::aggregates::{AggregateA, AggregateB};
use crate::structs::{CounterError, EventA, EventB};

#[derive(Clone)]
pub struct CounterProjector;
pub struct CounterTransactionalQuery;

// This is a projector template that will project AggregateA events into a shared projection (DB table).
#[async_trait]
impl Projector<AggregateA> for CounterProjector {
async fn project(&self, event: &StoreEvent<EventA>, connection: &mut PgConnection) -> Result<(), CounterError> {
impl TransactionalQuery<AggregateA, PgConnection> for CounterTransactionalQuery {
async fn handle(&self, event: &StoreEvent<EventA>, connection: &mut PgConnection) -> Result<(), CounterError> {
match event.payload() {
EventA::Inner { shared_id: id } => {
let existing = Counter::by_id(*id, &mut *connection).await?;
@@ -33,8 +32,8 @@ impl Projector<AggregateA> for CounterProjector {

// This is a projector template that will project AggregateB events into a shared projection (DB table).
#[async_trait]
impl Projector<AggregateB> for CounterProjector {
async fn project(&self, event: &StoreEvent<EventB>, connection: &mut PgConnection) -> Result<(), CounterError> {
impl TransactionalQuery<AggregateB, PgConnection> for CounterTransactionalQuery {
async fn handle(&self, event: &StoreEvent<EventB>, connection: &mut PgConnection) -> Result<(), CounterError> {
match event.payload() {
EventB::Inner { shared_id: id } => {
let existing = Counter::by_id(*id, &mut *connection).await?;
19 changes: 8 additions & 11 deletions examples/customize_persistence_flow/src/aggregate.rs
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@ use esrs::types::SequenceNumber;
use esrs::{Aggregate, AggregateManager};
use esrs::{AggregateState, StoreEvent};

use crate::projector::CounterProjector;
use crate::projector::CounterTransactionalQuery;
use crate::structs::{CounterCommand, CounterError, CounterEvent};

pub struct CounterAggregate {
@@ -18,7 +18,7 @@ pub struct CounterAggregate {
impl CounterAggregate {
pub async fn new(pool: &Pool<Postgres>) -> Result<Self, CounterError> {
let event_store: PgStore<CounterAggregate> = PgStore::new(pool.clone())
.set_projectors(vec![Box::new(CounterProjector)])
.set_transactional_queries(vec![Box::new(CounterTransactionalQuery)])
.setup()
.await?;

@@ -85,10 +85,10 @@ impl AggregateManager for CounterAggregate {
}

// Acquiring the list of projectors early, as it is an expensive operation.
let projectors = self.event_store().projectors();
let transactional_queries = self.event_store().transactional_queries();
for store_event in store_events.iter() {
for projector in projectors.iter() {
projector.project(store_event, &mut connection).await?;
for transactional_query in transactional_queries.iter() {
transactional_query.handle(store_event, &mut connection).await?;
}
}

@@ -98,15 +98,12 @@ impl AggregateManager for CounterAggregate {
drop(aggregate_state.take_lock());

// Acquiring the list of policies early, as it is an expensive operation.
let policies = self.event_store().policies();
let queries = self.event_store().queries();
for store_event in store_events.iter() {
for policy in policies.iter() {
for query in queries.iter() {
// We want to just log errors instead of return them. This is the customization
// we wanted.
match policy.handle_event(store_event).await {
Ok(_) => (),
Err(error) => println!("{:?}", error),
}
query.handle(store_event).await;
}
}

9 changes: 4 additions & 5 deletions examples/customize_persistence_flow/src/projector.rs
Original file line number Diff line number Diff line change
@@ -2,18 +2,17 @@ use async_trait::async_trait;
use sqlx::{Executor, PgConnection, Postgres};
use uuid::Uuid;

use esrs::postgres::Projector;
use esrs::StoreEvent;
use esrs::{StoreEvent, TransactionalQuery};

use crate::aggregate::CounterAggregate;
use crate::structs::{CounterError, CounterEvent};

#[derive(Clone)]
pub struct CounterProjector;
pub struct CounterTransactionalQuery;

#[async_trait]
impl Projector<CounterAggregate> for CounterProjector {
async fn project(
impl TransactionalQuery<CounterAggregate, PgConnection> for CounterTransactionalQuery {
async fn handle(
&self,
event: &StoreEvent<CounterEvent>,
connection: &mut PgConnection,
4 changes: 2 additions & 2 deletions examples/delete_aggregate/src/aggregate.rs
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ use sqlx::{Pool, Postgres};
use esrs::postgres::PgStore;
use esrs::{Aggregate, AggregateManager};

use crate::projector::CounterProjector;
use crate::projector::CounterTransactionalQuery;
use crate::structs::{CounterCommand, CounterError, CounterEvent};

pub struct CounterAggregate {
@@ -14,7 +14,7 @@ pub struct CounterAggregate {
impl CounterAggregate {
pub async fn new(pool: &Pool<Postgres>) -> Result<Self, CounterError> {
let event_store: PgStore<CounterAggregate> = PgStore::new(pool.clone())
.set_projectors(vec![Box::new(CounterProjector)])
.set_transactional_queries(vec![Box::new(CounterTransactionalQuery)])
.setup()
.await?;

9 changes: 4 additions & 5 deletions examples/delete_aggregate/src/projector.rs
Original file line number Diff line number Diff line change
@@ -2,18 +2,17 @@ use async_trait::async_trait;
use sqlx::{Executor, PgConnection, Postgres};
use uuid::Uuid;

use esrs::postgres::Projector;
use esrs::StoreEvent;
use esrs::{StoreEvent, TransactionalQuery};

use crate::aggregate::CounterAggregate;
use crate::structs::{CounterError, CounterEvent};

#[derive(Clone)]
pub struct CounterProjector;
pub struct CounterTransactionalQuery;

#[async_trait]
impl Projector<CounterAggregate> for CounterProjector {
async fn project(
impl TransactionalQuery<CounterAggregate, PgConnection> for CounterTransactionalQuery {
async fn handle(
&self,
event: &StoreEvent<CounterEvent>,
connection: &mut PgConnection,
4 changes: 2 additions & 2 deletions examples/simple_projection/src/aggregate.rs
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@ use sqlx::{Pool, Postgres};
use esrs::postgres::PgStore;
use esrs::{Aggregate, AggregateManager};

use crate::projector::CounterProjector;
use crate::projector::CounterTransactionalQuery;
use crate::structs::{CounterCommand, CounterError, CounterEvent};

pub struct CounterAggregate {
@@ -13,7 +13,7 @@ pub struct CounterAggregate {
impl CounterAggregate {
pub async fn new(pool: &Pool<Postgres>) -> Result<Self, CounterError> {
let event_store: PgStore<CounterAggregate> = PgStore::new(pool.clone())
.set_projectors(vec![Box::new(CounterProjector)])
.set_transactional_queries(vec![Box::new(CounterTransactionalQuery)])
.setup()
.await?;

9 changes: 4 additions & 5 deletions examples/simple_projection/src/projector.rs
Original file line number Diff line number Diff line change
@@ -2,18 +2,17 @@ use async_trait::async_trait;
use sqlx::{Executor, PgConnection, Postgres};
use uuid::Uuid;

use esrs::postgres::Projector;
use esrs::StoreEvent;
use esrs::{StoreEvent, TransactionalQuery};

use crate::aggregate::CounterAggregate;
use crate::structs::{CounterError, CounterEvent};

#[derive(Clone)]
pub struct CounterProjector;
pub struct CounterTransactionalQuery;

#[async_trait]
impl Projector<CounterAggregate> for CounterProjector {
async fn project(
impl TransactionalQuery<CounterAggregate, PgConnection> for CounterTransactionalQuery {
async fn handle(
&self,
event: &StoreEvent<CounterEvent>,
connection: &mut PgConnection,
46 changes: 27 additions & 19 deletions examples/simple_saga/src/lib.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@ use thiserror::Error;
use uuid::Uuid;

use esrs::postgres::PgStore;
use esrs::{Aggregate, AggregateManager, AggregateState, Policy, StoreEvent};
use esrs::{Aggregate, AggregateManager, AggregateState, Query, StoreEvent};

#[derive(Clone)]
pub struct LoggingAggregate {
@@ -21,7 +21,7 @@ impl LoggingAggregate {
event_store: event_store.clone(),
};

event_store.set_policies(vec![Box::new(LoggingPolicy::new(this.clone()))]);
event_store.set_queries(vec![Box::new(LoggingQuery::new(this.clone()))]);
Ok(this)
}
}
@@ -77,41 +77,49 @@ impl AggregateManager for LoggingAggregate {
// A very simply policy, which tries to log an event, and creates another command after it finishes
// which indicates success or failure to log
#[derive(Clone)]
struct LoggingPolicy {
struct LoggingQuery {
aggregate: LoggingAggregate,
}

impl LoggingPolicy {
impl LoggingQuery {
pub fn new(aggregate: LoggingAggregate) -> Self {
Self { aggregate }
}
}

#[async_trait]
impl Policy<LoggingAggregate> for LoggingPolicy {
async fn handle_event(&self, event: &StoreEvent<LoggingEvent>) -> Result<(), LoggingError> {
impl Query<LoggingAggregate> for LoggingQuery {
async fn handle(&self, event: &StoreEvent<LoggingEvent>) {
let aggregate_id: Uuid = event.aggregate_id;

let aggregate_state: AggregateState<u64> = self
.aggregate
.load(aggregate_id)
.await?
.unwrap_or_else(|| AggregateState::with_id(aggregate_id)); // This should never happen
let aggregate_state: AggregateState<u64> = match self.aggregate.load(aggregate_id).await {
Ok(Some(aggregate_state)) => aggregate_state,
Ok(None) => AggregateState::with_id(aggregate_id),
Err(e) => return println!("Error loading aggregate {}", e),
}; // This should never happen

if let LoggingEvent::Received(msg) = event.payload() {
if msg.contains("fail_policy") {
self.aggregate
match self
.aggregate
.handle_command(aggregate_state, LoggingCommand::Fail)
.await?;
return Err(LoggingError::Domain(msg.clone()));
.await
{
Ok(_) => (),
Err(e) => println!("Error handling command {}", e),
};
return;
}
println!("Logged via policy from {}: {}", aggregate_id, msg);
self.aggregate
match self
.aggregate
.handle_command(aggregate_state, LoggingCommand::Succeed)
.await?;
}

Ok(())
.await
{
Ok(_) => (),
Err(e) => println!("Error handling command {}", e),
};
};
}
}

25 changes: 12 additions & 13 deletions examples/simple_side_effect/src/lib.rs
Original file line number Diff line number Diff line change
@@ -3,8 +3,8 @@ use serde::{Deserialize, Serialize};
use sqlx::{PgConnection, Pool, Postgres};
use thiserror::Error;

use esrs::postgres::{PgStore, Projector};
use esrs::{Aggregate, AggregateManager, Policy, StoreEvent};
use esrs::postgres::PgStore;
use esrs::{Aggregate, AggregateManager, Query, StoreEvent, TransactionalQuery};

pub struct LoggingAggregate {
event_store: PgStore<Self>,
@@ -13,8 +13,8 @@ pub struct LoggingAggregate {
impl LoggingAggregate {
pub async fn new(pool: &Pool<Postgres>) -> Result<Self, LoggingError> {
let event_store: PgStore<LoggingAggregate> = PgStore::new(pool.clone())
.set_projectors(vec![Box::new(LoggingProjector)])
.set_policies(vec![Box::new(LoggingPolicy)])
.set_transactional_queries(vec![Box::new(LoggingTransactionalQuery)])
.set_queries(vec![Box::new(LoggingQuery)])
.setup()
.await?;

@@ -57,11 +57,11 @@ impl AggregateManager for LoggingAggregate {
// failure is due to a simple log message filter rule, but you can imagine a
// side effect which interacts with some 3rd party service in a failable way instead
#[derive(Clone)]
pub struct LoggingProjector;
pub struct LoggingTransactionalQuery;

#[async_trait]
impl Projector<LoggingAggregate> for LoggingProjector {
async fn project(&self, event: &StoreEvent<LoggingEvent>, _: &mut PgConnection) -> Result<(), LoggingError> {
impl TransactionalQuery<LoggingAggregate, PgConnection> for LoggingTransactionalQuery {
async fn handle(&self, event: &StoreEvent<LoggingEvent>, _: &mut PgConnection) -> Result<(), LoggingError> {
let id = event.aggregate_id;
match event.payload() {
LoggingEvent::Logged(msg) => {
@@ -83,21 +83,20 @@ impl Projector<LoggingAggregate> for LoggingProjector {
// stops the event from being persisted to the event store, whereas a failure in
// a policy does not.
#[derive(Clone)]
pub struct LoggingPolicy;
pub struct LoggingQuery;

#[async_trait]
impl Policy<LoggingAggregate> for LoggingPolicy {
async fn handle_event(&self, event: &StoreEvent<LoggingEvent>) -> Result<(), LoggingError> {
impl Query<LoggingAggregate> for LoggingQuery {
async fn handle(&self, event: &StoreEvent<LoggingEvent>) {
let id = event.aggregate_id;
match event.payload() {
LoggingEvent::Logged(msg) => {
if msg.contains("fail_policy") {
return Err(LoggingError::Domain(msg.clone()));
return;
}
println!("Logged via policy from {}: {}", id, msg);
}
}
Ok(())
};
}
}

1 change: 1 addition & 0 deletions src/esrs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod aggregate;
pub mod policy;
pub mod query;
pub mod state;
pub mod store;

9 changes: 7 additions & 2 deletions src/esrs/query.rs
Original file line number Diff line number Diff line change
@@ -31,10 +31,15 @@ pub trait Query<M: AggregateManager>: Send + Sync {
// }

#[async_trait]
pub trait TransactionalQuery<AM: AggregateManager, E> {
pub trait TransactionalQuery<AM, E>: Sync
where
AM: AggregateManager,
{
async fn handle(&self, event: &StoreEvent<AM::Event>, executor: &mut E) -> Result<(), AM::Error>;

async fn delete(&self, aggregate_id: Uuid, executor: &mut E) -> Result<(), AM::Error>;
async fn delete(&self, _aggregate_id: Uuid, _executor: &mut E) -> Result<(), AM::Error> {
Ok(())
}

/// The name of the projector. By default, this is the type name of the projector,
/// but it can be overridden to provide a custom name. This name is used as
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@

pub use crate::esrs::aggregate::{Aggregate, AggregateManager};
pub use crate::esrs::policy::Policy;
pub use crate::esrs::query::{Query, TransactionalQuery};
pub use crate::esrs::state::AggregateState;
pub use crate::esrs::store::{EventStore, EventStoreLockGuard, StoreEvent, UnlockOnDrop};