diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java index 5943b30f..da1cdea5 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java @@ -3,6 +3,7 @@ import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig; import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; +import com.google.common.annotations.VisibleForTesting; import org.apache.kafka.common.TopicPartition; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -112,7 +113,35 @@ public long lastStatsTimestamp() { return latestStats == null ? 0 : latestStats.getTimestamp(); } + public boolean reserveBandwidth(TopicPartition tp, double inBound, double outBound){ + if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound && + bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) { + reservedBytesIn += inBound; + reservedBytesOut += outBound; + toBeAddedReplicas.add(tp); + return true; + } + return false; + } + + /** + * cancels reservation of bandwidth for a TopicPartition + * @param tp TopicPartition to cancel reservation + * @param inBound amount of in-bound traffic to remove from broker + * @param outBound amount of out-bound traffic to remove from broker + * @return if reserved bandwidth has been successfully removed from broker + */ + public boolean removeReservedBandwidth(TopicPartition tp, double inBound, double outBound){ + if ( toBeAddedReplicas.contains(tp) ){ + reservedBytesIn -= inBound; + reservedBytesOut -= outBound; + toBeAddedReplicas.remove(tp); + return true; + } + return false; + } + @Deprecated public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) { if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound) { reservedBytesIn += inBound; @@ -135,6 +164,7 @@ public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) { * @param outBound the outbound bandwidth requirements in bytes/second * @return whether the reservation is successful or not. */ + @Deprecated public boolean reserveOutBoundBandwidth(TopicPartition tp, double outBound) { if (bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) { reservedBytesOut += outBound; @@ -217,6 +247,20 @@ public BrokerStats getLatestStats() { return latestStats; } + @VisibleForTesting + protected void setLatestStats(BrokerStats brokerStats){ + this.latestStats = brokerStats; + } + + public String getRackId(){ + return rackId; + } + + @VisibleForTesting + protected void setRackId(String rackId){ + this.rackId = rackId; + } + @Override public String toString() { diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java index 8c689b42..5f38ceb2 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java @@ -13,6 +13,8 @@ import com.google.gson.JsonArray; import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -40,6 +42,7 @@ public class KafkaCluster { private static final Logger LOG = LogManager.getLogger(KafkaCluster.class); private static final int MAX_NUM_STATS = 5; + private static final int INVALID_BROKERSTATS_TIME = 240000; private DoctorKafkaClusterConfig clusterConfig; public String zkUrl; @@ -217,10 +220,7 @@ public PriorityQueue getBrokerQueue() { new PriorityQueue<>(new KafkaBroker.KafkaBrokerComparator()); for (Map.Entry entry : brokers.entrySet()) { KafkaBroker broker = entry.getValue(); - BrokerStats latestStats = broker.getLatestStats(); - if (latestStats == null - || latestStats.getHasFailure() - || System.currentTimeMillis() - latestStats.getTimestamp() > 240000) { + if (isInvalidBroker(broker)) { continue; } brokerQueue.add(broker); @@ -228,6 +228,39 @@ public PriorityQueue getBrokerQueue() { return brokerQueue; } + /** + * + * @return a priority queue of brokers for each locality in the cluster ordered by network stats + */ + public Map> getBrokerQueueByLocality(){ + Map> brokerLocalityMap = new HashMap<>(); + Comparator comparator = new KafkaBroker.KafkaBrokerComparator(); + for ( Map.Entry entry : brokers.entrySet() ){ + KafkaBroker broker = entry.getValue(); + if (isInvalidBroker(broker)){ + continue; + } + // add broker to locality queue + // init queue if queue not present in brokerMap for a locality + brokerLocalityMap + .computeIfAbsent(broker.getRackId(), i -> new PriorityQueue<>(comparator)) + .add(broker); + } + return brokerLocalityMap; + } + + /** + * checks if the broker is invalid for assigning replicas + * @param broker the broker that we want to check + * @return true if the broker is invalid for assigning replicas, false if it is valid + */ + protected boolean isInvalidBroker(KafkaBroker broker) { + BrokerStats latestStats = broker.getLatestStats(); + return latestStats== null || + latestStats.getHasFailure() || + System.currentTimeMillis() - latestStats.getTimestamp() > INVALID_BROKERSTATS_TIME; + } + /** * Get the broker Id that has the resource. Here we need to apply the proper placement policy. @@ -245,38 +278,155 @@ public Map getAlternativeBrokers(PriorityQueue result = new HashMap<>(); - List unusableBrokers = new ArrayList<>(); + Set unusableBrokers = new HashSet<>(); + for (int oosBrokerId : oosReplica.outOfSyncBrokers) { // we will get the broker with the least network usage - KafkaBroker leastUsedBroker = brokerQueue.poll(); - while (leastUsedBroker != null && oosReplica.replicaBrokers.contains(leastUsedBroker.id())) { - unusableBrokers.add(leastUsedBroker); - leastUsedBroker = brokerQueue.poll(); - } - if (leastUsedBroker == null) { - LOG.error("Failed to find a usable broker for fixing {}:{}", oosReplica, oosBrokerId); - success = false; - } else { - LOG.info("LeastUsedBroker for replacing {} : {}", oosBrokerId, leastUsedBroker.id()); - success &= leastUsedBroker.reserveInBoundBandwidth(topicPartition, inBoundReq); - if (preferredBroker == oosBrokerId) { - success &= leastUsedBroker.reserveOutBoundBandwidth(topicPartition, outBoundReq); + success &= findNextBrokerForOosReplica( + brokerQueue, + unusableBrokers, + oosReplica.replicaBrokers, + result, + oosBrokerId, + topicPartition, + inBoundReq, + outBoundReq, + preferredBroker + ); + } + // push the brokers back to brokerQueue to keep invariant true + brokerQueue.addAll(unusableBrokers); + return success ? result : null; + } + + /** + * Similar to getAlternativeBrokers, but locality aware + * @param brokerQueueByLocality a map keeping a priority queue of brokers for each locality + * @param oosReplica out of sync replicas + * @return a BrokerId to KafkaBroker mapping + */ + public Map getAlternativeBrokersByLocality( + Map> brokerQueueByLocality, + OutOfSyncReplica oosReplica, + Map> localityAwareReassignmentFailures + ) { + + Map> oosBrokerIdsByLocality = new HashMap<>(); + for ( int oosBrokerId : oosReplica.outOfSyncBrokers) { + String brokerLocality = brokers.get(oosBrokerId).getRackId(); + oosBrokerIdsByLocality + .computeIfAbsent(brokerLocality, l -> new ArrayList<>()) + .add(oosBrokerId); + } + + TopicPartition topicPartition = oosReplica.topicPartition; + double inBoundReq = ReplicaStatsManager.getMaxBytesIn(zkUrl, topicPartition); + double outBoundReq = ReplicaStatsManager.getMaxBytesOut(zkUrl, topicPartition); + int preferredBroker = oosReplica.replicaBrokers.get(0); + + boolean success = true; + Map result = new HashMap<>(); + Map> unusableBrokersByLocality = new HashMap<>(); + Map failureAssignments = new HashMap<>(); + + // Affinity + for ( Map.Entry> oosBrokerIdsOfLocality : oosBrokerIdsByLocality.entrySet()) { + String oosLocality = oosBrokerIdsOfLocality.getKey(); + List oosBrokerIds = oosBrokerIdsOfLocality.getValue(); + PriorityQueue localityBrokerQueue = brokerQueueByLocality.get(oosLocality); + Set unusableBrokers = + unusableBrokersByLocality.computeIfAbsent(oosLocality, l -> new HashSet<>()); + for( Integer oosBrokerId : oosBrokerIds){ + boolean broker_success = findNextBrokerForOosReplica( + localityBrokerQueue, + unusableBrokers, + oosReplica.replicaBrokers, + result, + oosBrokerId, + topicPartition, + inBoundReq, + outBoundReq, + preferredBroker + ); + if ( !broker_success ){ + failureAssignments.put(oosBrokerId, oosLocality); } - if (success) { - result.put(oosBrokerId, leastUsedBroker); - // the broker should not be used again for this topic partition. - unusableBrokers.add(leastUsedBroker); - } else { - LOG.error("Failed to allocate resource to replace {}:{}", oosReplica, oosBrokerId); - success = false; + success &= broker_success; + } + } + + // release reserved bandwidth if failed + if ( !success ){ + // keep track of all oosReplicas that failed and alert + localityAwareReassignmentFailures.put(oosReplica, failureAssignments); + for ( Map.Entry entry : result.entrySet()){ + Integer oosBrokerId = entry.getKey(); + KafkaBroker broker = entry.getValue(); + boolean removed = oosBrokerId == preferredBroker ? + broker.removeReservedBandwidth(topicPartition, inBoundReq, outBoundReq) : + broker.removeReservedBandwidth(topicPartition, inBoundReq, 0); + if (!removed){ + LOG.error("Can not remove non-existing TopicPartition ", topicPartition, " from broker ", broker.id()); } } } - // push the brokers back to brokerQueue to keep invariant true - brokerQueue.addAll(unusableBrokers); + + // maintain invariant + for(Map.Entry> entry : unusableBrokersByLocality.entrySet()){ + brokerQueueByLocality.get(entry.getKey()).addAll(entry.getValue()); + } + return success ? result : null; } + /** + * Finds the next broker in the broker queue for migrating a replica + * @param brokerQueue a queue of brokers ordered by utilization + * @param unusableBrokers the brokers that should not be used for reassignment + * @param replicaBrokers the ids of the brokers that are already used for this replica + * @param reassignmentMap the replica -> target broker mapping for the next reassignment + * @param oosBrokerId the broker id of the current OutOfSync replica + * @param tp the TopicPartition of the current replica + * @param inBoundReq inbound traffic that needs to be reserved + * @param outBoundReq outbound traffic that needs to be reserved + * @param preferredBroker the preferred leader of the current TopicPartition + * @return true if we successfully assigned a target broker for migration of this replica false otherwise + */ + protected boolean findNextBrokerForOosReplica( + PriorityQueue brokerQueue, + Collection unusableBrokers, + Collection replicaBrokers, + Map reassignmentMap, + Integer oosBrokerId, + TopicPartition tp, + Double inBoundReq, + Double outBoundReq, + Integer preferredBroker + ){ + boolean success; + KafkaBroker leastUsedBroker = brokerQueue.poll(); + while (leastUsedBroker != null && replicaBrokers.contains(leastUsedBroker.id())) { + unusableBrokers.add(leastUsedBroker); + leastUsedBroker = brokerQueue.poll(); + } + if (leastUsedBroker == null) { + LOG.error("Failed to find a usable broker for fixing {}:{}", tp, oosBrokerId); + success = false; + } else { + LOG.info("LeastUsedBroker for replacing {} : {}", oosBrokerId, leastUsedBroker.id()); + success = preferredBroker == oosBrokerId ? + leastUsedBroker.reserveBandwidth(tp, inBoundReq, outBoundReq) : + leastUsedBroker.reserveBandwidth(tp, inBoundReq, 0); + if (success) { + reassignmentMap.put(oosBrokerId, leastUsedBroker); + // the broker should not be used again for this topic partition. + unusableBrokers.add(leastUsedBroker); + } else { + LOG.error("Failed to allocate resource to replace {}:{}", tp, oosBrokerId); + } + } + return success; + } public KafkaBroker getAlternativeBroker(TopicPartition topicPartition, double tpBytesIn, double tpBytesOut) { @@ -292,8 +442,7 @@ public KafkaBroker getAlternativeBroker(TopicPartition topicPartition, // we will get the broker with the least network usage KafkaBroker leastUsedBroker = brokerQueue.poll(); LOG.info("LeastUsedBroker for replacing {} : {}", topicPartition, leastUsedBroker.id()); - boolean success = leastUsedBroker.reserveInBoundBandwidth(topicPartition, tpBytesIn); - success &= leastUsedBroker.reserveOutBoundBandwidth(topicPartition, tpBytesOut); + boolean success = leastUsedBroker.reserveBandwidth(topicPartition, tpBytesIn, tpBytesOut); if (!success) { LOG.error("Failed to allocate resource to replace {}", topicPartition); diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java index a150fa65..6257a6d0 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java @@ -27,7 +27,6 @@ import org.apache.logging.log4j.Logger; import org.apache.zookeeper.data.ACL; -import scala.Option; import scala.collection.JavaConverters; import scala.collection.Seq; import com.google.gson.Gson; @@ -257,7 +256,7 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker, preferredLeader = new PreferredReplicaElectionInfo(tp, preferredBrokerId); preferredLeaders.put(tp, preferredLeader); toBeReducedBytesOut += tpBytesOut; - another.reserveOutBoundBandwidth(tp, tpBytesOut); + another.reserveBandwidth(tp, 0, tpBytesOut); continue; } } else if (brokerTraffic < averageBytesIn + averageBytesOut) { @@ -594,17 +593,37 @@ public UnderReplicatedReason getUnderReplicatedReason(String brokerHost, /** * Generate reassignment plan for dead brokers + * Different failure handling for locality-aware and non-locality-aware: + * - Locality-aware: will reassign topicPartitions that succeeded to get assigned to another + * same-locality broker, but will send alert notifying that partitions that failed (due to + * insufficient capacity on the same locality). + * - Non-locality-aware: current reassignment will fail (reassignments are all-or-none). */ private Map generateReassignmentPlanForDeadBrokers( List outOfSyncReplicas) { Map replicasMap = new HashMap<>(); boolean success = true; + boolean isLocalityAware = clusterConfig.enabledRackAwareness(); + + Map> brokerQueueByLocality = null; + PriorityQueue brokerQueue = null; + Map> reassignmentToLocalityFailures = new HashMap<>(); + if(isLocalityAware){ + brokerQueueByLocality = kafkaCluster.getBrokerQueueByLocality(); + } else { + brokerQueue = kafkaCluster.getBrokerQueue(); + } - PriorityQueue brokerQueue = kafkaCluster.getBrokerQueue(); for (OutOfSyncReplica oosReplica : outOfSyncReplicas) { - Map replacedNodes = - kafkaCluster.getAlternativeBrokers(brokerQueue, oosReplica); - if (replacedNodes == null) { + Map replacedNodes; + replacedNodes = isLocalityAware + ? kafkaCluster.getAlternativeBrokersByLocality( + brokerQueueByLocality, + oosReplica, + reassignmentToLocalityFailures + ) + : kafkaCluster.getAlternativeBrokers(brokerQueue, oosReplica); + if (replacedNodes == null && !isLocalityAware) { success = false; for (int oosBrokerId : oosReplica.outOfSyncBrokers) { KafkaBroker broker = kafkaCluster.getBroker(oosBrokerId); @@ -622,6 +641,25 @@ private Map generateReassignmentPlanForDeadBrokers( replicasMap.put(oosReplica.topicPartition, newReplicas); } } + + // clean up if there are partial success reassignments + if ( !success && replicasMap.size() > 0){ + kafkaCluster.clearResourceAllocationCounters(); + } + + // notify if locality-aware reassignment failed on some topic partitions + // since we are doing partial reassignments + if ( isLocalityAware && + !reassignmentToLocalityFailures.isEmpty() && + !replicasMap.isEmpty() + ){ + Email.alertPartialLocalityAssignmentFailure( + drkafkaConfig.getAlertEmails(), + clusterConfig.getClusterName(), + reassignmentToLocalityFailures + ); + } + return success ? replicasMap : null; } diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaClusterConfig.java b/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaClusterConfig.java index 1347c727..f1a4aa94 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaClusterConfig.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaClusterConfig.java @@ -34,6 +34,7 @@ public class DoctorKafkaClusterConfig { private static final String SECURITY_PROTOCOL = "security.protocol"; private static final String NOTIFICATION_EMAIL = "notification.email"; private static final String NOTIFICATION_PAGER = "notificatino.pager"; + private static final String ENABLE_RACK_AWARENESS = "rack_awareness.enabled"; private static final int DEFAULT_DEADBROKER_REPLACEMENT_NO_STATS_SECONDS = 1200; private static final int DEFAULT_UNDER_REPLICTED_ALERT_IN_SECS = 7200; @@ -135,4 +136,12 @@ public String getNotificationEmail() { public String getNotificationPager() { return clusterConfiguration.getString(NOTIFICATION_PAGER, ""); } + + public boolean enabledRackAwareness(){ + boolean result = false; + if (clusterConfiguration.containsKey(ENABLE_RACK_AWARENESS)){ + result = clusterConfiguration.getBoolean(ENABLE_RACK_AWARENESS); + } + return result; + } } diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/notification/Email.java b/drkafka/src/main/java/com/pinterest/doctorkafka/notification/Email.java index 6ee893d4..4c2d6490 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/notification/Email.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/notification/Email.java @@ -1,6 +1,7 @@ package com.pinterest.doctorkafka.notification; import com.pinterest.doctorkafka.KafkaBroker; +import com.pinterest.doctorkafka.util.OutOfSyncReplica; import kafka.cluster.Broker; import org.apache.commons.lang3.tuple.MutablePair; @@ -17,6 +18,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; public class Email { @@ -30,6 +32,7 @@ public class Email { private static final Map prolongedUrpEmails = new ConcurrentHashMap<>(); private static final Map noStatsBrokerEmails = new ConcurrentHashMap<>(); private static final Map slowBrokerReplacementEmail = new ConcurrentHashMap<>(); + private static final Map partialLocalityReassignmentFailureEmails = new ConcurrentHashMap<>(); public static void sendTo(String[] emails, String title, String content) { @@ -180,4 +183,35 @@ public static void alertOnProlongedBrokerReplacement(String[] emails, + " has not finished after " + replacementTimeInSeconds + " seconds"); sendTo(emails, title, sb.toString()); } + + public static void alertPartialLocalityAssignmentFailure( + String[] emails, + String clusterName, + Map> reassignmentToLocalityFailures + ){ + if ( partialLocalityReassignmentFailureEmails.containsKey(clusterName) && + System.currentTimeMillis() - partialLocalityReassignmentFailureEmails.get(clusterName) < COOLOFF_INTERVAL){ + return; + } + + partialLocalityReassignmentFailureEmails.put(clusterName, System.currentTimeMillis()); + String title = "Partial Failure during locality-aware reassignment : " + clusterName; + StringBuilder sb = new StringBuilder(); + sb.append("Failed to reassign the following OutOfSyncReplicas due to insufficient rack capacity: \n"); + for (Map.Entry> entry : reassignmentToLocalityFailures.entrySet()){ + OutOfSyncReplica oosReplica = entry.getKey(); + sb.append(oosReplica + " (out of sync brokers:"); + sb.append(oosReplica.outOfSyncBrokers + .stream() + .map(id -> id.toString()) + .collect(Collectors.joining(",")) + ); + sb.append("), brokers failed: "); + for ( Map.Entry entry2 : entry.getValue().entrySet()){ + sb.append(entry2.getKey() + "(" + entry2.getValue() + ") "); + } + sb.append("\n"); + } + sendTo(emails, title, sb.toString()); + } } diff --git a/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java index 5f7e0a57..4ffd1c02 100644 --- a/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java +++ b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java @@ -12,34 +12,44 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; class KafkaClusterTest { private static final String TOPIC = "test_topic"; + private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, 0); private static final String ZOOKEEPER_URL = "zk001/cluster1"; + private static final String[] LOCALITIES = new String[]{"LOCALITY_0","LOCALITY_1","LOCALITY_2"}; + private static final Node[] nodes = new Node[]{ + new Node(0, "test00", 9092), + new Node(1, "test01", 9092), + new Node(2, "test02", 9092), + new Node(3, "test03", 9092), + new Node(4, "test04", 9092), + new Node(5, "test05", 9092), + new Node(6, "test06", 9092) + }; - /** - * This test assures that getAlternativeBrokers does not return a many-to-one reassignment - * We want to make sure that multiple out-of-sync replicas of the same topic partition will not - * map to the same replacement broker, or duplicate reassigments will happen leading to invalid - * reassignment plans. - */ - @Test - void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ - DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); - DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); + private ConcurrentMap> oldBytesInStats; + private ConcurrentMap> oldBytesOutStats; - // create histogram maps to mock network stats per topic partition - ConcurrentMap> oldBytesInStats = ReplicaStatsManager.bytesInStats; - ConcurrentMap> oldBytesOutStats = ReplicaStatsManager.bytesOutStats; + @BeforeEach + void setUpReplicaStatsManager(){ + oldBytesInStats = ReplicaStatsManager.bytesInStats; + oldBytesOutStats = ReplicaStatsManager.bytesOutStats; - TopicPartition topicPartition = new TopicPartition(TOPIC, 0); + // create histogram maps to mock network stats per topic partition ConcurrentMap> testBytesInStats = new ConcurrentHashMap<>(); ConcurrentMap> testBytesOutStats = new ConcurrentHashMap<>(); @@ -48,27 +58,37 @@ void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ Histogram inHist = new Histogram(new UniformReservoir()); Histogram outHist = new Histogram(new UniformReservoir()); - inHist.update(0); - outHist.update(0); + inHist.update(20*1024*1024); + outHist.update(1); - testBytesInHistograms.put(topicPartition, inHist); - testBytesOutHistograms.put(topicPartition, outHist); + testBytesInHistograms.put(TOPIC_PARTITION, inHist); + testBytesOutHistograms.put(TOPIC_PARTITION, outHist); testBytesInStats.put(ZOOKEEPER_URL, testBytesInHistograms); testBytesOutStats.put(ZOOKEEPER_URL, testBytesOutHistograms); ReplicaStatsManager.bytesInStats = testBytesInStats; ReplicaStatsManager.bytesOutStats = testBytesOutStats; + } - KafkaCluster kafkaCluster = new KafkaCluster(ZOOKEEPER_URL, doctorKafkaClusterConfig); + @AfterEach + void tearDownReplicaStatsManager(){ + ReplicaStatsManager.bytesInStats = oldBytesInStats; + ReplicaStatsManager.bytesOutStats = oldBytesOutStats; + } - Node[] nodes = new Node[]{ - new Node(0, "test00", 9092), - new Node(1, "test01", 9092), - new Node(2, "test02", 9092), - new Node(3, "test03", 9092), - new Node(4, "test04", 9092) - }; + /** + * This test assures that getAlternativeBrokers does not return a many-to-one reassignment + * We want to make sure that multiple out-of-sync replicas of the same topic partition will not + * map to the same replacement broker, or duplicate reassigments will happen leading to invalid + * reassignment plans. + */ + @Test + void testGetAlternativeBrokersDuplicateReassignment() throws Exception{ + DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); + DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); + + KafkaCluster kafkaCluster = new KafkaCluster(ZOOKEEPER_URL, doctorKafkaClusterConfig); Node leader = nodes[0]; Node[] replicas = new Node[]{ @@ -94,8 +114,7 @@ void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ }; // setting the reservedBytesIn for priority queue comparisons - brokers[4].reserveInBoundBandwidth(topicPartition, 10); - brokers[4].reserveOutBoundBandwidth(topicPartition, 10); + brokers[4].reserveBandwidth(TOPIC_PARTITION, 10, 10); PriorityQueue brokerQueue = new PriorityQueue<>(); @@ -119,9 +138,242 @@ void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ assertNotEquals(altBrokers.get(1), altBrokers.get(2)); // The broker queue should contain the same amount of brokers assertEquals(beforeSize, brokerQueue.size()); + } + + @Test + void testLocalityAwareReassignments() throws Exception { + DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); + DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); + + KafkaCluster kafkaCluster = new KafkaCluster(ZOOKEEPER_URL, doctorKafkaClusterConfig); + + + Node leader = nodes[0]; + Node[] replicas = new Node[]{ + nodes[0], + nodes[1], + nodes[2], + }; + + Node[] isrs = new Node[]{ + nodes[0] + }; + PartitionInfo partitionInfo = new PartitionInfo(TOPIC, 0, leader, replicas, isrs); + OutOfSyncReplica oosReplica = new OutOfSyncReplica(partitionInfo); + oosReplica.replicaBrokers = Arrays.asList(new Integer[]{0,1,2}); + + + KafkaBroker[] brokers = new KafkaBroker[]{ + new KafkaBroker(doctorKafkaClusterConfig, 0), + new KafkaBroker(doctorKafkaClusterConfig, 1), + new KafkaBroker(doctorKafkaClusterConfig, 2), + new KafkaBroker(doctorKafkaClusterConfig, 3), + new KafkaBroker(doctorKafkaClusterConfig, 4), + new KafkaBroker(doctorKafkaClusterConfig, 5), + new KafkaBroker(doctorKafkaClusterConfig, 6) + }; + + for ( KafkaBroker broker : brokers ){ + BrokerStats bs = new BrokerStats(); + bs.setTimestamp(System.currentTimeMillis()); + broker.setLatestStats(bs); + kafkaCluster.brokers.put(broker.id(), broker); + } + + // a list of assignments of localities to brokers with the same size as LOCALITIES + List> testLocalityAssignments = Arrays.asList( + Arrays.asList(0, 3), + Arrays.asList(1, 2, 4, 5), + Arrays.asList(6) + ); + + for (int localityId = 0; localityId < testLocalityAssignments.size(); localityId++) { + for(int brokerId : testLocalityAssignments.get(localityId)){ + brokers[brokerId].setRackId(LOCALITIES[localityId]); + } + } + + // setting the reservedBytesIn for priority queue comparisons + brokers[4].reserveBandwidth(TOPIC_PARTITION, 10, 10); + brokers[5].reserveBandwidth(TOPIC_PARTITION, 5, 5); + + Map> brokerLocalityMap = + kafkaCluster.getBrokerQueueByLocality(); + Map> reassignmentToLocalityFailures = new HashMap<>(); + + for ( int localityId = 0; localityId < testLocalityAssignments.size(); localityId++) { + Collection expectedAssignments = testLocalityAssignments.get(localityId); + Collection actualAssignments = brokerLocalityMap.get(LOCALITIES[localityId]); + // size check + assertEquals(expectedAssignments.size(),actualAssignments.size()); + // element check + assertTrue(actualAssignments + .stream() + .map(broker -> broker.id()) + .collect(Collectors.toList()) + .containsAll(expectedAssignments)); + } + + // Test getAlternativeBrokersByLocality, + // brokers 1 & 2 are out of sync + // brokers 3 & 6 should be skipped even though they have the least utilization since they are not + // within the same locality as brokers 1 & 2 + + Map localityReassignments = + kafkaCluster.getAlternativeBrokersByLocality(brokerLocalityMap, oosReplica, reassignmentToLocalityFailures); + + assertEquals(2, localityReassignments.size()); + assertEquals(brokers[5], localityReassignments.get(1)); + assertEquals(brokers[4], localityReassignments.get(2)); + + // check if the invariants are maintained after reassignment + for ( int localityId = 0; localityId < testLocalityAssignments.size(); localityId++) { + Collection expectedAssignments = testLocalityAssignments.get(localityId); + Collection actualAssignments = brokerLocalityMap.get(LOCALITIES[localityId]); + // size check + assertEquals(expectedAssignments.size(),actualAssignments.size()); + // element check + assertTrue(actualAssignments + .stream() + .map(broker -> broker.id()) + .collect(Collectors.toList()) + .containsAll(expectedAssignments)); + } + + } + + @Test + void testNonLocalityAwareReassignments() throws Exception { + DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); + DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); + + KafkaCluster kafkaCluster = new KafkaCluster(ZOOKEEPER_URL, doctorKafkaClusterConfig); + + Node leader = nodes[0]; + Node[] replicas = new Node[]{ + nodes[0], + nodes[1], + nodes[2] + }; + + Node[] isrs = new Node[]{ + nodes[0] + }; + PartitionInfo partitionInfo = new PartitionInfo(TOPIC, 0, leader, replicas, isrs); + OutOfSyncReplica oosReplica = new OutOfSyncReplica(partitionInfo); + oosReplica.replicaBrokers = Arrays.asList(new Integer[]{0,1,2}); + + + KafkaBroker[] brokers = new KafkaBroker[]{ + new KafkaBroker(doctorKafkaClusterConfig, 0), + new KafkaBroker(doctorKafkaClusterConfig, 1), + new KafkaBroker(doctorKafkaClusterConfig, 2), + new KafkaBroker(doctorKafkaClusterConfig, 3), + new KafkaBroker(doctorKafkaClusterConfig, 4), + new KafkaBroker(doctorKafkaClusterConfig, 5), + new KafkaBroker(doctorKafkaClusterConfig, 6) + }; + + for ( KafkaBroker broker : brokers ){ + BrokerStats bs = new BrokerStats(); + bs.setTimestamp(System.currentTimeMillis()); + broker.setLatestStats(bs); + kafkaCluster.brokers.put(broker.id(), broker); + } + + // setting the reservedBytesIn for priority queue comparisons + brokers[4].reserveBandwidth(TOPIC_PARTITION, 10, 10); + brokers[5].reserveBandwidth(TOPIC_PARTITION, 5, 5); + brokers[6].reserveBandwidth(TOPIC_PARTITION, 2, 2); + + Map> brokerLocalityMap = + kafkaCluster.getBrokerQueueByLocality(); + Map> reassignmentToLocalityFailures = new HashMap<>(); + + assertEquals(brokers.length, brokerLocalityMap.get(null).size()); + assertTrue(brokerLocalityMap.get(null).containsAll(Arrays.asList(brokers))); + + // Test getAlternativeBrokersByLocality, + // brokers 1 & 2 are out of sync + // brokers 3 & 6 are used because they have the lowest utilization + + Map localityReassignments = + kafkaCluster.getAlternativeBrokersByLocality(brokerLocalityMap, oosReplica, reassignmentToLocalityFailures); + + assertEquals(2, localityReassignments.size()); + assertEquals(brokers[3], localityReassignments.get(1)); + assertEquals(brokers[6], localityReassignments.get(2)); + } + + @Test + void testLocalityAwareReassignmentsFailure() throws Exception { + DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); + DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); + + KafkaCluster kafkaCluster = new KafkaCluster(ZOOKEEPER_URL, doctorKafkaClusterConfig); + + Node leader = nodes[0]; + Node[] replicas = new Node[]{ + nodes[0], + nodes[1], + nodes[2], + }; + + Node[] isrs = new Node[]{ + nodes[0] + }; + PartitionInfo partitionInfo = new PartitionInfo(TOPIC, 0, leader, replicas, isrs); + OutOfSyncReplica oosReplica = new OutOfSyncReplica(partitionInfo); + oosReplica.replicaBrokers = Arrays.asList(new Integer[]{0,1,2}); + + + KafkaBroker[] brokers = new KafkaBroker[]{ + new KafkaBroker(doctorKafkaClusterConfig, 0), + new KafkaBroker(doctorKafkaClusterConfig, 1), + new KafkaBroker(doctorKafkaClusterConfig, 2), + new KafkaBroker(doctorKafkaClusterConfig, 3), + new KafkaBroker(doctorKafkaClusterConfig, 4) + }; + + for ( KafkaBroker broker : brokers ){ + BrokerStats bs = new BrokerStats(); + bs.setTimestamp(System.currentTimeMillis()); + broker.setLatestStats(bs); + kafkaCluster.brokers.put(broker.id(), broker); + } + + // a list of assignments of localities to brokers with the same size as LOCALITIES + List> testLocalityAssignments = Arrays.asList( + Arrays.asList(0), + Arrays.asList(1, 3), + Arrays.asList(2, 4) + ); + + for (int localityId = 0; localityId < testLocalityAssignments.size(); localityId++) { + for(int brokerId : testLocalityAssignments.get(localityId)){ + brokers[brokerId].setRackId(LOCALITIES[localityId]); + } + } + + // setting the reservedBytesIn for priority queue comparisons + brokers[4].reserveBandwidth(TOPIC_PARTITION, 20*1024*1024, 10); + + Map> brokerLocalityMap = + kafkaCluster.getBrokerQueueByLocality(); + Map> reassignmentToLocalityFailures = new HashMap<>(); + + // Test getAlternativeBrokersByLocality, + // brokers 1 & 2 are out of sync + // assignment fails since not enough bandwidth to migrate from 2 -> 4 + + Map localityReassignments = + kafkaCluster.getAlternativeBrokersByLocality(brokerLocalityMap, oosReplica, reassignmentToLocalityFailures); + + assertNull(localityReassignments); + assertEquals(1, reassignmentToLocalityFailures.size()); + assertEquals(1, reassignmentToLocalityFailures.get(oosReplica).size()); + assertEquals(brokers[2].getRackId(), reassignmentToLocalityFailures.get(oosReplica).get(2)); - ReplicaStatsManager.bytesInStats = oldBytesInStats; - ReplicaStatsManager.bytesOutStats = oldBytesOutStats; } } \ No newline at end of file