Skip to content

Commit

Permalink
Updated error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
sonnes committed Jun 11, 2024
1 parent b2d0c66 commit 0258b8a
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 11 deletions.
3 changes: 1 addition & 2 deletions examples/xkafka/sequential.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
"math/rand"
"time"

"log/slog"

"github.com/rs/xid"
"github.com/urfave/cli/v2"
"log/slog"

"github.com/gojekfarm/xrun"
"github.com/gojekfarm/xtools/xkafka"
Expand Down
3 changes: 2 additions & 1 deletion xkafka/middleware/retry/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package retry_test

import (
"context"
"log/slog"
"time"

"log/slog"

"github.com/gojekfarm/xtools/xkafka"
"github.com/gojekfarm/xtools/xkafka/middleware/retry"
)
Expand Down
36 changes: 31 additions & 5 deletions xkafka/middleware/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"time"

"github.com/cenkalti/backoff/v4"

"github.com/gojekfarm/xtools/xkafka"
)

// ErrRetryLimitExceeded is returned when the maximum number of retries is exceeded.
var ErrRetryLimitExceeded = errors.New("[xkafka/retry] retry limit exceeded")
var (
// ErrPermanent is returned when the error should not be retried.
ErrPermanent = errors.New("[xkafka/retry] permanent error")
)

// Option configures the retry middleware.
type Option interface {
Expand Down Expand Up @@ -53,7 +56,13 @@ type config struct {
}

func newConfig(opts ...Option) *config {
c := &config{}
c := &config{
maxRetries: 100,
maxLifetime: time.Hour,
delay: time.Second,
jitter: 100 * time.Millisecond,
multiplier: 1.5,
}

for _, opt := range opts {
opt.apply(c)
Expand All @@ -63,6 +72,14 @@ func newConfig(opts ...Option) *config {
}

// ExponentialBackoff is a middleware with exponential backoff retry strategy.
// It retries the handler until the maximum number of retries or the maximum
// lifetime is reached.
// Default values:
// - MaxRetries: 100
// - MaxLifetime: 1 hour
// - Delay: 1 second
// - Jitter: 100 milliseconds
// - Multiplier: 1.5
func ExponentialBackoff(opts ...Option) xkafka.MiddlewareFunc {
cfg := newConfig(opts...)

Expand All @@ -77,13 +94,22 @@ func ExponentialBackoff(opts ...Option) xkafka.MiddlewareFunc {
attempt := 0

return backoff.Retry(func() error {
err := next.Handle(ctx, msg)
if err == nil {
return nil
}

if errors.Is(err, ErrPermanent) {
return backoff.Permanent(err)
}

attempt++

if attempt > cfg.maxRetries {
return backoff.Permanent(ErrRetryLimitExceeded)
return err
}

return next.Handle(ctx, msg)
return err
}, expBackoff)
})
}
Expand Down
26 changes: 23 additions & 3 deletions xkafka/middleware/retry/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"testing"
"time"

"github.com/gojekfarm/xtools/xkafka"
"github.com/stretchr/testify/assert"

"github.com/gojekfarm/xtools/xkafka"
)

func TestExponentialBackoff_MaxRetries(t *testing.T) {
Expand All @@ -29,7 +30,7 @@ func TestExponentialBackoff_MaxRetries(t *testing.T) {
mw := ExponentialBackoff(MaxRetries(3))

err := mw(handler).Handle(context.TODO(), msg)
assert.ErrorIs(t, err, ErrRetryLimitExceeded)
assert.ErrorIs(t, err, assert.AnError)
assert.Equal(t, 3, attempts)
}

Expand All @@ -56,6 +57,25 @@ func TestExponentialBackoff_MaxLifetime(t *testing.T) {

start := time.Now()
err := mw(handler).Handle(context.TODO(), msg)
assert.Error(t, err)
assert.ErrorIs(t, err, assert.AnError)
assert.WithinDuration(t, start, time.Now(), 1*time.Second)
}

func TestExponentialBackoff_PermanentError(t *testing.T) {
msg := &xkafka.Message{
Topic: "test-topic",
Group: "test-group",
Partition: 2,
Key: []byte("key"),
Value: []byte("value"),
}

handler := xkafka.HandlerFunc(func(ctx context.Context, m *xkafka.Message) error {
return ErrPermanent
})

mw := ExponentialBackoff(MaxRetries(1000))

err := mw(handler).Handle(context.TODO(), msg)
assert.ErrorIs(t, err, ErrPermanent)
}

0 comments on commit 0258b8a

Please sign in to comment.