Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AggregateManager::handle command is generic in Error #185

Merged
merged 12 commits into from
Jan 9, 2024
1 change: 0 additions & 1 deletion .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: taiki-e/install-action@cargo-release
- uses: taiki-e/install-action@cargo-make
- name: Get version
run: |
Expand Down
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

---
## [0.13.1] - 2023-09-12

### Added

- [[#185]]: `AggregateManager` expose the internal EventStore.

---
## [0.13.0] - 2023-09-12

Expand Down Expand Up @@ -285,7 +292,8 @@ Refer to: [#107], [#108] and [#109]
- Bump min version of supported Rust to 1.58 since <1.58 fails to resolve sqlx-core dep


[Unreleased]: https://github.com/primait/event_sourcing.rs/compare/0.13.0...HEAD
[Unreleased]: https://github.com/primait/event_sourcing.rs/compare/0.13.1...HEAD
[0.13.1]: https://github.com/primait/event_sourcing.rs/compare/0.13.0...0.13.1
[0.13.0]: https://github.com/primait/event_sourcing.rs/compare/0.12.0...0.13.0
[0.12.0]: https://github.com/primait/event_sourcing.rs/compare/0.11.0...0.12.0
[0.11.0]: https://github.com/primait/event_sourcing.rs/compare/0.10.2...0.11.0
Expand All @@ -299,6 +307,7 @@ Refer to: [#107], [#108] and [#109]
[0.6.2]: https://github.com/primait/event_sourcing.rs/compare/0.6.1...0.6.2


[#185]: https://github.com/primait/event_sourcing.rs/pull/185
[#175]: https://github.com/primait/event_sourcing.rs/pull/175
[#164]: https://github.com/primait/event_sourcing.rs/pull/164
[#161]: https://github.com/primait/event_sourcing.rs/pull/161
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ name = "esrs"
readme = "README.md"
repository = "https://github.com/primait/event_sourcing.rs"
rust-version = "1.58.0"
version = "0.13.0"
version = "0.13.1"

[package.metadata.docs.rs]
all-features = true
Expand Down Expand Up @@ -42,7 +42,7 @@ ouroboros = "0.18"
# Sql library for async impl
sqlx = { version = "0.7.0", features = ["runtime-tokio-native-tls", "uuid", "json", "chrono"], optional = true }
# Kafka library
rdkafka = { version = "0.36.*", features = ["ssl-vendored"], optional = true }
rdkafka = { version = "0.35.*", features = ["ssl-vendored"], optional = true }
# Rabbit library
lapin = { version = "2.1.1", optional = true }
# Builder pattern
Expand Down
6 changes: 5 additions & 1 deletion Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ default_to_workspace = false
# Check
[tasks.check]
description = "Run checks for each feature"
dependencies = ["check-scripts"]

[tasks.check-scripts]
description = "Run checks for each feature"
script = [
"cargo check",
"cargo check --features=postgres",
Expand Down Expand Up @@ -107,4 +111,4 @@ env = { "RUSTDOCFLAGS" = "-Dwarnings" }
[tasks.release]
description = "Task to release the package to crates.io"
command = "cargo"
args = ["release", "publish", "--no-confirm", "--allow-branch \"*\"", "--all-features", "--execute"]
args = ["publish", "--no-verify"]
14 changes: 11 additions & 3 deletions examples/aggregate_deletion/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,23 @@ use sqlx::{Pool, Postgres};
use uuid::Uuid;

use esrs::manager::AggregateManager;
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::store::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::store::{EventStore, StoreEvent};
use esrs::AggregateState;

use crate::common::{new_pool, BasicAggregate, BasicCommand, BasicEvent, BasicEventHandler, BasicView};
use crate::common::{new_pool, BasicAggregate, BasicCommand, BasicError, BasicEvent, BasicEventHandler, BasicView};

#[path = "../common/lib.rs"]
mod common;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Aggregate(#[from] BasicError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand All @@ -39,7 +47,7 @@ async fn main() {

let manager = AggregateManager::new(store.clone());

manager.handle_command(state, command).await.unwrap();
manager.handle_command::<Error>(state, command).await.unwrap();

let row = view.by_id(id, &pool).await.unwrap().unwrap();

Expand Down
3 changes: 3 additions & 0 deletions examples/common/basic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ pub struct BasicEvent {
impl esrs::event::Upcaster for BasicEvent {}

#[allow(dead_code)]
#[derive(Debug, thiserror::Error)]
pub enum BasicError {
#[error("Empty content")]
EmptyContent,
#[error("Custom error: {}", .0)]
Custom(String),
}
1 change: 1 addition & 0 deletions examples/common/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ mod shared;

mod util;

#[derive(Debug, thiserror::Error)]
pub enum CommonError {}
14 changes: 11 additions & 3 deletions examples/event_bus/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ use uuid::Uuid;
use esrs::bus::kafka::{KafkaEventBus, KafkaEventBusConfig};
use esrs::bus::rabbit::{RabbitEventBus, RabbitEventBusConfig};
use esrs::manager::AggregateManager;
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::store::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::AggregateState;

use crate::common::{new_pool, random_letters, BasicAggregate, BasicCommand, BasicEventHandler, BasicView};
use crate::common::{new_pool, random_letters, BasicAggregate, BasicCommand, BasicError, BasicEventHandler, BasicView};
use crate::kafka::KafkaEventBusConsumer;
use crate::rabbit::RabbitEventBusConsumer;

Expand All @@ -29,6 +29,14 @@ mod common;
mod kafka;
mod rabbit;

#[derive(Debug, thiserror::Error)]
pub enum EventBusError {
#[error(transparent)]
Aggregate(#[from] BasicError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand Down Expand Up @@ -107,7 +115,7 @@ async fn main() {
let command = BasicCommand {
content: content.to_string(),
};
manager.handle_command(aggregate_state, command).await.unwrap();
let _: Result<(), EventBusError> = manager.handle_command(aggregate_state, command).await;

let (rabbit_timeout_result, kafka_timeout_result) = tokio::join!(rabbit_join_handle, kafka_join_handle);

Expand Down
14 changes: 11 additions & 3 deletions examples/eventual_view/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@ use sqlx::{Pool, Postgres};
use uuid::Uuid;

use esrs::manager::AggregateManager;
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::store::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::AggregateState;

use crate::common::{new_pool, BasicAggregate, BasicCommand, BasicEventHandler, BasicView};
use crate::common::{new_pool, BasicAggregate, BasicCommand, BasicError, BasicEventHandler, BasicView};

#[path = "../common/lib.rs"]
mod common;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Aggregate(#[from] BasicError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand All @@ -38,7 +46,7 @@ async fn main() {
};

AggregateManager::new(store)
.handle_command(state, command)
.handle_command::<Error>(state, command)
.await
.unwrap();

Expand Down
40 changes: 17 additions & 23 deletions examples/locking_strategies/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ use sqlx::{Pool, Postgres};
use tokio::sync::Mutex;
use uuid::Uuid;

use esrs::manager::{AggregateManager, AggregateManagerError};
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::manager::AggregateManager;
use esrs::store::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::store::EventStore;
use esrs::AggregateState;

use crate::common::{new_pool, BasicAggregate, BasicCommand, BasicEvent};
use crate::common::{new_pool, BasicAggregate, BasicCommand, BasicError, BasicEvent};

#[path = "../common/lib.rs"]
mod common;
Expand All @@ -54,10 +54,7 @@ type Agg = AggregateManager<PgStore<BasicAggregate>>;

/// Increment the value behind this `aggregate_id` as soon as atomical access can be obtained.
/// The lock can be obtained even if there are current optimistic accesses! Avoid mixing the two strategies when writing.
pub async fn increment_atomically(
manager: Agg,
aggregate_id: Uuid,
) -> Result<(), AggregateManagerError<PgStore<BasicAggregate>>> {
pub async fn increment_atomically(manager: Agg, aggregate_id: Uuid) -> Result<(), LockExampleError> {
let aggregate_state = manager.lock_and_load(aggregate_id).await?.unwrap_or_default();
manager
.handle_command(
Expand All @@ -66,16 +63,12 @@ pub async fn increment_atomically(
content: "whatever".to_string(),
},
)
.await?;
Ok(())
.await
}

/// Increment the value behind this `aggregate_id` with an optimistic locking strategy.
/// Optimistic access ignores any current active lock! Avoid mixing the two strategies when writing.
pub async fn increment_optimistically(
manager: Agg,
aggregate_id: Uuid,
) -> Result<(), AggregateManagerError<PgStore<BasicAggregate>>> {
pub async fn increment_optimistically(manager: Agg, aggregate_id: Uuid) -> Result<(), LockExampleError> {
// Every optimistic access can take place concurrently...
let aggregate_state = manager.load(aggregate_id).await?.unwrap_or_default();
// ...and events are persisted in non-deterministic order.
Expand All @@ -87,17 +80,13 @@ pub async fn increment_optimistically(
content: "whatever".to_string(),
},
)
.await?;
Ok(())
.await
}

/// Load the aggregate state for read-only purposes, preventing others (that use locking) from modifying it.
/// Avoid using atomic reads if writes are optimistic, as the state would be modified anyway!
/// If writes are atomic, it is perfectly fine to use a mixture of atomic and optimistic reads.
pub async fn with_atomic_read(
manager: Agg,
aggregate_id: Uuid,
) -> Result<(), AggregateManagerError<PgStore<BasicAggregate>>> {
pub async fn with_atomic_read(manager: Agg, aggregate_id: Uuid) -> Result<(), LockExampleError> {
let mut aggregate_state = manager.lock_and_load(aggregate_id).await?.unwrap_or_default();
// No one else (employing locking!) can read or modify the state just loaded here,
// ensuring this really is the *latest* aggregate state.
Expand All @@ -111,10 +100,7 @@ pub async fn with_atomic_read(
/// Load the aggregate state for read-only purposes, optimistically assuming nothing is modifying it.
/// If writes are atomic, it is perfectly fine to use a mixture of atomic and optimistic reads.
/// Otherwise, optimistic reads are allowed: beware there are no guarantees the state loaded is actually the latest.
pub async fn with_optimistic_read(
manager: Agg,
aggregate_id: Uuid,
) -> Result<(), AggregateManagerError<PgStore<BasicAggregate>>> {
pub async fn with_optimistic_read(manager: Agg, aggregate_id: Uuid) -> Result<(), LockExampleError> {
// Read the state now, ignoring any explicit locking...
let mut aggregate_state = manager.load(aggregate_id).await?.unwrap_or_default();
// ...but nothing prevents something else from updating the data in the store in the meanwhile,
Expand All @@ -126,6 +112,14 @@ pub async fn with_optimistic_read(
Ok(())
}

#[derive(Debug, thiserror::Error)]
pub enum LockExampleError {
#[error(transparent)]
Aggregate(#[from] BasicError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

/// Locking showcase
#[tokio::main]
async fn main() {
Expand Down
14 changes: 11 additions & 3 deletions examples/multi_aggregate_rebuild/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,22 @@ use esrs::store::StoreEvent;
use esrs::AggregateState;

use crate::common::{
new_pool, AggregateA, AggregateB, CommandA, CommandB, EventA, EventB, SharedEventHandler, SharedView,
new_pool, AggregateA, AggregateB, CommandA, CommandB, CommonError, EventA, EventB, SharedEventHandler, SharedView,
};
use crate::transactional_event_handler::SharedTransactionalEventHandler;

#[path = "../common/lib.rs"]
mod common;
mod transactional_event_handler;

#[derive(Debug, thiserror::Error)]
pub enum RebuilderError {
#[error(transparent)]
Aggregate(#[from] CommonError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand Down Expand Up @@ -173,7 +181,7 @@ async fn setup(shared_id: Uuid, pool: Pool<Postgres>, view: SharedView, transact

let manager: AggregateManager<PgStore<AggregateA>> = AggregateManager::new(pg_store_a);
manager
.handle_command(AggregateState::default(), CommandA { v: 10, shared_id })
.handle_command::<RebuilderError>(AggregateState::default(), CommandA { v: 10, shared_id })
.await
.unwrap();

Expand All @@ -191,7 +199,7 @@ async fn setup(shared_id: Uuid, pool: Pool<Postgres>, view: SharedView, transact

let manager: AggregateManager<PgStore<AggregateB>> = AggregateManager::new(pg_store_b);
manager
.handle_command(AggregateState::default(), CommandB { v: 7, shared_id })
.handle_command::<RebuilderError>(AggregateState::default(), CommandB { v: 7, shared_id })
.await
.unwrap();

Expand Down
12 changes: 11 additions & 1 deletion examples/readme/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,21 @@ async fn main() {
.expect("Failed to create PgStore");

let manager: AggregateManager<_> = AggregateManager::new(store);
let _ = manager
let _: Result<(), Error> = manager
.handle_command(Default::default(), BookCommand::Buy { num_of_copies: 1 })
.await;
}

//////////////////////////////
// Global error
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error(transparent)]
Aggregate(#[from] BookError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

//////////////////////////////
// Aggregate

Expand Down
14 changes: 11 additions & 3 deletions examples/rebuilder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ use uuid::Uuid;

use esrs::manager::AggregateManager;
use esrs::rebuilder::{PgRebuilder, Rebuilder};
use esrs::store::postgres::{PgStore, PgStoreBuilder};
use esrs::store::postgres::{PgStore, PgStoreBuilder, PgStoreError};
use esrs::AggregateState;

use crate::common::{new_pool, BasicAggregate, BasicCommand, BasicView};
use crate::common::{new_pool, BasicAggregate, BasicCommand, BasicError, BasicView};
use crate::event_handler::{AnotherEventHandler, BasicEventHandlerV1, BasicEventHandlerV2};
use crate::transactional_event_handler::{BasicTransactionalEventHandlerV1, BasicTransactionalEventHandlerV2};

Expand All @@ -49,6 +49,14 @@ mod common;
mod event_handler;
mod transactional_event_handler;

#[derive(Debug, thiserror::Error)]
pub enum RebuilderError {
#[error(transparent)]
Aggregate(#[from] BasicError),
#[error(transparent)]
Store(#[from] PgStoreError),
}

#[tokio::main]
async fn main() {
let pool: Pool<Postgres> = new_pool().await;
Expand Down Expand Up @@ -86,7 +94,7 @@ async fn setup(aggregate_id: Uuid, view: BasicView, transactional_view: BasicVie

let manager: AggregateManager<PgStore<BasicAggregate>> = AggregateManager::new(pg_store);
manager
.handle_command(
.handle_command::<RebuilderError>(
AggregateState::with_id(aggregate_id),
BasicCommand {
content: "basic_command".to_string(),
Expand Down
Loading