Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
Move replicaStats logic into corresponding KafkaCluster instances
Browse files Browse the repository at this point in the history
  • Loading branch information
kabochya committed May 3, 2019
1 parent e1e7139 commit 050215c
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 286 deletions.
13 changes: 6 additions & 7 deletions drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.pinterest.doctorkafka;

import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;

import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -41,9 +40,9 @@ public class KafkaBroker implements Comparable<KafkaBroker> {
private long reservedBytesOut;
private Set<TopicPartition> toBeAddedReplicas;

private ReplicaStatsManager replicaStatsManager;
private KafkaCluster kafkaCluster;

public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager replicaStatsManager, int brokerId) {
public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, KafkaCluster kafkaCluster, int brokerId) {
assert clusterConfig != null;
this.zkUrl = clusterConfig.getZkUrl();
this.brokerId = brokerId;
Expand All @@ -57,7 +56,7 @@ public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager r
this.reservedBytesOut = 0L;
this.bytesInPerSecLimit = clusterConfig.getNetworkInLimitInBytes();
this.bytesOutPerSecLimit = clusterConfig.getNetworkOutLimitInBytes();
this.replicaStatsManager = replicaStatsManager;
this.kafkaCluster = kafkaCluster;
}

public JsonElement toJson() {
Expand All @@ -76,10 +75,10 @@ public JsonElement toJson() {
public long getMaxBytesIn() {
long result = 0L;
for (TopicPartition topicPartition : leaderReplicas) {
result += replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
result += kafkaCluster.getMaxBytesIn(topicPartition);
}
for (TopicPartition topicPartition : followerReplicas) {
result += replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
result += kafkaCluster.getMaxBytesIn(topicPartition);
}
return result;
}
Expand All @@ -88,7 +87,7 @@ public long getMaxBytesIn() {
public long getMaxBytesOut() {
long result = 0L;
for (TopicPartition topicPartition : leaderReplicas) {
result += replicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
result += kafkaCluster.getMaxBytesOut(topicPartition);
}
return result;
}
Expand Down
69 changes: 54 additions & 15 deletions drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@


import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;
import com.pinterest.doctorkafka.util.OutOfSyncReplica;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingWindowReservoir;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -43,17 +44,24 @@ public class KafkaCluster {
private static final Logger LOG = LogManager.getLogger(KafkaCluster.class);
private static final int MAX_NUM_STATS = 5;
private static final int INVALID_BROKERSTATS_TIME = 240000;
/**
* The kafka network traffic stats takes ~15 minutes to cool down. We give a 20 minutes
* cool down period to avoid inaccurate stats collection.
*/
private static final long REASSIGNMENT_COOLDOWN_WINDOW_IN_MS = 1800 * 1000L;
private static final int SLIDING_WINDOW_SIZE = 1440 * 4;

private DoctorKafkaClusterConfig clusterConfig;
public String zkUrl;
public ConcurrentMap<Integer, KafkaBroker> brokers;
private ConcurrentMap<Integer, LinkedList<BrokerStats>> brokerStatsMap;
public ConcurrentMap<String, Set<TopicPartition>> topicPartitions = new ConcurrentHashMap<>();
private ReplicaStatsManager replicaStatsManager;
private ConcurrentMap<TopicPartition, Histogram> bytesInHistograms = new ConcurrentHashMap<>();
private ConcurrentMap<TopicPartition, Histogram> bytesOutHistograms = new ConcurrentHashMap<>();
private ConcurrentMap<TopicPartition, Long> reassignmentTimestamps = new ConcurrentHashMap<>();

public KafkaCluster(String zookeeper, DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager replicaStatsManager) {
public KafkaCluster(String zookeeper, DoctorKafkaClusterConfig clusterConfig) {
this.zkUrl = zookeeper;
this.replicaStatsManager = replicaStatsManager;
this.brokers = new ConcurrentHashMap<>();
this.clusterConfig = clusterConfig;
this.brokerStatsMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -89,17 +97,30 @@ public void recordBrokerStats(BrokerStats brokerStats) {

if (!brokerStats.getHasFailure()) {
// only record brokerstat when there is no failure on that broker.
KafkaBroker broker = brokers.computeIfAbsent(brokerId, i -> new KafkaBroker(clusterConfig, replicaStatsManager, i));
KafkaBroker broker = brokers.computeIfAbsent(brokerId, i -> new KafkaBroker(clusterConfig, this, i));
broker.update(brokerStats);
}

if (brokerStats.getLeaderReplicas() != null) {
for (AvroTopicPartition atp : brokerStats.getLeaderReplicas()) {
String topic = atp.getTopic();
TopicPartition tp = new TopicPartition(topic, atp.getPartition());
topicPartitions
.computeIfAbsent(topic, t -> new HashSet<>())
.add(tp);
if (brokerStats.getLeaderReplicaStats() != null) {
for (ReplicaStat replicaStat : brokerStats.getLeaderReplicaStats()) {
String topic = replicaStat.getTopic();
TopicPartition topicPartition = new TopicPartition(topic, replicaStat.getPartition());
topicPartitions.computeIfAbsent(topic, t -> new HashSet<>()).add(topicPartition);
// if the replica is involved in reassignment, ignore the stats
if (replicaStat.getInReassignment()){
reassignmentTimestamps.compute(topicPartition,
(t, v) -> v == null || v < replicaStat.getTimestamp() ? replicaStat.getTimestamp() : v);
continue;
}
long lastReassignment = reassignmentTimestamps.getOrDefault(topicPartition, 0L);
if (brokerStats.getTimestamp() - lastReassignment < REASSIGNMENT_COOLDOWN_WINDOW_IN_MS) {
continue;
}
bytesInHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));
bytesOutHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));

bytesInHistograms.get(topicPartition).update(replicaStat.getBytesIn15MinMeanRate());
bytesOutHistograms.get(topicPartition).update(replicaStat.getBytesOut15MinMeanRate());
}
}
} catch (Exception e) {
Expand All @@ -122,6 +143,18 @@ public JsonElement toJson() {
return json;
}

public ConcurrentMap<TopicPartition, Histogram> getBytesInHistograms() {
return bytesInHistograms;
}

public ConcurrentMap<TopicPartition, Histogram> getBytesOutHistograms() {
return bytesOutHistograms;
}

public ConcurrentMap<TopicPartition, Long> getReassignmentTimestamps() {
return reassignmentTimestamps;
}

/**
* Get broker by broker id.
*
Expand Down Expand Up @@ -450,25 +483,31 @@ public KafkaBroker getAlternativeBroker(TopicPartition topicPartition,
}
}

public long getMaxBytesIn(TopicPartition tp) {
return bytesInHistograms.get(tp).getSnapshot().getMax();
}

public long getMaxBytesOut(TopicPartition tp) {
return bytesOutHistograms.get(tp).getSnapshot().getMax();
}

public long getMaxBytesIn() {
long result = 0L;
for (Map.Entry<String, Set<TopicPartition>> entry : topicPartitions.entrySet()) {
Set<TopicPartition> topicPartitions = entry.getValue();
for (TopicPartition tp : topicPartitions) {
result += replicaStatsManager.getMaxBytesIn(zkUrl, tp);
result += getMaxBytesIn(tp);
}
}
return result;
}


public long getMaxBytesOut() {
long result = 0L;
for (Map.Entry<String, Set<TopicPartition>> entry : topicPartitions.entrySet()) {
Set<TopicPartition> topicPartitions = entry.getValue();
for (TopicPartition tp : topicPartitions) {
result += replicaStatsManager.getMaxBytesOut(zkUrl, tp);
result += getMaxBytesOut(tp);
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public class KafkaClusterManager implements Runnable {
private DoctorKafkaConfig drkafkaConfig = null;
private DoctorKafkaClusterConfig clusterConfig;
private DoctorKafkaActionReporter actionReporter = null;
private ReplicaStatsManager replicaStatsManager;
private boolean stopped = true;
private Thread thread = null;

Expand Down Expand Up @@ -118,7 +117,6 @@ public KafkaClusterManager(String zkUrl, KafkaCluster kafkaCluster,
if (clusterConfig.enabledDeadbrokerReplacement()) {
this.brokerReplacer = new BrokerReplacer(drkafkaConfig.getBrokerReplacementCommand());
}
this.replicaStatsManager = replicaStatsManager;
}

public KafkaCluster getCluster() {
Expand Down Expand Up @@ -236,8 +234,8 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker,

for (Map.Entry<TopicPartition, Double> entry : tpTraffic.entrySet()) {
TopicPartition tp = entry.getKey();
double tpBytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
double tpBytesOut = replicaStatsManager.getMaxBytesOut(zkUrl, tp);
double tpBytesIn = kafkaCluster.getMaxBytesIn(tp);
double tpBytesOut = kafkaCluster.getMaxBytesOut(tp);
double brokerTraffic = (bytesIn - toBeReducedBytesIn - tpBytesIn) +
(bytesOut - toBeReducedBytesOut - tpBytesOut);

Expand Down Expand Up @@ -314,7 +312,7 @@ private void generateFollowerReassignmentPlan(KafkaBroker broker) {

for (Map.Entry<TopicPartition, Double> entry : tpTraffic.entrySet()) {
TopicPartition tp = entry.getKey();
double tpBytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
double tpBytesIn = kafkaCluster.getMaxBytesIn(tp);
if (brokerBytesIn - toBeReducedBytesIn - tpBytesIn < bytesInLimit) {
// if moving a topic partition out will have the broker be under-utilized, do not
// move it out.
Expand Down Expand Up @@ -489,8 +487,8 @@ private Map<TopicPartition, Double> sortTopicPartitionsByTraffic(List<TopicParti
Map<TopicPartition, Double> tpTraffic = new HashMap<>();
for (TopicPartition tp : tps) {
try {
double bytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
double bytesOut = replicaStatsManager.getMaxBytesOut(zkUrl, tp);
double bytesIn = kafkaCluster.getMaxBytesIn(tp);
double bytesOut = kafkaCluster.getMaxBytesOut(tp);
tpTraffic.put(tp, bytesIn + bytesOut);
} catch (Exception e) {
LOG.info("Exception in sorting topic partition {}", tp, e);
Expand Down Expand Up @@ -614,8 +612,8 @@ private Map<TopicPartition, Integer[]> generateReassignmentPlanForDeadBrokers(

for (OutOfSyncReplica oosReplica : outOfSyncReplicas) {

double inBoundReq = replicaStatsManager.getMaxBytesIn(zkUrl, oosReplica.topicPartition);
double outBoundReq = replicaStatsManager.getMaxBytesOut(zkUrl, oosReplica.topicPartition);
double inBoundReq = kafkaCluster.getMaxBytesIn(oosReplica.topicPartition);
double outBoundReq = kafkaCluster.getMaxBytesOut(oosReplica.topicPartition);
int preferredBroker = oosReplica.replicaBrokers.get(0);

Map<Integer, KafkaBroker> replacedNodes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingWindowReservoir;
import com.pinterest.doctorkafka.BrokerStats;
import com.pinterest.doctorkafka.KafkaCluster;
import com.pinterest.doctorkafka.ReplicaStat;
import com.pinterest.doctorkafka.config.DoctorKafkaConfig;
import com.pinterest.doctorkafka.util.KafkaUtils;
import com.pinterest.doctorkafka.util.ReplicaStatsUtil;
Expand All @@ -26,27 +23,8 @@ public class ReplicaStatsManager {

private static final Logger LOG = LogManager.getLogger(ReplicaStatsManager.class);

private static final int SLIDING_WINDOW_SIZE = 1440 * 4;

/**
* The kafka network traffic stats takes ~15 minutes to cool down. We give a 20 minutes
* cool down period to avoid inaccurate stats collection.
*/
private static final long REASSIGNMENT_COOLDOWN_WINDOW_IN_MS = 1800 * 1000L;

private ConcurrentMap<String, ConcurrentMap<TopicPartition, Histogram>>
bytesInStats = new ConcurrentHashMap<>();

private ConcurrentMap<String, ConcurrentMap<TopicPartition, Histogram>>
bytesOutStats = new ConcurrentHashMap<>();

private ConcurrentMap<String, KafkaCluster> clusters = new ConcurrentHashMap<>();

private DoctorKafkaConfig config;

public ConcurrentHashMap<String, ConcurrentHashMap<TopicPartition, Long>>
replicaReassignmentTimestamps = new ConcurrentHashMap<>();

/*
* Getters
*/
Expand All @@ -70,36 +48,6 @@ public ReplicaStatsManager(DoctorKafkaConfig config){
this.clusterZkUrls = config.getClusterZkUrls();
}

public void updateReplicaReassignmentTimestamp(String brokerZkUrl,
ReplicaStat replicaStat) {
if (!replicaReassignmentTimestamps.containsKey(brokerZkUrl)) {
replicaReassignmentTimestamps.put(brokerZkUrl, new ConcurrentHashMap<>());
}
ConcurrentHashMap<TopicPartition, Long> replicaTimestamps =
replicaReassignmentTimestamps.get(brokerZkUrl);
TopicPartition topicPartition = new TopicPartition(
replicaStat.getTopic(), replicaStat.getPartition());

if (!replicaTimestamps.containsKey(topicPartition) ||
replicaTimestamps.get(topicPartition) < replicaStat.getTimestamp()) {
replicaTimestamps.put(topicPartition, replicaStat.getTimestamp());
}
}

private long getLastReplicaReassignmentTimestamp(String brokerZkUrl,
TopicPartition topicPartition) {
long result = 0;
if (replicaReassignmentTimestamps.containsKey(brokerZkUrl)) {
ConcurrentHashMap<TopicPartition, Long> replicaTimestamps =
replicaReassignmentTimestamps.get(brokerZkUrl);
if (replicaTimestamps.containsKey(topicPartition)) {
result = replicaTimestamps.get(topicPartition);
}
}
return result;
}


/**
* Record the latest brokerstats, and update DocotorKafka internal data structures.
*/
Expand All @@ -110,66 +58,8 @@ public void update(BrokerStats brokerStats) {
return;
}

KafkaCluster cluster = clusters.computeIfAbsent(brokerZkUrl, url -> new KafkaCluster(url, config.getClusterConfigByZkUrl(url), this));
KafkaCluster cluster = clusters.computeIfAbsent(brokerZkUrl, url -> new KafkaCluster(url, config.getClusterConfigByZkUrl(url)));
cluster.recordBrokerStats(brokerStats);

bytesInStats.putIfAbsent(brokerZkUrl, new ConcurrentHashMap<>());
bytesOutStats.putIfAbsent(brokerZkUrl, new ConcurrentHashMap<>());

if (brokerStats.getLeaderReplicaStats() != null) {
ConcurrentMap<TopicPartition, Histogram> bytesInHistograms = bytesInStats.get(brokerZkUrl);
ConcurrentMap<TopicPartition, Histogram> bytesOutHistograms = bytesOutStats.get(brokerZkUrl);
for (ReplicaStat replicaStat : brokerStats.getLeaderReplicaStats()) {
if (replicaStat.getInReassignment()) {
// if the replica is involved in reassignment, ignore the stats
updateReplicaReassignmentTimestamp(brokerZkUrl, replicaStat);
continue;
}
TopicPartition topicPartition = new TopicPartition(
replicaStat.getTopic(), replicaStat.getPartition());
long lastReassignment = getLastReplicaReassignmentTimestamp(brokerZkUrl, topicPartition);
if (brokerStats.getTimestamp() - lastReassignment < REASSIGNMENT_COOLDOWN_WINDOW_IN_MS) {
continue;
}

bytesInHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));
bytesOutHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));

bytesInHistograms.get(topicPartition).update(replicaStat.getBytesIn15MinMeanRate());
bytesOutHistograms.get(topicPartition).update(replicaStat.getBytesOut15MinMeanRate());
}
}
}


public long getMaxBytesIn(String zkUrl, TopicPartition topicPartition) {
try {
return bytesInStats.get(zkUrl).get(topicPartition).getSnapshot().getMax();
} catch (Exception e) {
LOG.error("Failed to get bytesinfo for {}:{}", zkUrl, topicPartition);
throw e;
}
}

public double get99thPercentilBytesIn(String zkUrl, TopicPartition topicPartition) {
return bytesInStats.get(zkUrl).get(topicPartition).getSnapshot().get99thPercentile();
}

public long getMaxBytesOut(String zkUrl, TopicPartition topicPartition) {
return bytesOutStats.get(zkUrl).get(topicPartition).getSnapshot().getMax();
}

public double get99thPercentilBytesOut(String zkUrl, TopicPartition topicPartition) {
return bytesOutStats.get(zkUrl).get(topicPartition).getSnapshot().get99thPercentile();
}


public Map<TopicPartition, Histogram> getTopicsBytesInStats(String zkUrl) {
return bytesInStats.get(zkUrl);
}

public Map<TopicPartition, Histogram> getTopicsBytesOutStats(String zkUrl) {
return bytesOutStats.get(zkUrl);
}

/**
Expand Down
Loading

0 comments on commit 050215c

Please sign in to comment.