Skip to content
This repository was archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
refactor ReplicaStatsManager static methods (#129)
Browse files Browse the repository at this point in the history
  • Loading branch information
kabochya authored and yuyang08 committed Apr 23, 2019
1 parent d0fcf4b commit ee5c005
Show file tree
Hide file tree
Showing 18 changed files with 172 additions and 148 deletions.
17 changes: 10 additions & 7 deletions drkafka/src/main/java/com/pinterest/doctorkafka/DoctorKafka.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class DoctorKafka {

private DoctorKafkaConfig drkafkaConf;

public BrokerStatsProcessor brokerStatsProcessor = null;
private BrokerStatsProcessor brokerStatsProcessor = null;

private DoctorKafkaActionReporter actionReporter = null;

Expand All @@ -35,8 +35,11 @@ public class DoctorKafka {

private DoctorKafkaHeartbeat heartbeat = null;

public DoctorKafka(DoctorKafkaConfig drkafkaConf) {
this.drkafkaConf = drkafkaConf;
private ReplicaStatsManager replicaStatsManager = null;

public DoctorKafka(ReplicaStatsManager replicaStatsManager) {
this.replicaStatsManager = replicaStatsManager;
this.drkafkaConf = replicaStatsManager.getConfig();
this.clusterZkUrls = drkafkaConf.getClusterZkUrls();
this.zookeeperClient = new ZookeeperClient(drkafkaConf.getDoctorKafkaZkurl());
}
Expand All @@ -50,26 +53,26 @@ public void start() {
SecurityProtocol actionReportSecurityProtocol = drkafkaConf.getActionReportProducerSecurityProtocol();

LOG.info("Start rebuilding the replica stats by reading the past 24 hours brokerstats");
ReplicaStatsManager.readPastReplicaStats(brokerstatsZkurl, statsSecurityProtocol,
replicaStatsManager.readPastReplicaStats(brokerstatsZkurl, statsSecurityProtocol,
drkafkaConf.getBrokerStatsTopic(), drkafkaConf.getBrokerStatsBacktrackWindowsInSeconds());
LOG.info("Finish rebuilding the replica stats");

brokerStatsProcessor = new BrokerStatsProcessor(brokerstatsZkurl, statsSecurityProtocol, statsTopic,
drkafkaConf.getBrokerStatsConsumerSslConfigs());
drkafkaConf.getBrokerStatsConsumerSslConfigs(), replicaStatsManager);
brokerStatsProcessor.start();

actionReporter = new DoctorKafkaActionReporter(actionReportZkurl, actionReportSecurityProtocol, actionReportTopic,
drkafkaConf.getActionReportProducerSslConfigs());
for (String clusterZkUrl : clusterZkUrls) {
DoctorKafkaClusterConfig clusterConf = drkafkaConf.getClusterConfigByZkUrl(clusterZkUrl);
KafkaCluster kafkaCluster = ReplicaStatsManager.clusters.get(clusterZkUrl);
KafkaCluster kafkaCluster = replicaStatsManager.getClusters().get(clusterZkUrl);

if (kafkaCluster == null) {
LOG.error("No brokerstats info for cluster {}", clusterZkUrl);
continue;
}
KafkaClusterManager clusterManager = new KafkaClusterManager(
clusterZkUrl, kafkaCluster, clusterConf, drkafkaConf, actionReporter, zookeeperClient);
clusterZkUrl, kafkaCluster, clusterConf, drkafkaConf, actionReporter, zookeeperClient, replicaStatsManager);
clusterManagers.put(clusterConf.getClusterName(), clusterManager);
clusterManager.start();
LOG.info("Starting cluster manager for " + clusterZkUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class DoctorKafkaMain extends Application<DoctorKafkaAppConfig> {

public static DoctorKafka doctorKafka = null;
private static DoctorKafkaWatcher operatorWatcher = null;
public static ReplicaStatsManager replicaStatsManager = null;

@Override
public void initialize(Bootstrap<DoctorKafkaAppConfig> bootstrap) {
Expand All @@ -54,16 +55,16 @@ public void run(DoctorKafkaAppConfig configuration, Environment environment) thr

LOG.info("Configuration path : {}", configuration.getConfig());

ReplicaStatsManager.config = new DoctorKafkaConfig(configuration.getConfig());
replicaStatsManager = new ReplicaStatsManager(new DoctorKafkaConfig(configuration.getConfig()));

if (!ReplicaStatsManager.config.getRestartDisabled()){
operatorWatcher = new DoctorKafkaWatcher(ReplicaStatsManager.config.getRestartIntervalInSeconds());
if (!replicaStatsManager.getConfig().getRestartDisabled()){
operatorWatcher = new DoctorKafkaWatcher(replicaStatsManager.getConfig().getRestartIntervalInSeconds());
operatorWatcher.start();
}

configureServerRuntime(configuration, ReplicaStatsManager.config);
configureServerRuntime(configuration, replicaStatsManager.getConfig());

doctorKafka = new DoctorKafka(ReplicaStatsManager.config);
doctorKafka = new DoctorKafka(replicaStatsManager);

registerAPIs(environment, doctorKafka);
registerServlets(environment);
Expand Down Expand Up @@ -119,8 +120,8 @@ private void registerAPIs(Environment environment, DoctorKafka doctorKafka) {
}

private void startMetricsService() {
int ostrichPort = ReplicaStatsManager.config.getOstrichPort();
String tsdHostPort = ReplicaStatsManager.config.getTsdHostPort();
int ostrichPort = replicaStatsManager.getConfig().getOstrichPort();
String tsdHostPort = replicaStatsManager.getConfig().getTsdHostPort();
if (tsdHostPort == null && ostrichPort == 0) {
LOG.info("OpenTSDB and Ostrich options missing, not starting Ostrich service");
} else if (ostrichPort == 0) {
Expand Down
11 changes: 7 additions & 4 deletions drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ public class KafkaBroker implements Comparable<KafkaBroker> {
private long reservedBytesOut;
private Set<TopicPartition> toBeAddedReplicas;

public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, int brokerId) {
private ReplicaStatsManager replicaStatsManager;

public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager replicaStatsManager, int brokerId) {
assert clusterConfig != null;
this.zkUrl = clusterConfig.getZkUrl();
this.brokerId = brokerId;
Expand All @@ -53,6 +55,7 @@ public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, int brokerId) {
this.reservedBytesOut = 0L;
this.bytesInPerSecLimit = clusterConfig.getNetworkInLimitInBytes();
this.bytesOutPerSecLimit = clusterConfig.getNetworkOutLimitInBytes();
this.replicaStatsManager = replicaStatsManager;
}

public JsonElement toJson() {
Expand All @@ -71,10 +74,10 @@ public JsonElement toJson() {
public long getMaxBytesIn() {
long result = 0L;
for (TopicPartition topicPartition : leaderReplicas) {
result += ReplicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
result += replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
}
for (TopicPartition topicPartition : followerReplicas) {
result += ReplicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
result += replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
}
return result;
}
Expand All @@ -83,7 +86,7 @@ public long getMaxBytesIn() {
public long getMaxBytesOut() {
long result = 0L;
for (TopicPartition topicPartition : leaderReplicas) {
result += ReplicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
result += replicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ public class KafkaCluster {
public ConcurrentMap<Integer, KafkaBroker> brokers;
private ConcurrentMap<Integer, LinkedList<BrokerStats>> brokerStatsMap;
public ConcurrentMap<String, Set<TopicPartition>> topicPartitions = new ConcurrentHashMap<>();
private ReplicaStatsManager replicaStatsManager;

public KafkaCluster(String zookeeper, DoctorKafkaClusterConfig clusterConfig) {
public KafkaCluster(String zookeeper, DoctorKafkaClusterConfig clusterConfig, ReplicaStatsManager replicaStatsManager) {
this.zkUrl = zookeeper;
this.replicaStatsManager = replicaStatsManager;
this.brokers = new ConcurrentHashMap<>();
this.clusterConfig = clusterConfig;
this.brokerStatsMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -84,7 +86,7 @@ public void recordBrokerStats(BrokerStats brokerStats) {

if (!brokerStats.getHasFailure()) {
// only record brokerstat when there is no failure on that broker.
KafkaBroker broker = brokers.computeIfAbsent(brokerId, i -> new KafkaBroker(clusterConfig, i));
KafkaBroker broker = brokers.computeIfAbsent(brokerId, i -> new KafkaBroker(clusterConfig, replicaStatsManager, i));
broker.update(brokerStats);
}

Expand Down Expand Up @@ -239,8 +241,8 @@ public PriorityQueue<KafkaBroker> getBrokerQueue() {
public Map<Integer, KafkaBroker> getAlternativeBrokers(PriorityQueue<KafkaBroker> brokerQueue,
OutOfSyncReplica oosReplica) {
TopicPartition topicPartition = oosReplica.topicPartition;
double inBoundReq = ReplicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
double outBoundReq = ReplicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
double inBoundReq = replicaStatsManager.getMaxBytesIn(zkUrl, topicPartition);
double outBoundReq = replicaStatsManager.getMaxBytesOut(zkUrl, topicPartition);
int preferredBroker = oosReplica.replicaBrokers.get(0);

boolean success = true;
Expand Down Expand Up @@ -277,7 +279,6 @@ public Map<Integer, KafkaBroker> getAlternativeBrokers(PriorityQueue<KafkaBroker
return success ? result : null;
}


public KafkaBroker getAlternativeBroker(TopicPartition topicPartition,
double tpBytesIn, double tpBytesOut) {
PriorityQueue<KafkaBroker> brokerQueue =
Expand Down Expand Up @@ -309,7 +310,7 @@ public long getMaxBytesIn() {
for (Map.Entry<String, Set<TopicPartition>> entry : topicPartitions.entrySet()) {
Set<TopicPartition> topicPartitions = entry.getValue();
for (TopicPartition tp : topicPartitions) {
result += ReplicaStatsManager.getMaxBytesIn(zkUrl, tp);
result += replicaStatsManager.getMaxBytesIn(zkUrl, tp);
}
}
return result;
Expand All @@ -321,7 +322,7 @@ public long getMaxBytesOut() {
for (Map.Entry<String, Set<TopicPartition>> entry : topicPartitions.entrySet()) {
Set<TopicPartition> topicPartitions = entry.getValue();
for (TopicPartition tp : topicPartitions) {
result += ReplicaStatsManager.getMaxBytesOut(zkUrl, tp);
result += replicaStatsManager.getMaxBytesOut(zkUrl, tp);
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public class KafkaClusterManager implements Runnable {
private DoctorKafkaConfig drkafkaConfig = null;
private DoctorKafkaClusterConfig clusterConfig;
private DoctorKafkaActionReporter actionReporter = null;
private ReplicaStatsManager replicaStatsManager;
private boolean stopped = true;
private Thread thread = null;

Expand All @@ -101,7 +102,8 @@ public KafkaClusterManager(String zkUrl, KafkaCluster kafkaCluster,
DoctorKafkaClusterConfig clusterConfig,
DoctorKafkaConfig drkafkaConfig,
DoctorKafkaActionReporter actionReporter,
ZookeeperClient zookeeperClient) {
ZookeeperClient zookeeperClient,
ReplicaStatsManager replicaStatsManager) {
assert clusterConfig != null;
this.zkUrl = zkUrl;
this.zkUtils = KafkaUtils.getZkUtils(zkUrl);
Expand All @@ -117,6 +119,7 @@ public KafkaClusterManager(String zkUrl, KafkaCluster kafkaCluster,
if (clusterConfig.enabledDeadbrokerReplacement()) {
this.brokerReplacer = new BrokerReplacer(drkafkaConfig.getBrokerReplacementCommand());
}
this.replicaStatsManager = replicaStatsManager;
}

public KafkaCluster getCluster() {
Expand Down Expand Up @@ -234,8 +237,8 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker,

for (Map.Entry<TopicPartition, Double> entry : tpTraffic.entrySet()) {
TopicPartition tp = entry.getKey();
double tpBytesIn = ReplicaStatsManager.getMaxBytesIn(zkUrl, tp);
double tpBytesOut = ReplicaStatsManager.getMaxBytesOut(zkUrl, tp);
double tpBytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
double tpBytesOut = replicaStatsManager.getMaxBytesOut(zkUrl, tp);
double brokerTraffic = (bytesIn - toBeReducedBytesIn - tpBytesIn) +
(bytesOut - toBeReducedBytesOut - tpBytesOut);

Expand Down Expand Up @@ -312,7 +315,7 @@ private void generateFollowerReassignmentPlan(KafkaBroker broker) {

for (Map.Entry<TopicPartition, Double> entry : tpTraffic.entrySet()) {
TopicPartition tp = entry.getKey();
double tpBytesIn = ReplicaStatsManager.getMaxBytesIn(zkUrl, tp);
double tpBytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
if (brokerBytesIn - toBeReducedBytesIn - tpBytesIn < bytesInLimit) {
// if moving a topic partition out will have the broker be under-utilized, do not
// move it out.
Expand Down Expand Up @@ -487,8 +490,8 @@ private Map<TopicPartition, Double> sortTopicPartitionsByTraffic(List<TopicParti
Map<TopicPartition, Double> tpTraffic = new HashMap<>();
for (TopicPartition tp : tps) {
try {
double bytesIn = ReplicaStatsManager.getMaxBytesIn(zkUrl, tp);
double bytesOut = ReplicaStatsManager.getMaxBytesOut(zkUrl, tp);
double bytesIn = replicaStatsManager.getMaxBytesIn(zkUrl, tp);
double bytesOut = replicaStatsManager.getMaxBytesOut(zkUrl, tp);
tpTraffic.put(tp, bytesIn + bytesOut);
} catch (Exception e) {
LOG.info("Exception in sorting topic partition {}", tp, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ public class BrokerStatsProcessor implements Runnable {
private String topic;
private SecurityProtocol securityProtocol;
private Map<String, String> consumerConfigs;
private ReplicaStatsManager replicaStatsManager;

public BrokerStatsProcessor(String zkUrl, SecurityProtocol securityProtocol,
String topic, Map<String, String> consumerConfigs) {
String topic, Map<String, String> consumerConfigs, ReplicaStatsManager replicaStatsManager) {
this.zkUrl = zkUrl;
this.topic = topic;
this.securityProtocol = securityProtocol;
this.consumerConfigs = consumerConfigs;
this.replicaStatsManager = replicaStatsManager;
}


Expand Down Expand Up @@ -77,7 +79,7 @@ public void run() {
continue;
}

ReplicaStatsManager.update(brokerStats);
replicaStatsManager.update(brokerStats);
OpenTsdbMetricConverter.incr(DoctorKafkaMetrics.BROKERSTATS_MESSAGES, 1,
"zkUrl= " + brokerStats.getZkUrl());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ public class PastReplicaStatsProcessor implements Runnable {
private long startOffset;
private long endOffset;
private Thread thread;
private ReplicaStatsManager replicaStatsManager;

public PastReplicaStatsProcessor(String zkUrl, SecurityProtocol securityProtocol, TopicPartition topicPartition,
long startOffset, long endOffset) {
long startOffset, long endOffset, ReplicaStatsManager replicaStatsManager) {
this.zkUrl = zkUrl;
this.securityProtocol = securityProtocol;
this.topicPartition = topicPartition;
this.startOffset = startOffset;
this.endOffset = endOffset;
this.replicaStatsManager = replicaStatsManager;
}

public void start() {
Expand Down Expand Up @@ -82,7 +84,7 @@ public void run() {
OpenTsdbMetricConverter.incr(DoctorKafkaMetrics.MESSAGE_DESERIALIZE_ERROR, 1);
continue;
}
ReplicaStatsManager.update(brokerStats);
replicaStatsManager.update(brokerStats);
}
}
} catch (Exception e) {
Expand Down
Loading

0 comments on commit ee5c005

Please sign in to comment.