From 84a4a3fed37d88fde1fc93d6f1f60bc6b1ef275c Mon Sep 17 00:00:00 2001 From: Edouard Bavoux Date: Thu, 12 Dec 2024 16:14:30 +0100 Subject: [PATCH] typed error to handle the retry / skip of events --- listener.go | 52 ++++++++++++++++++++++++++++++++++++++---------- listener_test.go | 8 ++++---- 2 files changed, 46 insertions(+), 14 deletions(-) diff --git a/listener.go b/listener.go index 5e98eb0..e8f41e7 100644 --- a/listener.go +++ b/listener.go @@ -11,10 +11,34 @@ import ( "github.com/opentracing/opentracing-go" ) -var ( - ErrEventUnretriable = errors.New("the event will not be retried") - ErrEventOmitted = errors.New("the event will be omitted") -) +// EventProcessingError is to be used to indicate that an error occurred during the processing of an event. +// It can be used to indicate that the event should not be retried, or that it should be omitted, in replacement +// of the deprecated errors ErrEventUnretriable and ErrEventOmitted. +type EventProcessingError struct { + msg string + isUnretriable bool + isToBeSkipped bool +} + +func (e *EventProcessingError) Error() string { + return e.msg +} + +func (e *EventProcessingError) IsUnretriable() bool { + return e.isUnretriable +} + +func (e *EventProcessingError) IsToBeSkipped() bool { + return e.isToBeSkipped +} + +func NewEventProcessingError(error error, isUnretriable bool, isToBeSkipped bool) *EventProcessingError { + return &EventProcessingError{ + msg: error.Error(), + isUnretriable: isUnretriable, + isToBeSkipped: isToBeSkipped, + } +} type HandlerConfig struct { ConsumerMaxRetries *int @@ -247,9 +271,11 @@ func (l *listener) onNewMessage(msg *sarama.ConsumerMessage, session sarama.Cons } func (l *listener) handleErrorMessage(initialError error, handler Handler, msg *sarama.ConsumerMessage) { - if errors.Is(initialError, ErrEventOmitted) { - l.handleOmittedMessage(initialError, msg) - return + if err, ok := initialError.(*EventProcessingError); ok { + if err.IsToBeSkipped() { + l.handleOmittedMessage(initialError, msg) + return + } } // Log @@ -323,7 +349,14 @@ func forwardToTopic(l *listener, msg *sarama.ConsumerMessage, topicName string) } func isRetriableError(initialError error) bool { - return !errors.Is(initialError, ErrEventUnretriable) && !errors.Is(initialError, ErrEventOmitted) + if eperr, ok := initialError.(*EventProcessingError); ok { + if eperr.IsUnretriable() { + return false + } else if eperr.IsToBeSkipped() { + return false + } + } + return true } func (l *listener) handleOmittedMessage(initialError error, msg *sarama.ConsumerMessage) { @@ -362,9 +395,8 @@ func shouldRetry(retries int, err error) bool { return false } - if errors.Is(err, ErrEventUnretriable) || errors.Is(err, ErrEventOmitted) { + if !isRetriableError(err) { return false } - return true } diff --git a/listener_test.go b/listener_test.go index 9c30c64..448704b 100644 --- a/listener_test.go +++ b/listener_test.go @@ -370,7 +370,7 @@ func Test_ConsumeClaim_Message_Error_WithHandlerSpecificRetryTopic(t *testing.T) func Test_handleErrorMessage_OmittedError(t *testing.T) { - omittedError := errors.New("This error should be omitted") + omittedError := errors.New("this error should be omitted") l := listener{} @@ -384,7 +384,7 @@ func Test_handleErrorMessage_OmittedError(t *testing.T) { }).Once() ErrorLogger = mockLogger - l.handleErrorMessage(fmt.Errorf("%w: %w", omittedError, ErrEventOmitted), Handler{}, nil) + l.handleErrorMessage(NewEventProcessingError(fmt.Errorf("failed in context blablah. %w", omittedError), false, true), Handler{}, nil) assert.True(t, errorLogged) } @@ -397,7 +397,7 @@ func Test_handleMessageWithRetry(t *testing.T) { handlerCalled := 0 handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error { handlerCalled++ - return err + return NewEventProcessingError(fmt.Errorf("failed in context blablah. %w", err), false, false) } handler := Handler{ Processor: handlerProcessor, @@ -415,7 +415,7 @@ func Test_handleMessageWithRetry_UnretriableError(t *testing.T) { handlerCalled := 0 handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error { handlerCalled++ - return fmt.Errorf("%w: %w", err, ErrEventUnretriable) + return NewEventProcessingError(fmt.Errorf("failed in context blablah. %w", err), true, false) } handler := Handler{ Processor: handlerProcessor,