diff --git a/e2e/client_hooks.go b/e2e/client_hooks.go index 93b720b..de6571d 100644 --- a/e2e/client_hooks.go +++ b/e2e/client_hooks.go @@ -2,6 +2,7 @@ package e2e import ( "net" + "sync/atomic" "time" "github.com/twmb/franz-go/pkg/kgo" @@ -12,6 +13,9 @@ import ( // in e2e we only use client hooks for logging connect/disconnect messages type clientHooks struct { logger *zap.Logger + + lastCoordinatorUpdate time.Time + currentCoordinator *atomic.Value // kgo.BrokerMetadata } func newEndToEndClientHooks(logger *zap.Logger) *clientHooks { @@ -19,11 +23,11 @@ func newEndToEndClientHooks(logger *zap.Logger) *clientHooks { logger = logger.With(zap.String("source", "end_to_end")) return &clientHooks{ - logger: logger, + currentCoordinator: &atomic.Value{}, } } -func (c clientHooks) OnConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ net.Conn, err error) { +func (c *clientHooks) OnConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ net.Conn, err error) { if err != nil { c.logger.Error("kafka connection failed", zap.String("broker_host", meta.Host), zap.Int32("broker_id", meta.NodeID), zap.Error(err)) return @@ -33,7 +37,7 @@ func (c clientHooks) OnConnect(meta kgo.BrokerMetadata, dialDur time.Duration, _ zap.Duration("dial_duration", dialDur)) } -func (c clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { +func (c *clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { c.logger.Warn("kafka broker disconnected", zap.Int32("broker_id", meta.NodeID), zap.String("host", meta.Host)) } @@ -47,23 +51,17 @@ func (c clientHooks) OnDisconnect(meta kgo.BrokerMetadata, _ net.Conn) { // OnWrite is called after a write to a broker. // // OnWrite(meta BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) -func (c clientHooks) OnWrite(meta kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) { +func (c *clientHooks) OnWrite(meta kgo.BrokerMetadata, key int16, bytesWritten int, writeWait, timeToWrite time.Duration, err error) { keyName := kmsg.NameForKey(key) if keyName != "OffsetCommit" { return } - c.logger.Info("hooks onWrite", - zap.Duration("timeToWrite", timeToWrite), - zap.NamedError("err", err)) - - offsetCommitStarted = time.Now() + // c.logger.Info("hooks onWrite", + // zap.Duration("timeToWrite", timeToWrite), + // zap.NamedError("err", err)) } -var ( - offsetCommitStarted time.Time -) - // OnRead is passed the broker metadata, the key for the response that // was read, the number of bytes read, how long the Client waited // before reading the response, how long it took to read the response, @@ -72,18 +70,15 @@ var ( // The bytes written does not count any tls overhead. // OnRead is called after a read from a broker. // OnRead(meta BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) -func (c clientHooks) OnRead(meta kgo.BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) { +func (c *clientHooks) OnRead(meta kgo.BrokerMetadata, key int16, bytesRead int, readWait, timeToRead time.Duration, err error) { keyName := kmsg.NameForKey(key) if keyName != "OffsetCommit" { return } - dur := time.Since(offsetCommitStarted) - - c.logger.Info("hooks onRead", - zap.Int64("timeToReadMs", timeToRead.Milliseconds()), - zap.Int64("totalTime", dur.Milliseconds()), - zap.NamedError("err", err)) - + if err == nil { + c.currentCoordinator.Store(meta) + c.lastCoordinatorUpdate = time.Now() + } } diff --git a/e2e/consumer.go b/e2e/consumer.go index 02cae47..9e599c1 100644 --- a/e2e/consumer.go +++ b/e2e/consumer.go @@ -3,7 +3,6 @@ package e2e import ( "context" "encoding/json" - "sync/atomic" "time" "github.com/twmb/franz-go/pkg/kerr" @@ -31,49 +30,6 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error { client.AssignGroup(s.groupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit()) s.logger.Info("Starting to consume end-to-end", zap.String("topicName", topicName), zap.String("groupId", s.groupId)) - // Keep checking for the coordinator - var currentCoordinator atomic.Value - currentCoordinator.Store(kgo.BrokerMetadata{}) - - go func() { - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - - case <-ticker.C: - describeReq := kmsg.NewDescribeGroupsRequest() - describeReq.Groups = []string{s.groupId} - describeReq.IncludeAuthorizedOperations = false - - shards := client.RequestSharded(ctx, &describeReq) - for _, shard := range shards { - // since we're only interested in the coordinator, we only check for broker errors on the response that contains our group - response, ok := shard.Resp.(*kmsg.DescribeGroupsResponse) - if !ok { - s.logger.Warn("cannot cast shard response to DescribeGroupsResponse") - continue - } - if len(response.Groups) == 0 { - s.logger.Warn("DescribeGroupsResponse contained no groups") - continue - } - group := response.Groups[0] - groupErr := kerr.ErrorForCode(group.ErrorCode) - if groupErr != nil { - s.logger.Error("couldn't describe end-to-end consumer group, error in group", zap.Error(groupErr), zap.Any("broker", shard.Meta)) - continue - } - - currentCoordinator.Store(shard.Meta) - break - } - } - } - }() - for { select { case <-ctx.Done(): @@ -109,16 +65,11 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error { // Commit offsets for processed messages // todo: the normal way to commit offsets with franz-go is pretty good, but in our special case // we want to do it manually, seperately for each partition, so we can track how long it took - - // todo: use findGroupCoordinatorID - // maybe ask travis about return value, we want to know what coordinator the offsets was committed to - // kminion probably already exposed coordinator for every group - if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil { startCommitTimestamp := timeNowMs() - client.CommitOffsets(ctx, uncommittedOffset, func(_ *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) { + client.CommitOffsets(ctx, uncommittedOffset, func(req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) { // got commit response latencyMs := timeNowMs() - startCommitTimestamp @@ -129,16 +80,20 @@ func (s *Service) ConsumeFromManagementTopic(ctx context.Context) error { return } - // todo: check each partitions error code + for _, t := range r.Topics { + for _, p := range t.Partitions { + err := kerr.ErrorForCode(p.ErrorCode) + if err != nil { + s.logger.Error("error committing partition offset", zap.String("topic", t.Topic), zap.Int32("partitionId", p.Partition), zap.Error(err)) + } + } + } - // only report commit latency if the coordinator is known - coordinator := currentCoordinator.Load().(kgo.BrokerMetadata) - if len(coordinator.Host) > 0 { + // only report commit latency if the coordinator wasn't set too long ago + if time.Since(s.clientHooks.lastCoordinatorUpdate) < 10*time.Second { + coordinator := s.clientHooks.currentCoordinator.Load().(kgo.BrokerMetadata) s.onOffsetCommit(coordinator.NodeID, commitLatency) - } else { - s.logger.Warn("won't report commit latency since broker coordinator is still unknown", zap.Int64("latencyMilliseconds", commitLatency.Milliseconds())) } - }) }