Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
Add simple logic locality-awareness reassignments.
Browse files Browse the repository at this point in the history
Unit test for testing SINGLE oosReplica, reassignments will still proceed partitally even if single oosReplica locality aware reassignment fails.
  • Loading branch information
kabochya committed Apr 26, 2019
1 parent ee5c005 commit 941b142
Show file tree
Hide file tree
Showing 6 changed files with 744 additions and 69 deletions.
72 changes: 67 additions & 5 deletions drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;

import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -28,6 +29,7 @@ public class KafkaBroker implements Comparable<KafkaBroker> {
private int brokerPort = 9092;
private String rackId;
private BrokerStats latestStats;

private Set<TopicPartition> leaderReplicas;
private Set<TopicPartition> followerReplicas;

Expand Down Expand Up @@ -115,7 +117,35 @@ public long lastStatsTimestamp() {
return latestStats == null ? 0 : latestStats.getTimestamp();
}

public boolean reserveBandwidth(TopicPartition tp, double inBound, double outBound){
if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound &&
bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) {
reservedBytesIn += inBound;
reservedBytesOut += outBound;
toBeAddedReplicas.add(tp);
return true;
}
return false;
}

/**
* cancels reservation of bandwidth for a TopicPartition
* @param tp TopicPartition to cancel reservation
* @param inBound amount of in-bound traffic to remove from broker
* @param outBound amount of out-bound traffic to remove from broker
* @return if reserved bandwidth has been successfully removed from broker
*/
public boolean removeReservedBandwidth(TopicPartition tp, double inBound, double outBound){
if ( toBeAddedReplicas.contains(tp) ){
reservedBytesIn -= inBound;
reservedBytesOut -= outBound;
toBeAddedReplicas.remove(tp);
return true;
}
return false;
}

@Deprecated
public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) {
if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound) {
reservedBytesIn += inBound;
Expand All @@ -138,6 +168,7 @@ public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) {
* @param outBound the outbound bandwidth requirements in bytes/second
* @return whether the reservation is successful or not.
*/
@Deprecated
public boolean reserveOutBoundBandwidth(TopicPartition tp, double outBound) {
if (bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) {
reservedBytesOut += outBound;
Expand Down Expand Up @@ -178,6 +209,14 @@ public void clearResourceAllocationCounters() {
this.toBeAddedReplicas.clear();
}

protected void setLeaderReplicas(Set<TopicPartition> leaderReplicas) {
this.leaderReplicas = leaderReplicas;
}

protected void setFollowerReplicas(Set<TopicPartition> followerReplicas) {
this.followerReplicas= followerReplicas;
}

/**
* Record the stats, and update the topic partition list based on the stats
*
Expand All @@ -196,12 +235,21 @@ public synchronized void update(BrokerStats stats) {
rackId = stats.getRackId() != null ? stats.getRackId() : stats.getAvailabilityZone();
}

// TODO: handle null poniter expceiton properly
leaderReplicas = stats.getLeaderReplicas().stream().map(tps ->
new TopicPartition(tps.getTopic(), tps.getPartition())).collect(Collectors.toSet());
if (stats.getLeaderReplicas() != null) {
setLeaderReplicas(stats.getLeaderReplicas()
.stream()
.map(tps -> new TopicPartition(tps.getTopic(), tps.getPartition()))
.collect(Collectors.toSet())
);
}

followerReplicas = stats.getFollowerReplicas().stream().map(tps ->
new TopicPartition(tps.getTopic(), tps.getPartition())).collect(Collectors.toSet());
if (stats.getFollowerReplicas() != null ) {
setFollowerReplicas(stats.getFollowerReplicas()
.stream()
.map(tps -> new TopicPartition(tps.getTopic(), tps.getPartition()))
.collect(Collectors.toSet())
);
}
}

/**
Expand All @@ -220,6 +268,20 @@ public BrokerStats getLatestStats() {
return latestStats;
}

@VisibleForTesting
protected void setLatestStats(BrokerStats brokerStats){
this.latestStats = brokerStats;
}

public String getRackId(){
return rackId;
}

@VisibleForTesting
protected void setRackId(String rackId){
this.rackId = rackId;
}


@Override
public String toString() {
Expand Down
Loading

0 comments on commit 941b142

Please sign in to comment.