Skip to content

Commit

Permalink
typed error to handle the retry / skip of events
Browse files Browse the repository at this point in the history
  • Loading branch information
EdouardBavoux committed Dec 12, 2024
1 parent 3dd9676 commit 84a4a3f
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 14 deletions.
52 changes: 42 additions & 10 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,34 @@ import (
"github.com/opentracing/opentracing-go"
)

var (
ErrEventUnretriable = errors.New("the event will not be retried")
ErrEventOmitted = errors.New("the event will be omitted")
)
// EventProcessingError is to be used to indicate that an error occurred during the processing of an event.
// It can be used to indicate that the event should not be retried, or that it should be omitted, in replacement
// of the deprecated errors ErrEventUnretriable and ErrEventOmitted.
type EventProcessingError struct {
msg string
isUnretriable bool
isToBeSkipped bool
}

func (e *EventProcessingError) Error() string {
return e.msg
}

func (e *EventProcessingError) IsUnretriable() bool {
return e.isUnretriable
}

func (e *EventProcessingError) IsToBeSkipped() bool {
return e.isToBeSkipped
}

func NewEventProcessingError(error error, isUnretriable bool, isToBeSkipped bool) *EventProcessingError {
return &EventProcessingError{
msg: error.Error(),
isUnretriable: isUnretriable,
isToBeSkipped: isToBeSkipped,
}
}

type HandlerConfig struct {
ConsumerMaxRetries *int
Expand Down Expand Up @@ -247,9 +271,11 @@ func (l *listener) onNewMessage(msg *sarama.ConsumerMessage, session sarama.Cons
}

func (l *listener) handleErrorMessage(initialError error, handler Handler, msg *sarama.ConsumerMessage) {
if errors.Is(initialError, ErrEventOmitted) {
l.handleOmittedMessage(initialError, msg)
return
if err, ok := initialError.(*EventProcessingError); ok {
if err.IsToBeSkipped() {
l.handleOmittedMessage(initialError, msg)
return
}
}

// Log
Expand Down Expand Up @@ -323,7 +349,14 @@ func forwardToTopic(l *listener, msg *sarama.ConsumerMessage, topicName string)
}

func isRetriableError(initialError error) bool {
return !errors.Is(initialError, ErrEventUnretriable) && !errors.Is(initialError, ErrEventOmitted)
if eperr, ok := initialError.(*EventProcessingError); ok {
if eperr.IsUnretriable() {
return false
} else if eperr.IsToBeSkipped() {
return false
}
}
return true
}

func (l *listener) handleOmittedMessage(initialError error, msg *sarama.ConsumerMessage) {
Expand Down Expand Up @@ -362,9 +395,8 @@ func shouldRetry(retries int, err error) bool {
return false
}

if errors.Is(err, ErrEventUnretriable) || errors.Is(err, ErrEventOmitted) {
if !isRetriableError(err) {
return false
}

return true
}
8 changes: 4 additions & 4 deletions listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func Test_ConsumeClaim_Message_Error_WithHandlerSpecificRetryTopic(t *testing.T)

func Test_handleErrorMessage_OmittedError(t *testing.T) {

omittedError := errors.New("This error should be omitted")
omittedError := errors.New("this error should be omitted")

l := listener{}

Expand All @@ -384,7 +384,7 @@ func Test_handleErrorMessage_OmittedError(t *testing.T) {
}).Once()
ErrorLogger = mockLogger

l.handleErrorMessage(fmt.Errorf("%w: %w", omittedError, ErrEventOmitted), Handler{}, nil)
l.handleErrorMessage(NewEventProcessingError(fmt.Errorf("failed in context blablah. %w", omittedError), false, true), Handler{}, nil)

assert.True(t, errorLogged)
}
Expand All @@ -397,7 +397,7 @@ func Test_handleMessageWithRetry(t *testing.T) {
handlerCalled := 0
handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error {
handlerCalled++
return err
return NewEventProcessingError(fmt.Errorf("failed in context blablah. %w", err), false, false)
}
handler := Handler{
Processor: handlerProcessor,
Expand All @@ -415,7 +415,7 @@ func Test_handleMessageWithRetry_UnretriableError(t *testing.T) {
handlerCalled := 0
handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error {
handlerCalled++
return fmt.Errorf("%w: %w", err, ErrEventUnretriable)
return NewEventProcessingError(fmt.Errorf("failed in context blablah. %w", err), true, false)
}
handler := Handler{
Processor: handlerProcessor,
Expand Down

0 comments on commit 84a4a3f

Please sign in to comment.