From a4d7000b5dd560d34762311897730df68d7a2b4a Mon Sep 17 00:00:00 2001 From: Erwan van Peteghem Date: Thu, 26 Dec 2024 12:07:05 +0100 Subject: [PATCH] Add exponential backoff possibility in case of retries --- README.md | 16 ++++-- listener.go | 26 ++++++++-- listener_test.go | 128 +++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 158 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 0cc2e35..e1072a6 100644 --- a/README.md +++ b/README.md @@ -110,12 +110,13 @@ Depending on the Retry topic/Deadletter topic/Max retries configuration, the eve ### Blocking Retries -By default, failed events consumptions will be retried 3 times (each attempt is separated by 2 seconds). +By default, failed events consumptions will be retried 3 times (each attempt is separated by 2 seconds) with no exponential backoff. This can be globally configured through the following properties: -* `ConsumerMaxRetries` -* `DurationBeforeRetry` +* `ConsumerMaxRetries` (int) +* `DurationBeforeRetry` (duration) +* `ExponentialBackoff` (boolean) -These properties can also be configured on a per-topic basis by setting the `ConsumerMaxRetries` and `DurationBeforeRetry` properties on the handler. +These properties can also be configured on a per-topic basis by setting the `ConsumerMaxRetries`, `DurationBeforeRetry` and `ExponentialBackoff` properties on the handler. If you want to achieve a blocking retry pattern (ie. continuously retrying until the event is successfully consumed), you can set `ConsumerMaxRetries` to `InfiniteRetries` (-1). @@ -128,6 +129,13 @@ return errors.Wrap(kafka.ErrNonRetriable, err.Error()) // This error will also not be retried return kafka.ErrNonRetriable ``` + +#### exponential backoff +The exponential backoff algorithm is define like this. You can activate it but setting `ExponentialBackoff` config variable as true +'''$$ +retryDuration = durationBeforeRetry * 2^retries +$$ + ### Deadletter And Retry topics diff --git a/listener.go b/listener.go index 50d13aa..8831313 100644 --- a/listener.go +++ b/listener.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "math" "strings" "time" @@ -19,6 +20,7 @@ var ( type HandlerConfig struct { ConsumerMaxRetries *int DurationBeforeRetry *time.Duration + ExponentialBackoff bool RetryTopic string DeadletterTopic string } @@ -243,7 +245,7 @@ func (l *listener) onNewMessage(msg *sarama.ConsumerMessage, session sarama.Cons handler = l.instrumenting.Instrumentation(handler) } - err := l.handleMessageWithRetry(messageContext, handler, msg, *handler.Config.ConsumerMaxRetries) + err := l.handleMessageWithRetry(messageContext, handler, msg, *handler.Config.ConsumerMaxRetries, handler.Config.ExponentialBackoff) if err != nil { err = fmt.Errorf("processing failed: %w", err) l.handleErrorMessage(err, handler, msg) @@ -353,7 +355,7 @@ func (l *listener) handleOmittedMessage(initialError error, msg *sarama.Consumer } // handleMessageWithRetry call the handler function and retry if it fails -func (l *listener) handleMessageWithRetry(ctx context.Context, handler Handler, msg *sarama.ConsumerMessage, retries int) (err error) { +func (l *listener) handleMessageWithRetry(ctx context.Context, handler Handler, msg *sarama.ConsumerMessage, retries int, exponentialBackoff bool) (err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("panic happened during handle of message: %v", r) @@ -367,7 +369,13 @@ func (l *listener) handleMessageWithRetry(ctx context.Context, handler Handler, err = handler.Processor(ctx, msg) if err != nil && shouldRetry(retries, err) { - time.Sleep(*handler.Config.DurationBeforeRetry) + if exponentialBackoff { + backoffDuration := calculateExponentialBackoffDuration(retries, handler.Config.DurationBeforeRetry) + Logger.Printf("exponential backoff enable we will retry in %s", backoffDuration) + time.Sleep(backoffDuration) + } else { + time.Sleep(*handler.Config.DurationBeforeRetry) + } if retries != InfiniteRetries { retries-- } else { @@ -375,7 +383,7 @@ func (l *listener) handleMessageWithRetry(ctx context.Context, handler Handler, errLog = append(errLog, extractMessageInfoForLog(msg)...) ErrorLogger.Println(errLog...) } - return l.handleMessageWithRetry(ctx, handler, msg, retries) + return l.handleMessageWithRetry(ctx, handler, msg, retries, exponentialBackoff) } return err @@ -399,3 +407,13 @@ func extractMessageInfoForLog(msg *sarama.ConsumerMessage) []interface{} { } return []interface{}{"message_topic", msg.Topic, "topic_partition", msg.Partition, "message_offset", msg.Offset, "message_key", string(msg.Key)} } + +func calculateExponentialBackoffDuration(retries int, baseDuration *time.Duration) time.Duration { + var duration time.Duration + if baseDuration == nil { + duration = 0 + } else { + duration = *baseDuration + } + return duration * time.Duration(math.Pow(2, float64(retries))) +} diff --git a/listener_test.go b/listener_test.go index 6a6885c..250710e 100644 --- a/listener_test.go +++ b/listener_test.go @@ -457,7 +457,28 @@ func Test_handleMessageWithRetry(t *testing.T) { } l := listener{} - l.handleMessageWithRetry(context.Background(), handler, nil, 3) + l.handleMessageWithRetry(context.Background(), handler, nil, 3, false) + + assert.Equal(t, 4, handlerCalled) +} + +func Test_handleMessageWithRetryWithBackoff(t *testing.T) { + // Reduce the retry interval to speed up the test + DurationBeforeRetry = 1 * time.Millisecond + + err := errors.New("This error should be retried") + handlerCalled := 0 + handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error { + handlerCalled++ + return err + } + handler := Handler{ + Processor: handlerProcessor, + Config: testHandlerConfig, + } + + l := listener{} + l.handleMessageWithRetry(context.Background(), handler, nil, 3, true) assert.Equal(t, 4, handlerCalled) } @@ -475,7 +496,25 @@ func Test_handleMessageWithRetry_UnretriableError(t *testing.T) { } l := listener{} - l.handleMessageWithRetry(context.Background(), handler, nil, 3) + l.handleMessageWithRetry(context.Background(), handler, nil, 3, false) + + assert.Equal(t, 1, handlerCalled) +} + +func Test_handleMessageWithRetry_UnretriableErrorWithBackoff(t *testing.T) { + err := errors.New("This error should not be retried") + handlerCalled := 0 + handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error { + handlerCalled++ + return fmt.Errorf("%w: %w", err, ErrEventUnretriable) + } + handler := Handler{ + Processor: handlerProcessor, + Config: testHandlerConfig, + } + + l := listener{} + l.handleMessageWithRetry(context.Background(), handler, nil, 3, true) assert.Equal(t, 1, handlerCalled) } @@ -503,7 +542,35 @@ func Test_handleMessageWithRetry_InfiniteRetries(t *testing.T) { } l := listener{} - l.handleMessageWithRetry(context.Background(), handler, nil, InfiniteRetries) + l.handleMessageWithRetry(context.Background(), handler, nil, InfiniteRetries, false) + + assert.Equal(t, 5, handlerCalled) + +} +func Test_handleMessageWithRetry_InfiniteRetriesWithBackoff(t *testing.T) { + // Reduce the retry interval to speed up the test + DurationBeforeRetry = 1 * time.Millisecond + + err := errors.New("This error should be retried") + handlerCalled := 0 + handlerProcessor := func(ctx context.Context, msg *sarama.ConsumerMessage) error { + handlerCalled++ + + // We simulate an infinite retry by failing 5 times, and then succeeding, + // which is above the 3 retries normally expected + if handlerCalled < 5 { + return err + } + return nil + } + + handler := Handler{ + Processor: handlerProcessor, + Config: testHandlerConfig, + } + + l := listener{} + l.handleMessageWithRetry(context.Background(), handler, nil, InfiniteRetries, true) assert.Equal(t, 5, handlerCalled) @@ -534,7 +601,7 @@ func Test_handleMessageWithRetry_InfiniteRetriesWithContextCancel(t *testing.T) } l := listener{} - l.handleMessageWithRetry(ctx, handler, nil, InfiniteRetries) + l.handleMessageWithRetry(ctx, handler, nil, InfiniteRetries, false) assert.Equal(t, 5, handlerCalled) @@ -633,3 +700,56 @@ func Test_Listen_ContextCanceled(t *testing.T) { assert.Equal(t, context.Canceled, err) consumerGroup.AssertExpectations(t) } + +func Test_calculateExponentialBackoffDuration(t *testing.T) { + tests := []struct { + name string + retries int + baseDuration *time.Duration + expectedDelay time.Duration + }{ + { + name: "nil base duration", + retries: 3, + baseDuration: nil, + expectedDelay: 0, + }, + { + name: "zero retries", + retries: 0, + baseDuration: Ptr(1 * time.Second), + expectedDelay: 1 * time.Second, + }, + { + name: "one retry", + retries: 1, + baseDuration: Ptr(1 * time.Second), + expectedDelay: 2 * time.Second, + }, + { + name: "two retries", + retries: 2, + baseDuration: Ptr(1 * time.Second), + expectedDelay: 4 * time.Second, + }, + { + name: "three retries", + retries: 3, + baseDuration: Ptr(1 * time.Second), + expectedDelay: 8 * time.Second, + }, + { + name: "three retries with different base duration", + retries: 3, + baseDuration: Ptr(500 * time.Millisecond), + expectedDelay: 4 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + delay := calculateExponentialBackoffDuration(tt.retries, tt.baseDuration) + assert.Equal(t, tt.expectedDelay, delay) + }) + } +}