From 9fbf95899a2e767a213fa041db1edfe8933054ee Mon Sep 17 00:00:00 2001 From: Charly Fau Date: Tue, 1 Aug 2023 09:30:29 -0300 Subject: [PATCH 1/7] Sort events before applying them to aggregate --- aggregatestore/events/aggregatestore.go | 42 ++++- .../events/aggregatestore_sort_test.go | 177 ++++++++++++++++++ 2 files changed, 218 insertions(+), 1 deletion(-) create mode 100644 aggregatestore/events/aggregatestore_sort_test.go diff --git a/aggregatestore/events/aggregatestore.go b/aggregatestore/events/aggregatestore.go index def3420a..a9e49b6e 100644 --- a/aggregatestore/events/aggregatestore.go +++ b/aggregatestore/events/aggregatestore.go @@ -229,7 +229,7 @@ func (r *AggregateStore) takeSnapshot(ctx context.Context, agg eh.Aggregate, las } func (r *AggregateStore) applyEvents(ctx context.Context, a VersionedAggregate, events []eh.Event) error { - for _, event := range events { + for _, event := range sortEventsByVersion(events) { if event.AggregateType() != a.AggregateType() { return ErrMismatchedEventType } @@ -243,3 +243,43 @@ func (r *AggregateStore) applyEvents(ctx context.Context, a VersionedAggregate, return nil } + +func sortEventsByVersion(events []eh.Event) []eh.Event { + if len(events) == 0 { + return events + } + + min, max := findMinAndMaxVersions(events) + sortedEvents := make([]eh.Event, max-min+1) + for _, event := range events { + sortedEvents[event.Version()-min] = event + } + + if len(sortedEvents) == len(events) { + return sortedEvents + } + + // remove version gaps (this should not happen) + i := 0 + for _, event := range sortedEvents { + if event != nil { + sortedEvents[i] = event + i++ + } + } + return sortedEvents[:i] +} + +func findMinAndMaxVersions(events []eh.Event) (int, int) { + min := events[0].Version() + max := min + for _, event := range events { + v := event.Version() + if v < min { + min = v + } else if v > max { + max = v + } + } + return min, max +} diff --git a/aggregatestore/events/aggregatestore_sort_test.go b/aggregatestore/events/aggregatestore_sort_test.go new file mode 100644 index 00000000..4645916e --- /dev/null +++ b/aggregatestore/events/aggregatestore_sort_test.go @@ -0,0 +1,177 @@ +// Copyright (c) 2014 - The Event Horizon authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "fmt" + "testing" + "time" + + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/mocks" + "github.com/looplab/eventhorizon/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_findMinAndMaxVersions(t *testing.T) { + tests := []struct { + name string + events []eh.Event + wantMin int + wantMax int + }{ + { + name: "one event", + events: []eh.Event{ + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 1)), + }, + wantMin: 1, + wantMax: 1, + }, + { + name: "sorted events", + events: []eh.Event{ + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 1)), + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 2)), + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 3)), + }, + wantMin: 1, + wantMax: 3, + }, + { + name: "unsorted events", + events: []eh.Event{ + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 13)), + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 11)), + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 12)), + }, + wantMin: 11, + wantMax: 13, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, got1 := findMinAndMaxVersions(tt.events) + assert.Equalf(t, tt.wantMin, got, "findMinAndMaxVersions(%v)", tt.events) + assert.Equalf(t, tt.wantMax, got1, "findMinAndMaxVersions(%v)", tt.events) + }) + } +} + +func Test_sortEventsByVersion(t *testing.T) { + tests := []struct { + name string + events []eh.Event + }{ + { + name: "no events", + events: []eh.Event{}, + }, + { + name: "one event", + events: []eh.Event{ + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 1)), + }, + }, + { + name: "one event with bigger version", + events: []eh.Event{ + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 17)), + }, + }, + { + name: "two sorted events", + events: []eh.Event{ + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 41)), + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 42)), + }, + }, + { + name: "two unsorted events", + events: []eh.Event{ + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 42)), + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 41)), + }, + }, + { + name: "several events with version gaps", + events: []eh.Event{ + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 11)), + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 12)), + eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 14)), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := sortEventsByVersion(tt.events) + + require.ElementsMatchf(t, tt.events, got, "same elements in sortEventsByVersion(%v)", tt.events) + // they are sorted + for i := 1; i < len(got); i++ { + assert.Truef(t, got[i-1].Version() < got[i].Version(), "sorted elements in sortEventsByVersion(%v)", tt.events) + } + }) + } +} + +const Sorted = true +const Unsorted = false + +func BenchmarkSort(b *testing.B) { + benchmarkSort(b, Sorted) +} +func BenchmarkSortUnsorted(b *testing.B) { + fmt.Println("BenchmarkSortUnsorted") + benchmarkSort(b, Unsorted) +} + +var sortedEvents []eh.Event + +func benchmarkSort(b *testing.B, sorted bool) { + // Use same data for all benchmarks to not influence times. + data := &mocks.EventData{} + aggId := uuid.New() + now := time.Now() + + for eventCount := 1; eventCount <= 1000; eventCount *= 10 { + + events := make([]eh.Event, eventCount, eventCount) + + if sorted { + for i := 0; i < eventCount; i++ { + events[i] = eh.NewEvent("test", data, now, eh.ForAggregate("test", aggId, i+7)) + } + } else { + for i := 0; i < eventCount; i++ { + events[i] = eh.NewEvent("test", data, now, eh.ForAggregate("test", aggId, eventCount-i+7)) + } + } + + b.Run(fmt.Sprintf("sortEventsByVersion-%d", eventCount), func(b *testing.B) { + var evs []eh.Event + for i := 0; i < b.N; i++ { + evs = sortEventsByVersion(events) + } + sortedEvents = evs + }) + + // check to force the compiler optimisation to not remove the variable + if len(sortedEvents) != eventCount { + b.Errorf("sortedEvents has wrong length: %d", len(sortedEvents)) + } + } +} From eaed3821c2f0ef6e2308b16047900be03acbe2d4 Mon Sep 17 00:00:00 2001 From: Charly Fau Date: Mon, 11 Dec 2023 12:52:32 -0300 Subject: [PATCH 2/7] Revert "Sort events before applying them to aggregate" This reverts commit 9fbf95899a2e767a213fa041db1edfe8933054ee. --- aggregatestore/events/aggregatestore.go | 42 +---- .../events/aggregatestore_sort_test.go | 177 ------------------ 2 files changed, 1 insertion(+), 218 deletions(-) delete mode 100644 aggregatestore/events/aggregatestore_sort_test.go diff --git a/aggregatestore/events/aggregatestore.go b/aggregatestore/events/aggregatestore.go index a9e49b6e..def3420a 100644 --- a/aggregatestore/events/aggregatestore.go +++ b/aggregatestore/events/aggregatestore.go @@ -229,7 +229,7 @@ func (r *AggregateStore) takeSnapshot(ctx context.Context, agg eh.Aggregate, las } func (r *AggregateStore) applyEvents(ctx context.Context, a VersionedAggregate, events []eh.Event) error { - for _, event := range sortEventsByVersion(events) { + for _, event := range events { if event.AggregateType() != a.AggregateType() { return ErrMismatchedEventType } @@ -243,43 +243,3 @@ func (r *AggregateStore) applyEvents(ctx context.Context, a VersionedAggregate, return nil } - -func sortEventsByVersion(events []eh.Event) []eh.Event { - if len(events) == 0 { - return events - } - - min, max := findMinAndMaxVersions(events) - sortedEvents := make([]eh.Event, max-min+1) - for _, event := range events { - sortedEvents[event.Version()-min] = event - } - - if len(sortedEvents) == len(events) { - return sortedEvents - } - - // remove version gaps (this should not happen) - i := 0 - for _, event := range sortedEvents { - if event != nil { - sortedEvents[i] = event - i++ - } - } - return sortedEvents[:i] -} - -func findMinAndMaxVersions(events []eh.Event) (int, int) { - min := events[0].Version() - max := min - for _, event := range events { - v := event.Version() - if v < min { - min = v - } else if v > max { - max = v - } - } - return min, max -} diff --git a/aggregatestore/events/aggregatestore_sort_test.go b/aggregatestore/events/aggregatestore_sort_test.go deleted file mode 100644 index 4645916e..00000000 --- a/aggregatestore/events/aggregatestore_sort_test.go +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright (c) 2014 - The Event Horizon authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package events - -import ( - "fmt" - "testing" - "time" - - eh "github.com/looplab/eventhorizon" - "github.com/looplab/eventhorizon/mocks" - "github.com/looplab/eventhorizon/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func Test_findMinAndMaxVersions(t *testing.T) { - tests := []struct { - name string - events []eh.Event - wantMin int - wantMax int - }{ - { - name: "one event", - events: []eh.Event{ - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 1)), - }, - wantMin: 1, - wantMax: 1, - }, - { - name: "sorted events", - events: []eh.Event{ - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 1)), - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 2)), - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 3)), - }, - wantMin: 1, - wantMax: 3, - }, - { - name: "unsorted events", - events: []eh.Event{ - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 13)), - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 11)), - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 12)), - }, - wantMin: 11, - wantMax: 13, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, got1 := findMinAndMaxVersions(tt.events) - assert.Equalf(t, tt.wantMin, got, "findMinAndMaxVersions(%v)", tt.events) - assert.Equalf(t, tt.wantMax, got1, "findMinAndMaxVersions(%v)", tt.events) - }) - } -} - -func Test_sortEventsByVersion(t *testing.T) { - tests := []struct { - name string - events []eh.Event - }{ - { - name: "no events", - events: []eh.Event{}, - }, - { - name: "one event", - events: []eh.Event{ - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 1)), - }, - }, - { - name: "one event with bigger version", - events: []eh.Event{ - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 17)), - }, - }, - { - name: "two sorted events", - events: []eh.Event{ - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 41)), - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 42)), - }, - }, - { - name: "two unsorted events", - events: []eh.Event{ - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 42)), - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 41)), - }, - }, - { - name: "several events with version gaps", - events: []eh.Event{ - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 11)), - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 12)), - eh.NewEvent("test", &mocks.EventData{}, time.Now(), eh.ForAggregate("test", uuid.New(), 14)), - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := sortEventsByVersion(tt.events) - - require.ElementsMatchf(t, tt.events, got, "same elements in sortEventsByVersion(%v)", tt.events) - // they are sorted - for i := 1; i < len(got); i++ { - assert.Truef(t, got[i-1].Version() < got[i].Version(), "sorted elements in sortEventsByVersion(%v)", tt.events) - } - }) - } -} - -const Sorted = true -const Unsorted = false - -func BenchmarkSort(b *testing.B) { - benchmarkSort(b, Sorted) -} -func BenchmarkSortUnsorted(b *testing.B) { - fmt.Println("BenchmarkSortUnsorted") - benchmarkSort(b, Unsorted) -} - -var sortedEvents []eh.Event - -func benchmarkSort(b *testing.B, sorted bool) { - // Use same data for all benchmarks to not influence times. - data := &mocks.EventData{} - aggId := uuid.New() - now := time.Now() - - for eventCount := 1; eventCount <= 1000; eventCount *= 10 { - - events := make([]eh.Event, eventCount, eventCount) - - if sorted { - for i := 0; i < eventCount; i++ { - events[i] = eh.NewEvent("test", data, now, eh.ForAggregate("test", aggId, i+7)) - } - } else { - for i := 0; i < eventCount; i++ { - events[i] = eh.NewEvent("test", data, now, eh.ForAggregate("test", aggId, eventCount-i+7)) - } - } - - b.Run(fmt.Sprintf("sortEventsByVersion-%d", eventCount), func(b *testing.B) { - var evs []eh.Event - for i := 0; i < b.N; i++ { - evs = sortEventsByVersion(events) - } - sortedEvents = evs - }) - - // check to force the compiler optimisation to not remove the variable - if len(sortedEvents) != eventCount { - b.Errorf("sortedEvents has wrong length: %d", len(sortedEvents)) - } - } -} From abedad925a59fc492fb684d11d9347adcfec1f0f Mon Sep 17 00:00:00 2001 From: Charly Fau Date: Mon, 11 Dec 2023 14:02:03 -0300 Subject: [PATCH 3/7] Add EventSorter to warrant event order Add order to mongodb_v2 eventstore --- eventstore.go | 1 + eventstore/eventsorter/event_sorter.go | 61 ++++++++++++ eventstore/eventsorter/event_sorter_test.go | 103 ++++++++++++++++++++ eventstore/mongodb/eventstore.go | 3 +- eventstore/mongodb_v2/eventstore.go | 7 +- 5 files changed, 172 insertions(+), 3 deletions(-) create mode 100644 eventstore/eventsorter/event_sorter.go create mode 100644 eventstore/eventsorter/event_sorter_test.go diff --git a/eventstore.go b/eventstore.go index a5fb01e5..d8ba2384 100644 --- a/eventstore.go +++ b/eventstore.go @@ -32,6 +32,7 @@ type EventStore interface { Load(context.Context, uuid.UUID) ([]Event, error) // LoadFrom loads all events from version for the aggregate id from the store. + // Event store should provide events in version order LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]Event, error) // Close closes the EventStore. diff --git a/eventstore/eventsorter/event_sorter.go b/eventstore/eventsorter/event_sorter.go new file mode 100644 index 00000000..26a0caf3 --- /dev/null +++ b/eventstore/eventsorter/event_sorter.go @@ -0,0 +1,61 @@ +package eventsorter + +import ( + "context" + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/uuid" + "sort" +) + +// EventSorter is an event store wrapper that warrants events are provided in version order. +// Version order is required for event sourcing to work correctly. +// Use it with an event store that does not warrant version order. +type EventSorter struct { + inner eh.EventStore +} + +var _ eh.EventStore = (*EventSorter)(nil) + +// NewEventSorter creates a new EventSorter wrapping the provided event store +func NewEventSorter(inner eh.EventStore) *EventSorter { + return &EventSorter{inner: inner} +} + +func (e EventSorter) Save(ctx context.Context, events []eh.Event, originalVersion int) error { + return e.inner.Save(ctx, events, originalVersion) +} + +func (e EventSorter) Load(ctx context.Context, uuid uuid.UUID) ([]eh.Event, error) { + events, err := e.inner.Load(ctx, uuid) + + if err != nil { + return nil, err + } + + return e.SortEvents(events), nil +} + +func (e EventSorter) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) { + events, err := e.inner.LoadFrom(ctx, id, version) + + if err != nil { + return nil, err + } + + return e.SortEvents(events), nil +} + +func (e EventSorter) Close() error { + return e.inner.Close() +} + +func (e EventSorter) SortEvents(events []eh.Event) []eh.Event { + byVersion := func(i, j int) bool { + return events[i].Version() < events[j].Version() + } + + // It is ok to sort in place, events slice is already the inner store response + sort.Slice(events, byVersion) + + return events +} diff --git a/eventstore/eventsorter/event_sorter_test.go b/eventstore/eventsorter/event_sorter_test.go new file mode 100644 index 00000000..b87994c8 --- /dev/null +++ b/eventstore/eventsorter/event_sorter_test.go @@ -0,0 +1,103 @@ +package eventsorter + +import ( + "context" + "github.com/AltScore/lcib-api/pkg/xeh/ehmocks" + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/uuid" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "testing" + "time" +) + +type EventSorterTestSuite struct { + suite.Suite + + innerStore *ehmocks.EventStoreMock + eventSorter *EventSorter + + unsortedEventList []eh.Event +} + +// In order for 'go test' to run this suite, we need to create +// a normal test function and pass our suite to suite.Run +func TestEventSorterTestSuite(t *testing.T) { + suite.Run(t, &EventSorterTestSuite{}) +} + +// before each test +func (s *EventSorterTestSuite) SetupTest() { + s.innerStore = &ehmocks.EventStoreMock{} + + s.eventSorter = NewEventSorter(s.innerStore) + + s.unsortedEventList = []eh.Event{ + eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 3)), + eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 2)), + eh.NewEvent("test", nil, time.Now(), eh.ForAggregate("test", uuid.New(), 1)), + } +} + +func (s *EventSorterTestSuite) Test_can_sort_empty_event_list_on_Load() { + // Given a event store with no events + s.innerStore.On("Load", mock.Anything, mock.Anything).Return([]eh.Event{}, nil) + + // When we load the events + events, err := s.eventSorter.Load(context.TODO(), uuid.New()) + + // Then no error is returned + s.NoError(err) + + // And empty event list is returned + s.Len(events, 0) +} + +func (s *EventSorterTestSuite) Test_can_sort_empty_event_list_on_LoafFrom() { + // Given a event store with no events + s.innerStore.On("LoadFrom", mock.Anything, mock.Anything, mock.Anything).Return([]eh.Event{}, nil) + + // When we load the events + events, err := s.eventSorter.LoadFrom(context.TODO(), uuid.New(), 8) + + // Then no error is returned + s.NoError(err) + + // And empty event list is returned + s.Len(events, 0) +} + +func (s *EventSorterTestSuite) Test_can_sort_event_list_on_Load() { + // Given a event store with no events + s.innerStore.On("Load", mock.Anything, mock.Anything).Return(s.unsortedEventList, nil) + + // When we load the events + events, err := s.eventSorter.Load(context.TODO(), uuid.New()) + + // Then no error is returned + s.NoError(err) + + // And the events are returned in version order + s.Len(events, 3) + + s.Equal(1, events[0].Version()) + s.Equal(2, events[1].Version()) + s.Equal(3, events[2].Version()) +} + +func (s *EventSorterTestSuite) Test_can_sort_event_list_on_LoadFrom() { + // Given a event store with no events + s.innerStore.On("LoadFrom", mock.Anything, mock.Anything, 2).Return(s.unsortedEventList, nil) + + // When we load the events + events, err := s.eventSorter.LoadFrom(context.TODO(), uuid.New(), 2) + + // Then no error is returned + s.NoError(err) + + // And the events are returned in version order + s.Len(events, 2) + + s.Equal(2, events[0].Version()) + s.Equal(3, events[1].Version()) +} diff --git a/eventstore/mongodb/eventstore.go b/eventstore/mongodb/eventstore.go index a6efd070..715cf5a2 100644 --- a/eventstore/mongodb/eventstore.go +++ b/eventstore/mongodb/eventstore.go @@ -16,6 +16,7 @@ package mongodb import ( "context" + "errors" "fmt" "time" @@ -332,7 +333,7 @@ func (s *EventStore) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([ var aggregate aggregateRecord if err := s.aggregates.FindOne(ctx, bson.M{"_id": id}).Decode(&aggregate); err != nil { // Translate to our own not found error. - if err == mongo.ErrNoDocuments { + if errors.Is(err, mongo.ErrNoDocuments) { err = eh.ErrAggregateNotFound } diff --git a/eventstore/mongodb_v2/eventstore.go b/eventstore/mongodb_v2/eventstore.go index 7329ca47..99ed8270 100644 --- a/eventstore/mongodb_v2/eventstore.go +++ b/eventstore/mongodb_v2/eventstore.go @@ -43,6 +43,7 @@ import ( // EventStore is an eventhorizon.EventStore for MongoDB, using one collection // for all events and another to keep track of all aggregates/streams. It also // keeps track of the global position of events, stored as metadata. +// This implementation warrants event order by Version on Load and LoadFrom methods. type EventStore struct { client *mongo.Client clientOwnership clientOwnership @@ -430,7 +431,8 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio // Load implements the Load method of the eventhorizon.EventStore interface. func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) { - cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id}) + opts := options.Find().SetSort(bson.M{"version": 1}) + cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id}, opts) if err != nil { return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not find event: %w", err), @@ -444,7 +446,8 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) // LoadFrom implements LoadFrom method of the eventhorizon.SnapshotStore interface. func (s *EventStore) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) { - cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}}) + opts := options.Find().SetSort(bson.M{"version": 1}) + cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}}, opts) if err != nil { return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not find event: %w", err), From dd4717ca7d29897eeeaf9d800390187f20f21ace Mon Sep 17 00:00:00 2001 From: Charly Fau Date: Mon, 11 Dec 2023 14:15:30 -0300 Subject: [PATCH 4/7] Fix invalid dependency --- eventstore/eventsorter/event_sorter_test.go | 7 ++-- eventstore/eventsorter/eventstore_stub.go | 40 +++++++++++++++++++++ 2 files changed, 43 insertions(+), 4 deletions(-) create mode 100644 eventstore/eventsorter/eventstore_stub.go diff --git a/eventstore/eventsorter/event_sorter_test.go b/eventstore/eventsorter/event_sorter_test.go index b87994c8..16bb6356 100644 --- a/eventstore/eventsorter/event_sorter_test.go +++ b/eventstore/eventsorter/event_sorter_test.go @@ -2,7 +2,6 @@ package eventsorter import ( "context" - "github.com/AltScore/lcib-api/pkg/xeh/ehmocks" eh "github.com/looplab/eventhorizon" "github.com/looplab/eventhorizon/uuid" "github.com/stretchr/testify/mock" @@ -14,7 +13,7 @@ import ( type EventSorterTestSuite struct { suite.Suite - innerStore *ehmocks.EventStoreMock + innerStore *EventStoreMock eventSorter *EventSorter unsortedEventList []eh.Event @@ -28,7 +27,7 @@ func TestEventSorterTestSuite(t *testing.T) { // before each test func (s *EventSorterTestSuite) SetupTest() { - s.innerStore = &ehmocks.EventStoreMock{} + s.innerStore = &EventStoreMock{} s.eventSorter = NewEventSorter(s.innerStore) @@ -87,7 +86,7 @@ func (s *EventSorterTestSuite) Test_can_sort_event_list_on_Load() { func (s *EventSorterTestSuite) Test_can_sort_event_list_on_LoadFrom() { // Given a event store with no events - s.innerStore.On("LoadFrom", mock.Anything, mock.Anything, 2).Return(s.unsortedEventList, nil) + s.innerStore.On("LoadFrom", mock.Anything, mock.Anything, 2).Return(s.unsortedEventList[0:2], nil) // When we load the events events, err := s.eventSorter.LoadFrom(context.TODO(), uuid.New(), 2) diff --git a/eventstore/eventsorter/eventstore_stub.go b/eventstore/eventsorter/eventstore_stub.go new file mode 100644 index 00000000..0f4fc811 --- /dev/null +++ b/eventstore/eventsorter/eventstore_stub.go @@ -0,0 +1,40 @@ +package eventsorter + +import ( + "context" + eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/uuid" + "github.com/stretchr/testify/mock" +) + +type EventStoreMock struct { + mock.Mock +} + +var _ eh.EventStore = (*EventStoreMock)(nil) + +func (e *EventStoreMock) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) { + args := e.Called(ctx, id, version) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]eh.Event), args.Error(1) +} + +func (e *EventStoreMock) Save(ctx context.Context, events []eh.Event, originalVersion int) error { + args := e.Called(ctx, events, originalVersion) + return args.Error(0) +} + +func (e *EventStoreMock) Load(ctx context.Context, u uuid.UUID) ([]eh.Event, error) { + args := e.Called(ctx, u) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).([]eh.Event), args.Error(1) +} + +func (e *EventStoreMock) Close() error { + args := e.Called() + return args.Error(0) +} From e0dce6070531eb46667b4b85264d3f76366a49c4 Mon Sep 17 00:00:00 2001 From: Charly Fau Date: Mon, 11 Dec 2023 14:02:03 -0300 Subject: [PATCH 5/7] Let configure sort option for mongodb event store --- eventstore/mongodb_v2/eventstore.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/eventstore/mongodb_v2/eventstore.go b/eventstore/mongodb_v2/eventstore.go index 99ed8270..64bc1dd5 100644 --- a/eventstore/mongodb_v2/eventstore.go +++ b/eventstore/mongodb_v2/eventstore.go @@ -43,7 +43,7 @@ import ( // EventStore is an eventhorizon.EventStore for MongoDB, using one collection // for all events and another to keep track of all aggregates/streams. It also // keeps track of the global position of events, stored as metadata. -// This implementation warrants event order by Version on Load and LoadFrom methods. +// This implementation warrants event order by Version on Load and LoadFrom methods (configurable, see WithSortEventsOnDB). type EventStore struct { client *mongo.Client clientOwnership clientOwnership @@ -53,6 +53,7 @@ type EventStore struct { eventHandlerAfterSave eh.EventHandler eventHandlerInTX eh.EventHandler skipNonRegisteredEvents bool + sortEventsOnDb bool // if true, events will be sorted on DB side. Default is false for backward compatibility. } type clientOwnership int @@ -224,6 +225,16 @@ func WithSnapshotCollectionName(snapshotColl string) Option { } } +// WithSortEventsOnDB enables sorting events on DB. +// Without this option, events order should be warranted by DB default ordering. This is not the case for MongoDB. +func WithSortEventsOnDB() Option { + return func(s *EventStore) error { + s.sortEventsOnDb = true + + return nil + } +} + // Save implements the Save method of the eventhorizon.EventStore interface. func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersion int) error { if len(events) == 0 { @@ -431,8 +442,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio // Load implements the Load method of the eventhorizon.EventStore interface. func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) { - opts := options.Find().SetSort(bson.M{"version": 1}) - cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id}, opts) + cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id}, s.makeFindOptions()) if err != nil { return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not find event: %w", err), @@ -446,8 +456,7 @@ func (s *EventStore) Load(ctx context.Context, id uuid.UUID) ([]eh.Event, error) // LoadFrom implements LoadFrom method of the eventhorizon.SnapshotStore interface. func (s *EventStore) LoadFrom(ctx context.Context, id uuid.UUID, version int) ([]eh.Event, error) { - opts := options.Find().SetSort(bson.M{"version": 1}) - cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}}, opts) + cursor, err := s.events.Find(ctx, bson.M{"aggregate_id": id, "version": bson.M{"$gte": version}}, s.makeFindOptions()) if err != nil { return nil, &eh.EventStoreError{ Err: fmt.Errorf("could not find event: %w", err), @@ -527,7 +536,7 @@ func (s *EventStore) loadFromCursor(ctx context.Context, id uuid.UUID, cursor *m } func (s *EventStore) LoadSnapshot(ctx context.Context, id uuid.UUID) (*eh.Snapshot, error) { - result := s.snapshots.FindOne(ctx, bson.M{"aggregate_id": id}, options.FindOne().SetSort(bson.M{"version": -1})) + result := s.snapshots.FindOne(ctx, bson.M{"aggregate_id": id}) if err := result.Err(); err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return nil, nil @@ -577,6 +586,13 @@ func (s *EventStore) LoadSnapshot(ctx context.Context, id uuid.UUID) (*eh.Snapsh return snapshot, nil } +func (s *EventStore) makeFindOptions() *mongoOptions.FindOptions { + if s.sortEventsOnDb { + return options.Find().SetSort(bson.M{"version": -1}) + } + return options.Find() +} + func (s *EventStore) SaveSnapshot(ctx context.Context, id uuid.UUID, snapshot eh.Snapshot) (err error) { if snapshot.AggregateType == "" { return &eh.EventStoreError{ From 872116c29804149e8cbfdf482b6b1ea342179c24 Mon Sep 17 00:00:00 2001 From: Charly Fau Date: Thu, 14 Dec 2023 10:05:21 -0300 Subject: [PATCH 6/7] Revert incorrect change --- eventstore/mongodb_v2/eventstore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eventstore/mongodb_v2/eventstore.go b/eventstore/mongodb_v2/eventstore.go index 64bc1dd5..6a13cff3 100644 --- a/eventstore/mongodb_v2/eventstore.go +++ b/eventstore/mongodb_v2/eventstore.go @@ -536,7 +536,7 @@ func (s *EventStore) loadFromCursor(ctx context.Context, id uuid.UUID, cursor *m } func (s *EventStore) LoadSnapshot(ctx context.Context, id uuid.UUID) (*eh.Snapshot, error) { - result := s.snapshots.FindOne(ctx, bson.M{"aggregate_id": id}) + result := s.snapshots.FindOne(ctx, bson.M{"aggregate_id": id}, options.FindOne().SetSort(bson.M{"version": -1})) if err := result.Err(); err != nil { if errors.Is(err, mongo.ErrNoDocuments) { return nil, nil From b92591c29a193821aa7ad157581403a0d1da9b23 Mon Sep 17 00:00:00 2001 From: Charly Fau Date: Thu, 28 Dec 2023 13:01:49 -0300 Subject: [PATCH 7/7] Fix event sort order on load --- eventstore/mongodb_v2/eventstore.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/eventstore/mongodb_v2/eventstore.go b/eventstore/mongodb_v2/eventstore.go index 6a13cff3..2071fda3 100644 --- a/eventstore/mongodb_v2/eventstore.go +++ b/eventstore/mongodb_v2/eventstore.go @@ -40,6 +40,8 @@ import ( "github.com/looplab/eventhorizon/uuid" ) +const Ascending = 1 + // EventStore is an eventhorizon.EventStore for MongoDB, using one collection // for all events and another to keep track of all aggregates/streams. It also // keeps track of the global position of events, stored as metadata. @@ -588,7 +590,7 @@ func (s *EventStore) LoadSnapshot(ctx context.Context, id uuid.UUID) (*eh.Snapsh func (s *EventStore) makeFindOptions() *mongoOptions.FindOptions { if s.sortEventsOnDb { - return options.Find().SetSort(bson.M{"version": -1}) + return options.Find().SetSort(bson.M{"version": Ascending}) } return options.Find() }