Skip to content

Commit

Permalink
multiple changes:
Browse files Browse the repository at this point in the history
- remove rebalancingProtocol config, since there will never be any rebalancing in kminions consumer groups (since every kminion instance has exactly one, unique, consumer group that it uses)
- change type of 'requiredAcks' from int to string (now takes 'all' or 'leader')
- lots of functions have been refactored to use "early return" and/or split into logical steps so there aren't any huge functions
- 'createEndToEndRecord' doesn't return an error anymore and instead panics. if serialization fails, there must be an issue that prevents kminion from running anyway
- actually set 'ProduceRequestTimeout' to 'ackSla' config property
- topic.go: complete rewrite: properly validate partition count, partition assignments, replication factor; considering potentially changing broker/partition count and replicationFactor; also now error checking for all potential errors in kafka responses (including nested errors, that are for example local to only one partition)
  • Loading branch information
rikimaru0345 committed May 21, 2021
1 parent 192bf30 commit 390cf6f
Show file tree
Hide file tree
Showing 9 changed files with 357 additions and 306 deletions.
17 changes: 1 addition & 16 deletions e2e/config_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,36 +5,21 @@ import (
"time"
)

const (
RoundRobin string = "roundRobin"
Range string = "range"
Sticky string = "sticky"
CooperativeSticky string = "cooperativeSticky"
)

type EndToEndConsumerConfig struct {
GroupIdPrefix string `koanf:"groupIdPrefix"`
RebalancingProtocol string `koanf:"rebalancingProtocol"`
GroupIdPrefix string `koanf:"groupIdPrefix"`

RoundtripSla time.Duration `koanf:"roundtripSla"`
CommitSla time.Duration `koanf:"commitSla"`
}

func (c *EndToEndConsumerConfig) SetDefaults() {
c.GroupIdPrefix = "kminion-end-to-end"
c.RebalancingProtocol = "cooperativeSticky"
c.RoundtripSla = 20 * time.Second
c.CommitSla = 10 * time.Second // no idea what to use as a good default value
}

func (c *EndToEndConsumerConfig) Validate() error {

switch c.RebalancingProtocol {
case RoundRobin, Range, Sticky, CooperativeSticky:
default:
return fmt.Errorf("given RebalancingProtocol '%v' is invalid", c.RebalancingProtocol)
}

if c.RoundtripSla <= 0 {
return fmt.Errorf("consumer.roundtripSla must be greater than zero")
}
Expand Down
13 changes: 6 additions & 7 deletions e2e/config_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@ import (

type EndToEndProducerConfig struct {
AckSla time.Duration `koanf:"ackSla"`
RequiredAcks int `koanf:"requiredAcks"`
RequiredAcks string `koanf:"requiredAcks"`
}

func (c *EndToEndProducerConfig) SetDefaults() {
c.AckSla = 5 * time.Second
c.RequiredAcks = -1
c.RequiredAcks = "all"
}

func (c *EndToEndProducerConfig) Validate() error {

if c.AckSla <= 0 {
return fmt.Errorf("producer.ackSla must be greater than zero")
if c.RequiredAcks != "all" && c.RequiredAcks != "leader" {
return fmt.Errorf("producer.requiredAcks must be 'all' or 'leader")
}

// all(-1) or leader(1)
if c.RequiredAcks != -1 && c.RequiredAcks != 1 {
return fmt.Errorf("producer.requiredAcks must be 1 or -1")
if c.AckSla <= 0 {
return fmt.Errorf("producer.ackSla must be greater than zero")
}

return nil
Expand Down
64 changes: 22 additions & 42 deletions e2e/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"time"

"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"go.uber.org/zap"
Expand All @@ -15,19 +14,14 @@ func (s *Service) startConsumeMessages(ctx context.Context) {
client := s.client
topicName := s.config.TopicManagement.Name
topic := kgo.ConsumeTopics(kgo.NewOffset().AtEnd(), topicName)
balancer := kgo.Balancers(kgo.CooperativeStickyBalancer()) // Default GroupBalancer
switch s.config.Consumer.RebalancingProtocol {
case RoundRobin:
balancer = kgo.Balancers(kgo.RoundRobinBalancer())
case Range:
balancer = kgo.Balancers(kgo.RangeBalancer())
case Sticky:
balancer = kgo.Balancers(kgo.StickyBalancer())
}
client.AssignPartitions(topic)

// Create our own consumer group
client.AssignGroup(s.groupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit())
client.AssignGroup(s.groupId,
kgo.GroupTopics(topicName),
kgo.Balancers(kgo.CooperativeStickyBalancer()),
kgo.DisableAutoCommit(),
)
s.logger.Info("Starting to consume end-to-end", zap.String("topicName", topicName), zap.String("groupId", s.groupId))

for {
Expand Down Expand Up @@ -59,40 +53,26 @@ func (s *Service) startConsumeMessages(ctx context.Context) {

func (s *Service) commitOffsets(ctx context.Context) {
client := s.client
uncommittedOffset := client.UncommittedOffsets()
if uncommittedOffset == nil {
return
}

//
// Commit offsets for processed messages
// todo: the normal way to commit offsets with franz-go is pretty good, but in our special case
// we want to do it manually, seperately for each partition, so we can track how long it took
if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil {

startCommitTimestamp := time.Now()

client.CommitOffsets(ctx, uncommittedOffset, func(req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) {
// got commit response
latency := time.Since(startCommitTimestamp)

if err != nil {
s.logger.Error("offset commit failed", zap.Error(err), zap.Int64("latencyMilliseconds", latency.Milliseconds()))
return
}
startCommitTimestamp := time.Now()
client.CommitOffsets(ctx, uncommittedOffset, func(req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) {
// Got commit response
latency := time.Since(startCommitTimestamp)

for _, t := range r.Topics {
for _, p := range t.Partitions {
err := kerr.ErrorForCode(p.ErrorCode)
if err != nil {
s.logger.Error("error committing partition offset", zap.String("topic", t.Topic), zap.Int32("partitionId", p.Partition), zap.Error(err))
}
}
}
if s.logCommitErrors(r, err) > 0 {
return
}

// only report commit latency if the coordinator wasn't set too long ago
if time.Since(s.clientHooks.lastCoordinatorUpdate) < 10*time.Second {
coordinator := s.clientHooks.currentCoordinator.Load().(kgo.BrokerMetadata)
s.onOffsetCommit(coordinator.NodeID, latency)
}
})
}
// only report commit latency if the coordinator wasn't set too long ago
if time.Since(s.clientHooks.lastCoordinatorUpdate) < 10*time.Second {
coordinator := s.clientHooks.currentCoordinator.Load().(kgo.BrokerMetadata)
s.onOffsetCommit(coordinator.NodeID, latency)
}
})
}

// processMessage:
Expand Down
2 changes: 1 addition & 1 deletion e2e/group_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups() error {
exists, _ := containsStr(matchingGroups, name)
if exists {
// still there, check age and maybe delete it
age := time.Now().Sub(firstSeen)
age := time.Since(firstSeen)
if age > oldGroupMaxAge {
// group was unused for too long, delete it
groupsToDelete = append(groupsToDelete, name)
Expand Down
2 changes: 0 additions & 2 deletions e2e/message_tracker.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package e2e

import (
"context"
"time"

goCache "github.com/patrickmn/go-cache"
Expand All @@ -28,7 +27,6 @@ import (
type messageTracker struct {
svc *Service
logger *zap.Logger
ctx context.Context
cache *goCache.Cache
}

Expand Down
21 changes: 8 additions & 13 deletions e2e/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@ type EndToEndMessage struct {
MessageID string `json:"messageID"` // unique for each message
Timestamp int64 `json:"createdUtcNs"` // when the message was created, unix nanoseconds

partition int
partition int // used in message tracker
hasArrived bool // used in tracker
}

func (m *EndToEndMessage) creationTime() time.Time {
return time.Unix(0, m.Timestamp)
}

// Goes through each partition and sends a EndToEndMessage to it

// Sends a EndToEndMessage to every partition
func (s *Service) produceLatencyMessages(ctx context.Context) {

for i := 0; i < s.partitionCount; i++ {
Expand All @@ -42,11 +41,7 @@ func (s *Service) produceLatencyMessages(ctx context.Context) {
func (s *Service) produceSingleMessage(ctx context.Context, partition int) error {

topicName := s.config.TopicManagement.Name

record, msg, err := createEndToEndRecord(s.minionID, topicName, partition)
if err != nil {
return err
}
record, msg := createEndToEndRecord(s.minionID, topicName, partition)

for {
select {
Expand Down Expand Up @@ -83,21 +78,21 @@ func (s *Service) produceSingleMessage(ctx context.Context, partition int) error

}

func createEndToEndRecord(minionID string, topicName string, partition int) (*kgo.Record, *EndToEndMessage, error) {
func createEndToEndRecord(minionID string, topicName string, partition int) (*kgo.Record, *EndToEndMessage) {

message := &EndToEndMessage{
MinionID: minionID,
MessageID: uuid.NewString(),
Timestamp: time.Now().UnixNano(),

partition: partition,
// todo: maybe indicate what broker was the leader for that partition at the time of sending,
// so that when receiving the message again, we could
}

mjson, err := json.Marshal(message)
if err != nil {
return nil, nil, err
// Should never happen since the struct is so simple,
// but if it does, something is completely broken anyway
panic("cannot serialize EndToEndMessage")
}

record := &kgo.Record{
Expand All @@ -106,5 +101,5 @@ func createEndToEndRecord(minionID string, topicName string, partition int) (*kg
Partition: int32(partition), // we set partition for producing so our customPartitioner can make use of it
}

return record, message, nil
return record, message
}
72 changes: 56 additions & 16 deletions e2e/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ type Service struct {
partitioner *customPartitioner // takes care of sending our end-to-end messages to the right partition
partitionCount int // number of partitions of our test topic, used to send messages to all partitions

// todo: tracker for in-flight messages
// lastRoundtripTimestamp float64 // creation time (in utc ms) of the message that most recently passed the roundtripSla check

// Metrics
endToEndMessagesProduced prometheus.Counter
endToEndMessagesAcked prometheus.Counter
Expand Down Expand Up @@ -108,15 +105,17 @@ func createKafkaClient(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service,

// Add RequiredAcks, as options can't be altered later
kgoOpts := []kgo.Opt{}
if cfg.Enabled {
ack := kgo.AllISRAcks()
if cfg.Producer.RequiredAcks == 1 {
ack = kgo.LeaderAck()
kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite())
}
kgoOpts = append(kgoOpts, kgo.RequiredAcks(ack))

if cfg.Producer.RequiredAcks == "all" {
kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.AllISRAcks()))
} else {
kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.LeaderAck()))
kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite())
}

// produce request timeout
kgoOpts = append(kgoOpts, kgo.ProduceRequestTimeout(cfg.Producer.AckSla))

// Prepare hooks
e2eHooks := newEndToEndClientHooks(logger)
kgoOpts = append(kgoOpts, kgo.WithHooks(e2eHooks))
Expand All @@ -136,11 +135,12 @@ func createKafkaClient(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service,
// Start starts the service (wow)
func (s *Service) Start(ctx context.Context) error {

// Ensure topic exists and is configured correctly
if err := s.validateManagementTopic(ctx); err != nil {
return fmt.Errorf("could not validate end-to-end topic: %w", err)
}

// after ensuring the topic exists and is configured correctly, we inform our custom partitioner about the partitions
// Get up-to-date metadata and inform our custom partitioner about the partition count
topicMetadata, err := s.getTopicMetadata(ctx)
if err != nil {
return fmt.Errorf("could not get topic metadata after validation: %w", err)
Expand All @@ -155,6 +155,51 @@ func (s *Service) Start(ctx context.Context) error {
return nil
}

func (s *Service) initEndToEnd(ctx context.Context) {

validateTopicTicker := time.NewTicker(s.config.TopicManagement.ReconciliationInterval)
produceTicker := time.NewTicker(s.config.ProbeInterval)
commitTicker := time.NewTicker(5 * time.Second)
// stop tickers when context is cancelled
go func() {
<-ctx.Done()
produceTicker.Stop()
validateTopicTicker.Stop()
commitTicker.Stop()
}()

// keep checking end-to-end topic
go func() {
for range validateTopicTicker.C {
err := s.validateManagementTopic(ctx)
if err != nil {
s.logger.Error("failed to validate end-to-end topic", zap.Error(err))
}
}
}()

// keep track of groups, delete old unused groups
go s.groupTracker.start()

// start consuming topic
go s.startConsumeMessages(ctx)

// start comitting offsets
go func() {
for range commitTicker.C {
s.commitOffsets(ctx)
}
}()

// start producing to topic
go func() {
for range produceTicker.C {
s.produceLatencyMessages(ctx)
}
}()

}

// called from e2e when a message is acknowledged
func (s *Service) onAck(partitionId int32, duration time.Duration) {
s.endToEndMessagesAcked.Inc()
Expand All @@ -167,11 +212,6 @@ func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) {
return // message is too old
}

// todo: track "lastRoundtripMessage"
// if msg.Timestamp < s.lastRoundtripTimestamp {
// return // msg older than what we recently processed (out of order, should never happen)
// }

s.endToEndMessagesReceived.Inc()
s.endToEndRoundtripLatency.WithLabelValues(fmt.Sprintf("%v", partitionId)).Observe(duration.Seconds())
}
Expand Down
Loading

0 comments on commit 390cf6f

Please sign in to comment.