Skip to content

Commit

Permalink
Introduce PushConsumerErrorsToRetryTopic and PushConsumerErrorsToDead…
Browse files Browse the repository at this point in the history
…letterTopic global variables
  • Loading branch information
EdouardBavoux committed Apr 16, 2024
1 parent ec52866 commit fef35c7
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 14 deletions.
12 changes: 7 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The listener is able to consume from multiple topics, and will execute a separat
> 📘 Important note for v3 upgrade:
> - The library now relies on the IBM/sarama library instead of Shopify/sarama, which is no longer maintained.
> - The `kafka.Handler` type has been changed to a struct containing both the function to execute and the handler's optional configuration.
> - The global variable `PushConsumerErrorsToTopic` has been replaced by the `PushConsumerErrorsToRetryTopic` and `PushConsumerErrorsToDeadletterTopic` properties on the handler.
>
> These two changes should be the only breaking changes in the v3 release. The rest of the library should be compatible with the previous version.
Expand Down Expand Up @@ -130,17 +131,18 @@ return kafka.ErrNonRetriable

### Deadletter And Retry topics

By default, events that have exceeded the maximum number of blocking retries will be pushed to a dead letter topic.
This behaviour can be disabled through the `PushConsumerErrorsToTopic` property.
By default, events that have exceeded the maximum number of blocking retries will be pushed to a retry topic or dead letter topic.
This behaviour can be disabled through the `PushConsumerErrorsToRetryTopic` and `PushConsumerErrorsToDeadletterTopic` properties.
```go
PushConsumerErrorsToTopic = false
PushConsumerErrorsToRetryTopic = false
PushConsumerErrorsToDeadletterTopic = false
```
The name of the deadletter and retry topics are dynamically generated based on the original topic name and the consumer group.
If these switches are ON, the names of the deadletter and retry topics are dynamically generated based on the original topic name and the consumer group.
For example, if the original topic is `my-topic` and the consumer group is `my-consumer-group`, the deadletter topic will be `my-consumer-group-my-topic-deadletter`.
This pattern can be overridden through the `ErrorTopicPattern` property.
Also, the retry and deadletter topics name can be overridden through the `RetryTopic` and `DeadLetterTopic` properties on the handler.

Note that, if global `PushConsumerErrorsToTopic` property is false, but you configure `RetryTopic` or `DeadLetterTopic` properties on a handler, then the events in error will be forwarder to the error topics only for this handler.
Note that, if global `PushConsumerErrorsToRetryTopic` or `PushConsumerErrorsToDeadletterTopic` property are false, but you configure `RetryTopic` or `DeadLetterTopic` properties on a handler, then the events in error will be forwarder to the error topics only for this handler.

### Omitting specific errors

Expand Down
7 changes: 5 additions & 2 deletions go-kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@ const InfiniteRetries = -1
// By default 2 seconds.
var DurationBeforeRetry = 2 * time.Second

// PushConsumerErrorsToTopic is a boolean to define if messages in error have to be pushed to an error topic.
var PushConsumerErrorsToTopic = true
// PushConsumerErrorsToRetryTopic is a boolean to define if messages in error have to be pushed to a retry topic.
var PushConsumerErrorsToRetryTopic = true

// PushConsumerErrorsToDeadletterTopic is a boolean to define if messages in error have to be pushed to a deadletter topic.
var PushConsumerErrorsToDeadletterTopic = true

// RetryTopicPattern is the retry topic name pattern.
// By default "consumergroup-topicname-retry"
Expand Down
6 changes: 3 additions & 3 deletions listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func (l *listener) handleErrorMessage(initialError error, handler Handler, msg *
}

// If not, check if global retry topic pattern is defined
if PushConsumerErrorsToTopic {
if PushConsumerErrorsToRetryTopic {
topicName := l.deduceTopicNameFromPattern(msg.Topic, RetryTopicPattern)
Logger.Printf("Sending message to retry topic: %s", topicName)
err := forwardToTopic(l, msg, topicName)
Expand All @@ -282,8 +282,8 @@ func (l *listener) handleErrorMessage(initialError error, handler Handler, msg *
return
}
}
// If the error is not retriable, or if there is no retry topic defined at all, then try to send to dead letter topic

// If the error is not retriable, or if there is no retry topic defined at all, then try to send to dead letter topic
// First, check if handler's config defines deadletter topic
if handler.Config.DeadletterTopic != "" {
Logger.Printf("Sending message to handler's deadletter topic: %s", handler.Config.DeadletterTopic)
Expand All @@ -295,7 +295,7 @@ func (l *listener) handleErrorMessage(initialError error, handler Handler, msg *
}

// If not, check if global deadletter topic pattern is defined
if PushConsumerErrorsToTopic {
if PushConsumerErrorsToDeadletterTopic {
topicName := l.deduceTopicNameFromPattern(msg.Topic, DeadletterTopicPattern)
Logger.Printf("Sending message to deadletter topic: %s", topicName)
err := forwardToTopic(l, msg, topicName)
Expand Down
8 changes: 4 additions & 4 deletions listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func Test_ConsumeClaim_Message_Error_WithErrorTopic(t *testing.T) {
// Reduce the retry interval to speed up the test
DurationBeforeRetry = 1 * time.Millisecond

PushConsumerErrorsToTopic = true
PushConsumerErrorsToDeadletterTopic = true

msgChanel := make(chan *sarama.ConsumerMessage, 1)
msgChanel <- &sarama.ConsumerMessage{
Expand Down Expand Up @@ -253,7 +253,7 @@ func Test_ConsumeClaim_Message_Error_WithErrorTopic(t *testing.T) {
}

func Test_ConsumeClaim_Message_Error_WithPanicTopic(t *testing.T) {
PushConsumerErrorsToTopic = true
PushConsumerErrorsToDeadletterTopic = true

msgChanel := make(chan *sarama.ConsumerMessage, 1)
msgChanel <- &sarama.ConsumerMessage{
Expand Down Expand Up @@ -303,7 +303,7 @@ func Test_ConsumeClaim_Message_Error_WithPanicTopic(t *testing.T) {
}

func Test_ConsumeClaim_Message_Error_WithHandlerSpecificRetryTopic(t *testing.T) {
PushConsumerErrorsToTopic = false // global value that is overwritten for the handler in this test
PushConsumerErrorsToRetryTopic = false // global value that is overwritten for the handler in this test

// Arrange
msgChanel := make(chan *sarama.ConsumerMessage, 1)
Expand Down Expand Up @@ -372,7 +372,7 @@ func Test_handleErrorMessage_OmittedError(t *testing.T) {
}).Once()
ErrorLogger = mockLogger

l.handleErrorMessage(context.Background(), fmt.Errorf("%w: %w", omittedError, ErrEventOmitted), Handler{}, nil)
l.handleErrorMessage(fmt.Errorf("%w: %w", omittedError, ErrEventOmitted), Handler{}, nil)

assert.True(t, errorLogged)
}
Expand Down

0 comments on commit fef35c7

Please sign in to comment.