Skip to content

Commit

Permalink
Merge pull request #26 from ricardo-ch/adr-35-instrumentation
Browse files Browse the repository at this point in the history
Go-kafka V2
  • Loading branch information
EtienneGuerlais authored Jun 5, 2023
2 parents 0b113de + 835d0bd commit 4f215a3
Show file tree
Hide file tree
Showing 16 changed files with 743 additions and 175 deletions.
10 changes: 1 addition & 9 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
version: 2.1
orbs:
coveralls: coveralls/[email protected]
jobs:
quality-gate:
docker:
Expand All @@ -16,15 +14,9 @@ jobs:
- run:
name: Test
command: make test
- run:
name: install coveralls
command: go install github.com/mattn/goveralls@latest
- run:
name: code coverage
command: goveralls -service=circle-ci -ignore "mocks/*" -repotoken=$COVERALLS_REPO_TOKEN
workflows:
version: 2
coveralls_workflow:
quality-gate:
jobs:
- quality-gate:
context: dev
118 changes: 87 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,116 @@
[![Coverage Status](https://coveralls.io/repos/github/ricardo-ch/go-kafka/badge.svg?branch=master)](https://coveralls.io/github/ricardo-ch/go-kafka?branch=master)
[![Go Report Card](https://goreportcard.com/badge/github.com/ricardo-ch/go-kafka)](https://goreportcard.com/report/github.com/ricardo-ch/go-kafka)

Go-kafka provides an easy way to use kafka listener, producer and go-kit like server with only few lines of code.
The listener is able to listen multiple topics, and will execute a defined go-kit endpoint by topic message.
Go-kafka provides an easy way to use kafka listeners and producers with only a few lines of code.
The listener is able to consume from multiple topics, and will execute a separate handler for each topic.

## Quick start

Simple consumer
```golang
// go-kit event endpoints
var endpointEvent1 endpoint.Endpoint
var endpointEvent2 endpoint.Endpoint
// topic-specific handlers
var handler1 kafka.Handler
var handler2 kafka.Handler

// set your handlers
myEvent1Handler := kafka.NewServer(endpointEvent1, myDecodeMethod1, nil)
myEvent2Handler := kafka.NewServer(endpointEvent2, myDecodeMethod2, nil)
// map your topics to their handlers
handlers := map[string]kafka.Handler{
"topic-event-1": myEvent1Handler,
"topic-event-2": myEvent2Handler,
"topic-1": handler1,
"topic-2": handler2,
}

// define your listener
listener, _ := kafka.NewListener(brokers, "my-consumer-group", handlers)
kafka.Brokers = []string{"localhost:9092"}
listener, _ := kafka.NewListener("my-consumer-group", handlers)
defer listener.Close()

// listen and enjoy
errc <- listener.Listen(ctx)
```

Simple producer
```golang
// define your producer
kafka.Brokers = []string{"localhost:9092"}
producer, _ := kafka.NewProducer()

// send your message
message := &sarama.ProducerMessage{
Topic: "my-topic",
Value: sarama.StringEncoder("my-message"),
}
_ = producer.Produce(message)
```

## Features

* Create a listener on multiple topics
* Retry policy on message handling
* Create a producer
* Create a go-kit like server
* Add instrumenting on Prometheus
* Prometheus instrumenting

## Consumer error handling

You can customize the error handling of the consumer.
And if there's still an error after all possible retries (3 by default), the error is logged and the faulty event can be pushed to a deadletter topic.

### Deadletter

By default, events that have exceeded the maximum number of retries will be pushed to a dead letter topic.
This behaviour can be disabled through the `PushConsumerErrorsToTopic` property.
```go
PushConsumerErrorsToTopic = false
```
The name of the deadletter topic is dynamically generated based on the original topic name and the consumer group.
For example, if the original topic is `my-topic` and the consumer group is `my-consumer-group`, the deadletter topic will be `my-consumer-group-my-topic-error`.
This pattern can be overridden through the `ErrorTopicPattern` property.
```go
ErrorTopicPattern = "custom-deadletter-topic"
```

### Retries

By default, failed events consumptions will be retried 3 times (each attempt is separated by 2 seconds).
This can be configured through the following properties:
* `ConsumerMaxRetries`
* `DurationBeforeRetry`

If you want to **not** retry specific errors, you can wrap them in a `kafka.ErrNonRetriable` error before returning them, or return a `kafka.ErrNonRetriable` directly.
```go
// This error will not be retried
err := errors.New("my error")
return errors.Wrap(kafka.ErrNonRetriable, err.Error())

// This error will also not be retried
return kafka.ErrNonRetriable
```

### Omitting specific errors

In certain scenarios, you might want to omit some errors. For example, you might want to discard outdated events that are not relevant anymore.
Such events would increase a separate, dedicated metric instead of the error one, and would not be retried.
To do so, wrap the errors that should lead to omitted events in a ErrEventOmitted, or return a kafka.ErrEventOmitted directly.
```go
// This error will be omitted
err := errors.New("my error")
return errors.Wrap(kafka.ErrEventOmitted, err.Error())

// This error will also be omitted
return kafka.ErrEventOmitted
```

## Instrumenting

Currently the instrumenting is implemented only on consumer part.
The metrics are exported on prometheus
The metrics are :
* Number of requests processed (label: kafka_topic, success)
* Total duration in milliseconds (label: kafka_topic)
Metrics for the listener and the producer can be exported to Prometheus.
The following metrics are available:
| Metric name | Labels | Description |
|-------------|--------|-------------|
| kafka_consumer_record_consumed_total | kafka_topic, consumer_group | Number of messages consumed |
| kafka_consumer_record_latency_seconds | kafka_topic, consumer_group | Latency of consuming a message |
| kafka_consumer_record_omitted_total | kafka_topic, consumer_group | Number of messages omitted |
| kafka_consumer_record_error_total | kafka_topic, consumer_group | Number of errors when consuming a message |
| kafka_producer_record_send_total | kafka_topic | Number of messages sent |
| kafka_producer_dead_letter_created_total | kafka_topic | Number of messages sent to a dead letter topic |
| kafka_producer_record_error_total | kafka_topic | Number of errors when sending a message |

To activate the tracing on go-Kafka:

Expand All @@ -62,19 +131,6 @@ go func() {

```

## Consumer error handling

The listener object is able to have a specific handle for consuming errors.
By default, if an error occurs, it's retried 3 times (each attempt is separated by 2 seconds).
And if there's still an error after the 3 retries, the error is logged and pushed to a topic named like `"group-id"-"original-topic-name"-error`.

All this strategy can be overridden through the following config variables:

* ConsumerMaxRetries
* DurationBeforeRetry
* PushConsumerErrorsToTopic
* ErrorTopicPattern

## Default configuration

Configuration of consumer/producer is opinionated. It aim to resolve simply problems that have taken us by surprise in the past.
Expand Down
39 changes: 39 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package kafka

import (
"sync"

"github.com/Shopify/sarama"
"github.com/pkg/errors"
)

var (
client *sarama.Client
clientMutex = &sync.Mutex{}
)

func getClient() (*sarama.Client, error) {
if client != nil {
return client, nil
}

clientMutex.Lock()
defer clientMutex.Unlock()

if client == nil {
var c sarama.Client
var err error

if len(Brokers) == 0 {
return nil, errors.New("cannot create new client, Brokers must be specified")
}
c, err = sarama.NewClient(Brokers, Config)
if err != nil {
return nil, err
}

client = &c
}

return client, nil
}
3 changes: 2 additions & 1 deletion example/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package main
import (
"context"
"encoding/json"

"github.com/Shopify/sarama"
"github.com/ricardo-ch/go-kafka"
"github.com/ricardo-ch/go-kafka/v2"
)

func makeUserHandler(s Service) kafka.Handler {
Expand Down
7 changes: 5 additions & 2 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package main

import (
"context"
"github.com/ricardo-ch/go-kafka"
"log"

"github.com/ricardo-ch/go-kafka/v2"
)

var (
Expand All @@ -14,7 +15,9 @@ var (
func main() {
handlers := kafka.Handlers{}
handlers["test-users"] = makeUserHandler(NewService())
listener, err := kafka.NewListener(brokers, appName, handlers)
kafka.Brokers = brokers

listener, err := kafka.NewListener(appName, handlers)
if err != nil {
log.Fatalln("could not initialise listener:", err)
}
Expand Down
3 changes: 3 additions & 0 deletions go-kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"github.com/Shopify/sarama"
)

// Brokers is the list of Kafka brokers to connect to.
var Brokers []string

// StdLogger is used to log messages.
type StdLogger interface {
Print(v ...interface{})
Expand Down
6 changes: 1 addition & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/ricardo-ch/go-kafka
module github.com/ricardo-ch/go-kafka/v2

go 1.19

Expand Down Expand Up @@ -41,14 +41,10 @@ require (
github.com/stretchr/objx v0.4.0 // indirect
github.com/uber/jaeger-client-go v2.23.1+incompatible // indirect
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
github.com/vektra/mockery v1.1.2 // indirect
go.uber.org/atomic v1.6.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/mod v0.3.0 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
5 changes: 0 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,6 @@ github.com/uber/jaeger-client-go v2.23.1+incompatible h1:uArBYHQR0HqLFFAypI7RsWT
github.com/uber/jaeger-client-go v2.23.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/vektra/mockery v1.1.2 h1:uc0Yn67rJpjt8U/mAZimdCKn9AeA97BOkjpmtBSlfP4=
github.com/vektra/mockery v1.1.2/go.mod h1:VcfZjKaFOPO+MpN4ZvwPjs4c48lkq1o3Ym8yHZJu0jU=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down Expand Up @@ -306,7 +304,6 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -446,7 +443,6 @@ golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200227222343-706bc42d1f0d/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw=
golang.org/x/tools v0.0.0-20200312045724-11d5b4c81c7d/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw=
golang.org/x/tools v0.0.0-20200323144430-8dcfad9e016e/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8=
golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8=
golang.org/x/tools v0.0.0-20200501065659-ab2804fb9c9d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand All @@ -459,7 +455,6 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
Expand Down
Loading

0 comments on commit 4f215a3

Please sign in to comment.