diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java index 5e146668..ffcb3541 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java @@ -3,6 +3,7 @@ import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig; import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; +import com.google.common.annotations.VisibleForTesting; import org.apache.kafka.common.TopicPartition; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,6 +29,7 @@ public class KafkaBroker implements Comparable { private int brokerPort = 9092; private String rackId; private BrokerStats latestStats; + private Set leaderReplicas; private Set followerReplicas; @@ -115,7 +117,35 @@ public long lastStatsTimestamp() { return latestStats == null ? 0 : latestStats.getTimestamp(); } + public boolean reserveBandwidth(TopicPartition tp, double inBound, double outBound){ + if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound && + bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) { + reservedBytesIn += inBound; + reservedBytesOut += outBound; + toBeAddedReplicas.add(tp); + return true; + } + return false; + } + /** + * cancels reservation of bandwidth for a TopicPartition + * @param tp TopicPartition to cancel reservation + * @param inBound amount of in-bound traffic to remove from broker + * @param outBound amount of out-bound traffic to remove from broker + * @return if reserved bandwidth has been successfully removed from broker + */ + public boolean removeReservedBandwidth(TopicPartition tp, double inBound, double outBound){ + if ( toBeAddedReplicas.contains(tp) ){ + reservedBytesIn -= inBound; + reservedBytesOut -= outBound; + toBeAddedReplicas.remove(tp); + return true; + } + return false; + } + + @Deprecated public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) { if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound) { reservedBytesIn += inBound; @@ -138,6 +168,7 @@ public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) { * @param outBound the outbound bandwidth requirements in bytes/second * @return whether the reservation is successful or not. */ + @Deprecated public boolean reserveOutBoundBandwidth(TopicPartition tp, double outBound) { if (bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) { reservedBytesOut += outBound; @@ -178,6 +209,14 @@ public void clearResourceAllocationCounters() { this.toBeAddedReplicas.clear(); } + protected void setLeaderReplicas(Set leaderReplicas) { + this.leaderReplicas = leaderReplicas; + } + + protected void setFollowerReplicas(Set followerReplicas) { + this.followerReplicas= followerReplicas; + } + /** * Record the stats, and update the topic partition list based on the stats * @@ -196,12 +235,21 @@ public synchronized void update(BrokerStats stats) { rackId = stats.getRackId() != null ? stats.getRackId() : stats.getAvailabilityZone(); } - // TODO: handle null poniter expceiton properly - leaderReplicas = stats.getLeaderReplicas().stream().map(tps -> - new TopicPartition(tps.getTopic(), tps.getPartition())).collect(Collectors.toSet()); + if (stats.getLeaderReplicas() != null) { + setLeaderReplicas(stats.getLeaderReplicas() + .stream() + .map(tps -> new TopicPartition(tps.getTopic(), tps.getPartition())) + .collect(Collectors.toSet()) + ); + } - followerReplicas = stats.getFollowerReplicas().stream().map(tps -> - new TopicPartition(tps.getTopic(), tps.getPartition())).collect(Collectors.toSet()); + if (stats.getFollowerReplicas() != null ) { + setFollowerReplicas(stats.getFollowerReplicas() + .stream() + .map(tps -> new TopicPartition(tps.getTopic(), tps.getPartition())) + .collect(Collectors.toSet()) + ); + } } /** @@ -220,6 +268,20 @@ public BrokerStats getLatestStats() { return latestStats; } + @VisibleForTesting + protected void setLatestStats(BrokerStats brokerStats){ + this.latestStats = brokerStats; + } + + public String getRackId(){ + return rackId; + } + + @VisibleForTesting + protected void setRackId(String rackId){ + this.rackId = rackId; + } + @Override public String toString() { diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java index 47a4e5fd..4f90b60b 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java @@ -13,6 +13,8 @@ import com.google.gson.JsonArray; import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -40,6 +42,7 @@ public class KafkaCluster { private static final Logger LOG = LogManager.getLogger(KafkaCluster.class); private static final int MAX_NUM_STATS = 5; + private static final int INVALID_BROKERSTATS_TIME = 240000; private DoctorKafkaClusterConfig clusterConfig; public String zkUrl; @@ -219,10 +222,7 @@ public PriorityQueue getBrokerQueue() { new PriorityQueue<>(new KafkaBroker.KafkaBrokerComparator()); for (Map.Entry entry : brokers.entrySet()) { KafkaBroker broker = entry.getValue(); - BrokerStats latestStats = broker.getLatestStats(); - if (latestStats == null - || latestStats.getHasFailure() - || System.currentTimeMillis() - latestStats.getTimestamp() > 240000) { + if (isInvalidBroker(broker)) { continue; } brokerQueue.add(broker); @@ -230,48 +230,79 @@ public PriorityQueue getBrokerQueue() { return brokerQueue; } + /** + * + * @return a priority queue of brokers for each locality in the cluster ordered by network stats + */ + public Map> getBrokerQueueByLocality(){ + Map> brokerLocalityMap = new HashMap<>(); + Comparator comparator = new KafkaBroker.KafkaBrokerComparator(); + for ( Map.Entry entry : brokers.entrySet() ){ + KafkaBroker broker = entry.getValue(); + if (isInvalidBroker(broker)){ + continue; + } + // add broker to locality queue + // init queue if queue not present in brokerMap for a locality + brokerLocalityMap + .computeIfAbsent(broker.getRackId(), i -> new PriorityQueue<>(comparator)) + .add(broker); + } + return brokerLocalityMap; + } + + /** + * checks if the broker is invalid for assigning replicas + * @param broker the broker that we want to check + * @return true if the broker is invalid for assigning replicas, false if it is valid + */ + protected boolean isInvalidBroker(KafkaBroker broker) { + BrokerStats latestStats = broker.getLatestStats(); + return latestStats== null || + latestStats.getHasFailure() || + System.currentTimeMillis() - latestStats.getTimestamp() > INVALID_BROKERSTATS_TIME; + } + /** * Get the broker Id that has the resource. Here we need to apply the proper placement policy. * * @param brokerQueue the list of brokers that are sorted in resource usage * @param oosReplica out of sync replicas + * @param inBoundReq inbound traffic + * @param outBoundReq outbound traffc + * @param preferredBroker preferred broker id * @return a BrokerId to KafkaBroker mapping */ - public Map getAlternativeBrokers(PriorityQueue brokerQueue, - OutOfSyncReplica oosReplica) { - TopicPartition topicPartition = oosReplica.topicPartition; - double inBoundReq = replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition); - double outBoundReq = replicaStatsManager.getMaxBytesOut(zkUrl, topicPartition); - int preferredBroker = oosReplica.replicaBrokers.get(0); + public Map getAlternativeBrokers( + PriorityQueue brokerQueue, + OutOfSyncReplica oosReplica, + double inBoundReq, + double outBoundReq, + int preferredBroker + ) { boolean success = true; Map result = new HashMap<>(); - List unusableBrokers = new ArrayList<>(); + Set unusableBrokers = new HashSet<>(); + for (int oosBrokerId : oosReplica.outOfSyncBrokers) { // we will get the broker with the least network usage - KafkaBroker leastUsedBroker = brokerQueue.poll(); - while (leastUsedBroker != null && oosReplica.replicaBrokers.contains(leastUsedBroker.id())) { - unusableBrokers.add(leastUsedBroker); - leastUsedBroker = brokerQueue.poll(); - } - if (leastUsedBroker == null) { - LOG.error("Failed to find a usable broker for fixing {}:{}", oosReplica, oosBrokerId); - success = false; - } else { - LOG.info("LeastUsedBroker for replacing {} : {}", oosBrokerId, leastUsedBroker.id()); - success &= leastUsedBroker.reserveInBoundBandwidth(topicPartition, inBoundReq); - if (preferredBroker == oosBrokerId) { - success &= leastUsedBroker.reserveOutBoundBandwidth(topicPartition, outBoundReq); - } - if (success) { - result.put(oosBrokerId, leastUsedBroker); - // the broker should not be used again for this topic partition. - unusableBrokers.add(leastUsedBroker); - } else { - LOG.error("Failed to allocate resource to replace {}:{}", oosReplica, oosBrokerId); - success = false; - } + success = findNextBrokerForOosReplica( + brokerQueue, + unusableBrokers, + oosReplica.replicaBrokers, + result, + oosBrokerId, + oosReplica.topicPartition, + inBoundReq, + outBoundReq, + preferredBroker + ); + + // short circuit if failed to find available broker + if (!success) { + break; } } // push the brokers back to brokerQueue to keep invariant true @@ -279,6 +310,122 @@ public Map getAlternativeBrokers(PriorityQueue getAlternativeBrokersByLocality( + Map> brokerQueueByLocality, + OutOfSyncReplica oosReplica, + double inBoundReq, + double outBoundReq, + int preferredBroker + ) { + + Map> oosBrokerIdsByLocality = new HashMap<>(); + for ( int oosBrokerId : oosReplica.outOfSyncBrokers) { + String brokerLocality = brokers.get(oosBrokerId).getRackId(); + oosBrokerIdsByLocality + .computeIfAbsent(brokerLocality, l -> new ArrayList<>()) + .add(oosBrokerId); + } + + Map result = new HashMap<>(); + Map> unusableBrokersByLocality = new HashMap<>(); + + boolean success = true; + // Affinity + for ( Map.Entry> oosBrokerIdsOfLocality : oosBrokerIdsByLocality.entrySet()) { + String oosLocality = oosBrokerIdsOfLocality.getKey(); + List oosBrokerIds = oosBrokerIdsOfLocality.getValue(); + PriorityQueue localityBrokerQueue = brokerQueueByLocality.get(oosLocality); + Set unusableBrokers = + unusableBrokersByLocality.computeIfAbsent(oosLocality, l -> new HashSet<>()); + for( Integer oosBrokerId : oosBrokerIds){ + success = findNextBrokerForOosReplica( + localityBrokerQueue, + unusableBrokers, + oosReplica.replicaBrokers, + result, + oosBrokerId, + oosReplica.topicPartition, + inBoundReq, + outBoundReq, + preferredBroker + ); + + // short circuit if failed to find available broker + if (!success) { + break; + } + } + if (!success) { + break; + } + } + + // maintain invariant + for(Map.Entry> entry : unusableBrokersByLocality.entrySet()){ + brokerQueueByLocality.get(entry.getKey()).addAll(entry.getValue()); + } + + return success ? result : null; + } + + /** + * Finds the next broker in the broker queue for migrating a replica + * @param brokerQueue a queue of brokers ordered by utilization + * @param unusableBrokers the brokers that should not be used for reassignment + * @param replicaBrokers the ids of the brokers that are already used for this replica + * @param reassignmentMap the replica -> target broker mapping for the next reassignment + * @param oosBrokerId the broker id of the current OutOfSync replica + * @param tp the TopicPartition of the current replica + * @param inBoundReq inbound traffic that needs to be reserved + * @param outBoundReq outbound traffic that needs to be reserved + * @param preferredBroker the preferred leader of the current TopicPartition + * @return true if we successfully assigned a target broker for migration of this replica false otherwise + */ + protected boolean findNextBrokerForOosReplica( + PriorityQueue brokerQueue, + Collection unusableBrokers, + Collection replicaBrokers, + Map reassignmentMap, + Integer oosBrokerId, + TopicPartition tp, + Double inBoundReq, + Double outBoundReq, + Integer preferredBroker + ){ + boolean success; + KafkaBroker leastUsedBroker = brokerQueue.poll(); + while (leastUsedBroker != null && replicaBrokers.contains(leastUsedBroker.id())) { + unusableBrokers.add(leastUsedBroker); + leastUsedBroker = brokerQueue.poll(); + } + if (leastUsedBroker == null) { + LOG.error("Failed to find a usable broker for fixing {}:{}", tp, oosBrokerId); + success = false; + } else { + LOG.info("LeastUsedBroker for replacing {} : {}", oosBrokerId, leastUsedBroker.id()); + success = preferredBroker == oosBrokerId ? + leastUsedBroker.reserveBandwidth(tp, inBoundReq, outBoundReq) : + leastUsedBroker.reserveBandwidth(tp, inBoundReq, 0); + if (success) { + reassignmentMap.put(oosBrokerId, leastUsedBroker); + // the broker should not be used again for this topic partition. + unusableBrokers.add(leastUsedBroker); + } else { + LOG.error("Failed to allocate resource to replace {}:{}", tp, oosBrokerId); + } + } + return success; + } + public KafkaBroker getAlternativeBroker(TopicPartition topicPartition, double tpBytesIn, double tpBytesOut) { PriorityQueue brokerQueue = @@ -293,8 +440,7 @@ public KafkaBroker getAlternativeBroker(TopicPartition topicPartition, // we will get the broker with the least network usage KafkaBroker leastUsedBroker = brokerQueue.poll(); LOG.info("LeastUsedBroker for replacing {} : {}", topicPartition, leastUsedBroker.id()); - boolean success = leastUsedBroker.reserveInBoundBandwidth(topicPartition, tpBytesIn); - success &= leastUsedBroker.reserveOutBoundBandwidth(topicPartition, tpBytesOut); + boolean success = leastUsedBroker.reserveBandwidth(topicPartition, tpBytesIn, tpBytesOut); if (!success) { LOG.error("Failed to allocate resource to replace {}", topicPartition); diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java index ceee6496..be57c9a3 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java @@ -27,7 +27,6 @@ import org.apache.logging.log4j.Logger; import org.apache.zookeeper.data.ACL; -import scala.Option; import scala.collection.JavaConverters; import scala.collection.Seq; import com.google.gson.Gson; @@ -260,7 +259,7 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker, preferredLeader = new PreferredReplicaElectionInfo(tp, preferredBrokerId); preferredLeaders.put(tp, preferredLeader); toBeReducedBytesOut += tpBytesOut; - another.reserveOutBoundBandwidth(tp, tpBytesOut); + another.reserveBandwidth(tp, 0, tpBytesOut); continue; } } else if (brokerTraffic < averageBytesIn + averageBytesOut) { @@ -596,18 +595,48 @@ public UnderReplicatedReason getUnderReplicatedReason(String brokerHost, /** - * Generate reassignment plan for dead brokers + * Generate reassignment plan for dead brokers, + * current reassignment will fail (reassignments are all-or-none). */ private Map generateReassignmentPlanForDeadBrokers( List outOfSyncReplicas) { Map replicasMap = new HashMap<>(); boolean success = true; + boolean isLocalityAware = clusterConfig.enabledRackAwareness(); + + Map> brokerQueueByLocality = null; + PriorityQueue brokerQueue = null; + if(isLocalityAware){ + brokerQueueByLocality = kafkaCluster.getBrokerQueueByLocality(); + } else { + brokerQueue = kafkaCluster.getBrokerQueue(); + } - PriorityQueue brokerQueue = kafkaCluster.getBrokerQueue(); for (OutOfSyncReplica oosReplica : outOfSyncReplicas) { - Map replacedNodes = - kafkaCluster.getAlternativeBrokers(brokerQueue, oosReplica); + + double inBoundReq = replicaStatsManager.getMaxBytesIn(zkUrl, oosReplica.topicPartition); + double outBoundReq = replicaStatsManager.getMaxBytesOut(zkUrl, oosReplica.topicPartition); + int preferredBroker = oosReplica.replicaBrokers.get(0); + + Map replacedNodes; + replacedNodes = isLocalityAware + ? kafkaCluster.getAlternativeBrokersByLocality( + brokerQueueByLocality, + oosReplica, + inBoundReq, + outBoundReq, + preferredBroker + ) + : kafkaCluster.getAlternativeBrokers( + brokerQueue, + oosReplica, + inBoundReq, + outBoundReq, + preferredBroker + ); if (replacedNodes == null) { + // current reassignment task fail immediately + // if failed to reassign for one partition success = false; for (int oosBrokerId : oosReplica.outOfSyncBrokers) { KafkaBroker broker = kafkaCluster.getBroker(oosBrokerId); @@ -625,6 +654,12 @@ private Map generateReassignmentPlanForDeadBrokers( replicasMap.put(oosReplica.topicPartition, newReplicas); } } + + // clean up if there are partial success reassignments + if ( !success && replicasMap.size() > 0){ + kafkaCluster.clearResourceAllocationCounters(); + } + return success ? replicasMap : null; } diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaClusterConfig.java b/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaClusterConfig.java index 1347c727..f1a4aa94 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaClusterConfig.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/config/DoctorKafkaClusterConfig.java @@ -34,6 +34,7 @@ public class DoctorKafkaClusterConfig { private static final String SECURITY_PROTOCOL = "security.protocol"; private static final String NOTIFICATION_EMAIL = "notification.email"; private static final String NOTIFICATION_PAGER = "notificatino.pager"; + private static final String ENABLE_RACK_AWARENESS = "rack_awareness.enabled"; private static final int DEFAULT_DEADBROKER_REPLACEMENT_NO_STATS_SECONDS = 1200; private static final int DEFAULT_UNDER_REPLICTED_ALERT_IN_SECS = 7200; @@ -135,4 +136,12 @@ public String getNotificationEmail() { public String getNotificationPager() { return clusterConfiguration.getString(NOTIFICATION_PAGER, ""); } + + public boolean enabledRackAwareness(){ + boolean result = false; + if (clusterConfiguration.containsKey(ENABLE_RACK_AWARENESS)){ + result = clusterConfiguration.getBoolean(ENABLE_RACK_AWARENESS); + } + return result; + } } diff --git a/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java index 8de74357..935ebe3b 100644 --- a/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java +++ b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java @@ -11,15 +11,42 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import java.util.Set; +import java.util.stream.Collectors; class KafkaClusterTest { private static final String TOPIC = "test_topic"; - private static final String ZOOKEEPER_URL = "zk001/cluster1"; + private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, 0); + private static final String[] LOCALITIES = new String[]{"LOCALITY_0","LOCALITY_1","LOCALITY_2"}; + private static final Node[] nodes = new Node[]{ + new Node(0, "test00", 9092), + new Node(1, "test01", 9092), + new Node(2, "test02", 9092), + new Node(3, "test03", 9092), + new Node(4, "test04", 9092), + new Node(5, "test05", 9092), + new Node(6, "test06", 9092) + }; + private static final String CLUSTER_NAME = "cluster1"; + private static DoctorKafkaClusterConfig doctorKafkaClusterConfig; + private static String zookeeper_url; + + @BeforeAll + static void setup() throws Exception { + DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); + doctorKafkaClusterConfig = config.getClusterConfigByName(CLUSTER_NAME); + zookeeper_url = doctorKafkaClusterConfig.getZkUrl(); + } /** * This test assures that getAlternativeBrokers does not return a many-to-one reassignment @@ -32,21 +59,10 @@ void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ ReplicaStatsManager mockReplicaStatsManager = mock(ReplicaStatsManager.class); TopicPartition topicPartition = new TopicPartition(TOPIC, 0); - when(mockReplicaStatsManager.getMaxBytesIn(ZOOKEEPER_URL, topicPartition)).thenReturn(0L); - when(mockReplicaStatsManager.getMaxBytesOut(ZOOKEEPER_URL, topicPartition)).thenReturn(0L); - - DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); - DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); - - KafkaCluster kafkaCluster = new KafkaCluster(ZOOKEEPER_URL, doctorKafkaClusterConfig, mockReplicaStatsManager); + when(mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, topicPartition)).thenReturn(1L); + when(mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, topicPartition)).thenReturn(0L); - Node[] nodes = new Node[]{ - new Node(0, "test00", 9092), - new Node(1, "test01", 9092), - new Node(2, "test02", 9092), - new Node(3, "test03", 9092), - new Node(4, "test04", 9092) - }; + KafkaCluster kafkaCluster = new KafkaCluster(zookeeper_url, doctorKafkaClusterConfig, mockReplicaStatsManager); Node leader = nodes[0]; Node[] replicas = new Node[]{ @@ -64,33 +80,49 @@ void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ KafkaBroker[] brokers = new KafkaBroker[]{ - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 0), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 0), new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 1), new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 2), new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 3), new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 4) }; + Set replicaSet = new HashSet<>(); + replicaSet.add(topicPartition); + + for ( Node node : replicas ) { + brokers[node.id()].setFollowerReplicas(replicaSet); + } + + brokers[leader.id()].setLeaderReplicas(replicaSet); + // setting the reservedBytesIn for priority queue comparisons - brokers[4].reserveInBoundBandwidth(topicPartition, 10); - brokers[4].reserveOutBoundBandwidth(topicPartition, 10); + brokers[4].reserveBandwidth(TOPIC_PARTITION, 10, 10); PriorityQueue brokerQueue = new PriorityQueue<>(); - for ( KafkaBroker broker : brokers){ - brokerQueue.add(broker); - } + brokerQueue.addAll(Arrays.asList((brokers))); int beforeSize = brokerQueue.size(); + double inBoundReq = mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, topicPartition); + double outBoundReq = mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, topicPartition); + int preferredBroker = oosReplica.replicaBrokers.get(0); + /* * getMaxBytesIn() and getMaxBytesOut() will return 0 for each broker, * so the ordering of the brokers will not change after reassignment */ - Map altBrokers = kafkaCluster.getAlternativeBrokers(brokerQueue, oosReplica); + Map altBrokers = kafkaCluster.getAlternativeBrokers( + brokerQueue, + oosReplica, + inBoundReq, + outBoundReq, + preferredBroker + ); - verify(mockReplicaStatsManager).getMaxBytesIn(ZOOKEEPER_URL, topicPartition); - verify(mockReplicaStatsManager).getMaxBytesOut(ZOOKEEPER_URL, topicPartition); + verify(mockReplicaStatsManager, atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); + verify(mockReplicaStatsManager, atLeast(2)).getMaxBytesOut(zookeeper_url, topicPartition); // There should be a valid reassignment for this scenario assertNotNull(altBrokers); @@ -102,4 +134,319 @@ void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ assertEquals(beforeSize, brokerQueue.size()); } + @Test + void testLocalityAwareReassignments() throws Exception { + ReplicaStatsManager mockReplicaStatsManager = mock(ReplicaStatsManager.class); + TopicPartition topicPartition = new TopicPartition(TOPIC, 0); + + when(mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, topicPartition)).thenReturn(0L); + when(mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, topicPartition)).thenReturn(0L); + + DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); + DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); + + KafkaCluster kafkaCluster = new KafkaCluster(zookeeper_url, doctorKafkaClusterConfig, mockReplicaStatsManager); + + + Node leader = nodes[0]; + Node[] replicas = new Node[]{ + nodes[0], + nodes[1], + nodes[2], + }; + + Node[] isrs = new Node[]{ + nodes[0] + }; + PartitionInfo partitionInfo = new PartitionInfo(TOPIC, 0, leader, replicas, isrs); + OutOfSyncReplica oosReplica = new OutOfSyncReplica(partitionInfo); + oosReplica.replicaBrokers = Arrays.asList(new Integer[]{0,1,2}); + + + KafkaBroker[] brokers = new KafkaBroker[]{ + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 0), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 1), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 2), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 3), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 4), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 5), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 6) + }; + + for ( KafkaBroker broker : brokers ){ + BrokerStats bs = new BrokerStats(); + bs.setTimestamp(System.currentTimeMillis()); + broker.setLatestStats(bs); + kafkaCluster.brokers.put(broker.id(), broker); + } + + Set replicaSet = new HashSet<>(); + replicaSet.add(topicPartition); + + for ( Node node : replicas ) { + brokers[node.id()].setFollowerReplicas(replicaSet); + } + + brokers[leader.id()].setLeaderReplicas(replicaSet); + + // a list of assignments of localities to brokers with the same size as LOCALITIES + List> testLocalityAssignments = Arrays.asList( + Arrays.asList(0, 3), + Arrays.asList(1, 2, 4, 5), + Arrays.asList(6) + ); + + for (int localityId = 0; localityId < testLocalityAssignments.size(); localityId++) { + for(int brokerId : testLocalityAssignments.get(localityId)){ + brokers[brokerId].setRackId(LOCALITIES[localityId]); + } + } + + // setting the reservedBytesIn for priority queue comparisons + brokers[4].reserveBandwidth(TOPIC_PARTITION, 10, 10); + brokers[5].reserveBandwidth(TOPIC_PARTITION, 5, 5); + + Map> brokerLocalityMap = + kafkaCluster.getBrokerQueueByLocality(); + Map> reassignmentToLocalityFailures = new HashMap<>(); + + for ( int localityId = 0; localityId < testLocalityAssignments.size(); localityId++) { + Collection expectedAssignments = testLocalityAssignments.get(localityId); + Collection actualAssignments = brokerLocalityMap.get(LOCALITIES[localityId]); + // size check + assertEquals(expectedAssignments.size(),actualAssignments.size()); + // element check + assertTrue(actualAssignments + .stream() + .map(broker -> broker.id()) + .collect(Collectors.toList()) + .containsAll(expectedAssignments)); + } + + double inBoundReq = mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, oosReplica.topicPartition); + double outBoundReq = mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, oosReplica.topicPartition); + int preferredBroker = oosReplica.replicaBrokers.get(0); + + // Test getAlternativeBrokersByLocality, + // brokers 1 & 2 are out of sync + // brokers 3 & 6 should be skipped even though they have the least utilization since they are not + // within the same locality as brokers 1 & 2 + + Map localityReassignments = + kafkaCluster.getAlternativeBrokersByLocality( + brokerLocalityMap, + oosReplica, + inBoundReq, + outBoundReq, + preferredBroker + ); + + verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); + verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); + assertEquals(2, localityReassignments.size()); + assertEquals(brokers[5], localityReassignments.get(1)); + assertEquals(brokers[4], localityReassignments.get(2)); + + // check if the invariants are maintained after reassignment + for ( int localityId = 0; localityId < testLocalityAssignments.size(); localityId++) { + Collection expectedAssignments = testLocalityAssignments.get(localityId); + Collection actualAssignments = brokerLocalityMap.get(LOCALITIES[localityId]); + // size check + assertEquals(expectedAssignments.size(),actualAssignments.size()); + // element check + assertTrue(actualAssignments + .stream() + .map(broker -> broker.id()) + .collect(Collectors.toList()) + .containsAll(expectedAssignments)); + } + + } + + @Test + void testNonLocalityAwareReassignments() throws Exception { + ReplicaStatsManager mockReplicaStatsManager = mock(ReplicaStatsManager.class); + TopicPartition topicPartition = new TopicPartition(TOPIC, 0); + + when(mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, topicPartition)).thenReturn(0L); + when(mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, topicPartition)).thenReturn(0L); + + DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); + DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); + + KafkaCluster kafkaCluster = new KafkaCluster(zookeeper_url, doctorKafkaClusterConfig, mockReplicaStatsManager); + + Node leader = nodes[0]; + Node[] replicas = new Node[]{ + nodes[0], + nodes[1], + nodes[2] + }; + + Node[] isrs = new Node[]{ + nodes[0] + }; + PartitionInfo partitionInfo = new PartitionInfo(TOPIC, 0, leader, replicas, isrs); + OutOfSyncReplica oosReplica = new OutOfSyncReplica(partitionInfo); + oosReplica.replicaBrokers = Arrays.asList(new Integer[]{0,1,2}); + + + KafkaBroker[] brokers = new KafkaBroker[]{ + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 0), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 1), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 2), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 3), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 4), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 5), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 6) + }; + + for ( KafkaBroker broker : brokers ){ + BrokerStats bs = new BrokerStats(); + bs.setTimestamp(System.currentTimeMillis()); + broker.setLatestStats(bs); + kafkaCluster.brokers.put(broker.id(), broker); + } + + Set replicaSet = new HashSet<>(); + replicaSet.add(topicPartition); + + for ( Node node : replicas ) { + brokers[node.id()].setFollowerReplicas(replicaSet); + } + + brokers[leader.id()].setLeaderReplicas(replicaSet); + + // setting the reservedBytesIn for priority queue comparisons + brokers[4].reserveBandwidth(TOPIC_PARTITION, 10, 10); + brokers[5].reserveBandwidth(TOPIC_PARTITION, 5, 5); + brokers[6].reserveBandwidth(TOPIC_PARTITION, 2, 2); + + Map> brokerLocalityMap = + kafkaCluster.getBrokerQueueByLocality(); + Map> reassignmentToLocalityFailures = new HashMap<>(); + + assertEquals(brokers.length, brokerLocalityMap.get(null).size()); + assertTrue(brokerLocalityMap.get(null).containsAll(Arrays.asList(brokers))); + + double inBoundReq = mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, oosReplica.topicPartition); + double outBoundReq = mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, oosReplica.topicPartition); + int preferredBroker = oosReplica.replicaBrokers.get(0); + + // Test getAlternativeBrokersByLocality, + // brokers 1 & 2 are out of sync + // brokers 3 & 6 are used because they have the lowest utilization + + Map localityReassignments = + kafkaCluster.getAlternativeBrokersByLocality( + brokerLocalityMap, + oosReplica, + inBoundReq, + outBoundReq, + preferredBroker + ); + + verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); + verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); + assertEquals(2, localityReassignments.size()); + assertEquals(brokers[3], localityReassignments.get(1)); + assertEquals(brokers[6], localityReassignments.get(2)); + } + + @Test + void testLocalityAwareReassignmentsFailure() throws Exception { + ReplicaStatsManager mockReplicaStatsManager = mock(ReplicaStatsManager.class); + TopicPartition topicPartition = new TopicPartition(TOPIC, 0); + + // set to 20MB + when(mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, topicPartition)).thenReturn(20*1024*1024L); + when(mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, topicPartition)).thenReturn(0L); + + DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); + DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); + + KafkaCluster kafkaCluster = new KafkaCluster(zookeeper_url, doctorKafkaClusterConfig, mockReplicaStatsManager); + + Node leader = nodes[0]; + Node[] replicas = new Node[]{ + nodes[0], + nodes[1], + nodes[2], + }; + + Node[] isrs = new Node[]{ + nodes[0] + }; + PartitionInfo partitionInfo = new PartitionInfo(TOPIC, 0, leader, replicas, isrs); + OutOfSyncReplica oosReplica = new OutOfSyncReplica(partitionInfo); + oosReplica.replicaBrokers = Arrays.asList(new Integer[]{0,1,2}); + + + KafkaBroker[] brokers = new KafkaBroker[]{ + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 0), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 1), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 2), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 3), + new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 4) + }; + + for ( KafkaBroker broker : brokers ){ + BrokerStats bs = new BrokerStats(); + bs.setTimestamp(System.currentTimeMillis()); + broker.setLatestStats(bs); + kafkaCluster.brokers.put(broker.id(), broker); + } + + Set replicaSet = new HashSet<>(); + replicaSet.add(topicPartition); + + for ( Node node : replicas ) { + brokers[node.id()].setFollowerReplicas(replicaSet); + } + + brokers[leader.id()].setLeaderReplicas(replicaSet); + + // a list of assignments of localities to brokers with the same size as LOCALITIES + List> testLocalityAssignments = Arrays.asList( + Arrays.asList(0), + Arrays.asList(1, 3), + Arrays.asList(2, 4) + ); + + for (int localityId = 0; localityId < testLocalityAssignments.size(); localityId++) { + for(int brokerId : testLocalityAssignments.get(localityId)){ + brokers[brokerId].setRackId(LOCALITIES[localityId]); + } + } + + // setting the reservedBytesIn for priority queue comparisons + brokers[4].reserveBandwidth(TOPIC_PARTITION, 20*1024*1024, 10); + + Map> brokerLocalityMap = + kafkaCluster.getBrokerQueueByLocality(); + Map> reassignmentToLocalityFailures = new HashMap<>(); + + double inBoundReq = mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, oosReplica.topicPartition); + double outBoundReq = mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, oosReplica.topicPartition); + int preferredBroker = oosReplica.replicaBrokers.get(0); + + // Test getAlternativeBrokersByLocality, + // brokers 1 & 2 are out of sync + // assignment fails since not enough bandwidth to migrate from 2 -> 4 + + Map localityReassignments = + kafkaCluster.getAlternativeBrokersByLocality( + brokerLocalityMap, + oosReplica, + inBoundReq, + outBoundReq, + preferredBroker + ); + + verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); + verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); + assertNull(localityReassignments); + + } + } \ No newline at end of file