From baea371cd3370b2b6efe49786a0711743a3e0d53 Mon Sep 17 00:00:00 2001 From: Max Ekman Date: Thu, 21 Jan 2021 13:09:24 +0100 Subject: [PATCH] Retryable event handling --- eventbus/acceptance_testing.go | 28 ++++++- eventbus/gcp/eventbus.go | 9 ++- eventbus/kafka/eventbus.go | 7 +- eventbus/local/eventbus.go | 18 ++++- eventbus/redis/eventbus.go | 19 ++++- eventhandler.go | 19 +++++ eventhandler/projector/eventhandler.go | 77 ++++++++++++------- eventhandler/projector/eventhandler_test.go | 7 ++ .../commandhandler/tracing/middleware.go | 7 +- .../eventhandler/async/middleware_test.go | 2 +- middleware/eventhandler/tracing/middleware.go | 13 ++-- mocks/mocks.go | 21 +++-- repo.go | 16 ++-- repo/tracing/repo.go | 30 ++++---- repo/version/repo.go | 13 +++- repo/version/repo_test.go | 2 +- 16 files changed, 210 insertions(+), 78 deletions(-) diff --git a/eventbus/acceptance_testing.go b/eventbus/acceptance_testing.go index dccd1160..6a9275cf 100644 --- a/eventbus/acceptance_testing.go +++ b/eventbus/acceptance_testing.go @@ -213,7 +213,7 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration) // Test async errors from handlers. errorHandler := mocks.NewEventHandler("error_handler") - errorHandler.Err = errors.New("handler error") + errorHandler.ErrOnce = errors.New("handler error") if err := bus1.AddHandler(ctx, eh.MatchAll{}, errorHandler); err != nil { t.Fatal("there should be no error:", err) } @@ -234,9 +234,35 @@ func AcceptanceTest(t *testing.T, bus1, bus2 eh.EventBus, timeout time.Duration) // Good case. if err.Error() != "could not handle event (error_handler): handler error: (Event@3)" { t.Error("incorrect error sent on event bus:", err) + t.Logf("%#v", err.Event) } } + // Retryable events. + retryHandler := mocks.NewEventHandler("retry_handler") + retryHandler.ErrOnce = eh.RetryableEventError{Err: errors.New("retryable error")} + bus1.AddHandler(ctx, eh.MatchAll{}, retryHandler) + + time.Sleep(timeout) // Need to wait here for handlers to be added. + + event4 := eh.NewEvent(mocks.EventType, &mocks.EventData{Content: "event4"}, timestamp, + eh.ForAggregate(mocks.AggregateType, id, 4), + eh.WithMetadata(map[string]interface{}{"meta": "data", "num": int32(42)}), + ) + if err := bus1.HandleEvent(ctx, event4); err != nil { + t.Error("there should be no error:", err) + } + select { + case <-time.After(timeout): + t.Error("there should be a retried event in time") + case <-retryHandler.Recv: + } + retryHandler.Lock() + if retryHandler.NumHandleEvent != 2 { + t.Error("the handler should have been called twice") + } + retryHandler.Unlock() + // Cancel all handlers and wait. cancel() bus1.Wait() diff --git a/eventbus/gcp/eventbus.go b/eventbus/gcp/eventbus.go index 916bb1b3..6d44f030 100644 --- a/eventbus/gcp/eventbus.go +++ b/eventbus/gcp/eventbus.go @@ -253,14 +253,19 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler) func(ctx contex // Handle the event if it did match. if err := h.HandleEvent(ctx, event); err != nil { + // Retryable errors are not logged and will be retried. + if _, ok := err.(eh.RetryableEventError); ok { + msg.Nack() + return + } + + // Log unhandled events, they will NOT be retried. err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err) select { case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in GCP event bus: %s", err) } - msg.Nack() - return } msg.Ack() diff --git a/eventbus/kafka/eventbus.go b/eventbus/kafka/eventbus.go index 122493c4..b0d21ae0 100644 --- a/eventbus/kafka/eventbus.go +++ b/eventbus/kafka/eventbus.go @@ -240,13 +240,18 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, r *kafka.Reader // Handle the event if it did match. if err := h.HandleEvent(ctx, event); err != nil { + // Retryable errors are not logged and will be retried. + if _, ok := err.(eh.RetryableEventError); ok { + return + } + + // Log unhandled events, they will NOT be retried. err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err) select { case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in Kafka event bus: %s", err) } - return } r.CommitMessages(ctx, msg) diff --git a/eventbus/local/eventbus.go b/eventbus/local/eventbus.go index b84ef0ad..2dca8633 100644 --- a/eventbus/local/eventbus.go +++ b/eventbus/local/eventbus.go @@ -137,7 +137,7 @@ type evt struct { } // Handles all events coming in on the channel. -func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch <-chan []byte) { +func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHandler, ch chan []byte) { defer b.wg.Done() for { @@ -161,7 +161,19 @@ func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHand // Handle the event if it did match. if err := h.HandleEvent(ctx, event); err != nil { - err = fmt.Errorf("could not handle event (%s): %s", h.HandlerType(), err.Error()) + // Retryable errors are not logged and will be retried. + if _, ok := err.(eh.RetryableEventError); ok { + select { + case ch <- data: + // Retry event by putting it back on the bus. + default: + log.Printf("eventhorizon: publish queue full for retry in local event bus") + } + continue + } + + // Log unhandled events, they will NOT be retried. + err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err) select { case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: @@ -187,7 +199,7 @@ func NewGroup() *Group { } } -func (g *Group) channel(id string) <-chan []byte { +func (g *Group) channel(id string) chan []byte { g.busMu.Lock() defer g.busMu.Unlock() diff --git a/eventbus/redis/eventbus.go b/eventbus/redis/eventbus.go index 2b716f79..2379c54c 100644 --- a/eventbus/redis/eventbus.go +++ b/eventbus/redis/eventbus.go @@ -190,11 +190,12 @@ func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHand defer b.wg.Done() msgHandler := b.handler(m, h, groupName) + readOpt := ">" for { streams, err := b.client.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: groupName, Consumer: groupName + "_" + b.clientID, - Streams: []string{b.streamName, ">"}, + Streams: []string{b.streamName, readOpt}, }).Result() if err != nil && err != context.Canceled { err = fmt.Errorf("could not receive: %w", err) @@ -219,6 +220,13 @@ func (b *EventBus) handle(ctx context.Context, m eh.EventMatcher, h eh.EventHand msgHandler(ctx, &msg) } } + + // Flip flop the read option to read new and non-acked messages every other time. + if readOpt == ">" { + readOpt = "0" + } else { + readOpt = ">" + } } } @@ -253,14 +261,19 @@ func (b *EventBus) handler(m eh.EventMatcher, h eh.EventHandler, groupName strin // Handle the event if it did match. if err := h.HandleEvent(ctx, event); err != nil { + // Retryable errors are not logged and will be retried. + if _, ok := err.(eh.RetryableEventError); ok { + // TODO: Nack if possible. + return + } + + // Log unhandled events, they will NOT be retried. err = fmt.Errorf("could not handle event (%s): %w", h.HandlerType(), err) select { case b.errCh <- eh.EventBusError{Err: err, Ctx: ctx, Event: event}: default: log.Printf("eventhorizon: missed error in Redis event bus: %s", err) } - // TODO: Nack if possible. - return } _, err = b.client.XAck(ctx, b.streamName, groupName, msg.ID).Result() diff --git a/eventhandler.go b/eventhandler.go index c6391e5c..920e1273 100644 --- a/eventhandler.go +++ b/eventhandler.go @@ -16,6 +16,7 @@ package eventhorizon import ( "context" + "fmt" "reflect" "runtime" "strings" @@ -40,6 +41,24 @@ type EventHandler interface { HandleEvent(context.Context, Event) error } +// RetryableEventError is a "soft" error that handlers should return if they want the +// handler to be retried. This will often be the case when handling events (for +// example in a saga) where related read models have not yet been projected. +// NOTE: The retry behavior is dependent on the eventbus implementation used. +type RetryableEventError struct { + Err error +} + +// Error implements the Error method of the error interface. +func (e RetryableEventError) Error() string { + return fmt.Sprintf("retryable: %s", e.Err) +} + +// Cause returns the cause of this error. +func (e RetryableEventError) Cause() error { + return e.Err +} + // EventHandlerFunc is a function that can be used as a event handler. type EventHandlerFunc func(context.Context, Event) error diff --git a/eventhandler/projector/eventhandler.go b/eventhandler/projector/eventhandler.go index 9a86e89e..d0974e9e 100644 --- a/eventhandler/projector/eventhandler.go +++ b/eventhandler/projector/eventhandler.go @@ -84,6 +84,12 @@ func (e Error) Cause() error { // ErrModelNotSet is when a model factory is not set on the EventHandler. var ErrModelNotSet = errors.New("model not set") +// ErrIncorrectEntityVersion is when an entity has an incorrect version. +var ErrIncorrectEntityVersion = errors.New("incorrect entity version") + +// ErrIncorrectProjectedEntityVersion is when an entity has an incorrect version after projection. +var ErrIncorrectProjectedEntityVersion = errors.New("incorrect projected entity version") + // NewEventHandler creates a new EventHandler. func NewEventHandler(projector Projector, repo eh.ReadWriteRepo, options ...Option) *EventHandler { h := &EventHandler{ @@ -124,23 +130,38 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { defer cancel() } entity, err := h.repo.Find(findCtx, event.AggregateID()) - if rrErr, ok := err.(eh.RepoError); ok && rrErr.Err == eh.ErrEntityNotFound { - if h.factoryFn == nil { + if err != nil { + if errors.Is(err, eh.ErrEntityNotFound) { + // Create the model if there was no previous. + // TODO: Consider that the event can still have been projected elsewhere + // but not yet available in this find. Handle this before/when saving! + if h.factoryFn == nil { + return Error{ + Err: ErrModelNotSet, + Projector: h.projector.ProjectorType().String(), + Namespace: eh.NamespaceFromContext(ctx), + EventVersion: event.Version(), + } + } + entity = h.factoryFn() + } else if errors.Is(err, version.ErrIncorrectLoadedEntityVersion) { + // Retry handling the event if model had the incorrect version. + return eh.RetryableEventError{ + Err: Error{ + Err: err, + Projector: h.projector.ProjectorType().String(), + Namespace: eh.NamespaceFromContext(ctx), + EventVersion: event.Version(), + }, + } + } else { return Error{ - Err: ErrModelNotSet, + Err: err, Projector: h.projector.ProjectorType().String(), Namespace: eh.NamespaceFromContext(ctx), EventVersion: event.Version(), } } - entity = h.factoryFn() - } else if err != nil { - return Error{ - Err: err, - Projector: h.projector.ProjectorType().String(), - Namespace: eh.NamespaceFromContext(ctx), - EventVersion: event.Version(), - } } // The entity should be one version behind the event. @@ -149,17 +170,19 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { entityVersion = entity.AggregateVersion() // Ignore old/duplicate events. - if entity.AggregateVersion() >= event.Version() { + if event.Version() <= entity.AggregateVersion() { return nil } - if entity.AggregateVersion()+1 != event.Version() { - return Error{ - Err: eh.ErrIncorrectEntityVersion, - Projector: h.projector.ProjectorType().String(), - Namespace: eh.NamespaceFromContext(ctx), - EventVersion: event.Version(), - EntityVersion: entityVersion, + if event.Version() != entity.AggregateVersion()+1 { + return eh.RetryableEventError{ + Err: Error{ + Err: ErrIncorrectEntityVersion, + Projector: h.projector.ProjectorType().String(), + Namespace: eh.NamespaceFromContext(ctx), + EventVersion: event.Version(), + EntityVersion: entityVersion, + }, } } } @@ -181,7 +204,7 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { entityVersion = newEntity.AggregateVersion() if newEntity.AggregateVersion() != event.Version() { return Error{ - Err: eh.ErrIncorrectEntityVersion, + Err: ErrIncorrectProjectedEntityVersion, Projector: h.projector.ProjectorType().String(), Namespace: eh.NamespaceFromContext(ctx), EventVersion: event.Version(), @@ -203,12 +226,14 @@ func (h *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { } } else { if err := h.repo.Remove(ctx, event.AggregateID()); err != nil { - return Error{ - Err: err, - Projector: h.projector.ProjectorType().String(), - Namespace: eh.NamespaceFromContext(ctx), - EventVersion: event.Version(), - EntityVersion: entityVersion, + return eh.RetryableEventError{ + Err: Error{ + Err: err, + Projector: h.projector.ProjectorType().String(), + Namespace: eh.NamespaceFromContext(ctx), + EventVersion: event.Version(), + EntityVersion: entityVersion, + }, } } } diff --git a/eventhandler/projector/eventhandler_test.go b/eventhandler/projector/eventhandler_test.go index 4bbaf82f..4444f6e5 100644 --- a/eventhandler/projector/eventhandler_test.go +++ b/eventhandler/projector/eventhandler_test.go @@ -282,6 +282,13 @@ func TestEventHandler_SaveError(t *testing.T) { timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) event := eh.NewEvent(mocks.EventType, eventData, timestamp, eh.ForAggregate(mocks.AggregateType, id, 1)) + repo.LoadErr = eh.RepoError{ + Err: eh.ErrEntityNotFound, + } + projector.newEntity = &mocks.SimpleModel{ + ID: id, + } + saveErr := errors.New("save error") repo.SaveErr = saveErr expectedErr := Error{ diff --git a/middleware/commandhandler/tracing/middleware.go b/middleware/commandhandler/tracing/middleware.go index 3ea20e5d..4551299f 100644 --- a/middleware/commandhandler/tracing/middleware.go +++ b/middleware/commandhandler/tracing/middleware.go @@ -29,17 +29,16 @@ func NewMiddleware() eh.CommandHandlerMiddleware { return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error { opName := fmt.Sprintf("Command(%s)", cmd.CommandType()) sp, ctx := opentracing.StartSpanFromContext(ctx, opName) - - err := h.HandleCommand(ctx, cmd) - sp.SetTag("eh.command_type", cmd.CommandType()) sp.SetTag("eh.aggregate_type", cmd.AggregateType()) sp.SetTag("eh.aggregate_id", cmd.AggregateID()) + + err := h.HandleCommand(ctx, cmd) if err != nil { ext.LogError(sp, err) } - sp.Finish() + sp.Finish() return err }) }) diff --git a/middleware/eventhandler/async/middleware_test.go b/middleware/eventhandler/async/middleware_test.go index ffc09bab..0c69ea39 100644 --- a/middleware/eventhandler/async/middleware_test.go +++ b/middleware/eventhandler/async/middleware_test.go @@ -55,7 +55,7 @@ func TestEventHandler(t *testing.T) { m, errCh = NewMiddleware() h = eh.UseEventHandlerMiddleware(inner, m) handlingErr := errors.New("handling error") - inner.Err = handlingErr + inner.ErrOnce = handlingErr ctx := context.Background() if err := h.HandleEvent(ctx, event); err != nil { t.Error("there should never be an error:", err) diff --git a/middleware/eventhandler/tracing/middleware.go b/middleware/eventhandler/tracing/middleware.go index 31aa7a9f..34de1261 100644 --- a/middleware/eventhandler/tracing/middleware.go +++ b/middleware/eventhandler/tracing/middleware.go @@ -38,17 +38,20 @@ type eventHandler struct { func (h *eventHandler) HandleEvent(ctx context.Context, event eh.Event) error { opName := fmt.Sprintf("%s.Event(%s)", h.HandlerType(), event.EventType()) sp, ctx := opentracing.StartSpanFromContext(ctx, opName) - - err := h.EventHandler.HandleEvent(ctx, event) - sp.SetTag("eh.event_type", event.EventType()) sp.SetTag("eh.aggregate_type", event.AggregateType()) sp.SetTag("eh.aggregate_id", event.AggregateID()) sp.SetTag("eh.version", event.Version()) + + err := h.EventHandler.HandleEvent(ctx, event) if err != nil { - ext.LogError(sp, err) + if _, ok := err.(eh.RetryableEventError); ok { + // Ignore logging retryable errors. + } else { + ext.LogError(sp, err) + } } - sp.Finish() + sp.Finish() return err } diff --git a/mocks/mocks.go b/mocks/mocks.go index f9a9d45c..87a3d02d 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -197,13 +197,14 @@ func (h *CommandHandler) HandleCommand(ctx context.Context, cmd eh.Command) erro type EventHandler struct { sync.RWMutex - Type string - Events []eh.Event - Context context.Context - Time time.Time - Recv chan eh.Event + Type string + Events []eh.Event + Context context.Context + Time time.Time + Recv chan eh.Event + NumHandleEvent int // Used to simulate errors when publishing. - Err error + ErrOnce error } var _ = eh.EventHandler(&EventHandler{}) @@ -228,8 +229,12 @@ func (m *EventHandler) HandleEvent(ctx context.Context, event eh.Event) error { m.Lock() defer m.Unlock() - if m.Err != nil { - return m.Err + m.NumHandleEvent++ + + if m.ErrOnce != nil { + err := m.ErrOnce + m.ErrOnce = nil + return err } m.Events = append(m.Events, event) m.Context = ctx diff --git a/repo.go b/repo.go index 14db7e48..8d895485 100644 --- a/repo.go +++ b/repo.go @@ -17,6 +17,7 @@ package eventhorizon import ( "context" "errors" + "fmt" "github.com/google/uuid" ) @@ -56,6 +57,9 @@ type Versionable interface { AggregateVersion() int } +// ErrEntityHasNoVersion is when an entity has no aggregate version. +var ErrEntityHasNoVersion = errors.New("entity has no version") + // Iter is a stateful iterator object that when called Next() readies the next // value that can be retrieved from Value(). Enables incremental object retrieval // from repos that support it. You must call Close() on each Iter even when @@ -78,16 +82,14 @@ var ( ErrCouldNotRemoveEntity = errors.New("could not remove entity") // ErrMissingEntityID is when a entity has no ID. ErrMissingEntityID = errors.New("missing entity ID") - // ErrEntityHasNoVersion is when an entity has no version number. - ErrEntityHasNoVersion = errors.New("entity has no version") - // ErrIncorrectEntityVersion is when an entity has an incorrect version. - ErrIncorrectEntityVersion = errors.New("incorrect entity version") ) // RepoError is an error in the read repository, with the namespace. type RepoError struct { // Err is the error. Err error + // EntityVersion is the version of the entity. + EntityVersion int // BaseErr is an optional underlying error, for example from the DB driver. BaseErr error // Namespace is the namespace for the error. @@ -100,7 +102,11 @@ func (e RepoError) Error() string { if e.BaseErr != nil { errStr += ": " + e.BaseErr.Error() } - return errStr + " (" + e.Namespace + ")" + if e.EntityVersion > 0 { + return fmt.Sprintf("%s, v%d (%s)", + errStr, e.EntityVersion, e.Namespace) + } + return fmt.Sprintf("%s (%s)", errStr, e.Namespace) } // Unwrap implements the errors.Unwrap method. diff --git a/repo/tracing/repo.go b/repo/tracing/repo.go index 6229284c..739f9b75 100644 --- a/repo/tracing/repo.go +++ b/repo/tracing/repo.go @@ -16,9 +16,11 @@ package tracing import ( "context" + "errors" "github.com/google/uuid" eh "github.com/looplab/eventhorizon" + "github.com/looplab/eventhorizon/repo/version" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" ) @@ -43,16 +45,19 @@ func (r *Repo) Parent() eh.ReadRepo { // Find implements the Find method of the eventhorizon.ReadModel interface. func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "Repo.Find") + sp.SetTag("eh.aggregate_id", id) entity, err := r.ReadWriteRepo.Find(ctx, id) - - sp.SetTag("eh.aggregate_id", id) - if rrErr, ok := err.(eh.RepoError); err != nil && - !(ok && rrErr.Err == eh.ErrEntityNotFound) { - ext.LogError(sp, err) + if err != nil { + if errors.Is(err, eh.ErrEntityNotFound) || + errors.Is(err, version.ErrIncorrectLoadedEntityVersion) { + // Ignore logging as error. + } else { + ext.LogError(sp, err) + } } - sp.Finish() + sp.Finish() return entity, err } @@ -61,42 +66,39 @@ func (r *Repo) FindAll(ctx context.Context) ([]eh.Entity, error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "Repo.FindAll") entities, err := r.ReadWriteRepo.FindAll(ctx) - if err != nil { ext.LogError(sp, err) } - sp.Finish() + sp.Finish() return entities, err } // Save implements the Save method of the eventhorizon.WriteRepo interface. func (r *Repo) Save(ctx context.Context, entity eh.Entity) error { sp, ctx := opentracing.StartSpanFromContext(ctx, "Repo.Save") + sp.SetTag("eh.aggregate_id", entity.EntityID()) err := r.ReadWriteRepo.Save(ctx, entity) - - sp.SetTag("eh.aggregate_id", entity.EntityID()) if err != nil { ext.LogError(sp, err) } - sp.Finish() + sp.Finish() return err } // Remove implements the Remove method of the eventhorizon.WriteRepo interface. func (r *Repo) Remove(ctx context.Context, id uuid.UUID) error { sp, ctx := opentracing.StartSpanFromContext(ctx, "Repo.Remove") + sp.SetTag("eh.aggregate_id", id) err := r.ReadWriteRepo.Remove(ctx, id) - - sp.SetTag("eh.aggregate_id", id) if err != nil { ext.LogError(sp, err) } - sp.Finish() + sp.Finish() return err } diff --git a/repo/version/repo.go b/repo/version/repo.go index 7384da85..17d192e6 100644 --- a/repo/version/repo.go +++ b/repo/version/repo.go @@ -16,6 +16,7 @@ package version import ( "context" + "errors" "time" "github.com/google/uuid" @@ -28,6 +29,9 @@ type Repo struct { eh.ReadWriteRepo } +// ErrIncorrectLoadedEntityVersion is when an entity has an incorrect version. +var ErrIncorrectLoadedEntityVersion = errors.New("incorrect loaded entity version") + // NewRepo creates a new Repo. func NewRepo(repo eh.ReadWriteRepo) *Repo { return &Repo{ @@ -62,8 +66,8 @@ func (r *Repo) Find(ctx context.Context, id uuid.UUID) (eh.Entity, error) { _, hasDeadline := ctx.Deadline() for { entity, err := r.findMinVersion(ctx, id, minVersion) - if rrErr, ok := err.(eh.RepoError); ok && - (rrErr.Err == eh.ErrIncorrectEntityVersion || rrErr.Err == eh.ErrEntityNotFound) { + if errors.Is(err, ErrIncorrectLoadedEntityVersion) || + errors.Is(err, eh.ErrEntityNotFound) { // Try again for incorrect version or if the entity was not found. } else if err != nil { // Return any real error. @@ -104,8 +108,9 @@ func (r *Repo) findMinVersion(ctx context.Context, id uuid.UUID, minVersion int) if versionable.AggregateVersion() < minVersion { return nil, eh.RepoError{ - Err: eh.ErrIncorrectEntityVersion, - Namespace: eh.NamespaceFromContext(ctx), + Err: ErrIncorrectLoadedEntityVersion, + EntityVersion: versionable.AggregateVersion(), + Namespace: eh.NamespaceFromContext(ctx), } } diff --git a/repo/version/repo_test.go b/repo/version/repo_test.go index a146a7d9..103f873b 100644 --- a/repo/version/repo_test.go +++ b/repo/version/repo_test.go @@ -95,7 +95,7 @@ func extraRepoTests(t *testing.T, ctx context.Context, r *Repo, baseRepo *memory // Find with min version, too low. ctxVersion = NewContextWithMinVersion(ctx, 2) model, err = r.Find(ctxVersion, m1.ID) - if !errors.As(err, &repoErr) || !errors.Is(err, eh.ErrIncorrectEntityVersion) { + if !errors.As(err, &repoErr) || !errors.Is(err, ErrIncorrectLoadedEntityVersion) { t.Error("there should be a incorrect model version error:", err) }