Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
Add simple logic locality-awareness reassignments.
Browse files Browse the repository at this point in the history
Unit test for testing SINGLE oosReplica, reassignments will still proceed partitally even if single oosReplica locality aware reassignment fails.
  • Loading branch information
kabochya committed Apr 15, 2019
1 parent 31b870f commit 2aa0ff4
Show file tree
Hide file tree
Showing 6 changed files with 591 additions and 65 deletions.
44 changes: 44 additions & 0 deletions drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;

import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -112,7 +113,35 @@ public long lastStatsTimestamp() {
return latestStats == null ? 0 : latestStats.getTimestamp();
}

public boolean reserveBandwidth(TopicPartition tp, double inBound, double outBound){
if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound &&
bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) {
reservedBytesIn += inBound;
reservedBytesOut += outBound;
toBeAddedReplicas.add(tp);
return true;
}
return false;
}

/**
* cancels reservation of bandwidth for a TopicPartition
* @param tp TopicPartition to cancel reservation
* @param inBound amount of in-bound traffic to remove from broker
* @param outBound amount of out-bound traffic to remove from broker
* @return if reserved bandwidth has been successfully removed from broker
*/
public boolean removeReservedBandwidth(TopicPartition tp, double inBound, double outBound){
if ( toBeAddedReplicas.contains(tp) ){
reservedBytesIn -= inBound;
reservedBytesOut -= outBound;
toBeAddedReplicas.remove(tp);
return true;
}
return false;
}

@Deprecated
public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) {
if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound) {
reservedBytesIn += inBound;
Expand All @@ -135,6 +164,7 @@ public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) {
* @param outBound the outbound bandwidth requirements in bytes/second
* @return whether the reservation is successful or not.
*/
@Deprecated
public boolean reserveOutBoundBandwidth(TopicPartition tp, double outBound) {
if (bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) {
reservedBytesOut += outBound;
Expand Down Expand Up @@ -217,6 +247,20 @@ public BrokerStats getLatestStats() {
return latestStats;
}

@VisibleForTesting
protected void setLatestStats(BrokerStats brokerStats){
this.latestStats = brokerStats;
}

public String getRackId(){
return rackId;
}

@VisibleForTesting
protected void setRackId(String rackId){
this.rackId = rackId;
}


@Override
public String toString() {
Expand Down
207 changes: 178 additions & 29 deletions drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.google.gson.JsonArray;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -40,6 +42,7 @@ public class KafkaCluster {

private static final Logger LOG = LogManager.getLogger(KafkaCluster.class);
private static final int MAX_NUM_STATS = 5;
private static final int INVALID_BROKERSTATS_TIME = 240000;

private DoctorKafkaClusterConfig clusterConfig;
public String zkUrl;
Expand Down Expand Up @@ -217,17 +220,47 @@ public PriorityQueue<KafkaBroker> getBrokerQueue() {
new PriorityQueue<>(new KafkaBroker.KafkaBrokerComparator());
for (Map.Entry<Integer, KafkaBroker> entry : brokers.entrySet()) {
KafkaBroker broker = entry.getValue();
BrokerStats latestStats = broker.getLatestStats();
if (latestStats == null
|| latestStats.getHasFailure()
|| System.currentTimeMillis() - latestStats.getTimestamp() > 240000) {
if (isInvalidBroker(broker)) {
continue;
}
brokerQueue.add(broker);
}
return brokerQueue;
}

/**
*
* @return a priority queue of brokers for each locality in the cluster ordered by network stats
*/
public Map<String, PriorityQueue<KafkaBroker>> getBrokerQueueByLocality(){
Map<String, PriorityQueue<KafkaBroker>> brokerLocalityMap = new HashMap<>();
Comparator<KafkaBroker> comparator = new KafkaBroker.KafkaBrokerComparator();
for ( Map.Entry<Integer, KafkaBroker> entry : brokers.entrySet() ){
KafkaBroker broker = entry.getValue();
if (isInvalidBroker(broker)){
continue;
}
// add broker to locality queue
// init queue if queue not present in brokerMap for a locality
brokerLocalityMap
.computeIfAbsent(broker.getRackId(), i -> new PriorityQueue<>(comparator))
.add(broker);
}
return brokerLocalityMap;
}

/**
* checks if the broker is invalid for assigning replicas
* @param broker the broker that we want to check
* @return true if the broker is invalid for assigning replicas, false if it is valid
*/
protected boolean isInvalidBroker(KafkaBroker broker) {
BrokerStats latestStats = broker.getLatestStats();
return latestStats== null ||
latestStats.getHasFailure() ||
System.currentTimeMillis() - latestStats.getTimestamp() > INVALID_BROKERSTATS_TIME;
}


/**
* Get the broker Id that has the resource. Here we need to apply the proper placement policy.
Expand All @@ -245,38 +278,155 @@ public Map<Integer, KafkaBroker> getAlternativeBrokers(PriorityQueue<KafkaBroker

boolean success = true;
Map<Integer, KafkaBroker> result = new HashMap<>();
List<KafkaBroker> unusableBrokers = new ArrayList<>();
Set<KafkaBroker> unusableBrokers = new HashSet<>();

for (int oosBrokerId : oosReplica.outOfSyncBrokers) {
// we will get the broker with the least network usage
KafkaBroker leastUsedBroker = brokerQueue.poll();
while (leastUsedBroker != null && oosReplica.replicaBrokers.contains(leastUsedBroker.id())) {
unusableBrokers.add(leastUsedBroker);
leastUsedBroker = brokerQueue.poll();
}
if (leastUsedBroker == null) {
LOG.error("Failed to find a usable broker for fixing {}:{}", oosReplica, oosBrokerId);
success = false;
} else {
LOG.info("LeastUsedBroker for replacing {} : {}", oosBrokerId, leastUsedBroker.id());
success &= leastUsedBroker.reserveInBoundBandwidth(topicPartition, inBoundReq);
if (preferredBroker == oosBrokerId) {
success &= leastUsedBroker.reserveOutBoundBandwidth(topicPartition, outBoundReq);
success &= findNextBrokerForOosReplica(
brokerQueue,
unusableBrokers,
oosReplica.replicaBrokers,
result,
oosBrokerId,
topicPartition,
inBoundReq,
outBoundReq,
preferredBroker
);
}
// push the brokers back to brokerQueue to keep invariant true
brokerQueue.addAll(unusableBrokers);
return success ? result : null;
}

/**
* Similar to getAlternativeBrokers, but locality aware
* @param brokerQueueByLocality a map keeping a priority queue of brokers for each locality
* @param oosReplica out of sync replicas
* @return a BrokerId to KafkaBroker mapping
*/
public Map<Integer, KafkaBroker> getAlternativeBrokersByLocality(
Map<String, PriorityQueue<KafkaBroker>> brokerQueueByLocality,
OutOfSyncReplica oosReplica,
Map<OutOfSyncReplica, Map<Integer, String>> localityAwareReassignmentFailures
) {

Map<String, List<Integer>> oosBrokerIdsByLocality = new HashMap<>();
for ( int oosBrokerId : oosReplica.outOfSyncBrokers) {
String brokerLocality = brokers.get(oosBrokerId).getRackId();
oosBrokerIdsByLocality
.computeIfAbsent(brokerLocality, l -> new ArrayList<>())
.add(oosBrokerId);
}

TopicPartition topicPartition = oosReplica.topicPartition;
double inBoundReq = ReplicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
double outBoundReq = ReplicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
int preferredBroker = oosReplica.replicaBrokers.get(0);

boolean success = true;
Map<Integer, KafkaBroker> result = new HashMap<>();
Map<String, Set<KafkaBroker>> unusableBrokersByLocality = new HashMap<>();
Map<Integer, String> failureAssignments = new HashMap<>();

// Affinity
for ( Map.Entry<String, List<Integer>> oosBrokerIdsOfLocality : oosBrokerIdsByLocality.entrySet()) {
String oosLocality = oosBrokerIdsOfLocality.getKey();
List<Integer> oosBrokerIds = oosBrokerIdsOfLocality.getValue();
PriorityQueue<KafkaBroker> localityBrokerQueue = brokerQueueByLocality.get(oosLocality);
Set<KafkaBroker> unusableBrokers =
unusableBrokersByLocality.computeIfAbsent(oosLocality, l -> new HashSet<>());
for( Integer oosBrokerId : oosBrokerIds){
boolean broker_success = findNextBrokerForOosReplica(
localityBrokerQueue,
unusableBrokers,
oosReplica.replicaBrokers,
result,
oosBrokerId,
topicPartition,
inBoundReq,
outBoundReq,
preferredBroker
);
if ( !broker_success ){
failureAssignments.put(oosBrokerId, oosLocality);
}
if (success) {
result.put(oosBrokerId, leastUsedBroker);
// the broker should not be used again for this topic partition.
unusableBrokers.add(leastUsedBroker);
} else {
LOG.error("Failed to allocate resource to replace {}:{}", oosReplica, oosBrokerId);
success = false;
success &= broker_success;
}
}

// release reserved bandwidth if failed
if ( !success ){
// keep track of all oosReplicas that failed and alert
localityAwareReassignmentFailures.put(oosReplica, failureAssignments);
for ( Map.Entry<Integer, KafkaBroker> entry : result.entrySet()){
Integer oosBrokerId = entry.getKey();
KafkaBroker broker = entry.getValue();
boolean removed = oosBrokerId == preferredBroker ?
broker.removeReservedBandwidth(topicPartition, inBoundReq, outBoundReq) :
broker.removeReservedBandwidth(topicPartition, inBoundReq, 0);
if (!removed){
LOG.error("Can not remove non-existing TopicPartition ", topicPartition, " from broker ", broker.id());
}
}
}
// push the brokers back to brokerQueue to keep invariant true
brokerQueue.addAll(unusableBrokers);

// maintain invariant
for(Map.Entry<String, Set<KafkaBroker>> entry : unusableBrokersByLocality.entrySet()){
brokerQueueByLocality.get(entry.getKey()).addAll(entry.getValue());
}

return success ? result : null;
}

/**
* Finds the next broker in the broker queue for migrating a replica
* @param brokerQueue a queue of brokers ordered by utilization
* @param unusableBrokers the brokers that should not be used for reassignment
* @param replicaBrokers the ids of the brokers that are already used for this replica
* @param reassignmentMap the replica -> target broker mapping for the next reassignment
* @param oosBrokerId the broker id of the current OutOfSync replica
* @param tp the TopicPartition of the current replica
* @param inBoundReq inbound traffic that needs to be reserved
* @param outBoundReq outbound traffic that needs to be reserved
* @param preferredBroker the preferred leader of the current TopicPartition
* @return true if we successfully assigned a target broker for migration of this replica false otherwise
*/
protected boolean findNextBrokerForOosReplica(
PriorityQueue<KafkaBroker> brokerQueue,
Collection<KafkaBroker> unusableBrokers,
Collection<Integer> replicaBrokers,
Map<Integer, KafkaBroker> reassignmentMap,
Integer oosBrokerId,
TopicPartition tp,
Double inBoundReq,
Double outBoundReq,
Integer preferredBroker
){
boolean success;
KafkaBroker leastUsedBroker = brokerQueue.poll();
while (leastUsedBroker != null && replicaBrokers.contains(leastUsedBroker.id())) {
unusableBrokers.add(leastUsedBroker);
leastUsedBroker = brokerQueue.poll();
}
if (leastUsedBroker == null) {
LOG.error("Failed to find a usable broker for fixing {}:{}", tp, oosBrokerId);
success = false;
} else {
LOG.info("LeastUsedBroker for replacing {} : {}", oosBrokerId, leastUsedBroker.id());
success = preferredBroker == oosBrokerId ?
leastUsedBroker.reserveBandwidth(tp, inBoundReq, outBoundReq) :
leastUsedBroker.reserveBandwidth(tp, inBoundReq, 0);
if (success) {
reassignmentMap.put(oosBrokerId, leastUsedBroker);
// the broker should not be used again for this topic partition.
unusableBrokers.add(leastUsedBroker);
} else {
LOG.error("Failed to allocate resource to replace {}:{}", tp, oosBrokerId);
}
}
return success;
}

public KafkaBroker getAlternativeBroker(TopicPartition topicPartition,
double tpBytesIn, double tpBytesOut) {
Expand All @@ -292,8 +442,7 @@ public KafkaBroker getAlternativeBroker(TopicPartition topicPartition,
// we will get the broker with the least network usage
KafkaBroker leastUsedBroker = brokerQueue.poll();
LOG.info("LeastUsedBroker for replacing {} : {}", topicPartition, leastUsedBroker.id());
boolean success = leastUsedBroker.reserveInBoundBandwidth(topicPartition, tpBytesIn);
success &= leastUsedBroker.reserveOutBoundBandwidth(topicPartition, tpBytesOut);
boolean success = leastUsedBroker.reserveBandwidth(topicPartition, tpBytesIn, tpBytesOut);

if (!success) {
LOG.error("Failed to allocate resource to replace {}", topicPartition);
Expand Down
Loading

0 comments on commit 2aa0ff4

Please sign in to comment.