Skip to content

Commit

Permalink
Merge pull request #34 from ricardo-ch/override-config-at-topic-level
Browse files Browse the repository at this point in the history
Allow to define topic-level config to override global attributes
  • Loading branch information
EdouardBavoux authored Apr 16, 2024
2 parents 50cb591 + fef35c7 commit 235363c
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 84 deletions.
89 changes: 72 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@
Go-kafka provides an easy way to use kafka listeners and producers with only a few lines of code.
The listener is able to consume from multiple topics, and will execute a separate handler for each topic.

> 📘 Important note for v3 upgrade:
> - The library now relies on the IBM/sarama library instead of Shopify/sarama, which is no longer maintained.
> - The `kafka.Handler` type has been changed to a struct containing both the function to execute and the handler's optional configuration.
> - The global variable `PushConsumerErrorsToTopic` has been replaced by the `PushConsumerErrorsToRetryTopic` and `PushConsumerErrorsToDeadletterTopic` properties on the handler.
>
> These two changes should be the only breaking changes in the v3 release. The rest of the library should be compatible with the previous version.
## Quick start

Simple consumer
Expand Down Expand Up @@ -53,33 +60,66 @@ _ = producer.Produce(message)

## Consumer error handling

You can customize the error handling of the consumer.
And if there's still an error after all possible retries (3 by default), the error is logged and the faulty event can be pushed to a deadletter topic.

### Deadletter
You can customize the error handling of the consumer, using various patterns:
* Blocking retries of the same event (Max number, and delay are configurable by handler)
* Forward to retry topic for automatic retry without blocking the consumer
* Forward to deadletter topic for manual investigation

Here is the overall logic applied to handle errors:
```mermaid
stateDiagram-v2
init: Error processing an event
state is_omitable_err <<choice>>
skipWithoutCounting: Skip the event without impacting counters
state is_retriable_err <<choice>>
state is_deadletter_configured <<choice>>
skip: Skip the event
forwardDL: Forward to deadletter topic
state should_retry <<choice>>
blocking_retry : Blocking Retry of this event
state is_retry_topic_configured <<choice>>
state is_deadletter_configured2 <<choice>>
forwardRQ: Forward to Retry topic
skip2: Skip the event
defaultDL: Forward to Deadletter topic
init --> is_omitable_err
is_omitable_err --> skipWithoutCounting: Error is of type ErrEventOmitted
is_omitable_err --> is_retriable_err: Error is not an ErrEventOmitted
is_retriable_err --> is_deadletter_configured: Error is of type ErrEventUnretriable
is_retriable_err --> should_retry: Error is retriable
should_retry --> blocking_retry: There are some retries left
should_retry --> is_retry_topic_configured : No more blocking retry
is_deadletter_configured --> skip: No Deadletter topic configured
is_deadletter_configured --> forwardDL: Deadletter topic configured
is_retry_topic_configured --> forwardRQ: Retry Topic Configured
is_retry_topic_configured --> is_deadletter_configured2: No Retry Topic Configured
is_deadletter_configured2 --> skip2: No Deadletter topic configured
is_deadletter_configured2 --> defaultDL: Deadletter topic configured
By default, events that have exceeded the maximum number of retries will be pushed to a dead letter topic.
This behaviour can be disabled through the `PushConsumerErrorsToTopic` property.
```go
PushConsumerErrorsToTopic = false
```
The name of the deadletter topic is dynamically generated based on the original topic name and the consumer group.
For example, if the original topic is `my-topic` and the consumer group is `my-consumer-group`, the deadletter topic will be `my-consumer-group-my-topic-error`.
This pattern can be overridden through the `ErrorTopicPattern` property.
```go
ErrorTopicPattern = "custom-deadletter-topic"
```
### Error types
Two types of errors are introduced, so that application code can return them whenever relevant
* `kafka.ErrEventUnretriable` - Errors that should not be retried
* `kafka.ErrEventOmitted` - Errors that should lead to the event being omitted without impacting metrics

All the other errors will be considered as "retryable" errors.

Depending on the Retry topic/Deadletter topic/Max retries configuration, the event will be retried, forwarded to a retry topic, or forwarded to a deadletter topic.

### Retries
### Blocking Retries

By default, failed events consumptions will be retried 3 times (each attempt is separated by 2 seconds).
This can be configured through the following properties:
This can be globally configured through the following properties:
* `ConsumerMaxRetries`
* `DurationBeforeRetry`

These properties can also be configured on a per-topic basis by setting the `ConsumerMaxRetries` and `DurationBeforeRetry` properties on the handler.

If you want to achieve a blocking retry pattern (ie. continuously retrying until the event is successfully consumed), you can set `ConsumerMaxRetries` to `InfiniteRetries` (-1).

If you want to **not** retry specific errors, you can wrap them in a `kafka.ErrNonRetriable` error before returning them, or return a `kafka.ErrNonRetriable` directly.
If you want to **not** retry specific errors, you can wrap them in a `kafka.ErrEventUnretriable` error before returning them, or return a `kafka.ErrNonRetriable` directly.
```go
// This error will not be retried
err := errors.New("my error")
Expand All @@ -89,6 +129,21 @@ return errors.Wrap(kafka.ErrNonRetriable, err.Error())
return kafka.ErrNonRetriable
```

### Deadletter And Retry topics

By default, events that have exceeded the maximum number of blocking retries will be pushed to a retry topic or dead letter topic.
This behaviour can be disabled through the `PushConsumerErrorsToRetryTopic` and `PushConsumerErrorsToDeadletterTopic` properties.
```go
PushConsumerErrorsToRetryTopic = false
PushConsumerErrorsToDeadletterTopic = false
```
If these switches are ON, the names of the deadletter and retry topics are dynamically generated based on the original topic name and the consumer group.
For example, if the original topic is `my-topic` and the consumer group is `my-consumer-group`, the deadletter topic will be `my-consumer-group-my-topic-deadletter`.
This pattern can be overridden through the `ErrorTopicPattern` property.
Also, the retry and deadletter topics name can be overridden through the `RetryTopic` and `DeadLetterTopic` properties on the handler.

Note that, if global `PushConsumerErrorsToRetryTopic` or `PushConsumerErrorsToDeadletterTopic` property are false, but you configure `RetryTopic` or `DeadLetterTopic` properties on a handler, then the events in error will be forwarder to the error topics only for this handler.

### Omitting specific errors

In certain scenarios, you might want to omit some errors. For example, you might want to discard outdated events that are not relevant anymore.
Expand Down
19 changes: 13 additions & 6 deletions example/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,26 @@ package main
import (
"context"
"encoding/json"
"time"

"github.com/IBM/sarama"
"github.com/ricardo-ch/go-kafka/v3"
)

func makeUserHandler(s Service) kafka.Handler {
return func(ctx context.Context, msg *sarama.ConsumerMessage) error {
parsedMsg, err := decodeUserEvent(msg.Value)
if err != nil {
return err
}
return kafka.Handler{
Processor: func(ctx context.Context, msg *sarama.ConsumerMessage) error {
parsedMsg, err := decodeUserEvent(msg.Value)
if err != nil {
return err
}

return s.OnUserEvent(parsedMsg)
return s.OnUserEvent(parsedMsg)
},
Config: kafka.HandlerConfig{
ConsumerMaxRetries: kafka.Ptr(2),
DurationBeforeRetry: kafka.Ptr(5 * time.Second),
},
}
}

Expand Down
19 changes: 14 additions & 5 deletions go-kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,23 @@ const InfiniteRetries = -1
// By default 2 seconds.
var DurationBeforeRetry = 2 * time.Second

// PushConsumerErrorsToTopic is a boolean to define if messages in error have to be pushed to an error topic.
var PushConsumerErrorsToTopic = true
// PushConsumerErrorsToRetryTopic is a boolean to define if messages in error have to be pushed to a retry topic.
var PushConsumerErrorsToRetryTopic = true

// ErrorTopicPattern is the error topic name pattern.
// By default "consumergroup-topicname-error"
// PushConsumerErrorsToDeadletterTopic is a boolean to define if messages in error have to be pushed to a deadletter topic.
var PushConsumerErrorsToDeadletterTopic = true

// RetryTopicPattern is the retry topic name pattern.
// By default "consumergroup-topicname-retry"
// Use $$CG$$ as consumer group placeholder
// Use $$T$$ as original topic name placeholder
var RetryTopicPattern = "$$CG$$-$$T$$-retry"

// DeadletterTopicPattern is the deadletter topic name pattern.
// By default "consumergroup-topicname-deadletter"
// Use $$CG$$ as consumer group placeholder
// Use $$T$$ as original topic name placeholder
var ErrorTopicPattern = "$$CG$$-$$T$$-error"
var DeadletterTopicPattern = "$$CG$$-$$T$$-deadletter"

// Config is the sarama (cluster) config used for the consumer and producer.
var Config = sarama.NewConfig()
Expand Down
45 changes: 24 additions & 21 deletions instrumenting.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,27 +157,30 @@ func NewConsumerMetricsService(groupID string) *ConsumerMetricsService {

// Instrumentation middleware used to add metrics
func (c *ConsumerMetricsService) Instrumentation(next Handler) Handler {
return func(ctx context.Context, msg *sarama.ConsumerMessage) (err error) {
defer func(begin time.Time) {
c.recordConsumedLatency.WithLabelValues(msg.Topic, c.groupID).Observe(time.Since(begin).Seconds())
}(time.Now())

err = next(ctx, msg)
if err == nil {
c.recordConsumedCounter.WithLabelValues(msg.Topic, c.groupID).Inc()

// If sarama sets the timestamp to the block timestamp, it means that the message was
// produced with the LogAppendTime timestamp type. Otherwise, it was produced with the
// CreateTime timestamp type.
// Since sarama anyways sets msg.BlockTimestamp to the block timestamp,
// we can compare it with msg.Timestamp to know if the message was produced with the
// LogAppendTime timestamp type or not.
timestampType := TimestampTypeLogAppendTime
if msg.Timestamp != msg.BlockTimestamp {
timestampType = TimestampTypeCreateTime
return Handler{
Processor: func(ctx context.Context, msg *sarama.ConsumerMessage) (err error) {
defer func(begin time.Time) {
c.recordConsumedLatency.WithLabelValues(msg.Topic, c.groupID).Observe(time.Since(begin).Seconds())
}(time.Now())

err = next.Processor(ctx, msg)
if err == nil {
c.recordConsumedCounter.WithLabelValues(msg.Topic, c.groupID).Inc()

// If sarama sets the timestamp to the block timestamp, it means that the message was
// produced with the LogAppendTime timestamp type. Otherwise, it was produced with the
// CreateTime timestamp type.
// Since sarama anyways sets msg.BlockTimestamp to the block timestamp,
// we can compare it with msg.Timestamp to know if the message was produced with the
// LogAppendTime timestamp type or not.
timestampType := TimestampTypeLogAppendTime
if msg.Timestamp != msg.BlockTimestamp {
timestampType = TimestampTypeCreateTime
}
c.currentMessageTimestamp.WithLabelValues(msg.Topic, c.groupID, string(msg.Partition), timestampType).Set(float64(msg.Timestamp.Unix()))
}
c.currentMessageTimestamp.WithLabelValues(msg.Topic, c.groupID, string(msg.Partition), timestampType).Set(float64(msg.Timestamp.Unix()))
}
return
return
},
Config: next.Config,
}
}
8 changes: 5 additions & 3 deletions instrumenting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ var testEncodedMessage = []byte{10, 3, 49, 50, 51}
func Test_NewConsumerMetricsService_Should_Return_Success_When_Success(t *testing.T) {
// Arrange
s := NewConsumerMetricsService("test_ok")
h := func(context.Context, *sarama.ConsumerMessage) error { return nil }

hp := func(context.Context, *sarama.ConsumerMessage) error { return nil }
h := Handler{
Processor: hp,
}
// Act
handler := s.Instrumentation(h)

err := handler(context.Background(), &sarama.ConsumerMessage{Value: testEncodedMessage, Topic: "test-topic"})
err := handler.Processor(context.Background(), &sarama.ConsumerMessage{Value: testEncodedMessage, Topic: "test-topic"})

// Assert
assert.Nil(t, err)
Expand Down
Loading

0 comments on commit 235363c

Please sign in to comment.