Skip to content

Commit

Permalink
use hooks to track which broker sent a response to our offset commit
Browse files Browse the repository at this point in the history
  • Loading branch information
rikimaru0345 committed May 19, 2021
1 parent 6f7eb6d commit 0546a23
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 78 deletions.
37 changes: 16 additions & 21 deletions e2e/client_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package e2e

import (
"net"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kgo"
Expand All @@ -12,18 +13,21 @@ import (
// in e2e we only use client hooks for logging connect/disconnect messages
type clientHooks struct {
logger *zap.Logger

lastCoordinatorUpdate time.Time
currentCoordinator *atomic.Value // kgo.BrokerMetadata
}

func newEndToEndClientHooks(logger *zap.Logger) *clientHooks {

logger = logger.With(zap.String("source", "end_to_end"))

return &clientHooks{
logger: logger,
currentCoordinator: &atomic.Value{},
}
}

func (c clientHooks) OnConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ net.Conn, err error) {
func (c *clientHooks) OnConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ net.Conn, err error) {
if err != nil {
c.logger.Error("kafka connection failed", zap.String("broker_host", meta.Host), zap.Int32("broker_id", meta.NodeID), zap.Error(err))
return
Expand All @@ -33,7 +37,7 @@ func (c clientHooks) OnConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _
zap.Duration("dial_duration", dialDur))
}

func (c clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
func (c *clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
c.logger.Warn("kafka broker disconnected", zap.Int32("broker_id", meta.NodeID),
zap.String("host", meta.Host))
}
Expand All @@ -47,23 +51,17 @@ func (c clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) {
// OnWrite is called after a write to a broker.
//
// OnWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error)
func (c clientHooks) OnWrite(meta kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
func (c *clientHooks) OnWrite(meta kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) {
keyName := kmsg.NameForKey(key)
if keyName != "OffsetCommit" {
return
}

c.logger.Info("hooks onWrite",
zap.Duration("timeToWrite", timeToWrite),
zap.NamedError("err", err))

offsetCommitStarted = time.Now()
// c.logger.Info("hooks onWrite",
// zap.Duration("timeToWrite", timeToWrite),
// zap.NamedError("err", err))
}

var (
offsetCommitStarted time.Time
)

// OnRead is passed the broker metadata, the key for the response that
// was read, the number of bytes read, how long the Client waited
// before reading the response, how long it took to read the response,
Expand All @@ -72,18 +70,15 @@ var (
// The bytes written does not count any tls overhead.
// OnRead is called after a read from a broker.
// OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error)
func (c clientHooks) OnRead(meta kgo.BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) {
func (c *clientHooks) OnRead(meta kgo.BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) {

keyName := kmsg.NameForKey(key)
if keyName != "OffsetCommit" {
return
}

dur := time.Since(offsetCommitStarted)

c.logger.Info("hooks onRead",
zap.Int64("timeToReadMs", timeToRead.Milliseconds()),
zap.Int64("totalTime", dur.Milliseconds()),
zap.NamedError("err", err))

if err == nil {
c.currentCoordinator.Store(meta)
c.lastCoordinatorUpdate = time.Now()
}
}
69 changes: 12 additions & 57 deletions e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package e2e
import (
"context"
"encoding/json"
"sync/atomic"
"time"

"github.com/twmb/franz-go/pkg/kerr"
Expand Down Expand Up @@ -31,49 +30,6 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
client.AssignGroup(s.groupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit())
s.logger.Info("Starting to consume end-to-end", zap.String("topicName", topicName), zap.String("groupId", s.groupId))

// Keep checking for the coordinator
var currentCoordinator atomic.Value
currentCoordinator.Store(kgo.BrokerMetadata{})

go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return

case <-ticker.C:
describeReq := kmsg.NewDescribeGroupsRequest()
describeReq.Groups = []string{s.groupId}
describeReq.IncludeAuthorizedOperations = false

shards := client.RequestSharded(ctx, &describeReq)
for _, shard := range shards {
// since we're only interested in the coordinator, we only check for broker errors on the response that contains our group
response, ok := shard.Resp.(*kmsg.DescribeGroupsResponse)
if !ok {
s.logger.Warn("cannot cast shard response to DescribeGroupsResponse")
continue
}
if len(response.Groups) == 0 {
s.logger.Warn("DescribeGroupsResponse contained no groups")
continue
}
group := response.Groups[0]
groupErr := kerr.ErrorForCode(group.ErrorCode)
if groupErr != nil {
s.logger.Error("couldn't describe end-to-end consumer group, error in group", zap.Error(groupErr), zap.Any("broker", shard.Meta))
continue
}

currentCoordinator.Store(shard.Meta)
break
}
}
}
}()

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -109,16 +65,11 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
// Commit offsets for processed messages
// todo: the normal way to commit offsets with franz-go is pretty good, but in our special case
// we want to do it manually, seperately for each partition, so we can track how long it took

// todo: use findGroupCoordinatorID
// maybe ask travis about return value, we want to know what coordinator the offsets was committed to
// kminion probably already exposed coordinator for every group

if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil {

startCommitTimestamp := timeNowMs()

client.CommitOffsets(ctx, uncommittedOffset, func(_ *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) {
client.CommitOffsets(ctx, uncommittedOffset, func(req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) {
// got commit response

latencyMs := timeNowMs() - startCommitTimestamp
Expand All @@ -129,16 +80,20 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error {
return
}

// todo: check each partitions error code
for _, t := range r.Topics {
for _, p := range t.Partitions {
err := kerr.ErrorForCode(p.ErrorCode)
if err != nil {
s.logger.Error("error committing partition offset", zap.String("topic", t.Topic), zap.Int32("partitionId", p.Partition), zap.Error(err))
}
}
}

// only report commit latency if the coordinator is known
coordinator := currentCoordinator.Load().(kgo.BrokerMetadata)
if len(coordinator.Host) > 0 {
// only report commit latency if the coordinator wasn't set too long ago
if time.Since(s.clientHooks.lastCoordinatorUpdate) < 10*time.Second {
coordinator := s.clientHooks.currentCoordinator.Load().(kgo.BrokerMetadata)
s.onOffsetCommit(coordinator.NodeID, commitLatency)
} else {
s.logger.Warn("won't report commit latency since broker coordinator is still unknown", zap.Int64("latencyMilliseconds", commitLatency.Milliseconds()))
}

})
}

Expand Down

0 comments on commit 0546a23

Please sign in to comment.