Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLATFORM-1080]: Error handling #157

Merged
merged 56 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
5d46c26
Add dependencies for kafka and rabbit behind features
cottinisimone May 9, 2023
45d4286
Refactor Makefile.toml; add feature tests, lint etc
cottinisimone May 9, 2023
e51b4a3
Put sql under postgres feature. Compilation error otherwise
cottinisimone May 9, 2023
e8cec8b
Add rabbit module in event_bus
cottinisimone May 9, 2023
f0d16d8
Rabbit event bus implementation: WIP
cottinisimone May 9, 2023
6d4116b
Add kafka implementation
cottinisimone May 11, 2023
6390c8b
Add docker compose to test kafka and rabbit stuff
cottinisimone May 11, 2023
0672524
Merge branch 'master' into PLATFORM-1085/task/add-some-event-bus-spec…
cottinisimone May 15, 2023
5fb3de1
Now rabbit and kafka get a config in order to be build
cottinisimone May 15, 2023
fbf2c42
Refactor error; not requested doesn't mean an error
cottinisimone May 15, 2023
f83ad4f
WIP moving integration tests to dedicated folder pt1
cottinisimone May 15, 2023
5c67248
Add more tests for manager
cottinisimone May 16, 2023
5890b03
WIP testing rabbit stuff
cottinisimone May 16, 2023
cc4fa77
WIP trying to let kafka tests work..
cottinisimone May 16, 2023
49829b0
Fix tests
cottinisimone May 16, 2023
0110b15
Remove kafka-ui container from compose
cottinisimone May 16, 2023
097f8e9
Regen drone.yml with rabbit, kafka and zookeeper
cottinisimone May 16, 2023
23184e0
Regen drone.yml
cottinisimone May 16, 2023
2cb5312
Uncomment routing key set in rabbit eb
cottinisimone May 17, 2023
087e2f2
Add shared_view example
cottinisimone May 17, 2023
acb1f40
Fix some tests; add tests for builder
cottinisimone May 17, 2023
279596e
Other changes in shared_view example
cottinisimone May 17, 2023
d2ef8e1
add eventual persistent and transactional views examples
cottinisimone May 17, 2023
0286573
Fix warnings
cottinisimone May 17, 2023
5109601
Wip on examples
cottinisimone May 18, 2023
f158f8c
Event handler must be cloneable; change builder apis
cottinisimone May 18, 2023
a47fcc8
Completed saga example
cottinisimone May 18, 2023
0fbbe49
Some other examples
cottinisimone May 18, 2023
4985e9e
Remove examples dbg; small changes
cottinisimone May 18, 2023
94af031
Store crud example
cottinisimone May 18, 2023
1bfb030
remove expect for unwrap in examples
cottinisimone May 18, 2023
00c3d94
Locking strategies with showcase
cottinisimone May 18, 2023
962c71a
Rebuilder + rebuild example
cottinisimone May 22, 2023
a35244a
Update docker rust version to 1.69.0
cottinisimone May 22, 2023
860a35d
Rebuilder + multi aggregate rebuild
cottinisimone May 22, 2023
04408f0
Add rebuilder feature; change locking_strategies example with assertions
cottinisimone May 22, 2023
6756517
Run clippy over examples
cottinisimone May 22, 2023
46ff7cf
Kafka and Rabbit example
cottinisimone May 23, 2023
d3fff72
regenerate
cpiemontese May 23, 2023
114364a
Review cpiemontese (grammar-fixes)
cottinisimone May 25, 2023
39d69fd
Renaming some files
cottinisimone May 25, 2023
b473e98
WIP ok, everything works.. trying to improved types
cottinisimone May 26, 2023
f1ea011
Small refactor trying to put type alias for transactional event handler
cottinisimone May 26, 2023
07d0fa7
Simplify aggregate manager error
cottinisimone May 29, 2023
58bd255
Remove commented deref for aggregate
cottinisimone May 29, 2023
81f4fe3
Merge branch 'master' into PLATFORM-1080/task/error-handling
cottinisimone May 29, 2023
eb4d5a1
Remove Clone from Aggregate::State assoc type
cottinisimone Jun 7, 2023
97d4abf
Remove Sync from Aggregate::Event
cottinisimone Jun 7, 2023
f6619c0
Remove Send from Aggregate::Event
cottinisimone Jun 7, 2023
0deee77
Remove Send from Aggregate::Command
cottinisimone Jun 7, 2023
0bce26f
Remove Send from Aggregate::State
cottinisimone Jun 7, 2023
4029d70
Remove 'static in favor of for<'a> lifetime bound
cottinisimone Jun 7, 2023
7eabf63
Aggregate::Error should't be std::error::Error; tokio RwLock for hand…
cottinisimone Jun 7, 2023
0b3390c
Update documentation
cottinisimone Jun 8, 2023
a4d973d
Remove useless/wrong comments
cottinisimone Jun 8, 2023
87375ba
Some more on docs
cottinisimone Jun 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ all-features = true

[features]
sql = ["sqlx"]
postgres = ["sql", "sqlx/postgres", "typed-builder"]
postgres = ["sql", "sqlx/postgres", "typed-builder", "tokio"]
rebuilder = []
kafka = ["rdkafka", "typed-builder"]
rabbit = ["lapin", "typed-builder"]

[dependencies]
tokio = { version = "1.6", optional = true }

# Serialization/Deserialization
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
Expand All @@ -32,8 +34,6 @@ chrono = { version = "0.4", features = ["serde"] }
# Build async trait
async-trait = "0.1.50"

# Atomic Arc
arc-swap = "1.6"
# Self-referential structs (for PgStoreLockGuard)
ouroboros = "0.15"

Expand All @@ -50,9 +50,10 @@ typed-builder = { version = "0.14.0", optional = true }
futures = "0.3"
tracing = "0.1"

thiserror = "1.0"

[dev-dependencies]
tokio = { version = "1.6", features = ["full"] }
thiserror = "1.0"
rand = "0.8"

[[example]]
Expand Down
9 changes: 2 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

It is an opinionated library used to achieve cqrs/es in Rust.

A set of example snippets can be found in the `example` folder.
A set of example snippets could be found in the `example` folder.

## Install

Expand Down Expand Up @@ -38,12 +38,7 @@ cargo make test

Run linters.

```
```shell
cargo make clippy
```

Finally eventually unset `DATABASE_URL` environment variable.

```shell
unset DATABASE_URL
```
11 changes: 4 additions & 7 deletions entrypoint
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
#!/usr/bin/env bash

source /setup_common.sh
source /decrypt_secrets.sh
export DATABASE_URL="postgresql://postgres:postgres@postgres:5432/postgres"
export RABBIT_URL="amqp://rabbit:rabbit@rabbit/rabbit"
export KAFKA_BROKERS_URL="kafka:9092"

if [ -n "$1" ]; then
sh -c "$@"
else
cargo watch -x 'run'
fi
sh -c "$@"
2 changes: 1 addition & 1 deletion examples/event_bus/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn main() {
.await
.unwrap();

let manager: AggregateManager<BasicAggregate> = AggregateManager::new(store);
let manager: AggregateManager<PgStore<BasicAggregate>> = AggregateManager::new(store);

let content: &str = "view row content";
let aggregate_state: AggregateState<()> = AggregateState::default();
Expand Down
28 changes: 20 additions & 8 deletions examples/locking_strategies/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@ use tokio::sync::Mutex;
use uuid::Uuid;

use esrs::postgres::{PgStore, PgStoreBuilder};
use esrs::{AggregateManager, AggregateState, EventStore};
use esrs::{AggregateManager, AggregateManagerError, AggregateState, EventStore};

use crate::common::{new_pool, BasicAggregate, BasicCommand, BasicError, BasicEvent};
use crate::common::{new_pool, BasicAggregate, BasicCommand, BasicEvent};

#[path = "../common/lib.rs"]
mod common;

type Agg = AggregateManager<BasicAggregate>;
type Agg = AggregateManager<PgStore<BasicAggregate>>;

/// Increment the value behind this `aggregate_id` as soon as atomical access can be obtained.
/// The lock can be obtained even if there are current optimistic accesses! Avoid mixing the two strategies when writing.
pub async fn increment_atomically(manager: Agg, aggregate_id: Uuid) -> Result<(), BasicError> {
pub async fn increment_atomically(
manager: Agg,
aggregate_id: Uuid,
) -> Result<(), AggregateManagerError<PgStore<BasicAggregate>>> {
let aggregate_state = manager.lock_and_load(aggregate_id).await?.unwrap_or_default();
manager
.handle_command(
Expand All @@ -32,7 +35,10 @@ pub async fn increment_atomically(manager: Agg, aggregate_id: Uuid) -> Result<()

/// Increment the value behind this `aggregate_id` with an optimistic locking strategy.
/// Optimistic access ignores any current active lock! Avoid mixing the two strategies when writing.
pub async fn increment_optimistically(manager: Agg, aggregate_id: Uuid) -> Result<(), BasicError> {
pub async fn increment_optimistically(
manager: Agg,
aggregate_id: Uuid,
) -> Result<(), AggregateManagerError<PgStore<BasicAggregate>>> {
// Every optimistic access can take place concurrently...
let aggregate_state = manager.load(aggregate_id).await?.unwrap_or_default();
// ...and events are persisted in non-deterministic order.
Expand All @@ -51,7 +57,10 @@ pub async fn increment_optimistically(manager: Agg, aggregate_id: Uuid) -> Resul
/// Load the aggregate state for read-only purposes, preventing others (that use locking) from modifying it.
/// Avoid using atomic reads if writes are optimistic, as the state would be modified anyway!
/// If writes are atomic, it is perfectly fine to use a mixture of atomic and optimistic reads.
pub async fn with_atomic_read(manager: Agg, aggregate_id: Uuid) -> Result<(), BasicError> {
pub async fn with_atomic_read(
manager: Agg,
aggregate_id: Uuid,
) -> Result<(), AggregateManagerError<PgStore<BasicAggregate>>> {
let mut aggregate_state = manager.lock_and_load(aggregate_id).await?.unwrap_or_default();
// No one else (employing locking!) can read or modify the state just loaded here,
// ensuring this really is the *latest* aggregate state.
Expand All @@ -65,7 +74,10 @@ pub async fn with_atomic_read(manager: Agg, aggregate_id: Uuid) -> Result<(), Ba
/// Load the aggregate state for read-only purposes, optimistically assuming nothing is modifying it.
/// If writes are atomic, it is perfectly fine to use a mixture of atomic and optimistic reads.
/// Otherwise, optimistic reads are allowed: beware there are no guarantees the state loaded is actually the latest.
pub async fn with_optimistic_read(manager: Agg, aggregate_id: Uuid) -> Result<(), BasicError> {
pub async fn with_optimistic_read(
manager: Agg,
aggregate_id: Uuid,
) -> Result<(), AggregateManagerError<PgStore<BasicAggregate>>> {
// Read the state now, ignoring any explicit locking...
let mut aggregate_state = manager.load(aggregate_id).await?.unwrap_or_default();
// ...but nothing prevents something else from updating the data in the store in the meanwhile,
Expand All @@ -91,7 +103,7 @@ async fn main() {
};
let _ = store.persist(&mut aggregate_state, vec![event]).await.unwrap();

let manager: AggregateManager<BasicAggregate> = AggregateManager::new(store.clone());
let manager: AggregateManager<PgStore<BasicAggregate>> = AggregateManager::new(store.clone());

// It is possible to load the aggregate state multiple times.
let _state_1 = manager.load(aggregate_id).await.unwrap().unwrap();
Expand Down
23 changes: 13 additions & 10 deletions examples/multi_aggregate_rebuild/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use futures::StreamExt;
use sqlx::{PgConnection, Pool, Postgres, Transaction};
use uuid::Uuid;

use esrs::postgres::{PgStore, PgStoreBuilder};
use esrs::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::{AggregateManager, AggregateState, ReplayableEventHandler, StoreEvent, TransactionalEventHandler};

use crate::common::{
new_pool, AggregateA, AggregateB, CommandA, CommandB, CommonError, EventA, EventB, SharedEventHandler, SharedView,
new_pool, AggregateA, AggregateB, CommandA, CommandB, EventA, EventB, SharedEventHandler, SharedView,
};
use crate::transactional_event_handler::SharedTransactionalEventHandler;

Expand Down Expand Up @@ -49,17 +49,20 @@ async fn rebuild_multi_aggregate(
let event_handlers_a: Vec<Box<dyn ReplayableEventHandler<AggregateA>>> = vec![event_handler.clone()];
let event_handlers_b: Vec<Box<dyn ReplayableEventHandler<AggregateB>>> = vec![event_handler.clone()];

let transactional_event_handlers_a: Vec<Box<dyn TransactionalEventHandler<AggregateA, PgConnection>>> =
vec![transactional_event_handler.clone()];
let transactional_event_handlers_b: Vec<Box<dyn TransactionalEventHandler<AggregateB, PgConnection>>> =
vec![transactional_event_handler.clone()];
let transactional_event_handlers_a: Vec<
Box<dyn TransactionalEventHandler<AggregateA, PgStoreError, PgConnection>>,
> = vec![transactional_event_handler.clone()];

let transactional_event_handlers_b: Vec<
Box<dyn TransactionalEventHandler<AggregateB, PgStoreError, PgConnection>>,
> = vec![transactional_event_handler.clone()];

let mut events_a = store_a.stream_events(&pool);
let mut events_b = store_b.stream_events(&pool);

// Fetch first element of both the tables
let mut event_a_opt: Option<Result<StoreEvent<EventA>, CommonError>> = events_a.next().await;
let mut event_b_opt: Option<Result<StoreEvent<EventB>, CommonError>> = events_b.next().await;
let mut event_a_opt: Option<Result<StoreEvent<EventA>, PgStoreError>> = events_a.next().await;
let mut event_b_opt: Option<Result<StoreEvent<EventB>, PgStoreError>> = events_b.next().await;

// At this point it's possible to open a transaction
let mut transaction: Transaction<Postgres> = pool.begin().await.unwrap();
Expand Down Expand Up @@ -152,7 +155,7 @@ async fn setup(shared_id: Uuid, pool: Pool<Postgres>, view: SharedView, transact
.await
.unwrap();

let manager: AggregateManager<AggregateA> = AggregateManager::new(pg_store_a);
let manager: AggregateManager<PgStore<AggregateA>> = AggregateManager::new(pg_store_a);
manager
.handle_command(AggregateState::default(), CommandA { v: 10, shared_id })
.await
Expand All @@ -170,7 +173,7 @@ async fn setup(shared_id: Uuid, pool: Pool<Postgres>, view: SharedView, transact
.await
.unwrap();

let manager: AggregateManager<AggregateB> = AggregateManager::new(pg_store_b);
let manager: AggregateManager<PgStore<AggregateB>> = AggregateManager::new(pg_store_b);
manager
.handle_command(AggregateState::default(), CommandB { v: 7, shared_id })
.await
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use async_trait::async_trait;
use sqlx::PgConnection;

use esrs::postgres::PgStoreError;
use esrs::{StoreEvent, TransactionalEventHandler};

use crate::common::{AggregateA, AggregateB, CommonError, EventA, EventB, SharedView, UpsertSharedView};
use crate::common::{AggregateA, AggregateB, EventA, EventB, SharedView, UpsertSharedView};

#[derive(Clone)]
pub struct SharedTransactionalEventHandler {
pub view: SharedView,
}

#[async_trait]
impl TransactionalEventHandler<AggregateA, PgConnection> for SharedTransactionalEventHandler {
async fn handle(&self, event: &StoreEvent<EventA>, executor: &mut PgConnection) -> Result<(), CommonError> {
impl TransactionalEventHandler<AggregateA, PgStoreError, PgConnection> for SharedTransactionalEventHandler {
async fn handle(&self, event: &StoreEvent<EventA>, executor: &mut PgConnection) -> Result<(), PgStoreError> {
Ok(self
.view
.upsert(
Expand All @@ -28,8 +29,8 @@ impl TransactionalEventHandler<AggregateA, PgConnection> for SharedTransactional
}

#[async_trait]
impl TransactionalEventHandler<AggregateB, PgConnection> for SharedTransactionalEventHandler {
async fn handle(&self, event: &StoreEvent<EventB>, executor: &mut PgConnection) -> Result<(), CommonError> {
impl TransactionalEventHandler<AggregateB, PgStoreError, PgConnection> for SharedTransactionalEventHandler {
async fn handle(&self, event: &StoreEvent<EventB>, executor: &mut PgConnection) -> Result<(), PgStoreError> {
Ok(self
.view
.upsert(
Expand Down
2 changes: 1 addition & 1 deletion examples/rebuilder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async fn setup(aggregate_id: Uuid, view: BasicView, transactional_view: BasicVie
.await
.unwrap();

let manager: AggregateManager<BasicAggregate> = AggregateManager::new(pg_store);
let manager: AggregateManager<PgStore<BasicAggregate>> = AggregateManager::new(pg_store);
manager
.handle_command(
AggregateState::with_id(aggregate_id),
Expand Down
11 changes: 6 additions & 5 deletions examples/rebuilder/transactional_event_handler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use async_trait::async_trait;
use sqlx::PgConnection;

use esrs::postgres::PgStoreError;
use esrs::{StoreEvent, TransactionalEventHandler};

use crate::common::{BasicAggregate, BasicError, BasicEvent, BasicView};
use crate::common::{BasicAggregate, BasicEvent, BasicView};

/// The `BasicTransactionalEventHandlerV1` and `BasicTransactionalEventHandlerV1` exists in this
/// example just for the sake of showing how a single transactional event handler, in this case called
Expand All @@ -19,8 +20,8 @@ pub struct BasicTransactionalEventHandlerV2 {
}

#[async_trait]
impl TransactionalEventHandler<BasicAggregate, PgConnection> for BasicTransactionalEventHandlerV1 {
async fn handle(&self, event: &StoreEvent<BasicEvent>, transaction: &mut PgConnection) -> Result<(), BasicError> {
impl TransactionalEventHandler<BasicAggregate, PgStoreError, PgConnection> for BasicTransactionalEventHandlerV1 {
async fn handle(&self, event: &StoreEvent<BasicEvent>, transaction: &mut PgConnection) -> Result<(), PgStoreError> {
Ok(self
.view
.upsert(
Expand All @@ -33,8 +34,8 @@ impl TransactionalEventHandler<BasicAggregate, PgConnection> for BasicTransactio
}

#[async_trait]
impl TransactionalEventHandler<BasicAggregate, PgConnection> for BasicTransactionalEventHandlerV2 {
async fn handle(&self, event: &StoreEvent<BasicEvent>, transaction: &mut PgConnection) -> Result<(), BasicError> {
impl TransactionalEventHandler<BasicAggregate, PgStoreError, PgConnection> for BasicTransactionalEventHandlerV2 {
async fn handle(&self, event: &StoreEvent<BasicEvent>, transaction: &mut PgConnection) -> Result<(), PgStoreError> {
Ok(self
.view
.upsert(
Expand Down
4 changes: 2 additions & 2 deletions examples/saga/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ async fn main() {
side_effect_mutex: side_effect_mutex.clone(),
};

store.add_event_handler(saga_event_handler);
store.add_event_handler(saga_event_handler).await;

let manager: AggregateManager<SagaAggregate> = AggregateManager::new(store.clone());
let manager: AggregateManager<PgStore<SagaAggregate>> = AggregateManager::new(store.clone());

let state: AggregateState<()> = AggregateState::default();
let id: Uuid = *state.id();
Expand Down
2 changes: 1 addition & 1 deletion examples/transactional_view/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn main() {
content: content.to_string(),
};

let manager: AggregateManager<BasicAggregate> = AggregateManager::new(store.clone());
let manager: AggregateManager<PgStore<BasicAggregate>> = AggregateManager::new(store.clone());

let result = manager.handle_command(state, command).await;

Expand Down
17 changes: 13 additions & 4 deletions examples/transactional_view/transactional_event_handler.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
use async_trait::async_trait;
use sqlx::PgConnection;

use esrs::postgres::PgStoreError;
use esrs::{StoreEvent, TransactionalEventHandler};

use crate::common::{BasicAggregate, BasicError, BasicEvent, BasicView};
use crate::common::{BasicAggregate, BasicEvent, BasicView};

pub struct BasicTransactionalEventHandler {
pub view: BasicView,
}

#[async_trait]
impl TransactionalEventHandler<BasicAggregate, PgConnection> for BasicTransactionalEventHandler {
async fn handle(&self, event: &StoreEvent<BasicEvent>, transaction: &mut PgConnection) -> Result<(), BasicError> {
impl TransactionalEventHandler<BasicAggregate, PgStoreError, PgConnection> for BasicTransactionalEventHandler {
async fn handle(&self, event: &StoreEvent<BasicEvent>, transaction: &mut PgConnection) -> Result<(), PgStoreError> {
// This is to show that event is rollbacked
if event.payload.content.eq("error") {
return Err(BasicError::Custom("Event contains `error` string".to_string()));
return Err(PgStoreError::Custom(Box::new(BasicEventHandlerError::Custom(
"Event contains `error` string".to_string(),
))));
}

let result = self
Expand All @@ -30,3 +33,9 @@ impl TransactionalEventHandler<BasicAggregate, PgConnection> for BasicTransactio
}
}
}

#[derive(thiserror::Error, Debug)]
pub enum BasicEventHandlerError {
#[error("{0}")]
Custom(String),
}
6 changes: 3 additions & 3 deletions src/esrs/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ pub trait Aggregate {

/// Internal aggregate state. This will be wrapped in [`AggregateState`] and could be used to validate
/// commands.
type State: Default + Clone + Send + Sync;
type State: Default;

/// A command is an action that the caller can execute over an aggregate in order to let it emit
/// an event.
type Command: Send;
type Command;

/// An event represents a fact that took place in the domain. They are the source of truth;
/// your current state is derived from the events.
type Event: Send + Sync;
type Event;

/// This associated type is used to get domain errors while handling a command.
type Error;
Expand Down
Loading