From 88b29a839296812b513be8bda8f5c1140ef694f8 Mon Sep 17 00:00:00 2001 From: kabochya Date: Sat, 16 Mar 2019 16:10:21 -0700 Subject: [PATCH] Fix #18 invalid reassignment due to duplicated broker ids (#106) Fix the invalid assignment issue that is reported in https://github.com/pinterest/doctorkafka/issues/18, and add a unit test for validation --- drkafka/pom.xml | 23 +--- .../pinterest/doctorkafka/KafkaCluster.java | 8 +- .../doctorkafka/KafkaClusterTest.java | 127 ++++++++++++++++++ 3 files changed, 135 insertions(+), 23 deletions(-) create mode 100644 drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java diff --git a/drkafka/pom.xml b/drkafka/pom.xml index 4fe77451..2d985a16 100644 --- a/drkafka/pom.xml +++ b/drkafka/pom.xml @@ -80,16 +80,11 @@ commons-validator 1.6 + org.junit.jupiter - junit-jupiter-api - 5.0.1 - test - - - org.junit.jupiter - junit-jupiter-engine - 5.3.1 + junit-jupiter + 5.4.0 test @@ -298,18 +293,6 @@ maven-surefire-plugin 2.22.1 - - - org.junit.platform - junit-platform-surefire-provider - 1.0.3 - - - org.junit.jupiter - junit-jupiter-engine - 5.3.1 - - org.codehaus.mojo diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java index 9f1c83a1..4811995a 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java @@ -251,8 +251,8 @@ public Map getAlternativeBrokers(PriorityQueue result = new HashMap<>(); + List unusableBrokers = new ArrayList<>(); for (int oosBrokerId : oosReplica.outOfSyncBrokers) { - List unusableBrokers = new ArrayList<>(); // we will get the broker with the least network usage KafkaBroker leastUsedBroker = brokerQueue.poll(); while (leastUsedBroker != null && oosReplica.replicaBrokers.contains(leastUsedBroker.id())) { @@ -270,14 +270,16 @@ public Map getAlternativeBrokers(PriorityQueue brokerQueue.add(broker)); - brokerQueue.add(leastUsedBroker); } + // push the brokers back to brokerQueue to keep invariant true + brokerQueue.addAll(unusableBrokers); return success ? result : null; } diff --git a/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java new file mode 100644 index 00000000..5f7e0a57 --- /dev/null +++ b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java @@ -0,0 +1,127 @@ +package com.pinterest.doctorkafka; + +import static org.junit.jupiter.api.Assertions.*; + +import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig; +import com.pinterest.doctorkafka.config.DoctorKafkaConfig; +import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; +import com.pinterest.doctorkafka.util.OutOfSyncReplica; + +import com.codahale.metrics.Histogram; +import com.codahale.metrics.UniformReservoir; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +class KafkaClusterTest { + private static final String TOPIC = "test_topic"; + private static final String ZOOKEEPER_URL = "zk001/cluster1"; + + /** + * This test assures that getAlternativeBrokers does not return a many-to-one reassignment + * We want to make sure that multiple out-of-sync replicas of the same topic partition will not + * map to the same replacement broker, or duplicate reassigments will happen leading to invalid + * reassignment plans. + */ + @Test + void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{ + DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties"); + DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1"); + + // create histogram maps to mock network stats per topic partition + ConcurrentMap> oldBytesInStats = ReplicaStatsManager.bytesInStats; + ConcurrentMap> oldBytesOutStats = ReplicaStatsManager.bytesOutStats; + + TopicPartition topicPartition = new TopicPartition(TOPIC, 0); + + ConcurrentMap> testBytesInStats = new ConcurrentHashMap<>(); + ConcurrentMap> testBytesOutStats = new ConcurrentHashMap<>(); + ConcurrentMap testBytesInHistograms = new ConcurrentHashMap<>(); + ConcurrentMap testBytesOutHistograms = new ConcurrentHashMap<>(); + Histogram inHist = new Histogram(new UniformReservoir()); + Histogram outHist = new Histogram(new UniformReservoir()); + + inHist.update(0); + outHist.update(0); + + testBytesInHistograms.put(topicPartition, inHist); + testBytesOutHistograms.put(topicPartition, outHist); + + testBytesInStats.put(ZOOKEEPER_URL, testBytesInHistograms); + testBytesOutStats.put(ZOOKEEPER_URL, testBytesOutHistograms); + + ReplicaStatsManager.bytesInStats = testBytesInStats; + ReplicaStatsManager.bytesOutStats = testBytesOutStats; + + KafkaCluster kafkaCluster = new KafkaCluster(ZOOKEEPER_URL, doctorKafkaClusterConfig); + + Node[] nodes = new Node[]{ + new Node(0, "test00", 9092), + new Node(1, "test01", 9092), + new Node(2, "test02", 9092), + new Node(3, "test03", 9092), + new Node(4, "test04", 9092) + }; + + Node leader = nodes[0]; + Node[] replicas = new Node[]{ + nodes[0], + nodes[1], + nodes[2] + }; + + Node[] isrs = new Node[]{ + nodes[0] + }; + PartitionInfo partitionInfo = new PartitionInfo(TOPIC, 0, leader, replicas, isrs); + OutOfSyncReplica oosReplica = new OutOfSyncReplica(partitionInfo); + oosReplica.replicaBrokers = Arrays.asList(new Integer[]{0,1,2}); + + + KafkaBroker[] brokers = new KafkaBroker[]{ + new KafkaBroker(doctorKafkaClusterConfig, 0), + new KafkaBroker(doctorKafkaClusterConfig, 1), + new KafkaBroker(doctorKafkaClusterConfig, 2), + new KafkaBroker(doctorKafkaClusterConfig, 3), + new KafkaBroker(doctorKafkaClusterConfig, 4) + }; + + // setting the reservedBytesIn for priority queue comparisons + brokers[4].reserveInBoundBandwidth(topicPartition, 10); + brokers[4].reserveOutBoundBandwidth(topicPartition, 10); + + PriorityQueue brokerQueue = new PriorityQueue<>(); + + for ( KafkaBroker broker : brokers){ + brokerQueue.add(broker); + } + + int beforeSize = brokerQueue.size(); + + /* + * getMaxBytesIn() and getMaxBytesOut() will return 0 for each broker, + * so the ordering of the brokers will not change after reassignment + */ + Map altBrokers = kafkaCluster.getAlternativeBrokers(brokerQueue, oosReplica); + + // There should be a valid reassignment for this scenario + assertNotNull(altBrokers); + // Broker 0 should not be reassigned since it is still in sync + assertNull(altBrokers.get(0)); + // The reassignment of broker 1 and 2 should be different brokers + assertNotEquals(altBrokers.get(1), altBrokers.get(2)); + // The broker queue should contain the same amount of brokers + assertEquals(beforeSize, brokerQueue.size()); + + ReplicaStatsManager.bytesInStats = oldBytesInStats; + ReplicaStatsManager.bytesOutStats = oldBytesOutStats; + } + +} \ No newline at end of file