Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Add simple logic locality-awareness reassignments #116

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 67 additions & 5 deletions drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;

import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -28,6 +29,7 @@ public class KafkaBroker implements Comparable<KafkaBroker> {
private int brokerPort = 9092;
private String rackId;
private BrokerStats latestStats;

private Set<TopicPartition> leaderReplicas;
private Set<TopicPartition> followerReplicas;

Expand Down Expand Up @@ -115,7 +117,35 @@ public long lastStatsTimestamp() {
return latestStats == null ? 0 : latestStats.getTimestamp();
}

public boolean reserveBandwidth(TopicPartition tp, double inBound, double outBound){
if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound &&
bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) {
reservedBytesIn += inBound;
reservedBytesOut += outBound;
toBeAddedReplicas.add(tp);
return true;
}
return false;
}

/**
* cancels reservation of bandwidth for a TopicPartition
* @param tp TopicPartition to cancel reservation
* @param inBound amount of in-bound traffic to remove from broker
* @param outBound amount of out-bound traffic to remove from broker
* @return if reserved bandwidth has been successfully removed from broker
*/
public boolean removeReservedBandwidth(TopicPartition tp, double inBound, double outBound){
if ( toBeAddedReplicas.contains(tp) ){
reservedBytesIn -= inBound;
reservedBytesOut -= outBound;
toBeAddedReplicas.remove(tp);
return true;
}
return false;
}

@Deprecated
public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) {
if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound) {
reservedBytesIn += inBound;
Expand All @@ -138,6 +168,7 @@ public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) {
* @param outBound the outbound bandwidth requirements in bytes/second
* @return whether the reservation is successful or not.
*/
@Deprecated
public boolean reserveOutBoundBandwidth(TopicPartition tp, double outBound) {
if (bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) {
reservedBytesOut += outBound;
Expand Down Expand Up @@ -178,6 +209,14 @@ public void clearResourceAllocationCounters() {
this.toBeAddedReplicas.clear();
}

protected void setLeaderReplicas(Set<TopicPartition> leaderReplicas) {
this.leaderReplicas = leaderReplicas;
}

protected void setFollowerReplicas(Set<TopicPartition> followerReplicas) {
this.followerReplicas= followerReplicas;
}

/**
* Record the stats, and update the topic partition list based on the stats
*
Expand All @@ -196,12 +235,21 @@ public synchronized void update(BrokerStats stats) {
rackId = stats.getRackId() != null ? stats.getRackId() : stats.getAvailabilityZone();
}

// TODO: handle null poniter expceiton properly
leaderReplicas = stats.getLeaderReplicas().stream().map(tps ->
new TopicPartition(tps.getTopic(), tps.getPartition())).collect(Collectors.toSet());
if (stats.getLeaderReplicas() != null) {
setLeaderReplicas(stats.getLeaderReplicas()
.stream()
.map(tps -> new TopicPartition(tps.getTopic(), tps.getPartition()))
.collect(Collectors.toSet())
);
}

followerReplicas = stats.getFollowerReplicas().stream().map(tps ->
new TopicPartition(tps.getTopic(), tps.getPartition())).collect(Collectors.toSet());
if (stats.getFollowerReplicas() != null ) {
setFollowerReplicas(stats.getFollowerReplicas()
.stream()
.map(tps -> new TopicPartition(tps.getTopic(), tps.getPartition()))
.collect(Collectors.toSet())
);
}
}

/**
Expand All @@ -220,6 +268,20 @@ public BrokerStats getLatestStats() {
return latestStats;
}

@VisibleForTesting
protected void setLatestStats(BrokerStats brokerStats){
this.latestStats = brokerStats;
}

public String getRackId(){
return rackId;
}

@VisibleForTesting
protected void setRackId(String rackId){
this.rackId = rackId;
}


@Override
public String toString() {
Expand Down
216 changes: 181 additions & 35 deletions drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.google.gson.JsonArray;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -40,6 +42,7 @@ public class KafkaCluster {

private static final Logger LOG = LogManager.getLogger(KafkaCluster.class);
private static final int MAX_NUM_STATS = 5;
private static final int INVALID_BROKERSTATS_TIME = 240000;

private DoctorKafkaClusterConfig clusterConfig;
public String zkUrl;
Expand Down Expand Up @@ -219,66 +222,210 @@ public PriorityQueue<KafkaBroker> getBrokerQueue() {
new PriorityQueue<>(new KafkaBroker.KafkaBrokerComparator());
for (Map.Entry<Integer, KafkaBroker> entry : brokers.entrySet()) {
KafkaBroker broker = entry.getValue();
BrokerStats latestStats = broker.getLatestStats();
if (latestStats == null
|| latestStats.getHasFailure()
|| System.currentTimeMillis() - latestStats.getTimestamp() > 240000) {
if (isInvalidBroker(broker)) {
continue;
}
brokerQueue.add(broker);
}
return brokerQueue;
}

/**
*
* @return a priority queue of brokers for each locality in the cluster ordered by network stats
*/
public Map<String, PriorityQueue<KafkaBroker>> getBrokerQueueByLocality(){
Map<String, PriorityQueue<KafkaBroker>> brokerLocalityMap = new HashMap<>();
Comparator<KafkaBroker> comparator = new KafkaBroker.KafkaBrokerComparator();
for ( Map.Entry<Integer, KafkaBroker> entry : brokers.entrySet() ){
KafkaBroker broker = entry.getValue();
if (isInvalidBroker(broker)){
continue;
}
// add broker to locality queue
// init queue if queue not present in brokerMap for a locality
brokerLocalityMap
.computeIfAbsent(broker.getRackId(), i -> new PriorityQueue<>(comparator))
.add(broker);
}
return brokerLocalityMap;
}

/**
* checks if the broker is invalid for assigning replicas
* @param broker the broker that we want to check
* @return true if the broker is invalid for assigning replicas, false if it is valid
*/
protected boolean isInvalidBroker(KafkaBroker broker) {
BrokerStats latestStats = broker.getLatestStats();
return latestStats== null ||
latestStats.getHasFailure() ||
System.currentTimeMillis() - latestStats.getTimestamp() > INVALID_BROKERSTATS_TIME;
}


/**
* Get the broker Id that has the resource. Here we need to apply the proper placement policy.
*
* @param brokerQueue the list of brokers that are sorted in resource usage
* @param oosReplica out of sync replicas
* @param inBoundReq inbound traffic
* @param outBoundReq outbound traffc
* @param preferredBroker preferred broker id
* @return a BrokerId to KafkaBroker mapping
*/
public Map<Integer, KafkaBroker> getAlternativeBrokers(PriorityQueue<KafkaBroker> brokerQueue,
OutOfSyncReplica oosReplica) {
TopicPartition topicPartition = oosReplica.topicPartition;
double inBoundReq = replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
double outBoundReq = replicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
int preferredBroker = oosReplica.replicaBrokers.get(0);
public Map<Integer, KafkaBroker> getAlternativeBrokers(
PriorityQueue<KafkaBroker> brokerQueue,
OutOfSyncReplica oosReplica,
double inBoundReq,
double outBoundReq,
int preferredBroker
) {

boolean success = true;
Map<Integer, KafkaBroker> result = new HashMap<>();
List<KafkaBroker> unusableBrokers = new ArrayList<>();
Set<KafkaBroker> unusableBrokers = new HashSet<>();

for (int oosBrokerId : oosReplica.outOfSyncBrokers) {
// we will get the broker with the least network usage
KafkaBroker leastUsedBroker = brokerQueue.poll();
while (leastUsedBroker != null && oosReplica.replicaBrokers.contains(leastUsedBroker.id())) {
unusableBrokers.add(leastUsedBroker);
leastUsedBroker = brokerQueue.poll();
}
if (leastUsedBroker == null) {
LOG.error("Failed to find a usable broker for fixing {}:{}", oosReplica, oosBrokerId);
success = false;
} else {
LOG.info("LeastUsedBroker for replacing {} : {}", oosBrokerId, leastUsedBroker.id());
success &= leastUsedBroker.reserveInBoundBandwidth(topicPartition, inBoundReq);
if (preferredBroker == oosBrokerId) {
success &= leastUsedBroker.reserveOutBoundBandwidth(topicPartition, outBoundReq);
}
if (success) {
result.put(oosBrokerId, leastUsedBroker);
// the broker should not be used again for this topic partition.
unusableBrokers.add(leastUsedBroker);
} else {
LOG.error("Failed to allocate resource to replace {}:{}", oosReplica, oosBrokerId);
success = false;
}
success = findNextBrokerForOosReplica(
brokerQueue,
unusableBrokers,
oosReplica.replicaBrokers,
result,
oosBrokerId,
oosReplica.topicPartition,
inBoundReq,
outBoundReq,
preferredBroker
);

// short circuit if failed to find available broker
if (!success) {
break;
}
}
// push the brokers back to brokerQueue to keep invariant true
brokerQueue.addAll(unusableBrokers);
return success ? result : null;
}

/**
* Similar to getAlternativeBrokers, but locality aware
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a general question: when broker.rack is not set on the kafka cluster side, kafkastats will have null for rackId. In that case, if the user set locality_awareness.enabled to true in the setting, what will be the expected behavior? can we add explanation about this in the comment?

* @param brokerQueueByLocality a map keeping a priority queue of brokers for each locality
* @param oosReplica out of sync replicas
* @param inBoundReq inbound traffic
* @param outBoundReq outbound traffc
* @param preferredBroker preferred broker id
* @return a BrokerId to KafkaBroker mapping
*/
public Map<Integer, KafkaBroker> getAlternativeBrokersByLocality(
Map<String, PriorityQueue<KafkaBroker>> brokerQueueByLocality,
OutOfSyncReplica oosReplica,
double inBoundReq,
double outBoundReq,
int preferredBroker
) {

Map<String, List<Integer>> oosBrokerIdsByLocality = new HashMap<>();
for ( int oosBrokerId : oosReplica.outOfSyncBrokers) {
String brokerLocality = brokers.get(oosBrokerId).getRackId();
oosBrokerIdsByLocality
.computeIfAbsent(brokerLocality, l -> new ArrayList<>())
.add(oosBrokerId);
}

Map<Integer, KafkaBroker> result = new HashMap<>();
Map<String, Set<KafkaBroker>> unusableBrokersByLocality = new HashMap<>();

boolean success = true;
// Affinity
for ( Map.Entry<String, List<Integer>> oosBrokerIdsOfLocality : oosBrokerIdsByLocality.entrySet()) {
String oosLocality = oosBrokerIdsOfLocality.getKey();
List<Integer> oosBrokerIds = oosBrokerIdsOfLocality.getValue();
PriorityQueue<KafkaBroker> localityBrokerQueue = brokerQueueByLocality.get(oosLocality);
Set<KafkaBroker> unusableBrokers =
unusableBrokersByLocality.computeIfAbsent(oosLocality, l -> new HashSet<>());
for( Integer oosBrokerId : oosBrokerIds){
success = findNextBrokerForOosReplica(
localityBrokerQueue,
unusableBrokers,
oosReplica.replicaBrokers,
result,
oosBrokerId,
oosReplica.topicPartition,
inBoundReq,
outBoundReq,
preferredBroker
);

// short circuit if failed to find available broker
if (!success) {
break;
}
}
if (!success) {
break;
}
}

// maintain invariant
for(Map.Entry<String, Set<KafkaBroker>> entry : unusableBrokersByLocality.entrySet()){
brokerQueueByLocality.get(entry.getKey()).addAll(entry.getValue());
}

return success ? result : null;
}

/**
* Finds the next broker in the broker queue for migrating a replica
* @param brokerQueue a queue of brokers ordered by utilization
* @param unusableBrokers the brokers that should not be used for reassignment
* @param replicaBrokers the ids of the brokers that are already used for this replica
* @param reassignmentMap the replica -> target broker mapping for the next reassignment
* @param oosBrokerId the broker id of the current OutOfSync replica
* @param tp the TopicPartition of the current replica
* @param inBoundReq inbound traffic that needs to be reserved
* @param outBoundReq outbound traffic that needs to be reserved
* @param preferredBroker the preferred leader of the current TopicPartition
* @return true if we successfully assigned a target broker for migration of this replica false otherwise
*/
protected boolean findNextBrokerForOosReplica(
PriorityQueue<KafkaBroker> brokerQueue,
Collection<KafkaBroker> unusableBrokers,
Collection<Integer> replicaBrokers,
Map<Integer, KafkaBroker> reassignmentMap,
Integer oosBrokerId,
TopicPartition tp,
Double inBoundReq,
Double outBoundReq,
Integer preferredBroker
){
boolean success;
KafkaBroker leastUsedBroker = brokerQueue.poll();
while (leastUsedBroker != null && replicaBrokers.contains(leastUsedBroker.id())) {
unusableBrokers.add(leastUsedBroker);
leastUsedBroker = brokerQueue.poll();
}
if (leastUsedBroker == null) {
LOG.error("Failed to find a usable broker for fixing {}:{}", tp, oosBrokerId);
success = false;
} else {
LOG.info("LeastUsedBroker for replacing {} : {}", oosBrokerId, leastUsedBroker.id());
success = preferredBroker == oosBrokerId ?
leastUsedBroker.reserveBandwidth(tp, inBoundReq, outBoundReq) :
leastUsedBroker.reserveBandwidth(tp, inBoundReq, 0);
if (success) {
reassignmentMap.put(oosBrokerId, leastUsedBroker);
// the broker should not be used again for this topic partition.
unusableBrokers.add(leastUsedBroker);
} else {
LOG.error("Failed to allocate resource to replace {}:{}", tp, oosBrokerId);
}
}
return success;
}

public KafkaBroker getAlternativeBroker(TopicPartition topicPartition,
double tpBytesIn, double tpBytesOut) {
PriorityQueue<KafkaBroker> brokerQueue =
Expand All @@ -293,8 +440,7 @@ public KafkaBroker getAlternativeBroker(TopicPartition topicPartition,
// we will get the broker with the least network usage
KafkaBroker leastUsedBroker = brokerQueue.poll();
LOG.info("LeastUsedBroker for replacing {} : {}", topicPartition, leastUsedBroker.id());
boolean success = leastUsedBroker.reserveInBoundBandwidth(topicPartition, tpBytesIn);
success &= leastUsedBroker.reserveOutBoundBandwidth(topicPartition, tpBytesOut);
boolean success = leastUsedBroker.reserveBandwidth(topicPartition, tpBytesIn, tpBytesOut);

if (!success) {
LOG.error("Failed to allocate resource to replace {}", topicPartition);
Expand Down
Loading