Skip to content

Commit

Permalink
Add exponential backoff possibility in case of retries
Browse files Browse the repository at this point in the history
  • Loading branch information
ErwanVP committed Dec 26, 2024
1 parent 5107ae6 commit a4d7000
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 12 deletions.
16 changes: 12 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ Depending on the Retry topic/Deadletter topic/Max retries configuration, the eve

### Blocking Retries

By default, failed events consumptions will be retried 3 times (each attempt is separated by 2 seconds).
By default, failed events consumptions will be retried 3 times (each attempt is separated by 2 seconds) with no exponential backoff.
This can be globally configured through the following properties:
* `ConsumerMaxRetries`
* `DurationBeforeRetry`
* `ConsumerMaxRetries` (int)
* `DurationBeforeRetry` (duration)
* `ExponentialBackoff` (boolean)

These properties can also be configured on a per-topic basis by setting the `ConsumerMaxRetries` and `DurationBeforeRetry` properties on the handler.
These properties can also be configured on a per-topic basis by setting the `ConsumerMaxRetries`, `DurationBeforeRetry` and `ExponentialBackoff` properties on the handler.

If you want to achieve a blocking retry pattern (ie. continuously retrying until the event is successfully consumed), you can set `ConsumerMaxRetries` to `InfiniteRetries` (-1).

Expand All @@ -128,6 +129,13 @@ return errors.Wrap(kafka.ErrNonRetriable, err.Error())
// This error will also not be retried
return kafka.ErrNonRetriable
```

#### exponential backoff
The exponential backoff algorithm is define like this. You can activate it but setting `ExponentialBackoff` config variable as true
'''$$
retryDuration = durationBeforeRetry * 2^retries
$$
### Deadletter And Retry topics
Expand Down
26 changes: 22 additions & 4 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"time"

Expand All @@ -19,6 +20,7 @@ var (
type HandlerConfig struct {
ConsumerMaxRetries *int
DurationBeforeRetry *time.Duration
ExponentialBackoff bool
RetryTopic string
DeadletterTopic string
}
Expand Down Expand Up @@ -243,7 +245,7 @@ func (l *listener) onNewMessage(msg *sarama.ConsumerMessage, session sarama.Cons
handler = l.instrumenting.Instrumentation(handler)
}

err := l.handleMessageWithRetry(messageContext, handler, msg, *handler.Config.ConsumerMaxRetries)
err := l.handleMessageWithRetry(messageContext, handler, msg, *handler.Config.ConsumerMaxRetries, handler.Config.ExponentialBackoff)
if err != nil {
err = fmt.Errorf("processing failed: %w", err)
l.handleErrorMessage(err, handler, msg)
Expand Down Expand Up @@ -353,7 +355,7 @@ func (l *listener) handleOmittedMessage(initialError error, msg *sarama.Consumer
}

// handleMessageWithRetry call the handler function and retry if it fails
func (l *listener) handleMessageWithRetry(ctx context.Context, handler Handler, msg *sarama.ConsumerMessage, retries int) (err error) {
func (l *listener) handleMessageWithRetry(ctx context.Context, handler Handler, msg *sarama.ConsumerMessage, retries int, exponentialBackoff bool) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic happened during handle of message: %v", r)
Expand All @@ -367,15 +369,21 @@ func (l *listener) handleMessageWithRetry(ctx context.Context, handler Handler,

err = handler.Processor(ctx, msg)
if err != nil && shouldRetry(retries, err) {
time.Sleep(*handler.Config.DurationBeforeRetry)
if exponentialBackoff {
backoffDuration := calculateExponentialBackoffDuration(retries, handler.Config.DurationBeforeRetry)
Logger.Printf("exponential backoff enable we will retry in %s", backoffDuration)
time.Sleep(backoffDuration)
} else {
time.Sleep(*handler.Config.DurationBeforeRetry)
}
if retries != InfiniteRetries {
retries--
} else {
errLog := []interface{}{ctx, err, "error", "unable to process message we retry indefinitely"}
errLog = append(errLog, extractMessageInfoForLog(msg)...)
ErrorLogger.Println(errLog...)
}
return l.handleMessageWithRetry(ctx, handler, msg, retries)
return l.handleMessageWithRetry(ctx, handler, msg, retries, exponentialBackoff)
}

return err
Expand All @@ -399,3 +407,13 @@ func extractMessageInfoForLog(msg *sarama.ConsumerMessage) []interface{} {
}
return []interface{}{"message_topic", msg.Topic, "topic_partition", msg.Partition, "message_offset", msg.Offset, "message_key", string(msg.Key)}
}

func calculateExponentialBackoffDuration(retries int, baseDuration *time.Duration) time.Duration {
var duration time.Duration
if baseDuration == nil {
duration = 0
} else {
duration = *baseDuration
}
return duration * time.Duration(math.Pow(2, float64(retries)))
}
128 changes: 124 additions & 4 deletions listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,28 @@ func Test_handleMessageWithRetry(t *testing.T) {
}

l := listener{}
l.handleMessageWithRetry(context.Background(), handler, nil, 3)
l.handleMessageWithRetry(context.Background(), handler, nil, 3, false)

assert.Equal(t, 4, handlerCalled)
}

func Test_handleMessageWithRetryWithBackoff(t *testing.T) {
// Reduce the retry interval to speed up the test
DurationBeforeRetry = 1 * time.Millisecond

err := errors.New("This error should be retried")
handlerCalled := 0
handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error {
handlerCalled++
return err
}
handler := Handler{
Processor: handlerProcessor,
Config: testHandlerConfig,
}

l := listener{}
l.handleMessageWithRetry(context.Background(), handler, nil, 3, true)

assert.Equal(t, 4, handlerCalled)
}
Expand All @@ -475,7 +496,25 @@ func Test_handleMessageWithRetry_UnretriableError(t *testing.T) {
}

l := listener{}
l.handleMessageWithRetry(context.Background(), handler, nil, 3)
l.handleMessageWithRetry(context.Background(), handler, nil, 3, false)

assert.Equal(t, 1, handlerCalled)
}

func Test_handleMessageWithRetry_UnretriableErrorWithBackoff(t *testing.T) {
err := errors.New("This error should not be retried")
handlerCalled := 0
handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error {
handlerCalled++
return fmt.Errorf("%w: %w", err, ErrEventUnretriable)
}
handler := Handler{
Processor: handlerProcessor,
Config: testHandlerConfig,
}

l := listener{}
l.handleMessageWithRetry(context.Background(), handler, nil, 3, true)

assert.Equal(t, 1, handlerCalled)
}
Expand Down Expand Up @@ -503,7 +542,35 @@ func Test_handleMessageWithRetry_InfiniteRetries(t *testing.T) {
}

l := listener{}
l.handleMessageWithRetry(context.Background(), handler, nil, InfiniteRetries)
l.handleMessageWithRetry(context.Background(), handler, nil, InfiniteRetries, false)

assert.Equal(t, 5, handlerCalled)

}
func Test_handleMessageWithRetry_InfiniteRetriesWithBackoff(t *testing.T) {
// Reduce the retry interval to speed up the test
DurationBeforeRetry = 1 * time.Millisecond

err := errors.New("This error should be retried")
handlerCalled := 0
handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error {
handlerCalled++

// We simulate an infinite retry by failing 5 times, and then succeeding,
// which is above the 3 retries normally expected
if handlerCalled < 5 {
return err
}
return nil
}

handler := Handler{
Processor: handlerProcessor,
Config: testHandlerConfig,
}

l := listener{}
l.handleMessageWithRetry(context.Background(), handler, nil, InfiniteRetries, true)

assert.Equal(t, 5, handlerCalled)

Expand Down Expand Up @@ -534,7 +601,7 @@ func Test_handleMessageWithRetry_InfiniteRetriesWithContextCancel(t *testing.T)
}

l := listener{}
l.handleMessageWithRetry(ctx, handler, nil, InfiniteRetries)
l.handleMessageWithRetry(ctx, handler, nil, InfiniteRetries, false)

assert.Equal(t, 5, handlerCalled)

Expand Down Expand Up @@ -633,3 +700,56 @@ func Test_Listen_ContextCanceled(t *testing.T) {
assert.Equal(t, context.Canceled, err)
consumerGroup.AssertExpectations(t)
}

func Test_calculateExponentialBackoffDuration(t *testing.T) {
tests := []struct {
name string
retries int
baseDuration *time.Duration
expectedDelay time.Duration
}{
{
name: "nil base duration",
retries: 3,
baseDuration: nil,
expectedDelay: 0,
},
{
name: "zero retries",
retries: 0,
baseDuration: Ptr(1 * time.Second),
expectedDelay: 1 * time.Second,
},
{
name: "one retry",
retries: 1,
baseDuration: Ptr(1 * time.Second),
expectedDelay: 2 * time.Second,
},
{
name: "two retries",
retries: 2,
baseDuration: Ptr(1 * time.Second),
expectedDelay: 4 * time.Second,
},
{
name: "three retries",
retries: 3,
baseDuration: Ptr(1 * time.Second),
expectedDelay: 8 * time.Second,
},
{
name: "three retries with different base duration",
retries: 3,
baseDuration: Ptr(500 * time.Millisecond),
expectedDelay: 4 * time.Second,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
delay := calculateExponentialBackoffDuration(tt.retries, tt.baseDuration)
assert.Equal(t, tt.expectedDelay, delay)
})
}
}

0 comments on commit a4d7000

Please sign in to comment.