Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Move ReplicaStatsManager logic into corresponding KafkaCluster instances #133

Merged
merged 1 commit into from
May 3, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.pinterest.doctorkafka;

import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;

import com.google.common.annotations.VisibleForTesting;
import org.apache.kafka.common.TopicPartition;
@@ -41,9 +40,9 @@ public class KafkaBroker implements Comparable<KafkaBroker> {
private long reservedBytesOut;
private Set<TopicPartition> toBeAddedReplicas;

private ReplicaStatsManager replicaStatsManager;
private KafkaCluster kafkaCluster;

public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager replicaStatsManager, int brokerId) {
public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, KafkaCluster kafkaCluster, int brokerId) {
assert clusterConfig != null;
this.zkUrl = clusterConfig.getZkUrl();
this.brokerId = brokerId;
@@ -57,7 +56,7 @@ public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager r
this.reservedBytesOut = 0L;
this.bytesInPerSecLimit = clusterConfig.getNetworkInLimitInBytes();
this.bytesOutPerSecLimit = clusterConfig.getNetworkOutLimitInBytes();
this.replicaStatsManager = replicaStatsManager;
this.kafkaCluster = kafkaCluster;
}

public JsonElement toJson() {
@@ -76,10 +75,10 @@ public JsonElement toJson() {
public long getMaxBytesIn() {
long result = 0L;
for (TopicPartition topicPartition : leaderReplicas) {
result += replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
result += kafkaCluster.getMaxBytesIn(topicPartition);
}
for (TopicPartition topicPartition : followerReplicas) {
result += replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
result += kafkaCluster.getMaxBytesIn(topicPartition);
}
return result;
}
@@ -88,7 +87,7 @@ public long getMaxBytesIn() {
public long getMaxBytesOut() {
long result = 0L;
for (TopicPartition topicPartition : leaderReplicas) {
result += replicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
result += kafkaCluster.getMaxBytesOut(topicPartition);
}
return result;
}
69 changes: 54 additions & 15 deletions drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java
Original file line number Diff line number Diff line change
@@ -2,9 +2,10 @@


import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;
import com.pinterest.doctorkafka.util.OutOfSyncReplica;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingWindowReservoir;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -43,17 +44,24 @@ public class KafkaCluster {
private static final Logger LOG = LogManager.getLogger(KafkaCluster.class);
private static final int MAX_NUM_STATS = 5;
private static final int INVALID_BROKERSTATS_TIME = 240000;
/**
* The kafka network traffic stats takes ~15 minutes to cool down. We give a 20 minutes
* cool down period to avoid inaccurate stats collection.
*/
private static final long REASSIGNMENT_COOLDOWN_WINDOW_IN_MS = 1800 * 1000L;
private static final int SLIDING_WINDOW_SIZE = 1440 * 4;

private DoctorKafkaClusterConfig clusterConfig;
public String zkUrl;
public ConcurrentMap<Integer, KafkaBroker> brokers;
private ConcurrentMap<Integer, LinkedList<BrokerStats>> brokerStatsMap;
public ConcurrentMap<String, Set<TopicPartition>> topicPartitions = new ConcurrentHashMap<>();
private ReplicaStatsManager replicaStatsManager;
private ConcurrentMap<TopicPartition, Histogram> bytesInHistograms = new ConcurrentHashMap<>();
private ConcurrentMap<TopicPartition, Histogram> bytesOutHistograms = new ConcurrentHashMap<>();
private ConcurrentMap<TopicPartition, Long> reassignmentTimestamps = new ConcurrentHashMap<>();

public KafkaCluster(String zookeeper, DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager replicaStatsManager) {
public KafkaCluster(String zookeeper, DoctorKafkaClusterConfig clusterConfig) {
this.zkUrl = zookeeper;
this.replicaStatsManager = replicaStatsManager;
this.brokers = new ConcurrentHashMap<>();
this.clusterConfig = clusterConfig;
this.brokerStatsMap = new ConcurrentHashMap<>();
@@ -89,17 +97,30 @@ public void recordBrokerStats(BrokerStats brokerStats) {

if (!brokerStats.getHasFailure()) {
// only record brokerstat when there is no failure on that broker.
KafkaBroker broker = brokers.computeIfAbsent(brokerId, i -> new KafkaBroker(clusterConfig, replicaStatsManager, i));
KafkaBroker broker = brokers.computeIfAbsent(brokerId, i -> new KafkaBroker(clusterConfig, this, i));
broker.update(brokerStats);
}

if (brokerStats.getLeaderReplicas() != null) {
for (AvroTopicPartition atp : brokerStats.getLeaderReplicas()) {
String topic = atp.getTopic();
TopicPartition tp = new TopicPartition(topic, atp.getPartition());
topicPartitions
.computeIfAbsent(topic, t -> new HashSet<>())
.add(tp);
if (brokerStats.getLeaderReplicaStats() != null) {
for (ReplicaStat replicaStat : brokerStats.getLeaderReplicaStats()) {
String topic = replicaStat.getTopic();
TopicPartition topicPartition = new TopicPartition(topic, replicaStat.getPartition());
topicPartitions.computeIfAbsent(topic, t -> new HashSet<>()).add(topicPartition);
// if the replica is involved in reassignment, ignore the stats
if (replicaStat.getInReassignment()){
reassignmentTimestamps.compute(topicPartition,
(t, v) -> v == null || v < replicaStat.getTimestamp() ? replicaStat.getTimestamp() : v);
continue;
}
long lastReassignment = reassignmentTimestamps.getOrDefault(topicPartition, 0L);
if (brokerStats.getTimestamp() - lastReassignment < REASSIGNMENT_COOLDOWN_WINDOW_IN_MS) {
continue;
}
bytesInHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));
bytesOutHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));

bytesInHistograms.get(topicPartition).update(replicaStat.getBytesIn15MinMeanRate());
bytesOutHistograms.get(topicPartition).update(replicaStat.getBytesOut15MinMeanRate());
}
}
} catch (Exception e) {
@@ -122,6 +143,18 @@ public JsonElement toJson() {
return json;
}

public ConcurrentMap<TopicPartition, Histogram> getBytesInHistograms() {
return bytesInHistograms;
}

public ConcurrentMap<TopicPartition, Histogram> getBytesOutHistograms() {
return bytesOutHistograms;
}

public ConcurrentMap<TopicPartition, Long> getReassignmentTimestamps() {
return reassignmentTimestamps;
}

/**
* Get broker by broker id.
*
@@ -450,25 +483,31 @@ public KafkaBroker getAlternativeBroker(TopicPartition topicPartition,
}
}

public long getMaxBytesIn(TopicPartition tp) {
return bytesInHistograms.get(tp).getSnapshot().getMax();
}

public long getMaxBytesOut(TopicPartition tp) {
return bytesOutHistograms.get(tp).getSnapshot().getMax();
}

public long getMaxBytesIn() {
long result = 0L;
for (Map.Entry<String, Set<TopicPartition>> entry : topicPartitions.entrySet()) {
Set<TopicPartition> topicPartitions = entry.getValue();
for (TopicPartition tp : topicPartitions) {
result += replicaStatsManager.getMaxBytesIn(zkUrl, tp);
result += getMaxBytesIn(tp);
}
}
return result;
}


public long getMaxBytesOut() {
long result = 0L;
for (Map.Entry<String, Set<TopicPartition>> entry : topicPartitions.entrySet()) {
Set<TopicPartition> topicPartitions = entry.getValue();
for (TopicPartition tp : topicPartitions) {
result += replicaStatsManager.getMaxBytesOut(zkUrl, tp);
result += getMaxBytesOut(tp);
}
}
return result;
Original file line number Diff line number Diff line change
@@ -77,7 +77,6 @@ public class KafkaClusterManager implements Runnable {
private DoctorKafkaConfig drkafkaConfig = null;
private DoctorKafkaClusterConfig clusterConfig;
private DoctorKafkaActionReporter actionReporter = null;
private ReplicaStatsManager replicaStatsManager;
private boolean stopped = true;
private Thread thread = null;

@@ -118,7 +117,6 @@ public KafkaClusterManager(String zkUrl, KafkaCluster kafkaCluster,
if (clusterConfig.enabledDeadbrokerReplacement()) {
this.brokerReplacer = new BrokerReplacer(drkafkaConfig.getBrokerReplacementCommand());
}
this.replicaStatsManager = replicaStatsManager;
}

public KafkaCluster getCluster() {
@@ -236,8 +234,8 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker,

for (Map.Entry<TopicPartition, Double> entry : tpTraffic.entrySet()) {
TopicPartition tp = entry.getKey();
double tpBytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
double tpBytesOut = replicaStatsManager.getMaxBytesOut(zkUrl, tp);
double tpBytesIn = kafkaCluster.getMaxBytesIn(tp);
double tpBytesOut = kafkaCluster.getMaxBytesOut(tp);
double brokerTraffic = (bytesIn - toBeReducedBytesIn - tpBytesIn) +
(bytesOut - toBeReducedBytesOut - tpBytesOut);

@@ -314,7 +312,7 @@ private void generateFollowerReassignmentPlan(KafkaBroker broker) {

for (Map.Entry<TopicPartition, Double> entry : tpTraffic.entrySet()) {
TopicPartition tp = entry.getKey();
double tpBytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
double tpBytesIn = kafkaCluster.getMaxBytesIn(tp);
if (brokerBytesIn - toBeReducedBytesIn - tpBytesIn < bytesInLimit) {
// if moving a topic partition out will have the broker be under-utilized, do not
// move it out.
@@ -489,8 +487,8 @@ private Map<TopicPartition, Double> sortTopicPartitionsByTraffic(List<TopicParti
Map<TopicPartition, Double> tpTraffic = new HashMap<>();
for (TopicPartition tp : tps) {
try {
double bytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
double bytesOut = replicaStatsManager.getMaxBytesOut(zkUrl, tp);
double bytesIn = kafkaCluster.getMaxBytesIn(tp);
double bytesOut = kafkaCluster.getMaxBytesOut(tp);
tpTraffic.put(tp, bytesIn + bytesOut);
} catch (Exception e) {
LOG.info("Exception in sorting topic partition {}", tp, e);
@@ -614,8 +612,8 @@ private Map<TopicPartition, Integer[]> generateReassignmentPlanForDeadBrokers(

for (OutOfSyncReplica oosReplica : outOfSyncReplicas) {

double inBoundReq = replicaStatsManager.getMaxBytesIn(zkUrl, oosReplica.topicPartition);
double outBoundReq = replicaStatsManager.getMaxBytesOut(zkUrl, oosReplica.topicPartition);
double inBoundReq = kafkaCluster.getMaxBytesIn(oosReplica.topicPartition);
double outBoundReq = kafkaCluster.getMaxBytesOut(oosReplica.topicPartition);
int preferredBroker = oosReplica.replicaBrokers.get(0);

Map<Integer, KafkaBroker> replacedNodes;
Original file line number Diff line number Diff line change
@@ -13,11 +13,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingWindowReservoir;
import com.pinterest.doctorkafka.BrokerStats;
import com.pinterest.doctorkafka.KafkaCluster;
import com.pinterest.doctorkafka.ReplicaStat;
import com.pinterest.doctorkafka.config.DoctorKafkaConfig;
import com.pinterest.doctorkafka.util.KafkaUtils;
import com.pinterest.doctorkafka.util.ReplicaStatsUtil;
@@ -26,31 +23,9 @@ public class ReplicaStatsManager {

private static final Logger LOG = LogManager.getLogger(ReplicaStatsManager.class);

private static final int SLIDING_WINDOW_SIZE = 1440 * 4;

/**
* The kafka network traffic stats takes ~15 minutes to cool down. We give a 20 minutes
* cool down period to avoid inaccurate stats collection.
*/
private static final long REASSIGNMENT_COOLDOWN_WINDOW_IN_MS = 1800 * 1000L;

private ConcurrentMap<String, ConcurrentMap<TopicPartition, Histogram>>
bytesInStats = new ConcurrentHashMap<>();

private ConcurrentMap<String, ConcurrentMap<TopicPartition, Histogram>>
bytesOutStats = new ConcurrentHashMap<>();

private ConcurrentMap<String, KafkaCluster> clusters = new ConcurrentHashMap<>();

private DoctorKafkaConfig config;

public ConcurrentHashMap<String, ConcurrentHashMap<TopicPartition, Long>>
replicaReassignmentTimestamps = new ConcurrentHashMap<>();

/*
* Getters
*/

public ConcurrentMap<String, KafkaCluster> getClusters() {
return clusters;
}
@@ -70,36 +45,6 @@ public ReplicaStatsManager(DoctorKafkaConfig config){
this.clusterZkUrls = config.getClusterZkUrls();
}

public void updateReplicaReassignmentTimestamp(String brokerZkUrl,
ReplicaStat replicaStat) {
if (!replicaReassignmentTimestamps.containsKey(brokerZkUrl)) {
replicaReassignmentTimestamps.put(brokerZkUrl, new ConcurrentHashMap<>());
}
ConcurrentHashMap<TopicPartition, Long> replicaTimestamps =
replicaReassignmentTimestamps.get(brokerZkUrl);
TopicPartition topicPartition = new TopicPartition(
replicaStat.getTopic(), replicaStat.getPartition());

if (!replicaTimestamps.containsKey(topicPartition) ||
replicaTimestamps.get(topicPartition) < replicaStat.getTimestamp()) {
replicaTimestamps.put(topicPartition, replicaStat.getTimestamp());
}
}

private long getLastReplicaReassignmentTimestamp(String brokerZkUrl,
TopicPartition topicPartition) {
long result = 0;
if (replicaReassignmentTimestamps.containsKey(brokerZkUrl)) {
ConcurrentHashMap<TopicPartition, Long> replicaTimestamps =
replicaReassignmentTimestamps.get(brokerZkUrl);
if (replicaTimestamps.containsKey(topicPartition)) {
result = replicaTimestamps.get(topicPartition);
}
}
return result;
}


/**
* Record the latest brokerstats, and update DocotorKafka internal data structures.
*/
@@ -110,66 +55,8 @@ public void update(BrokerStats brokerStats) {
return;
}

KafkaCluster cluster = clusters.computeIfAbsent(brokerZkUrl, url -> new KafkaCluster(url, config.getClusterConfigByZkUrl(url), this));
KafkaCluster cluster = clusters.computeIfAbsent(brokerZkUrl, url -> new KafkaCluster(url, config.getClusterConfigByZkUrl(url)));
cluster.recordBrokerStats(brokerStats);

bytesInStats.putIfAbsent(brokerZkUrl, new ConcurrentHashMap<>());
bytesOutStats.putIfAbsent(brokerZkUrl, new ConcurrentHashMap<>());

if (brokerStats.getLeaderReplicaStats() != null) {
ConcurrentMap<TopicPartition, Histogram> bytesInHistograms = bytesInStats.get(brokerZkUrl);
ConcurrentMap<TopicPartition, Histogram> bytesOutHistograms = bytesOutStats.get(brokerZkUrl);
for (ReplicaStat replicaStat : brokerStats.getLeaderReplicaStats()) {
if (replicaStat.getInReassignment()) {
// if the replica is involved in reassignment, ignore the stats
updateReplicaReassignmentTimestamp(brokerZkUrl, replicaStat);
continue;
}
TopicPartition topicPartition = new TopicPartition(
replicaStat.getTopic(), replicaStat.getPartition());
long lastReassignment = getLastReplicaReassignmentTimestamp(brokerZkUrl, topicPartition);
if (brokerStats.getTimestamp() - lastReassignment < REASSIGNMENT_COOLDOWN_WINDOW_IN_MS) {
continue;
}

bytesInHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));
bytesOutHistograms.computeIfAbsent(topicPartition, k -> new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE)));

bytesInHistograms.get(topicPartition).update(replicaStat.getBytesIn15MinMeanRate());
bytesOutHistograms.get(topicPartition).update(replicaStat.getBytesOut15MinMeanRate());
}
}
}


public long getMaxBytesIn(String zkUrl, TopicPartition topicPartition) {
try {
return bytesInStats.get(zkUrl).get(topicPartition).getSnapshot().getMax();
} catch (Exception e) {
LOG.error("Failed to get bytesinfo for {}:{}", zkUrl, topicPartition);
throw e;
}
}

public double get99thPercentilBytesIn(String zkUrl, TopicPartition topicPartition) {
return bytesInStats.get(zkUrl).get(topicPartition).getSnapshot().get99thPercentile();
}

public long getMaxBytesOut(String zkUrl, TopicPartition topicPartition) {
return bytesOutStats.get(zkUrl).get(topicPartition).getSnapshot().getMax();
}

public double get99thPercentilBytesOut(String zkUrl, TopicPartition topicPartition) {
return bytesOutStats.get(zkUrl).get(topicPartition).getSnapshot().get99thPercentile();
}


public Map<TopicPartition, Histogram> getTopicsBytesInStats(String zkUrl) {
return bytesInStats.get(zkUrl);
}

public Map<TopicPartition, Histogram> getTopicsBytesOutStats(String zkUrl) {
return bytesOutStats.get(zkUrl);
}

/**
Loading