Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLATFORM-1073]: Remove projectors and policies in favour of event handlers #148

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/aggregate_merging/src/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use sqlx::{Pool, Postgres};
use esrs::postgres::PgStore;
use esrs::{Aggregate, AggregateManager};

use crate::projectors::CounterProjector;
use crate::projectors::CounterTransactionalEventHandler;
use crate::structs::{CommandA, CommandB, CounterError, EventA, EventB};

// We use a template here to make instantiating the near-identical
Expand All @@ -15,7 +15,7 @@ pub struct AggregateA {
impl AggregateA {
pub async fn new(pool: &Pool<Postgres>) -> Result<Self, CounterError> {
let event_store: PgStore<AggregateA> = PgStore::new(pool.clone())
.set_projectors(vec![Box::new(CounterProjector)])
.set_transactional_event_handlers(vec![Box::new(CounterTransactionalEventHandler)])
.setup()
.await?;

Expand Down Expand Up @@ -63,7 +63,7 @@ pub struct AggregateB {
impl AggregateB {
pub async fn new(pool: &Pool<Postgres>) -> Result<Self, CounterError> {
let event_store: PgStore<AggregateB> = PgStore::new(pool.clone())
.set_projectors(vec![Box::new(CounterProjector)])
.set_transactional_event_handlers(vec![Box::new(CounterTransactionalEventHandler)])
.setup()
.await?;

Expand Down
13 changes: 6 additions & 7 deletions examples/aggregate_merging/src/projectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ use async_trait::async_trait;
use sqlx::{Executor, PgConnection, Postgres};
use uuid::Uuid;

use esrs::postgres::Projector;
use esrs::StoreEvent;
use esrs::{StoreEvent, TransactionalEventHandler};

use crate::aggregates::{AggregateA, AggregateB};
use crate::structs::{CounterError, EventA, EventB};

#[derive(Clone)]
pub struct CounterProjector;
pub struct CounterTransactionalEventHandler;

// This is a projector template that will project AggregateA events into a shared projection (DB table).
#[async_trait]
impl Projector<AggregateA> for CounterProjector {
async fn project(&self, event: &StoreEvent<EventA>, connection: &mut PgConnection) -> Result<(), CounterError> {
impl TransactionalEventHandler<AggregateA, PgConnection> for CounterTransactionalEventHandler {
async fn handle(&self, event: &StoreEvent<EventA>, connection: &mut PgConnection) -> Result<(), CounterError> {
match event.payload() {
EventA::Inner { shared_id: id } => {
let existing = Counter::by_id(*id, &mut *connection).await?;
Expand All @@ -33,8 +32,8 @@ impl Projector<AggregateA> for CounterProjector {

// This is a projector template that will project AggregateB events into a shared projection (DB table).
#[async_trait]
impl Projector<AggregateB> for CounterProjector {
async fn project(&self, event: &StoreEvent<EventB>, connection: &mut PgConnection) -> Result<(), CounterError> {
impl TransactionalEventHandler<AggregateB, PgConnection> for CounterTransactionalEventHandler {
async fn handle(&self, event: &StoreEvent<EventB>, connection: &mut PgConnection) -> Result<(), CounterError> {
match event.payload() {
EventB::Inner { shared_id: id } => {
let existing = Counter::by_id(*id, &mut *connection).await?;
Expand Down
19 changes: 8 additions & 11 deletions examples/customize_persistence_flow/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use esrs::types::SequenceNumber;
use esrs::{Aggregate, AggregateManager};
use esrs::{AggregateState, StoreEvent};

use crate::projector::CounterProjector;
use crate::projector::CounterTransactionalEventHandler;
use crate::structs::{CounterCommand, CounterError, CounterEvent};

pub struct CounterAggregate {
Expand All @@ -18,7 +18,7 @@ pub struct CounterAggregate {
impl CounterAggregate {
pub async fn new(pool: &Pool<Postgres>) -> Result<Self, CounterError> {
let event_store: PgStore<CounterAggregate> = PgStore::new(pool.clone())
.set_projectors(vec![Box::new(CounterProjector)])
.set_transactional_event_handlers(vec![Box::new(CounterTransactionalEventHandler)])
.setup()
.await?;

Expand Down Expand Up @@ -85,10 +85,10 @@ impl AggregateManager for CounterAggregate {
}

// Acquiring the list of projectors early, as it is an expensive operation.
let projectors = self.event_store().projectors();
let transactional_queries = self.event_store().transactional_event_handlers();
for store_event in store_events.iter() {
for projector in projectors.iter() {
projector.project(store_event, &mut connection).await?;
for transactional_query in transactional_queries.iter() {
transactional_query.handle(store_event, &mut connection).await?;
}
}

Expand All @@ -98,15 +98,12 @@ impl AggregateManager for CounterAggregate {
drop(aggregate_state.take_lock());

// Acquiring the list of policies early, as it is an expensive operation.
let policies = self.event_store().policies();
let queries = self.event_store().event_handlers();
for store_event in store_events.iter() {
for policy in policies.iter() {
for query in queries.iter() {
// We want to just log errors instead of return them. This is the customization
// we wanted.
match policy.handle_event(store_event).await {
Ok(_) => (),
Err(error) => println!("{:?}", error),
}
query.handle(store_event).await;
}
}

Expand Down
9 changes: 4 additions & 5 deletions examples/customize_persistence_flow/src/projector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ use async_trait::async_trait;
use sqlx::{Executor, PgConnection, Postgres};
use uuid::Uuid;

use esrs::postgres::Projector;
use esrs::StoreEvent;
use esrs::{StoreEvent, TransactionalEventHandler};

use crate::aggregate::CounterAggregate;
use crate::structs::{CounterError, CounterEvent};

#[derive(Clone)]
pub struct CounterProjector;
pub struct CounterTransactionalEventHandler;

#[async_trait]
impl Projector<CounterAggregate> for CounterProjector {
async fn project(
impl TransactionalEventHandler<CounterAggregate, PgConnection> for CounterTransactionalEventHandler {
async fn handle(
&self,
event: &StoreEvent<CounterEvent>,
connection: &mut PgConnection,
Expand Down
4 changes: 2 additions & 2 deletions examples/delete_aggregate/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use sqlx::{Pool, Postgres};
use esrs::postgres::PgStore;
use esrs::{Aggregate, AggregateManager};

use crate::projector::CounterProjector;
use crate::projector::CounterTransactionalEventHandler;
use crate::structs::{CounterCommand, CounterError, CounterEvent};

pub struct CounterAggregate {
Expand All @@ -14,7 +14,7 @@ pub struct CounterAggregate {
impl CounterAggregate {
pub async fn new(pool: &Pool<Postgres>) -> Result<Self, CounterError> {
let event_store: PgStore<CounterAggregate> = PgStore::new(pool.clone())
.set_projectors(vec![Box::new(CounterProjector)])
.set_transactional_event_handlers(vec![Box::new(CounterTransactionalEventHandler)])
.setup()
.await?;

Expand Down
9 changes: 4 additions & 5 deletions examples/delete_aggregate/src/projector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,17 @@ use async_trait::async_trait;
use sqlx::{Executor, PgConnection, Postgres};
use uuid::Uuid;

use esrs::postgres::Projector;
use esrs::StoreEvent;
use esrs::{StoreEvent, TransactionalEventHandler};

use crate::aggregate::CounterAggregate;
use crate::structs::{CounterError, CounterEvent};

#[derive(Clone)]
pub struct CounterProjector;
pub struct CounterTransactionalEventHandler;

#[async_trait]
impl Projector<CounterAggregate> for CounterProjector {
async fn project(
impl TransactionalEventHandler<CounterAggregate, PgConnection> for CounterTransactionalEventHandler {
async fn handle(
&self,
event: &StoreEvent<CounterEvent>,
connection: &mut PgConnection,
Expand Down
Loading