Skip to content
This repository has been archived by the owner on Aug 7, 2023. It is now read-only.

Commit

Permalink
add a groups reaper to remove non existing groups
Browse files Browse the repository at this point in the history
burrow currently keeps reporting lag for non existing consumers.

The only way to remove groups from burrow automatically is configuring
expire-group, which is not ideal as it can conflict with consumer with
no members.

This PR introduces a go routine to get the existing consumer groups from
Kafka, and compare it against burrow consumers to reap the non existing ones.
  • Loading branch information
d1egoaz committed Sep 11, 2020
1 parent 656ca7d commit 9fbb606
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 17 deletions.
1 change: 1 addition & 0 deletions config/burrow.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ servers=[ "kafka01.example.com:10251", "kafka02.example.com:10251", "kafka03.exa
client-profile="test"
topic-refresh=120
offset-refresh=30
groups-reaper-refresh=0

[consumer.local]
class-name="kafka"
Expand Down
102 changes: 85 additions & 17 deletions core/internal/cluster/kafka_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package cluster

import (
"fmt"
"sync"
"time"

Expand All @@ -34,16 +35,18 @@ type KafkaCluster struct {
// fields that are appropriate to identify this coordinator
Log *zap.Logger

name string
saramaConfig *sarama.Config
servers []string
offsetRefresh int
topicRefresh int
name string
saramaConfig *sarama.Config
servers []string
offsetRefresh int
topicRefresh int
groupsReaperRefresh int

offsetTicker *time.Ticker
metadataTicker *time.Ticker
quitChannel chan struct{}
running sync.WaitGroup
offsetTicker *time.Ticker
metadataTicker *time.Ticker
groupsReaperTicker *time.Ticker
quitChannel chan struct{}
running sync.WaitGroup

fetchMetadata bool
topicPartitions map[string][]int32
Expand Down Expand Up @@ -72,8 +75,10 @@ func (module *KafkaCluster) Configure(name, configRoot string) {
// Set defaults for configs if needed
viper.SetDefault(configRoot+".offset-refresh", 10)
viper.SetDefault(configRoot+".topic-refresh", 60)
viper.SetDefault(configRoot+".groups-reaper-refresh", 0)
module.offsetRefresh = viper.GetInt(configRoot + ".offset-refresh")
module.topicRefresh = viper.GetInt(configRoot + ".topic-refresh")
module.groupsReaperRefresh = viper.GetInt(configRoot + ".groups-reaper-refresh")
}

// Start connects to the Kafka cluster using the Shopify/sarama client. Any error connecting to the cluster is returned
Expand All @@ -98,6 +103,15 @@ func (module *KafkaCluster) Start() error {
// Start main loop that has a timer for offset and topic fetches
module.offsetTicker = time.NewTicker(time.Duration(module.offsetRefresh) * time.Second)
module.metadataTicker = time.NewTicker(time.Duration(module.topicRefresh) * time.Second)

if module.groupsReaperRefresh != 0 {
if module.saramaConfig.Version.IsAtLeast(sarama.V0_11_0_0) {
module.groupsReaperTicker = time.NewTicker(time.Duration(module.groupsReaperRefresh) * time.Second)
} else {
module.Log.Warn("groups reaper disabled, it needs at least kafka v0.11.0.0 to get the list of consumer groups")
module.groupsReaperRefresh = 0
}
}
go module.mainLoop(helperClient)

return nil
Expand All @@ -109,6 +123,10 @@ func (module *KafkaCluster) Stop() error {

module.metadataTicker.Stop()
module.offsetTicker.Stop()

if module.groupsReaperRefresh != 0 {
module.groupsReaperTicker.Stop()
}
close(module.quitChannel)
module.running.Wait()

Expand All @@ -120,14 +138,29 @@ func (module *KafkaCluster) mainLoop(client helpers.SaramaClient) {
defer module.running.Done()

for {
select {
case <-module.offsetTicker.C:
module.getOffsets(client)
case <-module.metadataTicker.C:
// Update metadata on next offset fetch
module.fetchMetadata = true
case <-module.quitChannel:
return
// NOTE: or I could simple create a very long tick ¯\_(ツ)_/¯
if module.groupsReaperRefresh != 0 {
select {
case <-module.offsetTicker.C:
module.getOffsets(client)
case <-module.metadataTicker.C:
// Update metadata on next offset fetch
module.fetchMetadata = true
case <-module.groupsReaperTicker.C:
module.reapNonExistingGroups(client)
case <-module.quitChannel:
return
}
} else {
select {
case <-module.offsetTicker.C:
module.getOffsets(client)
case <-module.metadataTicker.C:
// Update metadata on next offset fetch
module.fetchMetadata = true
case <-module.quitChannel:
return
}
}
}
}
Expand Down Expand Up @@ -278,3 +311,38 @@ func (module *KafkaCluster) getOffsets(client helpers.SaramaClient) {
return false
})
}

func (module *KafkaCluster) reapNonExistingGroups(client helpers.SaramaClient) {
kafkaGroups, err := client.ListConsumerGroups()
if err != nil {
module.Log.Warn("failed to get the list of available consumer groups", zap.Error(err))
}

req := &protocol.StorageRequest{
RequestType: protocol.StorageFetchConsumers,
Reply: make(chan interface{}),
Cluster: module.name,
}
helpers.TimeoutSendStorageRequest(module.App.StorageChannel, req, 20)

res := <-req.Reply
if res == nil {
module.Log.Warn("groups reaper: couldn't get list of consumer groups from storage")
return
}

// TODO: find how to get reportedConsumerGroup from KafkaClient
burrowIgnoreGroupName := "burrow-" + module.name
burrowGroups, _ := res.([]string)
for _, g := range burrowGroups {
if _, ok := kafkaGroups[g]; !ok && g != burrowIgnoreGroupName {
module.Log.Info(fmt.Sprintf("groups reaper: removing non existing kafka consumer group (%s) from burrow", g))
request := &protocol.StorageRequest{
RequestType: protocol.StorageSetDeleteGroup,
Cluster: module.name,
Group: g,
}
helpers.TimeoutSendStorageRequest(module.App.StorageChannel, request, 1)
}
}
}
22 changes: 22 additions & 0 deletions core/internal/cluster/kafka_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestKafkaCluster_Configure_DefaultIntervals(t *testing.T) {

assert.Equal(t, int(10), module.offsetRefresh, "Default OffsetRefresh value of 10 did not get set")
assert.Equal(t, int(60), module.topicRefresh, "Default TopicRefresh value of 60 did not get set")
assert.Equal(t, int(0), module.groupsReaperRefresh, "Default GroupsReaperRefresh value of 0 did not get set")
}

func TestKafkaCluster_maybeUpdateMetadataAndDeleteTopics_NoUpdate(t *testing.T) {
Expand Down Expand Up @@ -280,3 +281,24 @@ func TestKafkaCluster_getOffsets_BrokerFailed(t *testing.T) {
broker.AssertExpectations(t)
client.AssertExpectations(t)
}

func TestKafkaCluster_reapNonExistingGroups(t *testing.T) {
module := fixtureModule()
module.Configure("test", "cluster.test")

client := &helpers.MockSaramaClient{}
client.On("ListConsumerGroups").Return(map[string]string{"group1": ""}, nil)

go module.reapNonExistingGroups(client)
request := <-module.App.StorageChannel
client.AssertExpectations(t)

assert.Equalf(t, protocol.StorageFetchConsumers, request.RequestType, "Expected request sent with type StorageFetchConsumers, not %v", request.RequestType)
assert.Equalf(t, "test", request.Cluster, "Expected request sent with cluster test, not %v", request.Cluster)

request.Reply <- []string{"group1", "group2"}
request = <-module.App.StorageChannel
assert.Equalf(t, protocol.StorageSetDeleteGroup, request.RequestType, "Expected request sent with type StorageFetchConsumers, not %v", request.RequestType)
assert.Equalf(t, "test", request.Cluster, "Expected request sent with cluster test, not %v", request.Cluster)
assert.Equalf(t, "group2", request.Group, "Expected request sent with group group2, not %v", request.Group)
}
6 changes: 6 additions & 0 deletions docker-config/burrow.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@ servers=[ "zookeeper:2181" ]
timeout=6
root-path="/burrow"

[client-profile.profile]
kafka-version="0.11.0"
client-id="docker-client"

[cluster.local]
client-profile="profile"
class-name="kafka"
servers=[ "kafka:9092" ]
topic-refresh=60
offset-refresh=30
groups-reaper-refresh=30

[consumer.local]
class-name="kafka"
Expand Down

0 comments on commit 9fbb606

Please sign in to comment.