From b2324a6b9c7160b27d5746815cabe86aba2a6568 Mon Sep 17 00:00:00 2001 From: Simone Cottini Date: Thu, 29 Sep 2022 15:04:33 +0200 Subject: [PATCH 1/3] Some docs; changelog and remove dead code --- CHANGELOG.md | 11 +++++++++-- examples/rebuilder/src/main.rs | 23 ----------------------- src/esrs/state.rs | 10 ++++++++++ 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d5b36621..839567b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,15 +8,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] Note: this version contains hard breaking changes and may take a lot of time in order to upgrade library version! -Refer to: [#107] +Refer to: [#107], [#108] and [#109] ### Added - `AggregateManager` - should implement `name` function that act as `Identifier`. Be sure to not change the name previously set in - `Aggregate::name` function. This would cause the store to create a new table, losing pre-migration events. + `Identifier::name` function. This would cause the store to create a new table, losing pre-migration events. + - depends on `Aggregate`, so user must implement `Aggregate` trait in order to implement `AggregateManager` trait. - should implement `EventStore` associated type. +- `EventStore::delete` function with which an entire aggregate could be deleted by `aggregate_id`. + - `PgStore` - `setup` function to create table and indexes if not exists. This function should be used only once at your application startup. It tries to create the event table and its indexes if they not exist. @@ -80,6 +83,7 @@ Refer to: [#107] - `EventStore` - `run_policies`. To customize the way policies behave override `Aggregate::store_events` using `EventStore::persist` function. + - `close` function. - `PgStore` - `test` function. Use `#[sqlx::test]` in your tests to test the store. @@ -110,4 +114,7 @@ Refer to: [#107] [Unreleased]: https://github.com/primait/event_sourcing.rs/compare/0.6.2...HEAD [0.6.2]: https://github.com/primait/event_sourcing.rs/compare/0.6.1...0.6.2 + [#107]: https://github.com/primait/event_sourcing.rs/pull/107 +[#108]: https://github.com/primait/event_sourcing.rs/pull/108 +[#109]: https://github.com/primait/event_sourcing.rs/pull/109 diff --git a/examples/rebuilder/src/main.rs b/examples/rebuilder/src/main.rs index b2cc3332..32f323f3 100644 --- a/examples/rebuilder/src/main.rs +++ b/examples/rebuilder/src/main.rs @@ -142,26 +142,3 @@ async fn main() { assert!(res.counter_id == count_id && res.count == 3); } - -#[derive(sqlx::FromRow, serde::Serialize, serde::Deserialize, Debug)] -pub struct Event { - pub id: Uuid, - pub aggregate_id: Uuid, - pub payload: Value, - pub occurred_on: DateTime, - pub sequence_number: SequenceNumber, -} - -impl TryInto> for Event { - type Error = serde_json::Error; - - fn try_into(self) -> Result, Self::Error> { - Ok(StoreEvent { - id: self.id, - aggregate_id: self.aggregate_id, - payload: serde_json::from_value::(self.payload)?, - occurred_on: self.occurred_on, - sequence_number: self.sequence_number, - }) - } -} diff --git a/src/esrs/state.rs b/src/esrs/state.rs index 4222a6db..bcc2670b 100644 --- a/src/esrs/state.rs +++ b/src/esrs/state.rs @@ -2,6 +2,9 @@ use uuid::Uuid; use crate::types::SequenceNumber; +/// The internal state for an Aggregate. +/// It contains and id representing the aggregate id, an incremental sequence number and a state +/// defined by the user of this library. #[derive(Clone)] pub struct AggregateState { pub(crate) id: Uuid, @@ -9,6 +12,7 @@ pub struct AggregateState { pub(crate) inner: S, } +/// Default implementation for [AggregateState] impl Default for AggregateState { fn default() -> Self { Self::new(Uuid::new_v4()) @@ -16,6 +20,9 @@ impl Default for AggregateState { } impl AggregateState { + /// Creates a new instance of an [AggregateState] with the given aggregate id. The use of this is discouraged + /// being that that aggregate id could be already existing and a clash of ids might happen. + /// Prefer [Default] implementation. #[must_use] pub fn new(id: Uuid) -> Self { Self { @@ -25,14 +32,17 @@ impl AggregateState { } } + /// Returns an Uuid representing the aggregate id pub const fn id(&self) -> &Uuid { &self.id } + /// Returns the internal state pub const fn inner(&self) -> &S { &self.inner } + /// Returns the internal sequence number incremented by 1. pub const fn next_sequence_number(&self) -> SequenceNumber { self.sequence_number + 1 } From ad166e688cac6cf27e73a703d9db56ed388c42de Mon Sep 17 00:00:00 2001 From: Simone Cottini Date: Thu, 29 Sep 2022 15:14:31 +0200 Subject: [PATCH 2/3] Remove unused imports --- examples/rebuilder/src/main.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/examples/rebuilder/src/main.rs b/examples/rebuilder/src/main.rs index 32f323f3..875cf0a4 100644 --- a/examples/rebuilder/src/main.rs +++ b/examples/rebuilder/src/main.rs @@ -1,15 +1,11 @@ -use std::convert::TryInto; use std::fmt::Debug; -use chrono::{DateTime, Utc}; use futures_util::stream::StreamExt; -use serde_json::Value; use sqlx::migrate::MigrateDatabase; use sqlx::{pool::PoolOptions, Pool, Postgres, Transaction}; use uuid::Uuid; use esrs::postgres::Projector; -use esrs::types::SequenceNumber; use esrs::{Aggregate, AggregateManager, AggregateState, EventStore, StoreEvent}; use simple_projection::aggregate::CounterAggregate; use simple_projection::projector::{Counter, CounterProjector}; From e6349d43d8f4709c18786090b83db729c9801994 Mon Sep 17 00:00:00 2001 From: Simone Cottini Date: Thu, 29 Sep 2022 15:56:13 +0200 Subject: [PATCH 3/3] resolve some clippy::pedantics checks --- .clippy.toml | 1 + src/esrs/aggregate.rs | 7 +++++- src/esrs/postgres/store.rs | 41 +++++++++++++++++++++++----------- src/esrs/postgres/tests/mod.rs | 4 ++-- src/esrs/state.rs | 8 ++++--- src/esrs/store.rs | 2 +- 6 files changed, 43 insertions(+), 20 deletions(-) create mode 100644 .clippy.toml diff --git a/.clippy.toml b/.clippy.toml new file mode 100644 index 00000000..48204101 --- /dev/null +++ b/.clippy.toml @@ -0,0 +1 @@ +msrv = "1.58.0" diff --git a/src/esrs/aggregate.rs b/src/esrs/aggregate.rs index 1628f4a2..11948b58 100644 --- a/src/esrs/aggregate.rs +++ b/src/esrs/aggregate.rs @@ -31,6 +31,11 @@ pub trait Aggregate { type Error; /// Handles, validate a command and emits events. + /// + /// # Errors + /// + /// Will return `Err` if the user of this library set up command validations. Every error here + /// could be just a "domain error". No technical errors. fn handle_command(state: &Self::State, command: Self::Command) -> Result, Self::Error>; /// Updates the aggregate state using the new event. This assumes that the event can be correctly applied @@ -91,8 +96,8 @@ pub trait AggregateManager: Aggregate { ); AggregateState { - inner, sequence_number, + inner, ..aggregate_state } } diff --git a/src/esrs/postgres/store.rs b/src/esrs/postgres/store.rs index fa6bdaeb..28d39650 100644 --- a/src/esrs/postgres/store.rs +++ b/src/esrs/postgres/store.rs @@ -14,7 +14,7 @@ use uuid::Uuid; use crate::esrs::policy; use crate::types::SequenceNumber; -use crate::{AggregateManager, EventStore, StoreEvent}; +use crate::{Aggregate, AggregateManager, EventStore, StoreEvent}; use super::{event, projector, statement::Statements}; @@ -48,6 +48,7 @@ where Manager::Error: From + From + std::error::Error, { /// Creates a new implementation of an aggregate + #[must_use] pub fn new(pool: Pool) -> Self { let inner: InnerPgStore = InnerPgStore { pool, @@ -78,6 +79,10 @@ where /// /// This function should be used only once at your application startup. It tries to create the /// event table and its indexes if they not exist. + /// + /// # Errors + /// + /// Will return an `Err` if there's an error connecting with database or creating tables/indexes. pub async fn setup(self) -> Result { let mut transaction: Transaction = self.inner.pool.begin().await?; @@ -102,6 +107,10 @@ where } /// Save an event in the event store and return a new `StoreEvent` instance. + /// + /// # Errors + /// + /// Will return an `Err` if the insert of the values into the database fails. pub async fn save_event( &self, aggregate_id: Uuid, @@ -161,9 +170,14 @@ where /// /// An example of how to use this function is in `examples/customize_persistence_flow` example /// folder. - pub async fn persist<'a, F: Send, T>(&'a self, fun: F) -> Result>, Manager::Error> + /// + /// # Errors + /// + /// Will return an `Err` if the given `fun` returns an `Err`. In the `EventStore` implementation + /// for `PgStore` this function return an `Err` if the event insertion or its projection fails. + pub async fn persist<'a, F, T>(&'a self, fun: F) -> Result>, Manager::Error> where - F: FnOnce(&'a Pool) -> T, + F: Send + FnOnce(&'a Pool) -> T, T: Future>, Manager::Error>> + Send, { fun(&self.inner.pool).await @@ -203,20 +217,21 @@ where let occurred_on: DateTime = Utc::now(); let mut store_events: Vec> = vec![]; - for (index, event) in events.into_iter().enumerate() { - store_events.push( - self.save_event( + for (index, event) in (0..).zip(events.into_iter()) { + let store_event: StoreEvent<::Event> = self + .save_event( aggregate_id, event, occurred_on, - starting_sequence_number + index as i32, + starting_sequence_number + index, &mut *transaction, ) - .await?, - ) + .await?; + + store_events.push(store_event); } - for store_event in store_events.iter() { + for store_event in &store_events { for projector in self.projectors().iter() { projector.project(store_event, &mut transaction).await?; } @@ -224,9 +239,9 @@ where transaction.commit().await?; - for store_event in store_events.iter() { + for store_event in &store_events { for policy in self.policies().iter() { - let _ = policy.handle_event(store_event).await; + let _policy_result = policy.handle_event(store_event).await; } } @@ -243,7 +258,7 @@ where .map(|_| ())?; for projector in self.projectors().iter() { - projector.delete(aggregate_id, &mut *transaction).await?; + projector.delete(aggregate_id, &mut transaction).await?; } transaction.commit().await?; diff --git a/src/esrs/postgres/tests/mod.rs b/src/esrs/postgres/tests/mod.rs index c18f45c9..389eaaa0 100644 --- a/src/esrs/postgres/tests/mod.rs +++ b/src/esrs/postgres/tests/mod.rs @@ -205,7 +205,7 @@ fn delete_store_events_and_projections_test(pool: Pool) { #[sqlx::test] fn policy_test(pool: Pool) { - let last_id: Arc> = Arc::new(Mutex::new(Default::default())); + let last_id: Arc> = Arc::new(Mutex::new(Uuid::default())); let policy: Box = Box::new(TestPolicy { last_id: last_id.clone(), }); @@ -225,7 +225,7 @@ fn policy_test(pool: Pool) { .unwrap(); let guard: MutexGuard = last_id.lock().unwrap(); - assert_eq!(*guard, event_internal_id) + assert_eq!(*guard, event_internal_id); } async fn create_test_projection_table(pool: &Pool) { diff --git a/src/esrs/state.rs b/src/esrs/state.rs index bcc2670b..ffbf828e 100644 --- a/src/esrs/state.rs +++ b/src/esrs/state.rs @@ -12,7 +12,7 @@ pub struct AggregateState { pub(crate) inner: S, } -/// Default implementation for [AggregateState] +/// Default implementation for [`AggregateState`] impl Default for AggregateState { fn default() -> Self { Self::new(Uuid::new_v4()) @@ -20,8 +20,10 @@ impl Default for AggregateState { } impl AggregateState { - /// Creates a new instance of an [AggregateState] with the given aggregate id. The use of this is discouraged - /// being that that aggregate id could be already existing and a clash of ids might happen. + /// Creates a new instance of an [`AggregateState`] with the given aggregate id. The use of this + /// is discouraged being that that aggregate id could be already existing and a clash of ids + /// might happen. + /// /// Prefer [Default] implementation. #[must_use] pub fn new(id: Uuid) -> Self { diff --git a/src/esrs/store.rs b/src/esrs/store.rs index 5fd4fa5a..923ce751 100644 --- a/src/esrs/store.rs +++ b/src/esrs/store.rs @@ -34,7 +34,7 @@ pub trait EventStore { async fn delete(&self, aggregate_id: Uuid) -> Result<(), ::Error>; } -/// A StoreEvent contains the payload (the original event) alongside the event's metadata. +/// A `StoreEvent` contains the payload (the original event) alongside the event's metadata. pub struct StoreEvent { /// Uniquely identifies an event among all events emitted from all aggregates. pub id: Uuid,