Skip to content

Commit

Permalink
Fix partition reassignment and creation
Browse files Browse the repository at this point in the history
  • Loading branch information
weeco committed Aug 25, 2021
1 parent a3558e0 commit 7b6bd7e
Show file tree
Hide file tree
Showing 3 changed files with 371 additions and 163 deletions.
340 changes: 177 additions & 163 deletions e2e/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package e2e
import (
"context"
"fmt"
"math"
"time"

"github.com/twmb/franz-go/pkg/kerr"
Expand Down Expand Up @@ -39,215 +40,228 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
}
}

// Ensure the topic has enough partitions
if err = s.ensureEnoughPartitions(ctx, meta); err != nil {
return err
alterReq, createReq, err := s.calculatePartitionReassignments(meta)
if err != nil {
return fmt.Errorf("failed to calculate partition reassignments: %w", err)
}

// Validate assignments
if err = s.validatePartitionAssignments(ctx, meta); err != nil {
return err
err = s.executeAlterPartitionAssignments(ctx, alterReq)
if err != nil {
return fmt.Errorf("failed to alter partition assignments: %w", err)
}

err = s.executeCreatePartitions(ctx, createReq)
if err != nil {
return fmt.Errorf("failed to create partitions: %w", err)
}

return nil
}

func (s *Service) validatePartitionAssignments(ctx context.Context, meta *kmsg.MetadataResponse) error {
if !s.config.TopicManagement.Enabled {
func (s *Service) executeCreatePartitions(ctx context.Context, req *kmsg.CreatePartitionsRequest) error {
if req == nil {
return nil
}

// We use a very simple strategy to distribute all partitions and its replicas to the brokers
//
// For example if we had:
// - 5 brokers
// - 5 partitions (partitionsPerBroker = 1)
// - replicationFactor of 5
// then our assignments would look like the table below.
// Why does this example use 5 partitions?
// Because for end-to-end testing to make sense, we'd like to report the message roundtrip latency for each broker.
// That's why we ensure that we always have at least as many partitions as there are brokers, so every broker
// can be the leader of at least one partition.
//
// The numbers after each partition are the brokerIds (0, 1, 2, 3, 4)
// The first broker in an assignment array is the leader for that partition,
// the following broker ids are hosting the partitions' replicas.
//
// Partition 0: [0, 1, 2, 3, 4]
// Partition 1: [1, 2, 3, 4, 0]
// Partition 2: [2, 3, 4, 0, 1]
// Partition 3: [3, 4, 0, 1, 2]
// Partition 4: [4, 0, 1, 2, 3]
//
// In addition to being very simple, this also has the benefit that each partitionID neatly corresponds
// its leaders brokerID - at least most of the time.
// When a broker suddenly goes offline, or a new one is added to the cluster, etc the assignments
// might be off for a short period of time, but the assignments will be fixed automatically
// in the next, periodic, topic validation (configured by 'topicManagement.reconciliationInterval')
//

topicName := s.config.TopicManagement.Name
topicMeta := meta.Topics[0]
realPartitionCount := len(topicMeta.Partitions)
realReplicationFactor := len(topicMeta.Partitions[0].Replicas)

// 1. Calculate the expected assignments
allPartitionAssignments := make([][]int32, realPartitionCount)
for i := range allPartitionAssignments {
allPartitionAssignments[i] = make([]int32, realReplicationFactor)
res, err := req.RequestWith(ctx, s.client)
if err != nil {
return err
}

// simple helper function that just keeps returning values from 0 to brokerCount forever.
brokerIterator := func(start int32) func() int32 {
brokerIndex := start
return func() int32 {
result := brokerIndex // save result we'll return now
brokerIndex = (brokerIndex + 1) % int32(len(meta.Brokers)) // prepare for next call: add one, and wrap around
return result // return current result
for _, topic := range res.Topics {
typedErr := kerr.TypedErrorForCode(topic.ErrorCode)
if typedErr != nil {
return fmt.Errorf("inner Kafka error: %w", err)
}
}

startBroker := brokerIterator(0)
for _, partitionAssignments := range allPartitionAssignments {
// determine the leader broker: first partition will get 0, next one will get 1, ...
start := startBroker()
// and create an iterator that goes over all brokers, starting at our leader
nextReplica := brokerIterator(start)
return nil
}

for i := range partitionAssignments {
partitionAssignments[i] = nextReplica()
}
func (s *Service) executeAlterPartitionAssignments(ctx context.Context, req *kmsg.AlterPartitionAssignmentsRequest) error {
if req == nil {
return nil
}

// 2. Check
// Now that every partition knows which brokers are hosting its replicas, and which broker is the leader (i.e. is hosting the primary "replica"),
// we just have to check if the current/real assignments are equal to our desired assignments (and then apply them if not).
assignmentsAreEqual := true
for partitionId := 0; partitionId < realPartitionCount; partitionId++ {
expectedAssignments := allPartitionAssignments[partitionId]
actualAssignments := topicMeta.Partitions[partitionId].Replicas

// Check if any replica is assigned to the wrong broker
for i := 0; i < realReplicationFactor; i++ {
expectedBroker := expectedAssignments[i]
actualBroker := actualAssignments[i]
if expectedBroker != actualBroker {
assignmentsAreEqual = false
res, err := req.RequestWith(ctx, s.client)
if err != nil {
return err
}

typedErr := kerr.TypedErrorForCode(res.ErrorCode)
if typedErr != nil {
return fmt.Errorf("inner Kafka error: %w", err)
}
for _, topic := range res.Topics {
for _, partition := range topic.Partitions {
err = kerr.TypedErrorForCode(partition.ErrorCode)
if err != nil {
return fmt.Errorf("inner Kafka partition error on partition '%v': %w", partition.Partition, err)
}
}
}
if assignmentsAreEqual {
// assignments are already exactly as they are supposed to be
return nil
}

// 3. Apply
// Some partitions have their replicas hosted on the wrong brokers!
// Apply our desired replica configuration
partitionReassignments := make([]kmsg.AlterPartitionAssignmentsRequestTopicPartition, realPartitionCount)
for i := range partitionReassignments {
partitionReassignments[i] = kmsg.NewAlterPartitionAssignmentsRequestTopicPartition()
partitionReassignments[i].Partition = int32(i)
partitionReassignments[i].Replicas = allPartitionAssignments[i]
}
return nil
}

topicReassignment := kmsg.NewAlterPartitionAssignmentsRequestTopic()
topicReassignment.Topic = topicName
topicReassignment.Partitions = partitionReassignments
func (s *Service) calculatePartitionReassignments(meta *kmsg.MetadataResponse) (*kmsg.AlterPartitionAssignmentsRequest, *kmsg.CreatePartitionsRequest, error) {
brokerByID := brokerMetadataByBrokerID(meta.Brokers)
topicMeta := meta.Topics[0]
desiredReplicationFactor := s.config.TopicManagement.ReplicationFactor

reassignRq := kmsg.NewAlterPartitionAssignmentsRequest()
reassignRq.Topics = []kmsg.AlterPartitionAssignmentsRequestTopic{topicReassignment}
if desiredReplicationFactor > len(brokerByID) {
return nil, nil, fmt.Errorf("the desired replication factor of '%v' is larger than the available brokers "+
"('%v' brokers)", desiredReplicationFactor, len(brokerByID))
}

reassignRes, err := reassignRq.RequestWith(ctx, s.client)
if err != nil {
// error while sending
return fmt.Errorf("topic reassignment request failed: %w", err)
// We want to ensure that each brokerID leads at least one partition permanently. Hence let's iterate over brokers.
preferredLeaderPartitionsBrokerID := make(map[int32][]kmsg.MetadataResponseTopicPartition)
for _, broker := range brokerByID {
for _, partition := range topicMeta.Partitions {
// PreferredLeader = BrokerID of the brokerID that is the desired leader. Regardless who the current leader is
preferredLeader := partition.Replicas[0]
if broker.NodeID == preferredLeader {
preferredLeaderPartitionsBrokerID[broker.NodeID] = append(preferredLeaderPartitionsBrokerID[broker.NodeID], partition)
}
}
}
reassignErr := kerr.ErrorForCode(reassignRes.ErrorCode)
if reassignErr != nil || (reassignRes.ErrorMessage != nil && *reassignRes.ErrorMessage != "") {
// global error
return fmt.Errorf(fmt.Sprintf("topic reassignment failed with ErrorMessage=\"%v\": %v",
*reassignRes.ErrorMessage,
safeUnwrap(reassignErr),
))

// Partitions that use the same brokerID more than once as preferred leader can be reassigned to other brokers
// We collect them to avoid creating new partitions when not needed.
reassignablePartitions := make([]kmsg.MetadataResponseTopicPartition, 0)
for _, partitions := range preferredLeaderPartitionsBrokerID {
if len(partitions) > 1 {
reassignablePartitions = append(reassignablePartitions, partitions[1:]...)
continue
}
}

// errors for individual partitions
for _, t := range reassignRes.Topics {
for _, p := range t.Partitions {
pErr := kerr.ErrorForCode(p.ErrorCode)
if pErr != nil || (p.ErrorMessage != nil && *p.ErrorMessage != "") {
return fmt.Errorf(fmt.Sprintf("topic reassignment failed on partition %v with ErrorMessage=\"%v\": %v",
p.Partition,
safeUnwrap(pErr),
*p.ErrorMessage),
)
// Now let's try to reassign (or create) new partitions for those brokers that are not the preferred leader for
// any partition.

partitionCount := len(topicMeta.Partitions)
partitionReassignments := make([]kmsg.AlterPartitionAssignmentsRequestTopicPartition, 0)
createPartitionAssignments := make([]kmsg.CreatePartitionsRequestTopicAssignment, 0)

for brokerID, partitions := range preferredLeaderPartitionsBrokerID {
// Add replicas if number of replicas is smaller than desiredReplicationFactor
for _, partition := range partitions {
if len(partition.Replicas) < desiredReplicationFactor {
req := kmsg.NewAlterPartitionAssignmentsRequestTopicPartition()
req.Partition = partition.Partition
req.Replicas = s.calculateAppropriateReplicas(meta, desiredReplicationFactor, brokerByID[brokerID])
partitionReassignments = append(partitionReassignments, req)
}
}
}

return nil
}
// TODO: Consider more than one partition per broker config
if len(partitions) != 0 {
continue
}

func (s *Service) ensureEnoughPartitions(ctx context.Context, meta *kmsg.MetadataResponse) error {
partitionsPerBroker := s.config.TopicManagement.PartitionsPerBroker
expectedPartitions := partitionsPerBroker * len(meta.Brokers)
// Let's try to use one of the existing partitions before we decide to create new partitions
if len(reassignablePartitions) > 0 {
partition := reassignablePartitions[0]
req := kmsg.NewAlterPartitionAssignmentsRequestTopicPartition()
req.Partition = partition.Partition
req.Replicas = s.calculateAppropriateReplicas(meta, desiredReplicationFactor, brokerByID[brokerID])
partitionReassignments = append(partitionReassignments, req)

if len(meta.Topics[0].Partitions) >= expectedPartitions {
return nil // no need to add more
reassignablePartitions = reassignablePartitions[1:]
}

// Create a new partition for this broker
partitionCount++
assignmentReq := kmsg.NewCreatePartitionsRequestTopicAssignment()
assignmentReq.Replicas = s.calculateAppropriateReplicas(meta, desiredReplicationFactor, brokerByID[brokerID])
createPartitionAssignments = append(createPartitionAssignments, assignmentReq)
}

partitionsToAdd := expectedPartitions - len(meta.Topics[0].Partitions)
s.logger.Warn("e2e probe topic does not have enough partitions, partitionCount is less than brokerCount * partitionsPerBroker. will add partitions to the topic...",
zap.Int("expected_partition_count", expectedPartitions),
zap.Int("actual_partition_count", len(meta.Topics[0].Partitions)),
zap.Int("broker_count", len(meta.Brokers)),
zap.Int("config_partitions_per_broker", s.config.TopicManagement.PartitionsPerBroker),
zap.Int("partitions_to_add", partitionsToAdd),
)
var reassignmentReq *kmsg.AlterPartitionAssignmentsRequest
if len(partitionReassignments) > 0 {
s.logger.Info("e2e probe topic has to be modified due to missing replicas or wrong preferred leader assignments",
zap.Int("partition_count", len(topicMeta.Partitions)),
zap.Int("broker_count", len(meta.Brokers)),
zap.Int("config_partitions_per_broker", s.config.TopicManagement.PartitionsPerBroker),
zap.Int("config_replication_factor", s.config.TopicManagement.ReplicationFactor),
zap.Int("partitions_to_reassign", len(partitionReassignments)),
)

r := kmsg.NewAlterPartitionAssignmentsRequest()
reassignmentTopicReq := kmsg.NewAlterPartitionAssignmentsRequestTopic()
reassignmentTopicReq.Partitions = partitionReassignments
reassignmentTopicReq.Topic = topicMeta.Topic
r.Topics = []kmsg.AlterPartitionAssignmentsRequestTopic{reassignmentTopicReq}
reassignmentReq = &r
}

if !s.config.TopicManagement.Enabled {
return fmt.Errorf("the e2e probe topic does not have enough partitions and topic management is disabled")
var createReq *kmsg.CreatePartitionsRequest
if len(createPartitionAssignments) > 0 {
s.logger.Info("e2e probe topic does not have enough partitions. Will add partitions to the topic...",
zap.Int("actual_partition_count", len(topicMeta.Partitions)),
zap.Int("broker_count", len(meta.Brokers)),
zap.Int("config_partitions_per_broker", s.config.TopicManagement.PartitionsPerBroker),
zap.Int("partitions_to_add", len(createPartitionAssignments)),
)
r := kmsg.NewCreatePartitionsRequest()
createPartitionsTopicReq := kmsg.NewCreatePartitionsRequestTopic()
createPartitionsTopicReq.Topic = s.config.TopicManagement.Name
createPartitionsTopicReq.Assignment = createPartitionAssignments
createPartitionsTopicReq.Count = int32(partitionCount)
r.Topics = []kmsg.CreatePartitionsRequestTopic{createPartitionsTopicReq}
createReq = &r
}

topic := kmsg.NewCreatePartitionsRequestTopic()
topic.Topic = s.config.TopicManagement.Name
topic.Count = int32(expectedPartitions)

// For each partition we're about to add, we need to define its replicas
for i := 0; i < partitionsToAdd; i++ {
assignment := kmsg.NewCreatePartitionsRequestTopicAssignment()
// In order to keep the code as simple as possible, just copy the assignments from the first partition.
// After the topic is created, there is another validation step that will take care of bad assignments!
assignment.Replicas = meta.Topics[0].Partitions[0].Replicas
topic.Assignment = append(topic.Assignment, assignment)
return reassignmentReq, createReq, nil
}

// calculateAppropriateReplicas returns the best possible brokerIDs that shall be used as replicas.
// It takes care of the brokers' rack awareness and general distribution among the available brokers.
func (s *Service) calculateAppropriateReplicas(meta *kmsg.MetadataResponse, replicationFactor int, leaderBroker kmsg.MetadataResponseBroker) []int32 {
brokersWithoutLeader := make([]kmsg.MetadataResponseBroker, 0, len(meta.Brokers)-1)
for _, broker := range meta.Brokers {
if broker.NodeID == leaderBroker.NodeID {
continue
}
brokersWithoutLeader = append(brokersWithoutLeader, broker)
}
brokersByRack := brokerMetadataByRackID(brokersWithoutLeader)

// Send request
create := kmsg.NewCreatePartitionsRequest()
create.Topics = []kmsg.CreatePartitionsRequestTopic{topic}
createPartitionsResponse, err := create.RequestWith(ctx, s.client)
replicasPerRack := make(map[string]int)
replicas := make([]int32, replicationFactor)
replicas[0] = leaderBroker.NodeID

// Check for errors
if err != nil {
return fmt.Errorf("request to create more partitions for e2e topic failed: %w", err)
for rack := range brokersByRack {
replicasPerRack[rack] = 0
}
nestedErrors := 0
for _, topicResponse := range createPartitionsResponse.Topics {
tErr := kerr.ErrorForCode(topicResponse.ErrorCode)
if tErr != nil || (topicResponse.ErrorMessage != nil && *topicResponse.ErrorMessage != "") {
s.logger.Error("error in createPartitionsResponse",
zap.String("topic", topicResponse.Topic),
zap.Error(tErr),
)
nestedErrors++
replicasPerRack[pointerStrToStr(leaderBroker.Rack)]++

popBrokerFromRack := func(rackID string) kmsg.MetadataResponseBroker {
broker := brokersByRack[rackID][0]
if len(brokersByRack[rackID]) == 1 {
delete(brokersByRack, rackID)
} else {
brokersByRack[rackID] = brokersByRack[rackID][1:]
}
return broker
}
if nestedErrors > 0 {
return fmt.Errorf("request to add more partitions to e2e topic had some nested errors, see the %v log lines above", nestedErrors)

for i := 1; i < len(replicas); i++ {
// Find best rack
min := math.MaxInt32
bestRack := ""
for rack, replicaCount := range replicasPerRack {
if replicaCount < min {
bestRack = rack
min = replicaCount
}
}

replicas[i] = popBrokerFromRack(bestRack).NodeID
replicasPerRack[bestRack]++
}

return nil
return replicas
}

func (s *Service) createManagementTopic(ctx context.Context, allMeta *kmsg.MetadataResponse) error {
Expand Down
Loading

0 comments on commit 7b6bd7e

Please sign in to comment.