Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
Fix #18 invalid reassignment due to duplicated broker ids (#106)
Browse files Browse the repository at this point in the history
Fix the invalid assignment issue that is reported in #18, and add a unit test for validation
  • Loading branch information
kabochya authored and yuyang08 committed Mar 16, 2019
1 parent e4953ea commit 88b29a8
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 23 deletions.
23 changes: 3 additions & 20 deletions drkafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,11 @@
<artifactId>commons-validator</artifactId>
<version>1.6</version>
</dependency>
<!--Use aggregated junit-jupiter to fix previous mismatch of multiple junit dependencies-->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.0.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.3.1</version>
<artifactId>junit-jupiter</artifactId>
<version>5.4.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -298,18 +293,6 @@
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version> <!-- Cannot use 2.20 - See http://junit.org/junit5/docs/current/user-guide/#running-tests-build-maven -->
<dependencies>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-surefire-provider</artifactId>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.3.1</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ public Map<Integer, KafkaBroker> getAlternativeBrokers(PriorityQueue<KafkaBroker

boolean success = true;
Map<Integer, KafkaBroker> result = new HashMap<>();
List<KafkaBroker> unusableBrokers = new ArrayList<>();
for (int oosBrokerId : oosReplica.outOfSyncBrokers) {
List<KafkaBroker> unusableBrokers = new ArrayList<>();
// we will get the broker with the least network usage
KafkaBroker leastUsedBroker = brokerQueue.poll();
while (leastUsedBroker != null && oosReplica.replicaBrokers.contains(leastUsedBroker.id())) {
Expand All @@ -270,14 +270,16 @@ public Map<Integer, KafkaBroker> getAlternativeBrokers(PriorityQueue<KafkaBroker
}
if (success) {
result.put(oosBrokerId, leastUsedBroker);
// the broker should not be used again for this topic partition.
unusableBrokers.add(leastUsedBroker);
} else {
LOG.error("Failed to allocate resource to replace {}:{}", oosReplica, oosBrokerId);
success = false;
}
}
unusableBrokers.stream().forEach(broker -> brokerQueue.add(broker));
brokerQueue.add(leastUsedBroker);
}
// push the brokers back to brokerQueue to keep invariant true
brokerQueue.addAll(unusableBrokers);
return success ? result : null;
}

Expand Down
127 changes: 127 additions & 0 deletions drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package com.pinterest.doctorkafka;

import static org.junit.jupiter.api.Assertions.*;

import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig;
import com.pinterest.doctorkafka.config.DoctorKafkaConfig;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;
import com.pinterest.doctorkafka.util.OutOfSyncReplica;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.UniformReservoir;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

class KafkaClusterTest {
private static final String TOPIC = "test_topic";
private static final String ZOOKEEPER_URL = "zk001/cluster1";

/**
* This test assures that getAlternativeBrokers does not return a many-to-one reassignment
* We want to make sure that multiple out-of-sync replicas of the same topic partition will not
* map to the same replacement broker, or duplicate reassigments will happen leading to invalid
* reassignment plans.
*/
@Test
void getAlternativeBrokersDuplicateReassignmentTest() throws Exception{
DoctorKafkaConfig config = new DoctorKafkaConfig("./config/doctorkafka.properties");
DoctorKafkaClusterConfig doctorKafkaClusterConfig = config.getClusterConfigByName("cluster1");

// create histogram maps to mock network stats per topic partition
ConcurrentMap<String, ConcurrentMap<TopicPartition, Histogram>> oldBytesInStats = ReplicaStatsManager.bytesInStats;
ConcurrentMap<String, ConcurrentMap<TopicPartition, Histogram>> oldBytesOutStats = ReplicaStatsManager.bytesOutStats;

TopicPartition topicPartition = new TopicPartition(TOPIC, 0);

ConcurrentMap<String, ConcurrentMap<TopicPartition, Histogram>> testBytesInStats = new ConcurrentHashMap<>();
ConcurrentMap<String, ConcurrentMap<TopicPartition, Histogram>> testBytesOutStats = new ConcurrentHashMap<>();
ConcurrentMap<TopicPartition, Histogram> testBytesInHistograms = new ConcurrentHashMap<>();
ConcurrentMap<TopicPartition, Histogram> testBytesOutHistograms = new ConcurrentHashMap<>();
Histogram inHist = new Histogram(new UniformReservoir());
Histogram outHist = new Histogram(new UniformReservoir());

inHist.update(0);
outHist.update(0);

testBytesInHistograms.put(topicPartition, inHist);
testBytesOutHistograms.put(topicPartition, outHist);

testBytesInStats.put(ZOOKEEPER_URL, testBytesInHistograms);
testBytesOutStats.put(ZOOKEEPER_URL, testBytesOutHistograms);

ReplicaStatsManager.bytesInStats = testBytesInStats;
ReplicaStatsManager.bytesOutStats = testBytesOutStats;

KafkaCluster kafkaCluster = new KafkaCluster(ZOOKEEPER_URL, doctorKafkaClusterConfig);

Node[] nodes = new Node[]{
new Node(0, "test00", 9092),
new Node(1, "test01", 9092),
new Node(2, "test02", 9092),
new Node(3, "test03", 9092),
new Node(4, "test04", 9092)
};

Node leader = nodes[0];
Node[] replicas = new Node[]{
nodes[0],
nodes[1],
nodes[2]
};

Node[] isrs = new Node[]{
nodes[0]
};
PartitionInfo partitionInfo = new PartitionInfo(TOPIC, 0, leader, replicas, isrs);
OutOfSyncReplica oosReplica = new OutOfSyncReplica(partitionInfo);
oosReplica.replicaBrokers = Arrays.asList(new Integer[]{0,1,2});


KafkaBroker[] brokers = new KafkaBroker[]{
new KafkaBroker(doctorKafkaClusterConfig, 0),
new KafkaBroker(doctorKafkaClusterConfig, 1),
new KafkaBroker(doctorKafkaClusterConfig, 2),
new KafkaBroker(doctorKafkaClusterConfig, 3),
new KafkaBroker(doctorKafkaClusterConfig, 4)
};

// setting the reservedBytesIn for priority queue comparisons
brokers[4].reserveInBoundBandwidth(topicPartition, 10);
brokers[4].reserveOutBoundBandwidth(topicPartition, 10);

PriorityQueue<KafkaBroker> brokerQueue = new PriorityQueue<>();

for ( KafkaBroker broker : brokers){
brokerQueue.add(broker);
}

int beforeSize = brokerQueue.size();

/*
* getMaxBytesIn() and getMaxBytesOut() will return 0 for each broker,
* so the ordering of the brokers will not change after reassignment
*/
Map<Integer, KafkaBroker> altBrokers = kafkaCluster.getAlternativeBrokers(brokerQueue, oosReplica);

// There should be a valid reassignment for this scenario
assertNotNull(altBrokers);
// Broker 0 should not be reassigned since it is still in sync
assertNull(altBrokers.get(0));
// The reassignment of broker 1 and 2 should be different brokers
assertNotEquals(altBrokers.get(1), altBrokers.get(2));
// The broker queue should contain the same amount of brokers
assertEquals(beforeSize, brokerQueue.size());

ReplicaStatsManager.bytesInStats = oldBytesInStats;
ReplicaStatsManager.bytesOutStats = oldBytesOutStats;
}

}

0 comments on commit 88b29a8

Please sign in to comment.