Skip to content

Commit

Permalink
Rename get_all in stram_events; update rebuilder example (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
cottinisimone authored Sep 29, 2022
1 parent 22491db commit 0547676
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 23 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Refer to: [#107]
- `new` function is now sync and its return value is no longer a `Result` but `Self`. Removed `Aggregate` type param.
- `new` takes ownership of pool; removed projectors and policies params. Use `set_projectors` or `set_policies`
instead to add them to the store.
- `rebuild_events` renamed into `get_all`.
- `rebuild_events` renamed into `stream_events`. Now it takes an `sqlx::Executor` parameter.
- policies behaviour is now that if one of them fails they fail silently. (override this behaviour with
`Aggregate::store_events` using `EventStore::persist` function).
- `Event` and `Error` trait generic params removed in favour of `Manager: AggregateManager`.
Expand Down
4 changes: 2 additions & 2 deletions examples/rebuild_shared_projection/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ async fn main() {
let store_a = AggregateA::new(&pool).await.unwrap().event_store;
let store_b = AggregateB::new(&pool).await.unwrap().event_store;

let mut events_a = store_a.get_all();
let mut events_b = store_b.get_all();
let mut events_a = store_a.stream_events(&pool);
let mut events_b = store_b.stream_events(&pool);

let mut event_a_opt: Option<Result<StoreEvent<EventA>, CounterError>> = events_a.next().await;
let mut event_b_opt: Option<Result<StoreEvent<EventB>, CounterError>> = events_b.next().await;
Expand Down
1 change: 1 addition & 0 deletions examples/rebuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ version = "0.1.0"

# async main
tokio = { version = "1.6", features = ["full"] }
futures-util = "0.3"

esrs = { version = "*", path = "../../", features = ["postgres"] }
simple_projection = { version = "*", path = "../simple_projection/" }
Expand Down
25 changes: 12 additions & 13 deletions examples/rebuilder/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use chrono::{DateTime, Utc};
use serde_json::Value;
use std::convert::TryInto;
use std::fmt::Debug;

use chrono::{DateTime, Utc};
use futures_util::stream::StreamExt;
use serde_json::Value;
use sqlx::migrate::MigrateDatabase;
use sqlx::{pool::PoolOptions, Pool, Postgres, Transaction};
use uuid::Uuid;
Expand Down Expand Up @@ -111,27 +112,25 @@ async fn main() {

let mut transaction = pool.begin().await.unwrap();

let events = sqlx::query_as::<_, Event>("SELECT * FROM counter_events")
.fetch_all(&mut *transaction)
let events: Vec<StoreEvent<CounterEvent>> = aggregate
.event_store
.stream_events(&mut transaction)
.collect::<Vec<Result<StoreEvent<CounterEvent>, CounterError>>>()
.await
.unwrap();

let counter_events: Vec<StoreEvent<CounterEvent>> = events
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<StoreEvent<CounterEvent>>, serde_json::Error>>()
.unwrap();
.collect::<Result<Vec<StoreEvent<CounterEvent>>, CounterError>>()
.expect("Failed to get all events from event_table");

sqlx::query("TRUNCATE TABLE counters")
.execute(&mut *transaction)
.execute(&mut transaction)
.await
.expect("Failed to drop table");

// Assert the counter doesn't exist
let res = Counter::by_id(count_id, &mut *transaction).await.expect("Query failed");
let res = Counter::by_id(count_id, &mut transaction).await.expect("Query failed");
assert!(res.is_none());

rebuild_all_at_once(counter_events, projectors.as_slice(), &mut transaction).await;
rebuild_all_at_once(events, projectors.as_slice(), &mut transaction).await;

transaction.commit().await.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion examples/simple_projection/src/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::projector::CounterProjector;
use crate::structs::{CounterCommand, CounterError, CounterEvent};

pub struct CounterAggregate {
event_store: PgStore<Self>,
pub event_store: PgStore<Self>,
}

impl CounterAggregate {
Expand Down
12 changes: 6 additions & 6 deletions src/esrs/postgres/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,14 @@ where

/// This function returns a stream representing the full event store table content. This should
/// be mainly used to rebuild read models.
pub fn get_all(&self) -> BoxStream<Result<StoreEvent<Manager::Event>, Manager::Error>> {
pub fn stream_events<'s>(
&'s self,
executor: impl Executor<'s, Database = Postgres> + 's,
) -> BoxStream<Result<StoreEvent<Manager::Event>, Manager::Error>> {
Box::pin({
sqlx::query_as::<_, event::Event>(self.inner.statements.select_all())
.fetch(&self.inner.pool)
.map(|res| match res {
Ok(event) => event.try_into().map_err(Into::into),
Err(error) => Err(error.into()),
})
.fetch(executor)
.map(|res| Ok(res?.try_into()?))
})
}

Expand Down

0 comments on commit 0547676

Please sign in to comment.