From 390cf6fe3495e5e00ec2f97dde53e5e99c82d5f2 Mon Sep 17 00:00:00 2001 From: rikimaru0345 Date: Fri, 21 May 2021 15:39:31 +0200 Subject: [PATCH] multiple changes: - remove rebalancingProtocol config, since there will never be any rebalancing in kminions consumer groups (since every kminion instance has exactly one, unique, consumer group that it uses) - change type of 'requiredAcks' from int to string (now takes 'all' or 'leader') - lots of functions have been refactored to use "early return" and/or split into logical steps so there aren't any huge functions - 'createEndToEndRecord' doesn't return an error anymore and instead panics. if serialization fails, there must be an issue that prevents kminion from running anyway - actually set 'ProduceRequestTimeout' to 'ackSla' config property - topic.go: complete rewrite: properly validate partition count, partition assignments, replication factor; considering potentially changing broker/partition count and replicationFactor; also now error checking for all potential errors in kafka responses (including nested errors, that are for example local to only one partition) --- e2e/config_consumer.go | 17 +- e2e/config_producer.go | 13 +- e2e/consumer.go | 64 ++---- e2e/group_tracker.go | 2 +- e2e/message_tracker.go | 2 - e2e/producer.go | 21 +- e2e/service.go | 72 +++++-- e2e/topic.go | 444 ++++++++++++++++++++++------------------- e2e/utils.go | 28 +++ 9 files changed, 357 insertions(+), 306 deletions(-) diff --git a/e2e/config_consumer.go b/e2e/config_consumer.go index 8868c40..960577a 100644 --- a/e2e/config_consumer.go +++ b/e2e/config_consumer.go @@ -5,16 +5,8 @@ import ( "time" ) -const ( - RoundRobin string = "roundRobin" - Range string = "range" - Sticky string = "sticky" - CooperativeSticky string = "cooperativeSticky" -) - type EndToEndConsumerConfig struct { - GroupIdPrefix string `koanf:"groupIdPrefix"` - RebalancingProtocol string `koanf:"rebalancingProtocol"` + GroupIdPrefix string `koanf:"groupIdPrefix"` RoundtripSla time.Duration `koanf:"roundtripSla"` CommitSla time.Duration `koanf:"commitSla"` @@ -22,19 +14,12 @@ type EndToEndConsumerConfig struct { func (c *EndToEndConsumerConfig) SetDefaults() { c.GroupIdPrefix = "kminion-end-to-end" - c.RebalancingProtocol = "cooperativeSticky" c.RoundtripSla = 20 * time.Second c.CommitSla = 10 * time.Second // no idea what to use as a good default value } func (c *EndToEndConsumerConfig) Validate() error { - switch c.RebalancingProtocol { - case RoundRobin, Range, Sticky, CooperativeSticky: - default: - return fmt.Errorf("given RebalancingProtocol '%v' is invalid", c.RebalancingProtocol) - } - if c.RoundtripSla <= 0 { return fmt.Errorf("consumer.roundtripSla must be greater than zero") } diff --git a/e2e/config_producer.go b/e2e/config_producer.go index 878e4e8..c29796f 100644 --- a/e2e/config_producer.go +++ b/e2e/config_producer.go @@ -7,23 +7,22 @@ import ( type EndToEndProducerConfig struct { AckSla time.Duration `koanf:"ackSla"` - RequiredAcks int `koanf:"requiredAcks"` + RequiredAcks string `koanf:"requiredAcks"` } func (c *EndToEndProducerConfig) SetDefaults() { c.AckSla = 5 * time.Second - c.RequiredAcks = -1 + c.RequiredAcks = "all" } func (c *EndToEndProducerConfig) Validate() error { - if c.AckSla <= 0 { - return fmt.Errorf("producer.ackSla must be greater than zero") + if c.RequiredAcks != "all" && c.RequiredAcks != "leader" { + return fmt.Errorf("producer.requiredAcks must be 'all' or 'leader") } - // all(-1) or leader(1) - if c.RequiredAcks != -1 && c.RequiredAcks != 1 { - return fmt.Errorf("producer.requiredAcks must be 1 or -1") + if c.AckSla <= 0 { + return fmt.Errorf("producer.ackSla must be greater than zero") } return nil diff --git a/e2e/consumer.go b/e2e/consumer.go index 4c8da11..1c04c64 100644 --- a/e2e/consumer.go +++ b/e2e/consumer.go @@ -5,7 +5,6 @@ import ( "encoding/json" "time" - "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" "go.uber.org/zap" @@ -15,19 +14,14 @@ func (s *Service) startConsumeMessages(ctx context.Context) { client := s.client topicName := s.config.TopicManagement.Name topic := kgo.ConsumeTopics(kgo.NewOffset().AtEnd(), topicName) - balancer := kgo.Balancers(kgo.CooperativeStickyBalancer()) // Default GroupBalancer - switch s.config.Consumer.RebalancingProtocol { - case RoundRobin: - balancer = kgo.Balancers(kgo.RoundRobinBalancer()) - case Range: - balancer = kgo.Balancers(kgo.RangeBalancer()) - case Sticky: - balancer = kgo.Balancers(kgo.StickyBalancer()) - } client.AssignPartitions(topic) // Create our own consumer group - client.AssignGroup(s.groupId, kgo.GroupTopics(topicName), balancer, kgo.DisableAutoCommit()) + client.AssignGroup(s.groupId, + kgo.GroupTopics(topicName), + kgo.Balancers(kgo.CooperativeStickyBalancer()), + kgo.DisableAutoCommit(), + ) s.logger.Info("Starting to consume end-to-end", zap.String("topicName", topicName), zap.String("groupId", s.groupId)) for { @@ -59,40 +53,26 @@ func (s *Service) startConsumeMessages(ctx context.Context) { func (s *Service) commitOffsets(ctx context.Context) { client := s.client + uncommittedOffset := client.UncommittedOffsets() + if uncommittedOffset == nil { + return + } - // - // Commit offsets for processed messages - // todo: the normal way to commit offsets with franz-go is pretty good, but in our special case - // we want to do it manually, seperately for each partition, so we can track how long it took - if uncommittedOffset := client.UncommittedOffsets(); uncommittedOffset != nil { - - startCommitTimestamp := time.Now() - - client.CommitOffsets(ctx, uncommittedOffset, func(req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) { - // got commit response - latency := time.Since(startCommitTimestamp) - - if err != nil { - s.logger.Error("offset commit failed", zap.Error(err), zap.Int64("latencyMilliseconds", latency.Milliseconds())) - return - } + startCommitTimestamp := time.Now() + client.CommitOffsets(ctx, uncommittedOffset, func(req *kmsg.OffsetCommitRequest, r *kmsg.OffsetCommitResponse, err error) { + // Got commit response + latency := time.Since(startCommitTimestamp) - for _, t := range r.Topics { - for _, p := range t.Partitions { - err := kerr.ErrorForCode(p.ErrorCode) - if err != nil { - s.logger.Error("error committing partition offset", zap.String("topic", t.Topic), zap.Int32("partitionId", p.Partition), zap.Error(err)) - } - } - } + if s.logCommitErrors(r, err) > 0 { + return + } - // only report commit latency if the coordinator wasn't set too long ago - if time.Since(s.clientHooks.lastCoordinatorUpdate) < 10*time.Second { - coordinator := s.clientHooks.currentCoordinator.Load().(kgo.BrokerMetadata) - s.onOffsetCommit(coordinator.NodeID, latency) - } - }) - } + // only report commit latency if the coordinator wasn't set too long ago + if time.Since(s.clientHooks.lastCoordinatorUpdate) < 10*time.Second { + coordinator := s.clientHooks.currentCoordinator.Load().(kgo.BrokerMetadata) + s.onOffsetCommit(coordinator.NodeID, latency) + } + }) } // processMessage: diff --git a/e2e/group_tracker.go b/e2e/group_tracker.go index 7384ba0..83d59d4 100644 --- a/e2e/group_tracker.go +++ b/e2e/group_tracker.go @@ -134,7 +134,7 @@ func (g *groupTracker) checkAndDeleteOldConsumerGroups() error { exists, _ := containsStr(matchingGroups, name) if exists { // still there, check age and maybe delete it - age := time.Now().Sub(firstSeen) + age := time.Since(firstSeen) if age > oldGroupMaxAge { // group was unused for too long, delete it groupsToDelete = append(groupsToDelete, name) diff --git a/e2e/message_tracker.go b/e2e/message_tracker.go index 9a98f0c..c143e69 100644 --- a/e2e/message_tracker.go +++ b/e2e/message_tracker.go @@ -1,7 +1,6 @@ package e2e import ( - "context" "time" goCache "github.com/patrickmn/go-cache" @@ -28,7 +27,6 @@ import ( type messageTracker struct { svc *Service logger *zap.Logger - ctx context.Context cache *goCache.Cache } diff --git a/e2e/producer.go b/e2e/producer.go index 52c42f8..b74ecb8 100644 --- a/e2e/producer.go +++ b/e2e/producer.go @@ -15,7 +15,7 @@ type EndToEndMessage struct { MessageID string `json:"messageID"` // unique for each message Timestamp int64 `json:"createdUtcNs"` // when the message was created, unix nanoseconds - partition int + partition int // used in message tracker hasArrived bool // used in tracker } @@ -23,8 +23,7 @@ func (m *EndToEndMessage) creationTime() time.Time { return time.Unix(0, m.Timestamp) } -// Goes through each partition and sends a EndToEndMessage to it - +// Sends a EndToEndMessage to every partition func (s *Service) produceLatencyMessages(ctx context.Context) { for i := 0; i < s.partitionCount; i++ { @@ -42,11 +41,7 @@ func (s *Service) produceLatencyMessages(ctx context.Context) { func (s *Service) produceSingleMessage(ctx context.Context, partition int) error { topicName := s.config.TopicManagement.Name - - record, msg, err := createEndToEndRecord(s.minionID, topicName, partition) - if err != nil { - return err - } + record, msg := createEndToEndRecord(s.minionID, topicName, partition) for { select { @@ -83,7 +78,7 @@ func (s *Service) produceSingleMessage(ctx context.Context, partition int) error } -func createEndToEndRecord(minionID string, topicName string, partition int) (*kgo.Record, *EndToEndMessage, error) { +func createEndToEndRecord(minionID string, topicName string, partition int) (*kgo.Record, *EndToEndMessage) { message := &EndToEndMessage{ MinionID: minionID, @@ -91,13 +86,13 @@ func createEndToEndRecord(minionID string, topicName string, partition int) (*kg Timestamp: time.Now().UnixNano(), partition: partition, - // todo: maybe indicate what broker was the leader for that partition at the time of sending, - // so that when receiving the message again, we could } mjson, err := json.Marshal(message) if err != nil { - return nil, nil, err + // Should never happen since the struct is so simple, + // but if it does, something is completely broken anyway + panic("cannot serialize EndToEndMessage") } record := &kgo.Record{ @@ -106,5 +101,5 @@ func createEndToEndRecord(minionID string, topicName string, partition int) (*kg Partition: int32(partition), // we set partition for producing so our customPartitioner can make use of it } - return record, message, nil + return record, message } diff --git a/e2e/service.go b/e2e/service.go index 72b3bad..133b644 100644 --- a/e2e/service.go +++ b/e2e/service.go @@ -30,9 +30,6 @@ type Service struct { partitioner *customPartitioner // takes care of sending our end-to-end messages to the right partition partitionCount int // number of partitions of our test topic, used to send messages to all partitions - // todo: tracker for in-flight messages - // lastRoundtripTimestamp float64 // creation time (in utc ms) of the message that most recently passed the roundtripSla check - // Metrics endToEndMessagesProduced prometheus.Counter endToEndMessagesAcked prometheus.Counter @@ -108,15 +105,17 @@ func createKafkaClient(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, // Add RequiredAcks, as options can't be altered later kgoOpts := []kgo.Opt{} - if cfg.Enabled { - ack := kgo.AllISRAcks() - if cfg.Producer.RequiredAcks == 1 { - ack = kgo.LeaderAck() - kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite()) - } - kgoOpts = append(kgoOpts, kgo.RequiredAcks(ack)) + + if cfg.Producer.RequiredAcks == "all" { + kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.AllISRAcks())) + } else { + kgoOpts = append(kgoOpts, kgo.RequiredAcks(kgo.LeaderAck())) + kgoOpts = append(kgoOpts, kgo.DisableIdempotentWrite()) } + // produce request timeout + kgoOpts = append(kgoOpts, kgo.ProduceRequestTimeout(cfg.Producer.AckSla)) + // Prepare hooks e2eHooks := newEndToEndClientHooks(logger) kgoOpts = append(kgoOpts, kgo.WithHooks(e2eHooks)) @@ -136,11 +135,12 @@ func createKafkaClient(cfg Config, logger *zap.Logger, kafkaSvc *kafka.Service, // Start starts the service (wow) func (s *Service) Start(ctx context.Context) error { + // Ensure topic exists and is configured correctly if err := s.validateManagementTopic(ctx); err != nil { return fmt.Errorf("could not validate end-to-end topic: %w", err) } - // after ensuring the topic exists and is configured correctly, we inform our custom partitioner about the partitions + // Get up-to-date metadata and inform our custom partitioner about the partition count topicMetadata, err := s.getTopicMetadata(ctx) if err != nil { return fmt.Errorf("could not get topic metadata after validation: %w", err) @@ -155,6 +155,51 @@ func (s *Service) Start(ctx context.Context) error { return nil } +func (s *Service) initEndToEnd(ctx context.Context) { + + validateTopicTicker := time.NewTicker(s.config.TopicManagement.ReconciliationInterval) + produceTicker := time.NewTicker(s.config.ProbeInterval) + commitTicker := time.NewTicker(5 * time.Second) + // stop tickers when context is cancelled + go func() { + <-ctx.Done() + produceTicker.Stop() + validateTopicTicker.Stop() + commitTicker.Stop() + }() + + // keep checking end-to-end topic + go func() { + for range validateTopicTicker.C { + err := s.validateManagementTopic(ctx) + if err != nil { + s.logger.Error("failed to validate end-to-end topic", zap.Error(err)) + } + } + }() + + // keep track of groups, delete old unused groups + go s.groupTracker.start() + + // start consuming topic + go s.startConsumeMessages(ctx) + + // start comitting offsets + go func() { + for range commitTicker.C { + s.commitOffsets(ctx) + } + }() + + // start producing to topic + go func() { + for range produceTicker.C { + s.produceLatencyMessages(ctx) + } + }() + +} + // called from e2e when a message is acknowledged func (s *Service) onAck(partitionId int32, duration time.Duration) { s.endToEndMessagesAcked.Inc() @@ -167,11 +212,6 @@ func (s *Service) onRoundtrip(partitionId int32, duration time.Duration) { return // message is too old } - // todo: track "lastRoundtripMessage" - // if msg.Timestamp < s.lastRoundtripTimestamp { - // return // msg older than what we recently processed (out of order, should never happen) - // } - s.endToEndMessagesReceived.Inc() s.endToEndRoundtripLatency.WithLabelValues(fmt.Sprintf("%v", partitionId)).Observe(duration.Seconds()) } diff --git a/e2e/topic.go b/e2e/topic.go index 168586e..a3aab5c 100644 --- a/e2e/topic.go +++ b/e2e/topic.go @@ -5,229 +5,265 @@ import ( "fmt" "time" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kmsg" "go.uber.org/zap" ) +// Check our end-to-end test topic +// - does it exist? +// - is it configured correctly? +// - does it have enough partitions? +// - is the replicationFactor correct? +// - are assignments good? +// - is each broker leading at least one partition? +// - are replicas distributed correctly? func (s *Service) validateManagementTopic(ctx context.Context) error { + s.logger.Debug("validating end-to-end topic...") - s.logger.Info("validating end-to-end topic...") - - // expectedReplicationFactor := s.config.TopicManagement.ReplicationFactor - expectedPartitionsPerBroker := s.config.TopicManagement.PartitionsPerBroker - - // Check how many brokers we have - allMeta, err := s.getAllTopicsMetadata(ctx) + meta, err := s.getTopicMetadata(ctx) if err != nil { - return fmt.Errorf("validateManagementTopic cannot get metadata of all brokers/topics: %w", err) + return fmt.Errorf("validateManagementTopic cannot get metadata of e2e topic: %w", err) } - expectedPartitions := expectedPartitionsPerBroker * len(allMeta.Brokers) - topicMeta, err := s.getTopicMetadata(ctx) - if err != nil { + // Create topic if it doesn't exist + if len(meta.Topics) == 0 { + if err = s.createManagementTopic(ctx, meta); err != nil { + return err + } + } + + // Ensure the topic has enough partitions + if err = s.ensureEnoughPartitions(ctx, meta); err != nil { return err } - // If metadata is not reachable, then there is a problem in connecting to broker or lack of Authorization - // TopicMetadataArray could be empty, therefore needs to do this check beforehand - - // todo: check if len(meta.Topics) != 1 - // todo: assign topic := meta.Topics[0] - // if len(meta.Topics) != 1 { - // return fmt.Errorf("topic metadata request returned != 1 topics, please make sure the brokers are up and/or you have right to access them. got %v topics but expected 1", len(topicMetadataArray)) - // } - - // Create the management end to end topic if it does not exist - topicExists := topicMeta.Topics[0].Partitions != nil - if !topicExists { - s.logger.Warn("end-to-end testing topic does not exist, will create it...") - err = s.createManagementTopic(ctx, topicMeta) - if err != nil { - return err - } - return nil + // Validate assignments + if err = s.validatePartitionAssignments(ctx, meta); err != nil { + return err } - // If the number of broker is less than expected Replication Factor it means the cluster brokers number is too small - // topicMetadata.Brokers will return all the available brokers from the cluster - - // todo: - // isNumBrokerValid := len(topicMeta.Brokers) >= expectedReplicationFactor - // if !isNumBrokerValid { - // return fmt.Errorf("current cluster size differs from the expected size (based on config topicManagement.replicationFactor). expected broker: %v NumOfBroker: %v", len(topicMeta.Brokers), expectedReplicationFactor) - // } - - // Check the number of Partitions per broker, if it is too low create partition - // topicMetadata.Topics[0].Partitions is the number of PartitionsPerBroker - if len(topicMeta.Topics[0].Partitions) < expectedPartitions { - s.logger.Warn("e2e test topic does not have enough partitions, partitionCount is less than brokerCount * partitionsPerBroker. will add partitions to the topic...", - zap.Int("expectedPartitionCount", expectedPartitions), - zap.Int("actualPartitionCount", len(topicMeta.Topics[0].Partitions)), - zap.Int("brokerCount", len(allMeta.Brokers)), - zap.Int("config.partitionsPerBroker", s.config.TopicManagement.PartitionsPerBroker), - ) - // Create partition if the number partition is lower, can't delete partition - assignment := kmsg.NewCreatePartitionsRequestTopicAssignment() - assignment.Replicas = topicMeta.Topics[0].Partitions[0].Replicas + return nil +} - topic := kmsg.NewCreatePartitionsRequestTopic() - topic.Topic = s.config.TopicManagement.Name +func (s *Service) validatePartitionAssignments(ctx context.Context, meta *kmsg.MetadataResponse) error { + // We use a very simple strategy to distribute all partitions and its replicas to the brokers + // + // For example if we had: + // - 5 brokers + // - 5 partitions (partitionsPerBroker = 1) + // - replicationFactor of 5 + // then our assignments would look like the table below. + // Why does this example use 5 partitions? + // Because for end-to-end testing to make sense, we'd like to report the message roundtrip latency for each broker. + // That's why we ensure that we always have at least as many partitions as there are brokers, so every broker + // can be the leader of at least one partition. + // + // The numbers after each partition are the brokerIds (0, 1, 2, 3, 4) + // The first broker in an assignment array is the leader for that partition, + // the following broker ids are hosting the partitions' replicas. + // + // Partition 0: [0, 1, 2, 3, 4] + // Partition 1: [1, 2, 3, 4, 0] + // Partition 2: [2, 3, 4, 0, 1] + // Partition 3: [3, 4, 0, 1, 2] + // Partition 4: [4, 0, 1, 2, 3] + // + // In addition to being very simple, this also has the benefit that each partitionID neatly corresponds + // its leaders brokerID - at least most of the time. + // When a broker suddenly goes offline, or a new one is added to the cluster, etc the assignments + // might be off for a short period of time, but the assignments will be fixed automatically + // in the next, periodic, topic validation (configured by 'topicManagement.reconciliationInterval') + // - topic.Count = int32(expectedPartitionsPerBroker) // Should be greater than current partition number - topic.Assignment = []kmsg.CreatePartitionsRequestTopicAssignment{assignment} + topicName := s.config.TopicManagement.Name + topicMeta := meta.Topics[0] + realPartitionCount := len(topicMeta.Partitions) + realReplicationFactor := len(topicMeta.Partitions[0].Replicas) + + // 1. Calculate the expected assignments + allPartitionAssignments := make([][]int32, realPartitionCount) + for i := range allPartitionAssignments { + allPartitionAssignments[i] = make([]int32, realReplicationFactor) + } - create := kmsg.NewCreatePartitionsRequest() - create.Topics = []kmsg.CreatePartitionsRequestTopic{topic} - _, err := create.RequestWith(ctx, s.client) - if err != nil { - return fmt.Errorf("failed to add partitions to topic: %w", err) + // simple helper function that just keeps returning values from 0 to brokerCount forever. + brokerIterator := func(start int32) func() int32 { + brokerIndex := start + return func() int32 { + result := brokerIndex // save result we'll return now + brokerIndex = (brokerIndex + 1) % int32(len(meta.Brokers)) // prepare for next call: add one, and wrap around + return result // return current result } + } - // todo: why return? shouldn't we check for distinct leaders anyway?? - return nil + startBroker := brokerIterator(0) + for _, partitionAssignments := range allPartitionAssignments { + // determine the leader broker: first partition will get 0, next one will get 1, ... + start := startBroker() + // and create an iterator that goes over all brokers, starting at our leader + nextReplica := brokerIterator(start) + + for i := range partitionAssignments { + partitionAssignments[i] = nextReplica() + } } - // Check distinct Leader Nodes, if it is more than replicationFactor it means the partitions got assigned wrongly - // todo: rewrite this. its entirely possible that one broker leads multiple partitions (for example when a broker was temporarily offline). - // we only have to ensure that every available broker leads at least one of our partitions. - - distinctLeaderNodes := []int32{} - for _, partition := range topicMeta.Topics[0].Partitions { - if len(distinctLeaderNodes) == 0 { - distinctLeaderNodes = append(distinctLeaderNodes, partition.Leader) - } else { - // Only append on distinct - distinct := true - for _, leaderNode := range distinctLeaderNodes { - if partition.Leader == leaderNode { - distinct = false - } - } - if distinct { - distinctLeaderNodes = append(distinctLeaderNodes, partition.Leader) + // 2. Check + // Now that every partition knows which brokers are hosting its replicas, and which broker is the leader (i.e. is hosting the primary "replica"), + // we just have to check if the current/real assignments are equal to our desired assignments (and then apply them if not). + assignmentsAreEqual := true + for partitionId := 0; partitionId < realPartitionCount; partitionId++ { + expectedAssignments := allPartitionAssignments[partitionId] + actualAssignments := topicMeta.Partitions[partitionId].Replicas + + // Check if any replica is assigned to the wrong broker + for i := 0; i < realReplicationFactor; i++ { + expectedBroker := expectedAssignments[i] + actualBroker := actualAssignments[i] + if expectedBroker != actualBroker { + assignmentsAreEqual = false } } } - assignmentInvalid := len(distinctLeaderNodes) != s.config.TopicManagement.ReplicationFactor - // Reassign Partitions on invalid assignment - if assignmentInvalid { - - s.logger.Warn("e2e test topic partition assignments are invalid. not every broker leads at least one partition. will reassign partitions...", - zap.String("todo", "not yet implemented"), - ) + if assignmentsAreEqual { + // assignments are already exactly as they are supposed to be return nil + } - // Get the new AssignedReplicas by checking the ReplicationFactor config - assignedReplicas := make([]int32, s.config.TopicManagement.ReplicationFactor) - for index := range assignedReplicas { - assignedReplicas[index] = int32(index) - } + // 3. Apply + // Some partitions have their replicas hosted on the wrong brokers! + // Apply our desired replica configuration + partitionReassignments := make([]kmsg.AlterPartitionAssignmentsRequestTopicPartition, realPartitionCount) + for i := range partitionReassignments { + partitionReassignments[i] = kmsg.NewAlterPartitionAssignmentsRequestTopicPartition() + partitionReassignments[i].Partition = int32(i) + partitionReassignments[i].Replicas = allPartitionAssignments[i] + } - // Generate the partition assignments from PartitionPerBroker config - partitions := make([]int32, s.config.TopicManagement.PartitionsPerBroker) - reassignedPartitions := []kmsg.AlterPartitionAssignmentsRequestTopicPartition{} - for index := range partitions { - rp := kmsg.NewAlterPartitionAssignmentsRequestTopicPartition() - rp.Partition = int32(index) - rp.Replicas = assignedReplicas - reassignedPartitions = append(reassignedPartitions, rp) - } + topicReassignment := kmsg.NewAlterPartitionAssignmentsRequestTopic() + topicReassignment.Topic = topicName + topicReassignment.Partitions = partitionReassignments - managamentTopicReassignment := kmsg.NewAlterPartitionAssignmentsRequestTopic() - managamentTopicReassignment.Topic = s.config.TopicManagement.Name - managamentTopicReassignment.Partitions = reassignedPartitions + reassignRq := kmsg.NewAlterPartitionAssignmentsRequest() + reassignRq.Topics = []kmsg.AlterPartitionAssignmentsRequestTopic{topicReassignment} - reassignment := kmsg.NewAlterPartitionAssignmentsRequest() - reassignment.Topics = []kmsg.AlterPartitionAssignmentsRequestTopic{managamentTopicReassignment} + reassignRes, err := reassignRq.RequestWith(ctx, s.client) + if err != nil { + // error while sending + return fmt.Errorf("topic reassignment request failed: %w", err) + } + reassignErr := kerr.ErrorForCode(reassignRes.ErrorCode) + if reassignErr != nil || reassignRes.ErrorMessage != nil { + // global error + return fmt.Errorf("topic reassignment failed with ErrorMessage=\"%v\": %w", reassignRes.ErrorMessage, err) + } - _, err := reassignment.RequestWith(ctx, s.client) - if err != nil { - return fmt.Errorf("failed to do kmsg request on topic reassignment: %w", err) + // errors for individual partitions + for _, t := range reassignRes.Topics { + for _, p := range t.Partitions { + pErr := kerr.ErrorForCode(p.ErrorCode) + if pErr != nil || p.ErrorMessage != nil { + return fmt.Errorf("topic reassignment failed on partition %v: ErrorMessage \"%v\": %w", p.Partition, reassignRes.ErrorMessage, err) + } } - return nil } return nil } -func createTopicConfig(cfgTopic EndToEndTopicConfig) []kmsg.CreateTopicsRequestTopicConfig { +func (s *Service) ensureEnoughPartitions(ctx context.Context, meta *kmsg.MetadataResponse) error { + partitionsPerBroker := s.config.TopicManagement.PartitionsPerBroker + expectedPartitions := partitionsPerBroker * len(meta.Brokers) - topicConfig := func(name string, value interface{}) kmsg.CreateTopicsRequestTopicConfig { - prop := kmsg.NewCreateTopicsRequestTopicConfig() - prop.Name = name - valStr := fmt.Sprintf("%v", value) - prop.Value = &valStr - return prop + if len(meta.Topics[0].Partitions) >= expectedPartitions { + return nil // no need to add more } - minISR := 1 - if cfgTopic.ReplicationFactor >= 3 { - // Only with 3+ replicas does it make sense to require acks from 2 brokers - // todo: think about if we should change how 'producer.requiredAcks' works. - // we probably don't even need this configured on the topic directly... - minISR = 2 + partitionsToAdd := expectedPartitions - len(meta.Topics[0].Partitions) + s.logger.Warn("e2e test topic does not have enough partitions, partitionCount is less than brokerCount * partitionsPerBroker. will add partitions to the topic...", + zap.Int("expectedPartitionCount", expectedPartitions), + zap.Int("actualPartitionCount", len(meta.Topics[0].Partitions)), + zap.Int("brokerCount", len(meta.Brokers)), + zap.Int("config.partitionsPerBroker", s.config.TopicManagement.PartitionsPerBroker), + zap.Int("partitionsToAdd", partitionsToAdd), + ) + + topic := kmsg.NewCreatePartitionsRequestTopic() + topic.Topic = s.config.TopicManagement.Name + topic.Count = int32(expectedPartitions) + + // For each partition we're about to add, we need to define its replicas + for i := 0; i < partitionsToAdd; i++ { + assignment := kmsg.NewCreatePartitionsRequestTopicAssignment() + // In order to keep the code as simple as possible, just copy the assignments from the first partition. + // After the topic is created, there is another validation step that will take care of bad assignments! + assignment.Replicas = meta.Topics[0].Partitions[0].Replicas + topic.Assignment = append(topic.Assignment, assignment) } - // Even though kminion's end-to-end feature actually does not require any - // real persistence beyond a few minutes; it might be good too keep messages - // around a bit for debugging. - return []kmsg.CreateTopicsRequestTopicConfig{ - topicConfig("cleanup.policy", "delete"), - topicConfig("segment.ms", (time.Hour * 12).Milliseconds()), // new segment every 12h - topicConfig("retention.ms", (time.Hour * 24).Milliseconds()), // discard segments older than 24h - topicConfig("min.insync.replicas", minISR), + // Send request + create := kmsg.NewCreatePartitionsRequest() + create.Topics = []kmsg.CreatePartitionsRequestTopic{topic} + createPartitionsResponse, err := create.RequestWith(ctx, s.client) + + // Check for errors + if err != nil { + return fmt.Errorf("request to create more partitions for e2e topic failed: %w", err) + } + nestedErrors := 0 + for _, topicResponse := range createPartitionsResponse.Topics { + tErr := kerr.ErrorForCode(topicResponse.ErrorCode) + if tErr != nil || topicResponse.ErrorMessage != nil { + s.logger.Error("error in createPartitionsResponse", + zap.String("topic", topicResponse.Topic), + zap.Stringp("errorMessage", topicResponse.ErrorMessage), + zap.NamedError("topicError", tErr), + ) + nestedErrors++ + } + } + if nestedErrors > 0 { + return fmt.Errorf("request to add more partitions to e2e topic had some nested errors, see the %v log lines above", nestedErrors) } -} -func (s *Service) createManagementTopic(ctx context.Context, topicMetadata *kmsg.MetadataResponse) error { + return nil +} - s.logger.Info(fmt.Sprintf("creating topic %s for EndToEnd metrics", s.config.TopicManagement.Name)) +func (s *Service) createManagementTopic(ctx context.Context, allMeta *kmsg.MetadataResponse) error { + topicCfg := s.config.TopicManagement + brokerCount := len(allMeta.Brokers) + totalPartitions := brokerCount * topicCfg.PartitionsPerBroker - cfgTopic := s.config.TopicManagement - topicConfigs := createTopicConfig(cfgTopic) + s.logger.Info("e2e topic does not exist, creating it...", + zap.String("topicName", topicCfg.Name), + zap.Int("partitionsPerBroker", topicCfg.PartitionsPerBroker), + zap.Int("replicationFactor", topicCfg.ReplicationFactor), + zap.Int("brokerCount", brokerCount), + zap.Int("totalPartitions", totalPartitions), + ) topic := kmsg.NewCreateTopicsRequestTopic() - topic.Topic = cfgTopic.Name - topic.NumPartitions = int32(cfgTopic.PartitionsPerBroker) - topic.ReplicationFactor = int16(cfgTopic.ReplicationFactor) - topic.Configs = topicConfigs - - // Workaround for wrong assignment on 1 ReplicationFactor with automatic assignment on topic creation, this will create the assignment manually, automatic assignment works on more than 1 RepFactor - // Issue: Instead of putting the number of PartitionPerBroker in One Broker/Replica, the client will assign one partition on different Broker/Replica - // Example for 1 RepFactor and 2 PartitionPerBroker: Instead of 2 Partitions on 1 Broker, it will put 1 Partition each on 2 Brokers - if cfgTopic.ReplicationFactor == 1 { - brokerID := topicMetadata.Brokers[0].NodeID - var assignment []kmsg.CreateTopicsRequestTopicReplicaAssignment - partitions := make([]int32, cfgTopic.PartitionsPerBroker) - for index := range partitions { - replicaAssignment := kmsg.NewCreateTopicsRequestTopicReplicaAssignment() - replicaAssignment.Partition = int32(index) - replicaAssignment.Replicas = []int32{brokerID} - assignment = append(assignment, replicaAssignment) - } - topic.NumPartitions = -1 // Need to set this as -1 on Manual Assignment - topic.ReplicationFactor = -1 // Need to set this as -1 on Manual Assignment - topic.ReplicaAssignment = assignment - } + topic.Topic = topicCfg.Name + topic.NumPartitions = int32(totalPartitions) + topic.ReplicationFactor = int16(topicCfg.ReplicationFactor) + topic.Configs = createTopicConfig(topicCfg) req := kmsg.NewCreateTopicsRequest() req.Topics = []kmsg.CreateTopicsRequestTopic{topic} res, err := req.RequestWith(ctx, s.client) - // Sometimes it won't throw Error, but the Error will be abstracted to res.Topics[0].ErrorMessage - if res.Topics[0].ErrorMessage != nil { - return fmt.Errorf("failed to create topic: %s", *res.Topics[0].ErrorMessage) - } - if err != nil { - return fmt.Errorf("failed to create topic: %w", err) + return fmt.Errorf("failed to create e2e topic: %w", err) + } + if len(res.Topics) > 0 && res.Topics[0].ErrorMessage != nil { + return fmt.Errorf("failed to create e2e topic: %s", *res.Topics[0].ErrorMessage) } return nil } func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse, error) { - topicReq := kmsg.NewMetadataRequestTopic() topicName := s.config.TopicManagement.Name topicReq.Topic = &topicName @@ -237,57 +273,47 @@ func (s *Service) getTopicMetadata(ctx context.Context) (*kmsg.MetadataResponse, return req.RequestWith(ctx, s.client) } -func (s *Service) getAllTopicsMetadata(ctx context.Context) (*kmsg.MetadataResponse, error) { - // need to request metadata of all topics in order to get metadata of all brokers - // not sure if there is a better way (copied from kowl) - req := kmsg.NewMetadataRequest() - req.Topics = []kmsg.MetadataRequestTopic{} // empty array should get us all topics, in all kafka versions +func (s *Service) getTopicsConfigs(ctx context.Context, configNames []string) (*kmsg.DescribeConfigsResponse, error) { + req := kmsg.NewDescribeConfigsRequest() + req.IncludeDocumentation = false + req.IncludeSynonyms = false + req.Resources = []kmsg.DescribeConfigsRequestResource{ + { + ResourceType: kmsg.ConfigResourceTypeTopic, + ResourceName: s.config.TopicManagement.Name, + ConfigNames: configNames, + }, + } return req.RequestWith(ctx, s.client) } -func (s *Service) initEndToEnd(ctx context.Context) { - - validateTopicTicker := time.NewTicker(s.config.TopicManagement.ReconciliationInterval) - produceTicker := time.NewTicker(s.config.ProbeInterval) - commitTicker := time.NewTicker(5 * time.Second) - // stop tickers when context is cancelled - go func() { - <-ctx.Done() - produceTicker.Stop() - validateTopicTicker.Stop() - commitTicker.Stop() - }() - - // keep checking end-to-end topic - go func() { - for range validateTopicTicker.C { - err := s.validateManagementTopic(ctx) - if err != nil { - s.logger.Error("failed to validate end-to-end topic", zap.Error(err)) - } - } - }() - - // keep track of groups, delete old unused groups - go s.groupTracker.start() - - // start consuming topic - go s.startConsumeMessages(ctx) +func createTopicConfig(cfgTopic EndToEndTopicConfig) []kmsg.CreateTopicsRequestTopicConfig { - // start comitting offsets - go func() { - for range commitTicker.C { - s.commitOffsets(ctx) - } - }() + topicConfig := func(name string, value interface{}) kmsg.CreateTopicsRequestTopicConfig { + prop := kmsg.NewCreateTopicsRequestTopicConfig() + prop.Name = name + valStr := fmt.Sprintf("%v", value) + prop.Value = &valStr + return prop + } - // start producing to topic - go func() { - for range produceTicker.C { - s.produceLatencyMessages(ctx) - } - }() + minISR := 1 + if cfgTopic.ReplicationFactor >= 3 { + // Only with 3+ replicas does it make sense to require acks from 2 brokers + // todo: think about if we should change how 'producer.requiredAcks' works. + // we probably don't even need this configured on the topic directly... + minISR = 2 + } + // Even though kminion's end-to-end feature actually does not require any + // real persistence beyond a few minutes; it might be good too keep messages + // around a bit for debugging. + return []kmsg.CreateTopicsRequestTopicConfig{ + topicConfig("cleanup.policy", "delete"), + topicConfig("segment.ms", (time.Hour * 12).Milliseconds()), // new segment every 12h + topicConfig("retention.ms", (time.Hour * 24).Milliseconds()), // discard segments older than 24h + topicConfig("min.insync.replicas", minISR), + } } diff --git a/e2e/utils.go b/e2e/utils.go index df98803..4a0b4a4 100644 --- a/e2e/utils.go +++ b/e2e/utils.go @@ -5,6 +5,9 @@ import ( "time" "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kerr" + "github.com/twmb/franz-go/pkg/kmsg" + "go.uber.org/zap" ) // create histogram buckets for metrics reported by 'end-to-end' @@ -41,3 +44,28 @@ func containsStr(ar []string, x string) (bool, int) { } return false, -1 } + +// logs all errors, returns number of errors +func (s *Service) logCommitErrors(r *kmsg.OffsetCommitResponse, err error) int { + if err != nil { + s.logger.Error("offset commit failed", zap.Error(err)) + return 1 + } + + errCount := 0 + for _, t := range r.Topics { + for _, p := range t.Partitions { + err := kerr.ErrorForCode(p.ErrorCode) + if err != nil { + s.logger.Error("error committing partition offset", + zap.String("topic", t.Topic), + zap.Int32("partitionId", p.Partition), + zap.Error(err), + ) + errCount++ + } + } + } + + return errCount +}