Skip to content

Commit

Permalink
Merge pull request #27 from ricardo-ch/errorhandling
Browse files Browse the repository at this point in the history
Small error handling improvements
  • Loading branch information
EtienneGuerlais authored Jun 9, 2023
2 parents 4f215a3 + ce9084e commit e9a1b9d
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 27 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2.1
jobs:
quality-gate:
docker:
- image: 'cimg/go:1.19'
- image: 'cimg/go:1.20'
steps:
- checkout
- run:
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package kafka

import (
"errors"
"sync"

"github.com/Shopify/sarama"
"github.com/pkg/errors"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go 1.19
require (
github.com/Shopify/sarama v1.37.2
github.com/opentracing/opentracing-go v1.2.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.14.0
github.com/ricardo-ch/go-tracing v0.5.1
github.com/stretchr/testify v1.8.0
Expand All @@ -32,6 +31,7 @@ require (
github.com/klauspost/compress v1.15.12 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand Down
29 changes: 8 additions & 21 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ package kafka

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/Shopify/sarama"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

baseerrors "errors"
)

var (
ErrEventUnretriable = baseerrors.New("the event will not be retried")
ErrEventOmitted = baseerrors.New("the event will be omitted")
ErrEventUnretriable = errors.New("the event will not be retried")
ErrEventOmitted = errors.New("the event will be omitted")
)

// Handler that handle received kafka messages
Expand Down Expand Up @@ -192,15 +191,15 @@ func (l *listener) onNewMessage(msg *sarama.ConsumerMessage, session sarama.Cons

err := l.handleMessageWithRetry(messageContext, handler, msg, ConsumerMaxRetries)
if err != nil {
err = errors.Wrapf(err, "processing failed after all possible attempts")
err = fmt.Errorf("processing failed after all possible attempts attempts: %w", err)
l.handleErrorMessage(messageContext, err, msg)
}

session.MarkMessage(msg, "")
}

func (l *listener) handleErrorMessage(ctx context.Context, initialError error, msg *sarama.ConsumerMessage) {
if is(initialError, ErrEventOmitted) {
if errors.Is(initialError, ErrEventOmitted) {
l.handleOmittedMessage(initialError, msg)
return
}
Expand Down Expand Up @@ -248,7 +247,7 @@ func (l *listener) handleOmittedMessage(initialError error, msg *sarama.Consumer
func (l *listener) handleMessageWithRetry(ctx context.Context, handler Handler, msg *sarama.ConsumerMessage, retries int) (err error) {
defer func() {
if r := recover(); r != nil {
err = errors.Errorf("Panic happened during handle of message: %v", r)
err = fmt.Errorf("Panic happened during handle of message: %v", r)
}
}()

Expand All @@ -266,21 +265,9 @@ func shouldRetry(retries int, err error) bool {
return false
}

if is(err, ErrEventOmitted, ErrEventUnretriable) {
if errors.Is(err, ErrEventUnretriable) || errors.Is(err, ErrEventOmitted) {
return false
}

return true
}

// is() checks if the error is one of the targets.
// this is achieved using a plain string comparison, as using errors.Is() would not work with some custom errors types.
// this could be improved later, but it comes with a similar performance cost so it's not a priority for now.
func is(err error, targets ...error) bool {
for _, target := range targets {
if strings.Contains(err.Error(), target.Error()) {
return true
}
}
return false
}
7 changes: 4 additions & 3 deletions listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"testing"
"time"

"errors"

"github.com/Shopify/sarama"
"github.com/pkg/errors"
"github.com/ricardo-ch/go-kafka/v2/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -228,7 +229,7 @@ func Test_handleErrorMessage_OmittedError(t *testing.T) {
}).Once()
ErrorLogger = mockLogger

l.handleErrorMessage(context.Background(), errors.Wrap(ErrEventOmitted, omittedError.Error()), nil)
l.handleErrorMessage(context.Background(), fmt.Errorf("%w: %w", omittedError, ErrEventOmitted), nil)

assert.True(t, errorLogged)
}
Expand Down Expand Up @@ -256,7 +257,7 @@ func Test_handleMessageWithRetry_UnretriableError(t *testing.T) {
handlerCalled := 0
handler := func(ctx context.Context, msg *sarama.ConsumerMessage) error {
handlerCalled++
return errors.Wrap(ErrEventUnretriable, err.Error())
return fmt.Errorf("%w: %w", err, ErrEventUnretriable)
}

l := listener{}
Expand Down

0 comments on commit e9a1b9d

Please sign in to comment.