diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java index ffcb3541..ce1637a2 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java @@ -1,7 +1,6 @@ package com.pinterest.doctorkafka; import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig; -import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; import com.google.common.annotations.VisibleForTesting; import org.apache.kafka.common.TopicPartition; @@ -41,9 +40,9 @@ public class KafkaBroker implements Comparable { private long reservedBytesOut; private Set toBeAddedReplicas; - private ReplicaStatsManager replicaStatsManager; + private KafkaCluster kafkaCluster; - public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager replicaStatsManager, int brokerId) { + public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, KafkaCluster kafkaCluster, int brokerId) { assert clusterConfig != null; this.zkUrl = clusterConfig.getZkUrl(); this.brokerId = brokerId; @@ -57,7 +56,7 @@ public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager r this.reservedBytesOut = 0L; this.bytesInPerSecLimit = clusterConfig.getNetworkInLimitInBytes(); this.bytesOutPerSecLimit = clusterConfig.getNetworkOutLimitInBytes(); - this.replicaStatsManager = replicaStatsManager; + this.kafkaCluster = kafkaCluster; } public JsonElement toJson() { @@ -76,10 +75,10 @@ public JsonElement toJson() { public long getMaxBytesIn() { long result = 0L; for (TopicPartition topicPartition : leaderReplicas) { - result += replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition); + result += kafkaCluster.getMaxBytesIn(topicPartition); } for (TopicPartition topicPartition : followerReplicas) { - result += replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition); + result += kafkaCluster.getMaxBytesIn(topicPartition); } return result; } @@ -88,7 +87,7 @@ public long getMaxBytesIn() { public long getMaxBytesOut() { long result = 0L; for (TopicPartition topicPartition : leaderReplicas) { - result += replicaStatsManager.getMaxBytesOut(zkUrl, topicPartition); + result += kafkaCluster.getMaxBytesOut(topicPartition); } return result; } diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java index 4f90b60b..f3bf21e1 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java @@ -2,9 +2,10 @@ import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig; -import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; import com.pinterest.doctorkafka.util.OutOfSyncReplica; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.SlidingWindowReservoir; import org.apache.kafka.common.TopicPartition; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -43,17 +44,24 @@ public class KafkaCluster { private static final Logger LOG = LogManager.getLogger(KafkaCluster.class); private static final int MAX_NUM_STATS = 5; private static final int INVALID_BROKERSTATS_TIME = 240000; + /** + * The kafka network traffic stats takes ~15 minutes to cool down. We give a 20 minutes + * cool down period to avoid inaccurate stats collection. + */ + private static final long REASSIGNMENT_COOLDOWN_WINDOW_IN_MS = 1800 * 1000L; + private static final int SLIDING_WINDOW_SIZE = 1440 * 4; private DoctorKafkaClusterConfig clusterConfig; public String zkUrl; public ConcurrentMap brokers; private ConcurrentMap> brokerStatsMap; public ConcurrentMap> topicPartitions = new ConcurrentHashMap<>(); - private ReplicaStatsManager replicaStatsManager; + private ConcurrentMap bytesInHistograms = new ConcurrentHashMap<>(); + private ConcurrentMap bytesOutHistograms = new ConcurrentHashMap<>(); + private ConcurrentMap reassignmentTimestamps = new ConcurrentHashMap<>(); - public KafkaCluster(String zookeeper, DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager replicaStatsManager) { + public KafkaCluster(String zookeeper, DoctorKafkaClusterConfig clusterConfig) { this.zkUrl = zookeeper; - this.replicaStatsManager = replicaStatsManager; this.brokers = new ConcurrentHashMap<>(); this.clusterConfig = clusterConfig; this.brokerStatsMap = new ConcurrentHashMap<>(); @@ -89,17 +97,30 @@ public void recordBrokerStats(BrokerStats brokerStats) { if (!brokerStats.getHasFailure()) { // only record brokerstat when there is no failure on that broker. - KafkaBroker broker = brokers.computeIfAbsent(brokerId, i -> new KafkaBroker(clusterConfig, replicaStatsManager, i)); + KafkaBroker broker = brokers.computeIfAbsent(brokerId, i -> new KafkaBroker(clusterConfig, this, i)); broker.update(brokerStats); } - if (brokerStats.getLeaderReplicas() != null) { - for (AvroTopicPartition atp : brokerStats.getLeaderReplicas()) { - String topic = atp.getTopic(); - TopicPartition tp = new TopicPartition(topic, atp.getPartition()); - topicPartitions - .computeIfAbsent(topic, t -> new HashSet<>()) - .add(tp); + if (brokerStats.getLeaderReplicaStats() != null) { + for (ReplicaStat replicaStat : brokerStats.getLeaderReplicaStats()) { + String topic = replicaStat.getTopic(); + TopicPartition topicPartition = new TopicPartition(topic, replicaStat.getPartition()); + topicPartitions.computeIfAbsent(topic, t -> new HashSet<>()).add(topicPartition); + // if the replica is involved in reassignment, ignore the stats + if (replicaStat.getInReassignment()){ + reassignmentTimestamps.compute(topicPartition, + (t, v) -> v == null || v < replicaStat.getTimestamp() ? replicaStat.getTimestamp() : v); + continue; + } + long lastReassignment = reassignmentTimestamps.getOrDefault(topicPartition, 0L); + if (brokerStats.getTimestamp() - lastReassignment < REASSIGNMENT_COOLDOWN_WINDOW_IN_MS) { + continue; + } + bytesInHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE))); + bytesOutHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE))); + + bytesInHistograms.get(topicPartition).update(replicaStat.getBytesIn15MinMeanRate()); + bytesOutHistograms.get(topicPartition).update(replicaStat.getBytesOut15MinMeanRate()); } } } catch (Exception e) { @@ -122,6 +143,18 @@ public JsonElement toJson() { return json; } + public ConcurrentMap getBytesInHistograms() { + return bytesInHistograms; + } + + public ConcurrentMap getBytesOutHistograms() { + return bytesOutHistograms; + } + + public ConcurrentMap getReassignmentTimestamps() { + return reassignmentTimestamps; + } + /** * Get broker by broker id. * @@ -450,25 +483,31 @@ public KafkaBroker getAlternativeBroker(TopicPartition topicPartition, } } + public long getMaxBytesIn(TopicPartition tp) { + return bytesInHistograms.get(tp).getSnapshot().getMax(); + } + + public long getMaxBytesOut(TopicPartition tp) { + return bytesOutHistograms.get(tp).getSnapshot().getMax(); + } public long getMaxBytesIn() { long result = 0L; for (Map.Entry> entry : topicPartitions.entrySet()) { Set topicPartitions = entry.getValue(); for (TopicPartition tp : topicPartitions) { - result += replicaStatsManager.getMaxBytesIn(zkUrl, tp); + result += getMaxBytesIn(tp); } } return result; } - public long getMaxBytesOut() { long result = 0L; for (Map.Entry> entry : topicPartitions.entrySet()) { Set topicPartitions = entry.getValue(); for (TopicPartition tp : topicPartitions) { - result += replicaStatsManager.getMaxBytesOut(zkUrl, tp); + result += getMaxBytesOut(tp); } } return result; diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java index be57c9a3..932144ca 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java @@ -77,7 +77,6 @@ public class KafkaClusterManager implements Runnable { private DoctorKafkaConfig drkafkaConfig = null; private DoctorKafkaClusterConfig clusterConfig; private DoctorKafkaActionReporter actionReporter = null; - private ReplicaStatsManager replicaStatsManager; private boolean stopped = true; private Thread thread = null; @@ -118,7 +117,6 @@ public KafkaClusterManager(String zkUrl, KafkaCluster kafkaCluster, if (clusterConfig.enabledDeadbrokerReplacement()) { this.brokerReplacer = new BrokerReplacer(drkafkaConfig.getBrokerReplacementCommand()); } - this.replicaStatsManager = replicaStatsManager; } public KafkaCluster getCluster() { @@ -236,8 +234,8 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker, for (Map.Entry entry : tpTraffic.entrySet()) { TopicPartition tp = entry.getKey(); - double tpBytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp); - double tpBytesOut = replicaStatsManager.getMaxBytesOut(zkUrl, tp); + double tpBytesIn = kafkaCluster.getMaxBytesIn(tp); + double tpBytesOut = kafkaCluster.getMaxBytesOut(tp); double brokerTraffic = (bytesIn - toBeReducedBytesIn - tpBytesIn) + (bytesOut - toBeReducedBytesOut - tpBytesOut); @@ -314,7 +312,7 @@ private void generateFollowerReassignmentPlan(KafkaBroker broker) { for (Map.Entry entry : tpTraffic.entrySet()) { TopicPartition tp = entry.getKey(); - double tpBytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp); + double tpBytesIn = kafkaCluster.getMaxBytesIn(tp); if (brokerBytesIn - toBeReducedBytesIn - tpBytesIn < bytesInLimit) { // if moving a topic partition out will have the broker be under-utilized, do not // move it out. @@ -489,8 +487,8 @@ private Map sortTopicPartitionsByTraffic(List tpTraffic = new HashMap<>(); for (TopicPartition tp : tps) { try { - double bytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp); - double bytesOut = replicaStatsManager.getMaxBytesOut(zkUrl, tp); + double bytesIn = kafkaCluster.getMaxBytesIn(tp); + double bytesOut = kafkaCluster.getMaxBytesOut(tp); tpTraffic.put(tp, bytesIn + bytesOut); } catch (Exception e) { LOG.info("Exception in sorting topic partition {}", tp, e); @@ -614,8 +612,8 @@ private Map generateReassignmentPlanForDeadBrokers( for (OutOfSyncReplica oosReplica : outOfSyncReplicas) { - double inBoundReq = replicaStatsManager.getMaxBytesIn(zkUrl, oosReplica.topicPartition); - double outBoundReq = replicaStatsManager.getMaxBytesOut(zkUrl, oosReplica.topicPartition); + double inBoundReq = kafkaCluster.getMaxBytesIn(oosReplica.topicPartition); + double outBoundReq = kafkaCluster.getMaxBytesOut(oosReplica.topicPartition); int preferredBroker = oosReplica.replicaBrokers.get(0); Map replacedNodes; diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/replicastats/ReplicaStatsManager.java b/drkafka/src/main/java/com/pinterest/doctorkafka/replicastats/ReplicaStatsManager.java index f26d404c..27a30c41 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/replicastats/ReplicaStatsManager.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/replicastats/ReplicaStatsManager.java @@ -13,11 +13,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import com.codahale.metrics.Histogram; -import com.codahale.metrics.SlidingWindowReservoir; import com.pinterest.doctorkafka.BrokerStats; import com.pinterest.doctorkafka.KafkaCluster; -import com.pinterest.doctorkafka.ReplicaStat; import com.pinterest.doctorkafka.config.DoctorKafkaConfig; import com.pinterest.doctorkafka.util.KafkaUtils; import com.pinterest.doctorkafka.util.ReplicaStatsUtil; @@ -26,31 +23,9 @@ public class ReplicaStatsManager { private static final Logger LOG = LogManager.getLogger(ReplicaStatsManager.class); - private static final int SLIDING_WINDOW_SIZE = 1440 * 4; - - /** - * The kafka network traffic stats takes ~15 minutes to cool down. We give a 20 minutes - * cool down period to avoid inaccurate stats collection. - */ - private static final long REASSIGNMENT_COOLDOWN_WINDOW_IN_MS = 1800 * 1000L; - - private ConcurrentMap> - bytesInStats = new ConcurrentHashMap<>(); - - private ConcurrentMap> - bytesOutStats = new ConcurrentHashMap<>(); - private ConcurrentMap clusters = new ConcurrentHashMap<>(); - private DoctorKafkaConfig config; - public ConcurrentHashMap> - replicaReassignmentTimestamps = new ConcurrentHashMap<>(); - - /* - * Getters - */ - public ConcurrentMap getClusters() { return clusters; } @@ -70,36 +45,6 @@ public ReplicaStatsManager(DoctorKafkaConfig config){ this.clusterZkUrls = config.getClusterZkUrls(); } - public void updateReplicaReassignmentTimestamp(String brokerZkUrl, - ReplicaStat replicaStat) { - if (!replicaReassignmentTimestamps.containsKey(brokerZkUrl)) { - replicaReassignmentTimestamps.put(brokerZkUrl, new ConcurrentHashMap<>()); - } - ConcurrentHashMap replicaTimestamps = - replicaReassignmentTimestamps.get(brokerZkUrl); - TopicPartition topicPartition = new TopicPartition( - replicaStat.getTopic(), replicaStat.getPartition()); - - if (!replicaTimestamps.containsKey(topicPartition) || - replicaTimestamps.get(topicPartition) < replicaStat.getTimestamp()) { - replicaTimestamps.put(topicPartition, replicaStat.getTimestamp()); - } - } - - private long getLastReplicaReassignmentTimestamp(String brokerZkUrl, - TopicPartition topicPartition) { - long result = 0; - if (replicaReassignmentTimestamps.containsKey(brokerZkUrl)) { - ConcurrentHashMap replicaTimestamps = - replicaReassignmentTimestamps.get(brokerZkUrl); - if (replicaTimestamps.containsKey(topicPartition)) { - result = replicaTimestamps.get(topicPartition); - } - } - return result; - } - - /** * Record the latest brokerstats, and update DocotorKafka internal data structures. */ @@ -110,66 +55,8 @@ public void update(BrokerStats brokerStats) { return; } - KafkaCluster cluster = clusters.computeIfAbsent(brokerZkUrl, url -> new KafkaCluster(url, config.getClusterConfigByZkUrl(url), this)); + KafkaCluster cluster = clusters.computeIfAbsent(brokerZkUrl, url -> new KafkaCluster(url, config.getClusterConfigByZkUrl(url))); cluster.recordBrokerStats(brokerStats); - - bytesInStats.putIfAbsent(brokerZkUrl, new ConcurrentHashMap<>()); - bytesOutStats.putIfAbsent(brokerZkUrl, new ConcurrentHashMap<>()); - - if (brokerStats.getLeaderReplicaStats() != null) { - ConcurrentMap bytesInHistograms = bytesInStats.get(brokerZkUrl); - ConcurrentMap bytesOutHistograms = bytesOutStats.get(brokerZkUrl); - for (ReplicaStat replicaStat : brokerStats.getLeaderReplicaStats()) { - if (replicaStat.getInReassignment()) { - // if the replica is involved in reassignment, ignore the stats - updateReplicaReassignmentTimestamp(brokerZkUrl, replicaStat); - continue; - } - TopicPartition topicPartition = new TopicPartition( - replicaStat.getTopic(), replicaStat.getPartition()); - long lastReassignment = getLastReplicaReassignmentTimestamp(brokerZkUrl, topicPartition); - if (brokerStats.getTimestamp() - lastReassignment < REASSIGNMENT_COOLDOWN_WINDOW_IN_MS) { - continue; - } - - bytesInHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE))); - bytesOutHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE))); - - bytesInHistograms.get(topicPartition).update(replicaStat.getBytesIn15MinMeanRate()); - bytesOutHistograms.get(topicPartition).update(replicaStat.getBytesOut15MinMeanRate()); - } - } - } - - - public long getMaxBytesIn(String zkUrl, TopicPartition topicPartition) { - try { - return bytesInStats.get(zkUrl).get(topicPartition).getSnapshot().getMax(); - } catch (Exception e) { - LOG.error("Failed to get bytesinfo for {}:{}", zkUrl, topicPartition); - throw e; - } - } - - public double get99thPercentilBytesIn(String zkUrl, TopicPartition topicPartition) { - return bytesInStats.get(zkUrl).get(topicPartition).getSnapshot().get99thPercentile(); - } - - public long getMaxBytesOut(String zkUrl, TopicPartition topicPartition) { - return bytesOutStats.get(zkUrl).get(topicPartition).getSnapshot().getMax(); - } - - public double get99thPercentilBytesOut(String zkUrl, TopicPartition topicPartition) { - return bytesOutStats.get(zkUrl).get(topicPartition).getSnapshot().get99thPercentile(); - } - - - public Map getTopicsBytesInStats(String zkUrl) { - return bytesInStats.get(zkUrl); - } - - public Map getTopicsBytesOutStats(String zkUrl) { - return bytesOutStats.get(zkUrl); } /** diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/KafkaTopicStatsServlet.java b/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/KafkaTopicStatsServlet.java index e51ea98b..b66b158a 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/KafkaTopicStatsServlet.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/KafkaTopicStatsServlet.java @@ -4,7 +4,6 @@ import com.pinterest.doctorkafka.DoctorKafkaMain; import com.pinterest.doctorkafka.KafkaCluster; import com.pinterest.doctorkafka.KafkaClusterManager; -import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; import com.pinterest.doctorkafka.util.KafkaUtils; import com.pinterest.doctorkafka.errors.ClusterInfoError; @@ -47,9 +46,9 @@ public void renderJSON(PrintWriter writer, Map params) { for (TopicPartition topicPartition : topicPartitions) { double bytesInMax = - DoctorKafkaMain.replicaStatsManager.getMaxBytesIn(cluster.zkUrl, topicPartition) / 1024.0 / 1024.0; + cluster.getMaxBytesIn(topicPartition) / 1024.0 / 1024.0; double bytesOutMax = - DoctorKafkaMain.replicaStatsManager.getMaxBytesOut(cluster.zkUrl, topicPartition) / 1024.0 / 1024.0; + cluster.getMaxBytesOut(topicPartition) / 1024.0 / 1024.0; JsonObject jsonPartition = new JsonObject(); jsonPartition.add("bytesInMax", gson.toJsonTree(bytesInMax)); @@ -108,9 +107,9 @@ private void printTopicPartitionInfo(KafkaCluster cluster, PrintWriter writer, S writer.print(""); double bytesInMax = - DoctorKafkaMain.replicaStatsManager.getMaxBytesIn(cluster.zkUrl, topicPartition) / 1024.0 / 1024.0; + cluster.getMaxBytesIn(topicPartition) / 1024.0 / 1024.0; double bytesOutMax = - DoctorKafkaMain.replicaStatsManager.getMaxBytesOut(cluster.zkUrl, topicPartition) / 1024.0 / 1024.0; + cluster.getMaxBytesOut(topicPartition) / 1024.0 / 1024.0; if (isZero(bytesInMax) && isZero(bytesOutMax)) { zeroTrafficPartitions++; diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/tools/ReplicaStatsRetriever.java b/drkafka/src/main/java/com/pinterest/doctorkafka/tools/ReplicaStatsRetriever.java index 38aef746..dfa8c781 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/tools/ReplicaStatsRetriever.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/tools/ReplicaStatsRetriever.java @@ -1,5 +1,6 @@ package com.pinterest.doctorkafka.tools; +import com.pinterest.doctorkafka.KafkaCluster; import com.pinterest.doctorkafka.config.DoctorKafkaConfig; import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; import com.pinterest.doctorkafka.util.KafkaUtils; @@ -87,10 +88,10 @@ public static void main(String[] args) throws Exception { Map bytesInStats = new TreeMap<>(new KafkaUtils.TopicPartitionComparator()); - bytesInStats.putAll(replicaStatsManager.getTopicsBytesInStats(clusterZk)); + bytesInStats.putAll(replicaStatsManager.getClusters().get(clusterZk).getBytesInHistograms()); Map bytesOutStats = new TreeMap<>(new KafkaUtils.TopicPartitionComparator()); - bytesOutStats.putAll(replicaStatsManager.getTopicsBytesOutStats(clusterZk)); + bytesOutStats.putAll(replicaStatsManager.getClusters().get(clusterZk).getBytesOutHistograms()); for (TopicPartition tp : bytesInStats.keySet()) { long maxBytesIn = bytesInStats.get(tp).getSnapshot().getMax(); @@ -98,10 +99,10 @@ public static void main(String[] args) throws Exception { System.out.println(tp + " : maxBytesIn = " + maxBytesIn + ", maxBytesOut = " + maxBytesOut); } - for (String zkUrl : replicaStatsManager.replicaReassignmentTimestamps.keySet()) { - System.out.println("Reassignment info for " + zkUrl); + for (KafkaCluster cluster : replicaStatsManager.getClusters().values()) { + System.out.println("Reassignment info for " + cluster.name()); Map reassignmentTimestamps = - replicaStatsManager.replicaReassignmentTimestamps.get(zkUrl); + cluster.getReassignmentTimestamps(); for (TopicPartition tp : reassignmentTimestamps.keySet()) { System.out.println(" " + tp + " : " + reassignmentTimestamps.get(tp)); } diff --git a/drkafka/src/test/java/com/pinterest/doctorkafka/ClusterInfoServletTest.java b/drkafka/src/test/java/com/pinterest/doctorkafka/ClusterInfoServletTest.java index 6ce18812..8bc4f57f 100644 --- a/drkafka/src/test/java/com/pinterest/doctorkafka/ClusterInfoServletTest.java +++ b/drkafka/src/test/java/com/pinterest/doctorkafka/ClusterInfoServletTest.java @@ -1,13 +1,9 @@ package com.pinterest.doctorkafka; import com.pinterest.doctorkafka.config.DoctorKafkaConfig; -import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; import com.pinterest.doctorkafka.servlet.ClusterInfoServlet; -import kafka.cluster.Cluster; -import org.eclipse.jetty.http.HttpStatus; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import org.mockito.internal.matchers.Any; import scala.Console; import javax.servlet.http.HttpServletRequest; @@ -25,9 +21,8 @@ public void clusterInfoJSONResponse() throws Exception { DoctorKafka mockDoctor = mock(DoctorKafka.class); DoctorKafkaMain.doctorKafka = mockDoctor; KafkaClusterManager clusterManager = mock(KafkaClusterManager.class); - ReplicaStatsManager mockReplicaStatManager = mock(ReplicaStatsManager.class); DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); - KafkaCluster cluster = new KafkaCluster(clusterName, config.getClusterConfigByName(clusterName), mockReplicaStatManager); + KafkaCluster cluster = new KafkaCluster(clusterName, config.getClusterConfigByName(clusterName)); when(mockDoctor.getClusterManager(clusterName)).thenReturn(clusterManager); when(clusterManager.getCluster()).thenReturn(cluster); diff --git a/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaBrokerTest.java b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaBrokerTest.java index 2f850d22..544dbdde 100644 --- a/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaBrokerTest.java +++ b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaBrokerTest.java @@ -2,12 +2,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.Mockito.*; -import com.pinterest.doctorkafka.KafkaBroker; import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig; import com.pinterest.doctorkafka.config.DoctorKafkaConfig; -import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; import org.junit.jupiter.api.Test; @@ -18,10 +15,10 @@ public void kafkaBrokerComparatorTest() throws Exception { DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); DoctorKafkaClusterConfig clusterConfig = config.getClusterConfigByName("cluster1"); - ReplicaStatsManager mockReplicaStatsManager = mock(ReplicaStatsManager.class); + KafkaCluster kafkaCluster = new KafkaCluster(clusterConfig.getZkUrl(), clusterConfig); - KafkaBroker a = new KafkaBroker(clusterConfig, mockReplicaStatsManager, 0); - KafkaBroker b = new KafkaBroker(clusterConfig, mockReplicaStatsManager, 1); + KafkaBroker a = new KafkaBroker(clusterConfig, kafkaCluster, 0); + KafkaBroker b = new KafkaBroker(clusterConfig, kafkaCluster, 1); KafkaBroker.KafkaBrokerComparator comparator = new KafkaBroker.KafkaBrokerComparator(); assertEquals(0, comparator.compare(a, b)); diff --git a/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java index 935ebe3b..a1f23c21 100644 --- a/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java +++ b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java @@ -5,7 +5,6 @@ import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig; import com.pinterest.doctorkafka.config.DoctorKafkaConfig; -import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; import com.pinterest.doctorkafka.util.OutOfSyncReplica; import org.apache.kafka.common.Node; @@ -40,12 +39,14 @@ class KafkaClusterTest { private static final String CLUSTER_NAME = "cluster1"; private static DoctorKafkaClusterConfig doctorKafkaClusterConfig; private static String zookeeper_url; + private static KafkaCluster kafkaCluster; @BeforeAll static void setup() throws Exception { DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); doctorKafkaClusterConfig = config.getClusterConfigByName(CLUSTER_NAME); zookeeper_url = doctorKafkaClusterConfig.getZkUrl(); + kafkaCluster = new KafkaCluster(zookeeper_url, doctorKafkaClusterConfig); } /** @@ -55,14 +56,12 @@ static void setup() throws Exception { * reassignment plans. */ @Test - void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ - ReplicaStatsManager mockReplicaStatsManager = mock(ReplicaStatsManager.class); + void getAlternativeBrokersDuplicateReassignmentTest() throws Exception { TopicPartition topicPartition = new TopicPartition(TOPIC, 0); - when(mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, topicPartition)).thenReturn(1L); - when(mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, topicPartition)).thenReturn(0L); - - KafkaCluster kafkaCluster = new KafkaCluster(zookeeper_url, doctorKafkaClusterConfig, mockReplicaStatsManager); + KafkaCluster spyKafkaCluster = spy(kafkaCluster); + doReturn(1L).when(spyKafkaCluster).getMaxBytesIn(topicPartition); + doReturn(0L).when(spyKafkaCluster).getMaxBytesOut(topicPartition); Node leader = nodes[0]; Node[] replicas = new Node[]{ @@ -80,11 +79,11 @@ void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ KafkaBroker[] brokers = new KafkaBroker[]{ - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 0), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 1), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 2), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 3), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 4) + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 0), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 1), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 2), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 3), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 4) }; Set replicaSet = new HashSet<>(); @@ -105,8 +104,8 @@ void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ int beforeSize = brokerQueue.size(); - double inBoundReq = mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, topicPartition); - double outBoundReq = mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, topicPartition); + double inBoundReq = spyKafkaCluster.getMaxBytesIn(topicPartition); + double outBoundReq = spyKafkaCluster.getMaxBytesOut(topicPartition); int preferredBroker = oosReplica.replicaBrokers.get(0); /* @@ -121,8 +120,8 @@ void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ preferredBroker ); - verify(mockReplicaStatsManager, atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); - verify(mockReplicaStatsManager, atLeast(2)).getMaxBytesOut(zookeeper_url, topicPartition); + verify(spyKafkaCluster, atLeast(2)).getMaxBytesIn(topicPartition); + verify(spyKafkaCluster, atLeast(2)).getMaxBytesOut(topicPartition); // There should be a valid reassignment for this scenario assertNotNull(altBrokers); @@ -136,17 +135,11 @@ void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ @Test void testLocalityAwareReassignments() throws Exception { - ReplicaStatsManager mockReplicaStatsManager = mock(ReplicaStatsManager.class); TopicPartition topicPartition = new TopicPartition(TOPIC, 0); - when(mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, topicPartition)).thenReturn(0L); - when(mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, topicPartition)).thenReturn(0L); - - DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); - DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); - - KafkaCluster kafkaCluster = new KafkaCluster(zookeeper_url, doctorKafkaClusterConfig, mockReplicaStatsManager); - + KafkaCluster spyKafkaCluster = spy(kafkaCluster); + doReturn(1L).when(spyKafkaCluster).getMaxBytesIn(topicPartition); + doReturn(0L).when(spyKafkaCluster).getMaxBytesOut(topicPartition); Node leader = nodes[0]; Node[] replicas = new Node[]{ @@ -164,13 +157,13 @@ void testLocalityAwareReassignments() throws Exception { KafkaBroker[] brokers = new KafkaBroker[]{ - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 0), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 1), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 2), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 3), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 4), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 5), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 6) + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 0), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 1), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 2), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 3), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 4), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 5), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 6) }; for ( KafkaBroker broker : brokers ){ @@ -208,7 +201,6 @@ void testLocalityAwareReassignments() throws Exception { Map> brokerLocalityMap = kafkaCluster.getBrokerQueueByLocality(); - Map> reassignmentToLocalityFailures = new HashMap<>(); for ( int localityId = 0; localityId < testLocalityAssignments.size(); localityId++) { Collection expectedAssignments = testLocalityAssignments.get(localityId); @@ -223,8 +215,8 @@ void testLocalityAwareReassignments() throws Exception { .containsAll(expectedAssignments)); } - double inBoundReq = mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, oosReplica.topicPartition); - double outBoundReq = mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, oosReplica.topicPartition); + double inBoundReq = spyKafkaCluster.getMaxBytesIn(oosReplica.topicPartition); + double outBoundReq = spyKafkaCluster.getMaxBytesOut(oosReplica.topicPartition); int preferredBroker = oosReplica.replicaBrokers.get(0); // Test getAlternativeBrokersByLocality, @@ -241,8 +233,8 @@ void testLocalityAwareReassignments() throws Exception { preferredBroker ); - verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); - verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); + verify(spyKafkaCluster,atLeast(2)).getMaxBytesIn(topicPartition); + verify(spyKafkaCluster,atLeast(2)).getMaxBytesIn(topicPartition); assertEquals(2, localityReassignments.size()); assertEquals(brokers[5], localityReassignments.get(1)); assertEquals(brokers[4], localityReassignments.get(2)); @@ -265,16 +257,11 @@ void testLocalityAwareReassignments() throws Exception { @Test void testNonLocalityAwareReassignments() throws Exception { - ReplicaStatsManager mockReplicaStatsManager = mock(ReplicaStatsManager.class); TopicPartition topicPartition = new TopicPartition(TOPIC, 0); - when(mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, topicPartition)).thenReturn(0L); - when(mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, topicPartition)).thenReturn(0L); - - DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); - DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); - - KafkaCluster kafkaCluster = new KafkaCluster(zookeeper_url, doctorKafkaClusterConfig, mockReplicaStatsManager); + KafkaCluster spyKafkaCluster = spy(kafkaCluster); + doReturn(1L).when(spyKafkaCluster).getMaxBytesIn(topicPartition); + doReturn(0L).when(spyKafkaCluster).getMaxBytesOut(topicPartition); Node leader = nodes[0]; Node[] replicas = new Node[]{ @@ -292,13 +279,13 @@ void testNonLocalityAwareReassignments() throws Exception { KafkaBroker[] brokers = new KafkaBroker[]{ - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 0), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 1), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 2), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 3), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 4), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 5), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 6) + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 0), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 1), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 2), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 3), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 4), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 5), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 6) }; for ( KafkaBroker broker : brokers ){ @@ -324,13 +311,12 @@ void testNonLocalityAwareReassignments() throws Exception { Map> brokerLocalityMap = kafkaCluster.getBrokerQueueByLocality(); - Map> reassignmentToLocalityFailures = new HashMap<>(); assertEquals(brokers.length, brokerLocalityMap.get(null).size()); assertTrue(brokerLocalityMap.get(null).containsAll(Arrays.asList(brokers))); - double inBoundReq = mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, oosReplica.topicPartition); - double outBoundReq = mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, oosReplica.topicPartition); + double inBoundReq = spyKafkaCluster.getMaxBytesIn(oosReplica.topicPartition); + double outBoundReq = spyKafkaCluster.getMaxBytesOut(oosReplica.topicPartition); int preferredBroker = oosReplica.replicaBrokers.get(0); // Test getAlternativeBrokersByLocality, @@ -346,8 +332,8 @@ void testNonLocalityAwareReassignments() throws Exception { preferredBroker ); - verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); - verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); + verify(spyKafkaCluster,atLeast(2)).getMaxBytesIn(topicPartition); + verify(spyKafkaCluster,atLeast(2)).getMaxBytesIn(topicPartition); assertEquals(2, localityReassignments.size()); assertEquals(brokers[3], localityReassignments.get(1)); assertEquals(brokers[6], localityReassignments.get(2)); @@ -355,17 +341,12 @@ void testNonLocalityAwareReassignments() throws Exception { @Test void testLocalityAwareReassignmentsFailure() throws Exception { - ReplicaStatsManager mockReplicaStatsManager = mock(ReplicaStatsManager.class); TopicPartition topicPartition = new TopicPartition(TOPIC, 0); // set to 20MB - when(mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, topicPartition)).thenReturn(20*1024*1024L); - when(mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, topicPartition)).thenReturn(0L); - - DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); - DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); - - KafkaCluster kafkaCluster = new KafkaCluster(zookeeper_url, doctorKafkaClusterConfig, mockReplicaStatsManager); + KafkaCluster spyKafkaCluster = spy(kafkaCluster); + doReturn(20*1024*1024L).when(spyKafkaCluster).getMaxBytesIn(topicPartition); + doReturn(0L).when(spyKafkaCluster).getMaxBytesOut(topicPartition); Node leader = nodes[0]; Node[] replicas = new Node[]{ @@ -383,11 +364,11 @@ void testLocalityAwareReassignmentsFailure() throws Exception { KafkaBroker[] brokers = new KafkaBroker[]{ - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 0), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 1), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 2), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 3), - new KafkaBroker(doctorKafkaClusterConfig, mockReplicaStatsManager, 4) + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 0), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 1), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 2), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 3), + new KafkaBroker(doctorKafkaClusterConfig, spyKafkaCluster, 4) }; for ( KafkaBroker broker : brokers ){ @@ -426,8 +407,8 @@ void testLocalityAwareReassignmentsFailure() throws Exception { kafkaCluster.getBrokerQueueByLocality(); Map> reassignmentToLocalityFailures = new HashMap<>(); - double inBoundReq = mockReplicaStatsManager.getMaxBytesIn(zookeeper_url, oosReplica.topicPartition); - double outBoundReq = mockReplicaStatsManager.getMaxBytesOut(zookeeper_url, oosReplica.topicPartition); + double inBoundReq = spyKafkaCluster.getMaxBytesIn(oosReplica.topicPartition); + double outBoundReq = spyKafkaCluster.getMaxBytesOut(oosReplica.topicPartition); int preferredBroker = oosReplica.replicaBrokers.get(0); // Test getAlternativeBrokersByLocality, @@ -443,10 +424,9 @@ void testLocalityAwareReassignmentsFailure() throws Exception { preferredBroker ); - verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); - verify(mockReplicaStatsManager,atLeast(2)).getMaxBytesIn(zookeeper_url, topicPartition); + verify(spyKafkaCluster,atLeast(2)).getMaxBytesIn(topicPartition); + verify(spyKafkaCluster,atLeast(2)).getMaxBytesIn(topicPartition); assertNull(localityReassignments); - } } \ No newline at end of file diff --git a/drkafka/src/test/java/com/pinterest/doctorkafka/ReplicaStatsManagerTest.java b/drkafka/src/test/java/com/pinterest/doctorkafka/ReplicaStatsManagerTest.java deleted file mode 100644 index 5a61714f..00000000 --- a/drkafka/src/test/java/com/pinterest/doctorkafka/ReplicaStatsManagerTest.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.pinterest.doctorkafka; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -import com.pinterest.doctorkafka.config.DoctorKafkaConfig; -import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; - -import org.apache.kafka.common.TopicPartition; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; - -public class ReplicaStatsManagerTest { - private static final String ZKURL = "zk001/cluster1"; - private static final String TOPIC = "nginx_log"; - private List replicaStats = new ArrayList<>(); - - private void initialize() { - ReplicaStat stats1 = new ReplicaStat(1502951705179L, - TOPIC, 21, true, true, true, 9481888L, 9686567L, 9838856L, 34313663L, 31282690L, - 22680562L, 154167421996L, 154253138423L, 8.937965169074621, 72999272025L, 68 ); - - ReplicaStat stats2 = new ReplicaStat(1502951705179L, TOPIC, 51, true, true, false, - 9539926L, 9745859L, 9899080L, 34523697L, 31474171L, 22819389L, 154167145733L, - 154253367479L, 8.992674338022038, 73446100348L, 69); - - replicaStats.add(stats1); - replicaStats.add(stats2); - } - - @Test - public void updateReplicaReassignmentTimestampTest() throws Exception { - initialize(); - DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); - ReplicaStatsManager replicaStatsManager = new ReplicaStatsManager(config); - replicaStatsManager.updateReplicaReassignmentTimestamp(ZKURL, replicaStats.get(0)); - replicaStatsManager.updateReplicaReassignmentTimestamp(ZKURL, replicaStats.get(1)); - - TopicPartition topicPartition = new TopicPartition(TOPIC, 21); - - assertEquals(replicaStatsManager.replicaReassignmentTimestamps.get(ZKURL).size(), 2); - assertEquals( - (long)replicaStatsManager.replicaReassignmentTimestamps.get(ZKURL).get(topicPartition), - 1502951705179L); - } -}