From 7b6bd7e2c428d1d5423a78c5999b0bce7994f680 Mon Sep 17 00:00:00 2001 From: Martin Schneppenheim Date: Wed, 25 Aug 2021 15:43:58 +0200 Subject: [PATCH] Fix partition reassignment and creation --- e2e/topic.go | 340 ++++++++++++++++++++++++---------------------- e2e/topic_test.go | 165 ++++++++++++++++++++++ e2e/utils.go | 29 ++++ 3 files changed, 371 insertions(+), 163 deletions(-) create mode 100644 e2e/topic_test.go diff --git a/e2e/topic.go b/e2e/topic.go index 07b06f6..ac79013 100644 --- a/e2e/topic.go +++ b/e2e/topic.go @@ -3,6 +3,7 @@ package e2e import ( "context" "fmt" + "math" "time" "github.com/twmb/franz-go/pkg/kerr" @@ -39,215 +40,228 @@ func (s *Service) validateManagementTopic(ctx context.Context) error { } } - // Ensure the topic has enough partitions - if err = s.ensureEnoughPartitions(ctx, meta); err != nil { - return err + alterReq, createReq, err := s.calculatePartitionReassignments(meta) + if err != nil { + return fmt.Errorf("failed to calculate partition reassignments: %w", err) } - // Validate assignments - if err = s.validatePartitionAssignments(ctx, meta); err != nil { - return err + err = s.executeAlterPartitionAssignments(ctx, alterReq) + if err != nil { + return fmt.Errorf("failed to alter partition assignments: %w", err) + } + + err = s.executeCreatePartitions(ctx, createReq) + if err != nil { + return fmt.Errorf("failed to create partitions: %w", err) } return nil } -func (s *Service) validatePartitionAssignments(ctx context.Context, meta *kmsg.MetadataResponse) error { - if !s.config.TopicManagement.Enabled { +func (s *Service) executeCreatePartitions(ctx context.Context, req *kmsg.CreatePartitionsRequest) error { + if req == nil { return nil } - // We use a very simple strategy to distribute all partitions and its replicas to the brokers - // - // For example if we had: - // - 5 brokers - // - 5 partitions (partitionsPerBroker = 1) - // - replicationFactor of 5 - // then our assignments would look like the table below. - // Why does this example use 5 partitions? - // Because for end-to-end testing to make sense, we'd like to report the message roundtrip latency for each broker. - // That's why we ensure that we always have at least as many partitions as there are brokers, so every broker - // can be the leader of at least one partition. - // - // The numbers after each partition are the brokerIds (0, 1, 2, 3, 4) - // The first broker in an assignment array is the leader for that partition, - // the following broker ids are hosting the partitions' replicas. - // - // Partition 0: [0, 1, 2, 3, 4] - // Partition 1: [1, 2, 3, 4, 0] - // Partition 2: [2, 3, 4, 0, 1] - // Partition 3: [3, 4, 0, 1, 2] - // Partition 4: [4, 0, 1, 2, 3] - // - // In addition to being very simple, this also has the benefit that each partitionID neatly corresponds - // its leaders brokerID - at least most of the time. - // When a broker suddenly goes offline, or a new one is added to the cluster, etc the assignments - // might be off for a short period of time, but the assignments will be fixed automatically - // in the next, periodic, topic validation (configured by 'topicManagement.reconciliationInterval') - // - - topicName := s.config.TopicManagement.Name - topicMeta := meta.Topics[0] - realPartitionCount := len(topicMeta.Partitions) - realReplicationFactor := len(topicMeta.Partitions[0].Replicas) - - // 1. Calculate the expected assignments - allPartitionAssignments := make([][]int32, realPartitionCount) - for i := range allPartitionAssignments { - allPartitionAssignments[i] = make([]int32, realReplicationFactor) + res, err := req.RequestWith(ctx, s.client) + if err != nil { + return err } - // simple helper function that just keeps returning values from 0 to brokerCount forever. - brokerIterator := func(start int32) func() int32 { - brokerIndex := start - return func() int32 { - result := brokerIndex // save result we'll return now - brokerIndex = (brokerIndex + 1) % int32(len(meta.Brokers)) // prepare for next call: add one, and wrap around - return result // return current result + for _, topic := range res.Topics { + typedErr := kerr.TypedErrorForCode(topic.ErrorCode) + if typedErr != nil { + return fmt.Errorf("inner Kafka error: %w", err) } } - startBroker := brokerIterator(0) - for _, partitionAssignments := range allPartitionAssignments { - // determine the leader broker: first partition will get 0, next one will get 1, ... - start := startBroker() - // and create an iterator that goes over all brokers, starting at our leader - nextReplica := brokerIterator(start) + return nil +} - for i := range partitionAssignments { - partitionAssignments[i] = nextReplica() - } +func (s *Service) executeAlterPartitionAssignments(ctx context.Context, req *kmsg.AlterPartitionAssignmentsRequest) error { + if req == nil { + return nil } - // 2. Check - // Now that every partition knows which brokers are hosting its replicas, and which broker is the leader (i.e. is hosting the primary "replica"), - // we just have to check if the current/real assignments are equal to our desired assignments (and then apply them if not). - assignmentsAreEqual := true - for partitionId := 0; partitionId < realPartitionCount; partitionId++ { - expectedAssignments := allPartitionAssignments[partitionId] - actualAssignments := topicMeta.Partitions[partitionId].Replicas - - // Check if any replica is assigned to the wrong broker - for i := 0; i < realReplicationFactor; i++ { - expectedBroker := expectedAssignments[i] - actualBroker := actualAssignments[i] - if expectedBroker != actualBroker { - assignmentsAreEqual = false + res, err := req.RequestWith(ctx, s.client) + if err != nil { + return err + } + + typedErr := kerr.TypedErrorForCode(res.ErrorCode) + if typedErr != nil { + return fmt.Errorf("inner Kafka error: %w", err) + } + for _, topic := range res.Topics { + for _, partition := range topic.Partitions { + err = kerr.TypedErrorForCode(partition.ErrorCode) + if err != nil { + return fmt.Errorf("inner Kafka partition error on partition '%v': %w", partition.Partition, err) } } } - if assignmentsAreEqual { - // assignments are already exactly as they are supposed to be - return nil - } - // 3. Apply - // Some partitions have their replicas hosted on the wrong brokers! - // Apply our desired replica configuration - partitionReassignments := make([]kmsg.AlterPartitionAssignmentsRequestTopicPartition, realPartitionCount) - for i := range partitionReassignments { - partitionReassignments[i] = kmsg.NewAlterPartitionAssignmentsRequestTopicPartition() - partitionReassignments[i].Partition = int32(i) - partitionReassignments[i].Replicas = allPartitionAssignments[i] - } + return nil +} - topicReassignment := kmsg.NewAlterPartitionAssignmentsRequestTopic() - topicReassignment.Topic = topicName - topicReassignment.Partitions = partitionReassignments +func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) (*kmsg.AlterPartitionAssignmentsRequest, *kmsg.CreatePartitionsRequest, error) { + brokerByID := brokerMetadataByBrokerID(meta.Brokers) + topicMeta := meta.Topics[0] + desiredReplicationFactor := s.config.TopicManagement.ReplicationFactor - reassignRq := kmsg.NewAlterPartitionAssignmentsRequest() - reassignRq.Topics = []kmsg.AlterPartitionAssignmentsRequestTopic{topicReassignment} + if desiredReplicationFactor > len(brokerByID) { + return nil, nil, fmt.Errorf("the desired replication factor of '%v' is larger than the available brokers "+ + "('%v' brokers)", desiredReplicationFactor, len(brokerByID)) + } - reassignRes, err := reassignRq.RequestWith(ctx, s.client) - if err != nil { - // error while sending - return fmt.Errorf("topic reassignment request failed: %w", err) + // We want to ensure that each brokerID leads at least one partition permanently. Hence let's iterate over brokers. + preferredLeaderPartitionsBrokerID := make(map[int32][]kmsg.MetadataResponseTopicPartition) + for _, broker := range brokerByID { + for _, partition := range topicMeta.Partitions { + // PreferredLeader = BrokerID of the brokerID that is the desired leader. Regardless who the current leader is + preferredLeader := partition.Replicas[0] + if broker.NodeID == preferredLeader { + preferredLeaderPartitionsBrokerID[broker.NodeID] = append(preferredLeaderPartitionsBrokerID[broker.NodeID], partition) + } + } } - reassignErr := kerr.ErrorForCode(reassignRes.ErrorCode) - if reassignErr != nil || (reassignRes.ErrorMessage != nil && *reassignRes.ErrorMessage != "") { - // global error - return fmt.Errorf(fmt.Sprintf("topic reassignment failed with ErrorMessage=\"%v\": %v", - *reassignRes.ErrorMessage, - safeUnwrap(reassignErr), - )) + + // Partitions that use the same brokerID more than once as preferred leader can be reassigned to other brokers + // We collect them to avoid creating new partitions when not needed. + reassignablePartitions := make([]kmsg.MetadataResponseTopicPartition, 0) + for _, partitions := range preferredLeaderPartitionsBrokerID { + if len(partitions) > 1 { + reassignablePartitions = append(reassignablePartitions, partitions[1:]...) + continue + } } - // errors for individual partitions - for _, t := range reassignRes.Topics { - for _, p := range t.Partitions { - pErr := kerr.ErrorForCode(p.ErrorCode) - if pErr != nil || (p.ErrorMessage != nil && *p.ErrorMessage != "") { - return fmt.Errorf(fmt.Sprintf("topic reassignment failed on partition %v with ErrorMessage=\"%v\": %v", - p.Partition, - safeUnwrap(pErr), - *p.ErrorMessage), - ) + // Now let's try to reassign (or create) new partitions for those brokers that are not the preferred leader for + // any partition. + + partitionCount := len(topicMeta.Partitions) + partitionReassignments := make([]kmsg.AlterPartitionAssignmentsRequestTopicPartition, 0) + createPartitionAssignments := make([]kmsg.CreatePartitionsRequestTopicAssignment, 0) + + for brokerID, partitions := range preferredLeaderPartitionsBrokerID { + // Add replicas if number of replicas is smaller than desiredReplicationFactor + for _, partition := range partitions { + if len(partition.Replicas) < desiredReplicationFactor { + req := kmsg.NewAlterPartitionAssignmentsRequestTopicPartition() + req.Partition = partition.Partition + req.Replicas = s.calculateAppropriateReplicas(meta, desiredReplicationFactor, brokerByID[brokerID]) + partitionReassignments = append(partitionReassignments, req) } } - } - return nil -} + // TODO: Consider more than one partition per broker config + if len(partitions) != 0 { + continue + } -func (s *Service) ensureEnoughPartitions(ctx context.Context, meta *kmsg.MetadataResponse) error { - partitionsPerBroker := s.config.TopicManagement.PartitionsPerBroker - expectedPartitions := partitionsPerBroker * len(meta.Brokers) + // Let's try to use one of the existing partitions before we decide to create new partitions + if len(reassignablePartitions) > 0 { + partition := reassignablePartitions[0] + req := kmsg.NewAlterPartitionAssignmentsRequestTopicPartition() + req.Partition = partition.Partition + req.Replicas = s.calculateAppropriateReplicas(meta, desiredReplicationFactor, brokerByID[brokerID]) + partitionReassignments = append(partitionReassignments, req) - if len(meta.Topics[0].Partitions) >= expectedPartitions { - return nil // no need to add more + reassignablePartitions = reassignablePartitions[1:] + } + + // Create a new partition for this broker + partitionCount++ + assignmentReq := kmsg.NewCreatePartitionsRequestTopicAssignment() + assignmentReq.Replicas = s.calculateAppropriateReplicas(meta, desiredReplicationFactor, brokerByID[brokerID]) + createPartitionAssignments = append(createPartitionAssignments, assignmentReq) } - partitionsToAdd := expectedPartitions - len(meta.Topics[0].Partitions) - s.logger.Warn("e2e probe topic does not have enough partitions, partitionCount is less than brokerCount * partitionsPerBroker. will add partitions to the topic...", - zap.Int("expected_partition_count", expectedPartitions), - zap.Int("actual_partition_count", len(meta.Topics[0].Partitions)), - zap.Int("broker_count", len(meta.Brokers)), - zap.Int("config_partitions_per_broker", s.config.TopicManagement.PartitionsPerBroker), - zap.Int("partitions_to_add", partitionsToAdd), - ) + var reassignmentReq *kmsg.AlterPartitionAssignmentsRequest + if len(partitionReassignments) > 0 { + s.logger.Info("e2e probe topic has to be modified due to missing replicas or wrong preferred leader assignments", + zap.Int("partition_count", len(topicMeta.Partitions)), + zap.Int("broker_count", len(meta.Brokers)), + zap.Int("config_partitions_per_broker", s.config.TopicManagement.PartitionsPerBroker), + zap.Int("config_replication_factor", s.config.TopicManagement.ReplicationFactor), + zap.Int("partitions_to_reassign", len(partitionReassignments)), + ) + + r := kmsg.NewAlterPartitionAssignmentsRequest() + reassignmentTopicReq := kmsg.NewAlterPartitionAssignmentsRequestTopic() + reassignmentTopicReq.Partitions = partitionReassignments + reassignmentTopicReq.Topic = topicMeta.Topic + r.Topics = []kmsg.AlterPartitionAssignmentsRequestTopic{reassignmentTopicReq} + reassignmentReq = &r + } - if !s.config.TopicManagement.Enabled { - return fmt.Errorf("the e2e probe topic does not have enough partitions and topic management is disabled") + var createReq *kmsg.CreatePartitionsRequest + if len(createPartitionAssignments) > 0 { + s.logger.Info("e2e probe topic does not have enough partitions. Will add partitions to the topic...", + zap.Int("actual_partition_count", len(topicMeta.Partitions)), + zap.Int("broker_count", len(meta.Brokers)), + zap.Int("config_partitions_per_broker", s.config.TopicManagement.PartitionsPerBroker), + zap.Int("partitions_to_add", len(createPartitionAssignments)), + ) + r := kmsg.NewCreatePartitionsRequest() + createPartitionsTopicReq := kmsg.NewCreatePartitionsRequestTopic() + createPartitionsTopicReq.Topic = s.config.TopicManagement.Name + createPartitionsTopicReq.Assignment = createPartitionAssignments + createPartitionsTopicReq.Count = int32(partitionCount) + r.Topics = []kmsg.CreatePartitionsRequestTopic{createPartitionsTopicReq} + createReq = &r } - topic := kmsg.NewCreatePartitionsRequestTopic() - topic.Topic = s.config.TopicManagement.Name - topic.Count = int32(expectedPartitions) - - // For each partition we're about to add, we need to define its replicas - for i := 0; i < partitionsToAdd; i++ { - assignment := kmsg.NewCreatePartitionsRequestTopicAssignment() - // In order to keep the code as simple as possible, just copy the assignments from the first partition. - // After the topic is created, there is another validation step that will take care of bad assignments! - assignment.Replicas = meta.Topics[0].Partitions[0].Replicas - topic.Assignment = append(topic.Assignment, assignment) + return reassignmentReq, createReq, nil +} + +// calculateAppropriateReplicas returns the best possible brokerIDs that shall be used as replicas. +// It takes care of the brokers' rack awareness and general distribution among the available brokers. +func (s *Service) calculateAppropriateReplicas(meta *kmsg.MetadataResponse, replicationFactor int, leaderBroker kmsg.MetadataResponseBroker) []int32 { + brokersWithoutLeader := make([]kmsg.MetadataResponseBroker, 0, len(meta.Brokers)-1) + for _, broker := range meta.Brokers { + if broker.NodeID == leaderBroker.NodeID { + continue + } + brokersWithoutLeader = append(brokersWithoutLeader, broker) } + brokersByRack := brokerMetadataByRackID(brokersWithoutLeader) - // Send request - create := kmsg.NewCreatePartitionsRequest() - create.Topics = []kmsg.CreatePartitionsRequestTopic{topic} - createPartitionsResponse, err := create.RequestWith(ctx, s.client) + replicasPerRack := make(map[string]int) + replicas := make([]int32, replicationFactor) + replicas[0] = leaderBroker.NodeID - // Check for errors - if err != nil { - return fmt.Errorf("request to create more partitions for e2e topic failed: %w", err) + for rack := range brokersByRack { + replicasPerRack[rack] = 0 } - nestedErrors := 0 - for _, topicResponse := range createPartitionsResponse.Topics { - tErr := kerr.ErrorForCode(topicResponse.ErrorCode) - if tErr != nil || (topicResponse.ErrorMessage != nil && *topicResponse.ErrorMessage != "") { - s.logger.Error("error in createPartitionsResponse", - zap.String("topic", topicResponse.Topic), - zap.Error(tErr), - ) - nestedErrors++ + replicasPerRack[pointerStrToStr(leaderBroker.Rack)]++ + + popBrokerFromRack := func(rackID string) kmsg.MetadataResponseBroker { + broker := brokersByRack[rackID][0] + if len(brokersByRack[rackID]) == 1 { + delete(brokersByRack, rackID) + } else { + brokersByRack[rackID] = brokersByRack[rackID][1:] } + return broker } - if nestedErrors > 0 { - return fmt.Errorf("request to add more partitions to e2e topic had some nested errors, see the %v log lines above", nestedErrors) + + for i := 1; i < len(replicas); i++ { + // Find best rack + min := math.MaxInt32 + bestRack := "" + for rack, replicaCount := range replicasPerRack { + if replicaCount < min { + bestRack = rack + min = replicaCount + } + } + + replicas[i] = popBrokerFromRack(bestRack).NodeID + replicasPerRack[bestRack]++ } - return nil + return replicas } func (s *Service) createManagementTopic(ctx context.Context, allMeta *kmsg.MetadataResponse) error { diff --git a/e2e/topic_test.go b/e2e/topic_test.go new file mode 100644 index 0000000..f9ba120 --- /dev/null +++ b/e2e/topic_test.go @@ -0,0 +1,165 @@ +package e2e + +import ( + "github.com/stretchr/testify/assert" + "github.com/twmb/franz-go/pkg/kmsg" + "sort" + "testing" +) + +func TestCalculateAppropriateReplicas(t *testing.T) { + tt := []struct { + TestName string + Brokers []kmsg.MetadataResponseBroker + ReplicationFactor int + LeaderBroker kmsg.MetadataResponseBroker + + // Some cases may have more than one possible solution, each entry in the outer array covers one allowed + // solution. The compared int32 array order does not matter, except for the very first item as this indicates + // the preferred leader. For example if you use {2, 0, 1} as expected result this would also be valid for + // the actual result {2, 1, 0} but not for {1, 2, 0} - because '2' must be the first int32. + ExpectedResults [][]int32 + }{ + { + TestName: "3 Brokers, no rack, RF = 3", + Brokers: []kmsg.MetadataResponseBroker{ + {NodeID: 0, Rack: nil}, + {NodeID: 1, Rack: nil}, + {NodeID: 2, Rack: nil}, + }, + ReplicationFactor: 3, + LeaderBroker: kmsg.MetadataResponseBroker{NodeID: 2, Rack: nil}, + ExpectedResults: [][]int32{{2, 0, 1}}, + }, + + { + TestName: "3 Brokers, 3 racks, RF = 3", + Brokers: []kmsg.MetadataResponseBroker{ + {NodeID: 0, Rack: kmsg.StringPtr("a")}, + {NodeID: 1, Rack: kmsg.StringPtr("b")}, + {NodeID: 2, Rack: kmsg.StringPtr("c")}, + }, + ReplicationFactor: 3, + LeaderBroker: kmsg.MetadataResponseBroker{NodeID: 2, Rack: kmsg.StringPtr("c")}, + ExpectedResults: [][]int32{{2, 0, 1}}, + }, + + { + TestName: "3 Brokers, 3 racks, RF = 1", + Brokers: []kmsg.MetadataResponseBroker{ + {NodeID: 0, Rack: kmsg.StringPtr("a")}, + {NodeID: 1, Rack: kmsg.StringPtr("b")}, + {NodeID: 2, Rack: kmsg.StringPtr("c")}, + }, + ReplicationFactor: 1, + LeaderBroker: kmsg.MetadataResponseBroker{NodeID: 1, Rack: kmsg.StringPtr("b")}, + ExpectedResults: [][]int32{{1}}, + }, + + { + TestName: "3 Brokers, 3 racks, RF = 2", + Brokers: []kmsg.MetadataResponseBroker{ + {NodeID: 0, Rack: kmsg.StringPtr("a")}, + {NodeID: 1, Rack: kmsg.StringPtr("b")}, + {NodeID: 2, Rack: kmsg.StringPtr("c")}, + }, + ReplicationFactor: 2, + LeaderBroker: kmsg.MetadataResponseBroker{NodeID: 1, Rack: kmsg.StringPtr("b")}, + ExpectedResults: [][]int32{{1, 0}, {1, 2}}, + }, + + { + TestName: "6 Brokers, 3 racks, RF = 3", + Brokers: []kmsg.MetadataResponseBroker{ + {NodeID: 0, Rack: kmsg.StringPtr("a")}, + {NodeID: 1, Rack: kmsg.StringPtr("b")}, + {NodeID: 2, Rack: kmsg.StringPtr("c")}, + {NodeID: 3, Rack: kmsg.StringPtr("a")}, + {NodeID: 4, Rack: kmsg.StringPtr("b")}, + {NodeID: 5, Rack: kmsg.StringPtr("c")}, + }, + ReplicationFactor: 3, + LeaderBroker: kmsg.MetadataResponseBroker{NodeID: 4, Rack: kmsg.StringPtr("b")}, + ExpectedResults: [][]int32{{4, 0, 2}, {4, 0, 5}, {4, 3, 2}, {4, 3, 5}}, + }, + + { + TestName: "4 Brokers, 2 racks, RF = 3", + Brokers: []kmsg.MetadataResponseBroker{ + {NodeID: 0, Rack: kmsg.StringPtr("a")}, + {NodeID: 1, Rack: kmsg.StringPtr("b")}, + {NodeID: 2, Rack: kmsg.StringPtr("a")}, + {NodeID: 3, Rack: kmsg.StringPtr("b")}, + }, + ReplicationFactor: 3, + LeaderBroker: kmsg.MetadataResponseBroker{NodeID: 0, Rack: kmsg.StringPtr("a")}, + ExpectedResults: [][]int32{{0, 1, 2}, {0, 1, 3}, {0, 2, 3}}, + }, + + { + TestName: "6 Brokers, 3 racks, RF = 3, lowest node id != 0", + Brokers: []kmsg.MetadataResponseBroker{ + {NodeID: 10, Rack: kmsg.StringPtr("a")}, + {NodeID: 11, Rack: kmsg.StringPtr("b")}, + {NodeID: 12, Rack: kmsg.StringPtr("c")}, + {NodeID: 13, Rack: kmsg.StringPtr("a")}, + {NodeID: 14, Rack: kmsg.StringPtr("b")}, + {NodeID: 15, Rack: kmsg.StringPtr("c")}, + }, + ReplicationFactor: 3, + LeaderBroker: kmsg.MetadataResponseBroker{NodeID: 11, Rack: kmsg.StringPtr("b")}, + ExpectedResults: [][]int32{{11, 10, 12}, {11, 12, 13}, {11, 13, 15}}, + }, + + { + TestName: "6 Brokers, 3 racks, RF = 5, lowest node id != 0", + Brokers: []kmsg.MetadataResponseBroker{ + {NodeID: 10, Rack: kmsg.StringPtr("a")}, + {NodeID: 11, Rack: kmsg.StringPtr("b")}, + {NodeID: 12, Rack: kmsg.StringPtr("c")}, + {NodeID: 13, Rack: kmsg.StringPtr("a")}, + {NodeID: 14, Rack: kmsg.StringPtr("b")}, + {NodeID: 15, Rack: kmsg.StringPtr("c")}, + }, + ReplicationFactor: 5, + LeaderBroker: kmsg.MetadataResponseBroker{NodeID: 11, Rack: kmsg.StringPtr("b")}, + ExpectedResults: [][]int32{{11, 10, 12, 13, 14}, {11, 10, 13, 14, 15}, {11, 12, 13, 14, 15}, {11, 10, 12, 13, 15}, {11, 10, 12, 14, 15}}, + }, + } + + svc := Service{} + for _, test := range tt { + meta := kmsg.NewMetadataResponse() + meta.Brokers = test.Brokers + replicaIDs := svc.calculateAppropriateReplicas(&meta, test.ReplicationFactor, test.LeaderBroker) + + matchesAtLeastOneExpectedResult := false + for _, possibleResult := range test.ExpectedResults { + isValidResult := possibleResult[0] == replicaIDs[0] && doElementsMatch(possibleResult, replicaIDs) + if isValidResult { + matchesAtLeastOneExpectedResult = true + break + } + } + if !matchesAtLeastOneExpectedResult { + // Use first elementsmatch to print some valid result along with the actual results. + assert.ElementsMatch(t, test.ExpectedResults[0], replicaIDs, test.TestName) + } + } +} + +func doElementsMatch(a, b []int32) bool { + if len(a) != len(b) { + return false + } + + sort.Slice(a, func(i, j int) bool { return a[i] < a[j] }) + sort.Slice(b, func(i, j int) bool { return a[i] < a[j] }) + for i, num := range a { + if num != b[i] { + return false + } + } + + return true +} diff --git a/e2e/utils.go b/e2e/utils.go index 2d5d94d..1c8cdeb 100644 --- a/e2e/utils.go +++ b/e2e/utils.go @@ -66,6 +66,35 @@ func (s *Service) logCommitErrors(r *kmsg.OffsetCommitResponse, err error) strin return lastErrCode } +// brokerMetadataByBrokerID returns a map of all broker metadata keyed by their BrokerID +func brokerMetadataByBrokerID(meta []kmsg.MetadataResponseBroker) map[int32]kmsg.MetadataResponseBroker { + res := make(map[int32]kmsg.MetadataResponseBroker) + for _, broker := range meta { + res[broker.NodeID] = broker + } + return res +} + +// brokerMetadataByRackID returns a map of all broker metadata keyed by their Rack identifier +func brokerMetadataByRackID(meta []kmsg.MetadataResponseBroker) map[string][]kmsg.MetadataResponseBroker { + res := make(map[string][]kmsg.MetadataResponseBroker) + for _, broker := range meta { + rackID := "" + if broker.Rack != nil { + rackID = *broker.Rack + } + res[rackID] = append(res[rackID], broker) + } + return res +} + +func pointerStrToStr(str *string) string { + if str == nil { + return "" + } + return *str +} + func safeUnwrap(err error) string { if err == nil { return ""