From babe941b2b4c077fa4bbc30097c087c37c60f115 Mon Sep 17 00:00:00 2001 From: kabochya Date: Sun, 12 May 2019 11:23:11 -0700 Subject: [PATCH] add decommission api (#137) add email notifications for decommissioning refactor api code make broker api return decommission info, refactored getters to comply with convention, reordering to group getter/setters modify UI to expose decommission status --- .../doctorkafka/DoctorKafkaMain.java | 15 +- .../pinterest/doctorkafka/KafkaBroker.java | 258 +++++++++++------- .../pinterest/doctorkafka/KafkaCluster.java | 13 +- .../doctorkafka/KafkaClusterManager.java | 71 +++-- .../api/{BrokerApi.java => BrokersApi.java} | 16 +- .../api/BrokersDecommissionApi.java | 52 ++++ .../api/{ClusterApi.java => ClustersApi.java} | 12 +- ...ceApi.java => ClustersMaintenanceApi.java} | 27 +- .../doctorkafka/api/DoctorKafkaApi.java | 44 +++ .../doctorkafka/notification/Email.java | 14 +- .../servlet/ClusterInfoServlet.java | 18 +- .../DoctorKafkaBrokerStatsServlet.java | 54 ++-- .../pinterest/doctorkafka/util/ApiUtils.java | 11 + .../doctorkafka/util/ReassignmentInfo.java | 2 +- .../doctorkafka/KafkaClusterTest.java | 10 +- 15 files changed, 412 insertions(+), 205 deletions(-) rename drkafka/src/main/java/com/pinterest/doctorkafka/api/{BrokerApi.java => BrokersApi.java} (50%) create mode 100644 drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersDecommissionApi.java rename drkafka/src/main/java/com/pinterest/doctorkafka/api/{ClusterApi.java => ClustersApi.java} (68%) rename drkafka/src/main/java/com/pinterest/doctorkafka/api/{MaintenanceApi.java => ClustersMaintenanceApi.java} (60%) create mode 100644 drkafka/src/main/java/com/pinterest/doctorkafka/api/DoctorKafkaApi.java create mode 100644 drkafka/src/main/java/com/pinterest/doctorkafka/util/ApiUtils.java diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/DoctorKafkaMain.java b/drkafka/src/main/java/com/pinterest/doctorkafka/DoctorKafkaMain.java index 1890ed3c..526190ef 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/DoctorKafkaMain.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/DoctorKafkaMain.java @@ -8,9 +8,11 @@ import org.apache.logging.log4j.Logger; import com.google.common.collect.ImmutableList; -import com.pinterest.doctorkafka.api.MaintenanceApi; -import com.pinterest.doctorkafka.api.BrokerApi; -import com.pinterest.doctorkafka.api.ClusterApi; + +import com.pinterest.doctorkafka.api.BrokersDecommissionApi; +import com.pinterest.doctorkafka.api.ClustersMaintenanceApi; +import com.pinterest.doctorkafka.api.BrokersApi; +import com.pinterest.doctorkafka.api.ClustersApi; import com.pinterest.doctorkafka.config.DoctorKafkaAppConfig; import com.pinterest.doctorkafka.config.DoctorKafkaConfig; import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager; @@ -115,9 +117,10 @@ private void configureServerRuntime(DoctorKafkaAppConfig configuration, DoctorKa private void registerAPIs(Environment environment, DoctorKafka doctorKafka) { environment.jersey().setUrlPattern("/api/*"); - environment.jersey().register(new BrokerApi()); - environment.jersey().register(new ClusterApi(doctorKafka)); - environment.jersey().register(new MaintenanceApi(doctorKafka)); + environment.jersey().register(new BrokersApi(doctorKafka)); + environment.jersey().register(new ClustersApi(doctorKafka)); + environment.jersey().register(new ClustersMaintenanceApi(doctorKafka)); + environment.jersey().register(new BrokersDecommissionApi(doctorKafka)); } private void startMetricsService() { diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java index ce1637a2..06227263 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java @@ -9,14 +9,18 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; import com.google.gson.JsonObject; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonIgnore; import java.util.ArrayList; import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; + public class KafkaBroker implements Comparable { private static final Logger LOG = LogManager.getLogger(KafkaBroker.class); @@ -41,6 +45,7 @@ public class KafkaBroker implements Comparable { private Set toBeAddedReplicas; private KafkaCluster kafkaCluster; + private AtomicBoolean isDecommissioned = new AtomicBoolean(false); public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, KafkaCluster kafkaCluster, int brokerId) { assert clusterConfig != null; @@ -59,19 +64,6 @@ public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, KafkaCluster kafkaClu this.kafkaCluster = kafkaCluster; } - public JsonElement toJson() { - // Return a JSON representation of a Kafka Broker. Sadly, not everything can be trivially added. - JsonObject json = new JsonObject(); - json.add("brokerId", gson.toJsonTree(brokerId)); - json.add("brokerName", gson.toJsonTree(brokerName)); - json.add("rackId", gson.toJsonTree(rackId)); - json.add("bytesInPerSecLimit", gson.toJsonTree(bytesInPerSecLimit)); - json.add("bytesOutPerSecLimit", gson.toJsonTree(bytesOutPerSecLimit)); - json.add("maxBytesOut", gson.toJsonTree(getMaxBytesOut())); - json.add("maxBytesIn", gson.toJsonTree(getMaxBytesIn())); - return json; - } - public long getMaxBytesIn() { long result = 0L; for (TopicPartition topicPartition : leaderReplicas) { @@ -100,83 +92,26 @@ public long getReservedBytesOut() { return reservedBytesOut; } - public int id() { + @JsonProperty + public int getId() { return this.brokerId; } - public String name() { + @JsonProperty + public String getName() { return brokerName; } - public int port() { + @JsonProperty + public int getPort() { return this.brokerPort; } - public long lastStatsTimestamp() { + public long getLastStatsTimestamp() { return latestStats == null ? 0 : latestStats.getTimestamp(); } - public boolean reserveBandwidth(TopicPartition tp, double inBound, double outBound){ - if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound && - bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) { - reservedBytesIn += inBound; - reservedBytesOut += outBound; - toBeAddedReplicas.add(tp); - return true; - } - return false; - } - - /** - * cancels reservation of bandwidth for a TopicPartition - * @param tp TopicPartition to cancel reservation - * @param inBound amount of in-bound traffic to remove from broker - * @param outBound amount of out-bound traffic to remove from broker - * @return if reserved bandwidth has been successfully removed from broker - */ - public boolean removeReservedBandwidth(TopicPartition tp, double inBound, double outBound){ - if ( toBeAddedReplicas.contains(tp) ){ - reservedBytesIn -= inBound; - reservedBytesOut -= outBound; - toBeAddedReplicas.remove(tp); - return true; - } - return false; - } - - @Deprecated - public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) { - if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound) { - reservedBytesIn += inBound; - toBeAddedReplicas.add(tp); - return true; - } - return false; - } - - /** - * To reserve out-bound bandwidth, we first need to compute the current network outbound usage. - * When a partition reassignment happens, the outbound metric of leader prelicas will be high. - * Because of this, using the brokerstats based information will lead to inaccurate decision. - * One observation is that in-bound traffic is more stable than the out-bound traffic, - * and read/write ratio for topics usually does not change often. Because of this, we can - * use the in-bound traffic times read/write ratio to infer the required out-bound bandwidth - * for topic partitions. - * - * @param tp the topic partition for reserving outbound bandwidth - * @param outBound the outbound bandwidth requirements in bytes/second - * @return whether the reservation is successful or not. - */ - @Deprecated - public boolean reserveOutBoundBandwidth(TopicPartition tp, double outBound) { - if (bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) { - reservedBytesOut += outBound; - toBeAddedReplicas.add(tp); - return true; - } - return false; - } - + @JsonIgnore public List getLeaderTopicPartitions() { BrokerStats brokerStats = getLatestStats(); if (brokerStats == null) { @@ -189,6 +124,7 @@ public List getLeaderTopicPartitions() { return topicPartitions; } + @JsonIgnore public List getFollowerTopicPartitions() { BrokerStats brokerStats = getLatestStats(); if (brokerStats == null) { @@ -208,14 +144,86 @@ public void clearResourceAllocationCounters() { this.toBeAddedReplicas.clear(); } + @JsonIgnore protected void setLeaderReplicas(Set leaderReplicas) { this.leaderReplicas = leaderReplicas; } + @JsonIgnore protected void setFollowerReplicas(Set followerReplicas) { this.followerReplicas= followerReplicas; } + /** + * This is used in partition reassignment. During the partition reassignment, we cannot + * put two replicas on the same broker. + * + * @param tp the topic partition for examining + * @return whether the broker has a replica for the given topic partition. + */ + public boolean hasTopicPartition(TopicPartition tp) { + return leaderReplicas.contains(tp) || followerReplicas.contains(tp) + || toBeAddedReplicas.contains(tp); + } + + @JsonIgnore + public BrokerStats getLatestStats() { + return latestStats; + } + + @VisibleForTesting + protected void setLatestStats(BrokerStats brokerStats){ + this.latestStats = brokerStats; + } + + @JsonProperty + public String getRackId(){ + return rackId; + } + + @VisibleForTesting + protected void setRackId(String rackId){ + this.rackId = rackId; + } + + /** + * + * Broker Decommissioning: + * Currently, a decommissioned broker + * WILL: + * 1. Be ignored when checking for dead brokers (and thus no replacement will happen) + * 2. Be ignored during URP reassignments + * 3. Still update stats so it can catch up if decommission is cancelled + * WILL NOT: + * 1. Reassign the partitions to other brokers, you have to do it manually + * + * Note: Decommissioning is ephemeral for now, state is not preserved in ZK, so if a restart happens, + * we will have to do it again + * + */ + + /** + * Decommissions the broker + * @return previous decommission state + */ + public boolean decommission() { + return this.isDecommissioned.getAndSet(true); + } + + /** + * Cancels decommission of the broker + * @return previous decommission state + */ + public boolean cancelDecommission() { + return this.isDecommissioned.getAndSet(false); + } + + @JsonProperty + public boolean isDecommissioned() { + return this.isDecommissioned.get(); + } + + /** * Record the stats, and update the topic partition list based on the stats * @@ -251,46 +259,93 @@ public synchronized void update(BrokerStats stats) { } } - /** - * This is used in partition reassignment. During the partition reassignment, we cannot - * put two replicas on the same broker. - * - * @param tp the topic partition for examining - * @return whether the broker has a replica for the given topic partition. - */ - public boolean hasTopicPartition(TopicPartition tp) { - return leaderReplicas.contains(tp) || followerReplicas.contains(tp) - || toBeAddedReplicas.contains(tp); - } - public BrokerStats getLatestStats() { - return latestStats; + public boolean reserveBandwidth(TopicPartition tp, double inBound, double outBound){ + if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound && + bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) { + reservedBytesIn += inBound; + reservedBytesOut += outBound; + toBeAddedReplicas.add(tp); + return true; + } + return false; } - @VisibleForTesting - protected void setLatestStats(BrokerStats brokerStats){ - this.latestStats = brokerStats; + /** + * cancels reservation of bandwidth for a TopicPartition + * @param tp TopicPartition to cancel reservation + * @param inBound amount of in-bound traffic to remove from broker + * @param outBound amount of out-bound traffic to remove from broker + * @return if reserved bandwidth has been successfully removed from broker + */ + public boolean removeReservedBandwidth(TopicPartition tp, double inBound, double outBound){ + if ( toBeAddedReplicas.contains(tp) ){ + reservedBytesIn -= inBound; + reservedBytesOut -= outBound; + toBeAddedReplicas.remove(tp); + return true; + } + return false; } - public String getRackId(){ - return rackId; + @Deprecated + public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) { + if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound) { + reservedBytesIn += inBound; + toBeAddedReplicas.add(tp); + return true; + } + return false; } - @VisibleForTesting - protected void setRackId(String rackId){ - this.rackId = rackId; + /** + * To reserve out-bound bandwidth, we first need to compute the current network outbound usage. + * When a partition reassignment happens, the outbound metric of leader prelicas will be high. + * Because of this, using the brokerstats based information will lead to inaccurate decision. + * One observation is that in-bound traffic is more stable than the out-bound traffic, + * and read/write ratio for topics usually does not change often. Because of this, we can + * use the in-bound traffic times read/write ratio to infer the required out-bound bandwidth + * for topic partitions. + * + * @param tp the topic partition for reserving outbound bandwidth + * @param outBound the outbound bandwidth requirements in bytes/second + * @return whether the reservation is successful or not. + */ + @Deprecated + public boolean reserveOutBoundBandwidth(TopicPartition tp, double outBound) { + if (bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) { + reservedBytesOut += outBound; + toBeAddedReplicas.add(tp); + return true; + } + return false; } - @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("brokerId:" + id()); + sb.append("brokerId:" + getId()); sb.append("; rackId = " + rackId); sb.append("; stats : " + (latestStats == null ? "null" : latestStats)); return sb.toString(); } + + @Deprecated + public JsonElement toJson() { + // Return a JSON representation of a Kafka Broker. Sadly, not everything can be trivially added. + JsonObject json = new JsonObject(); + json.add("brokerId", gson.toJsonTree(brokerId)); + json.add("brokerName", gson.toJsonTree(brokerName)); + json.add("rackId", gson.toJsonTree(rackId)); + json.add("bytesInPerSecLimit", gson.toJsonTree(bytesInPerSecLimit)); + json.add("bytesOutPerSecLimit", gson.toJsonTree(bytesOutPerSecLimit)); + json.add("maxBytesOut", gson.toJsonTree(getMaxBytesOut())); + json.add("maxBytesIn", gson.toJsonTree(getMaxBytesIn())); + return json; + } + + public int compareTo(KafkaBroker another) { double networkUsage = getMaxBytesIn() + getMaxBytesOut() + reservedBytesIn + reservedBytesOut; @@ -315,4 +370,5 @@ public int compare(KafkaBroker x, KafkaBroker y) { return (xNetworkUsage < yNetworkUsage) ? -1 : (xNetworkUsage > yNetworkUsage ? 1 : 0); } } + } diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java index f3bf21e1..2939235a 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaCluster.java @@ -219,7 +219,7 @@ public List getHighTrafficBrokers() { continue; } LOG.debug("High traffic broker: {} : [{}, {}]", - broker.name(), broker.getMaxBytesIn(), broker.getMaxBytesOut()); + broker.getName(), broker.getMaxBytesIn(), broker.getMaxBytesOut()); result.add(broker); } } @@ -239,7 +239,7 @@ public List getLowTrafficBrokers() { double brokerBytesOut = broker.getMaxBytesOut(); if (brokerBytesIn < averageBytesIn && brokerBytesOut < averageBytesOut) { LOG.info("Low traffic broker {} : [{}, {}]", - broker.name(), broker.getMaxBytesIn(), broker.getMaxBytesOut()); + broker.getName(), broker.getMaxBytesIn(), broker.getMaxBytesOut()); result.add(broker); } } catch (Exception e) { @@ -293,7 +293,8 @@ protected boolean isInvalidBroker(KafkaBroker broker) { BrokerStats latestStats = broker.getLatestStats(); return latestStats== null || latestStats.getHasFailure() || - System.currentTimeMillis() - latestStats.getTimestamp() > INVALID_BROKERSTATS_TIME; + System.currentTimeMillis() - latestStats.getTimestamp() > INVALID_BROKERSTATS_TIME || + broker.isDecommissioned(); } @@ -436,7 +437,7 @@ protected boolean findNextBrokerForOosReplica( ){ boolean success; KafkaBroker leastUsedBroker = brokerQueue.poll(); - while (leastUsedBroker != null && replicaBrokers.contains(leastUsedBroker.id())) { + while (leastUsedBroker != null && replicaBrokers.contains(leastUsedBroker.getId())) { unusableBrokers.add(leastUsedBroker); leastUsedBroker = brokerQueue.poll(); } @@ -444,7 +445,7 @@ protected boolean findNextBrokerForOosReplica( LOG.error("Failed to find a usable broker for fixing {}:{}", tp, oosBrokerId); success = false; } else { - LOG.info("LeastUsedBroker for replacing {} : {}", oosBrokerId, leastUsedBroker.id()); + LOG.info("LeastUsedBroker for replacing {} : {}", oosBrokerId, leastUsedBroker.getId()); success = preferredBroker == oosBrokerId ? leastUsedBroker.reserveBandwidth(tp, inBoundReq, outBoundReq) : leastUsedBroker.reserveBandwidth(tp, inBoundReq, 0); @@ -472,7 +473,7 @@ public KafkaBroker getAlternativeBroker(TopicPartition topicPartition, } // we will get the broker with the least network usage KafkaBroker leastUsedBroker = brokerQueue.poll(); - LOG.info("LeastUsedBroker for replacing {} : {}", topicPartition, leastUsedBroker.id()); + LOG.info("LeastUsedBroker for replacing {} : {}", topicPartition, leastUsedBroker.getId()); boolean success = leastUsedBroker.reserveBandwidth(topicPartition, tpBytesIn, tpBytesOut); if (!success) { diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java index ef970c1f..c775cf94 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/KafkaClusterManager.java @@ -219,9 +219,9 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker, List leaderReplicas, double averageBytesIn, double averageBytesOut) { - LOG.info("Start generating leader reassignment plan for {}", broker.name()); + LOG.info("Start generating leader reassignment plan for {}", broker.getName()); if (leaderReplicas == null) { - LOG.info("broker {} does not have leader partition", broker.id()); + LOG.info("broker {} does not have leader partition", broker.getId()); return; } @@ -246,8 +246,8 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker, // if the preferred leader is not the current leader, // check if applying preferred leader election is feasible int preferredBrokerId = replicasList.get(0); - if (preferredBrokerId != broker.id()) { - LOG.info("Partition {}: {}, broker :{}", tp.partition(), replicasList, broker.name()); + if (preferredBrokerId != broker.getId()) { + LOG.info("Partition {}: {}, broker :{}", tp.partition(), replicasList, broker.getName()); KafkaBroker another = kafkaCluster.getBroker(preferredBrokerId); // we only need to check if the outbound bandwidth for preferredBroker as // there will be no in-bound traffic change @@ -268,14 +268,14 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker, // invariant: is preferred leader, and moving this replica out will be helpful. KafkaBroker alterBroker = kafkaCluster.getAlternativeBroker(tp, tpBytesIn, tpBytesOut); if (alterBroker != null) { - LOG.info("Alternative broker for {} : {} -> {}", tp, broker.name(), alterBroker.name()); + LOG.info("Alternative broker for {} : {} -> {}", tp, broker.getName(), alterBroker.getName()); LOG.info(" tpBytesIn:{}, tpBytesOut:{}", tpBytesIn, tpBytesOut); LOG.info(" to be added: in: {}, out: {}", alterBroker.getReservedBytesIn(), alterBroker.getReservedBytesOut()); ReassignmentInfo reassign = new ReassignmentInfo(tp, broker, alterBroker); reassignmentMap.put(tp, reassign); - LOG.info(" {} : {} -> {}", tp, reassign.source.name(), reassign.dest.name()); + LOG.info(" {} : {} -> {}", tp, reassign.source.getName(), reassign.dest.getName()); toBeReducedBytesIn += tpBytesIn; toBeReducedBytesOut += tpBytesOut; @@ -284,14 +284,14 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker, break; } } else { - LOG.info("Could not find an alternative broker for {}:{} ", broker.name(), tp); + LOG.info("Could not find an alternative broker for {}:{} ", broker.getName(), tp); reassignmentFailures.add(new MutablePair<>(broker, tp)); } } } catch (Exception e) { - LOG.info("Failure in generating leader assignment plan for {}", broker.name(), e); + LOG.info("Failure in generating leader assignment plan for {}", broker.getName(), e); } - LOG.info("End generating leader reassignment plan for {}", broker.name()); + LOG.info("End generating leader reassignment plan for {}", broker.getName()); } @@ -299,7 +299,7 @@ private void generateLeadersReassignmentPlan(KafkaBroker broker, * Reassign the follower partitions */ private void generateFollowerReassignmentPlan(KafkaBroker broker) { - LOG.info("Begin generating follower reassignment plan for {}", broker.name()); + LOG.info("Begin generating follower reassignment plan for {}", broker.getName()); List topicPartitions = broker.getFollowerTopicPartitions(); Map tpTraffic = sortTopicPartitionsByTraffic(topicPartitions); @@ -320,17 +320,17 @@ private void generateFollowerReassignmentPlan(KafkaBroker broker) { } KafkaBroker alterBroker = kafkaCluster.getAlternativeBroker(tp, tpBytesIn, 0); if (alterBroker != null) { - LOG.info(" Alternative broker for {} : {} -> {}, bytesIn: {}", tp, broker.name(), - alterBroker.name(), tpBytesIn); + LOG.info(" Alternative broker for {} : {} -> {}, bytesIn: {}", tp, broker.getName(), + alterBroker.getName(), tpBytesIn); ReassignmentInfo reassign = new ReassignmentInfo(tp, broker, alterBroker); reassignmentMap.put(tp, reassign); - LOG.info(" {} : {} -> {}", tp, reassign.source.name(), reassign.dest.name()); + LOG.info(" {} : {} -> {}", tp, reassign.source.getName(), reassign.dest.getName()); toBeReducedBytesIn += tpBytesIn; if (broker.getMaxBytesIn() - toBeReducedBytesIn <= bytesInLimit) { break; } } else { - LOG.info("Could not find an alternative broker for {}:{}", broker.name(), tp); + LOG.info("Could not find an alternative broker for {}:{}", broker.getName(), tp); reassignmentFailures.add(new MutablePair<>(broker, tp)); } } @@ -353,7 +353,7 @@ public List getHighTrafficBroker() { Collections.reverse(highTrafficBrokers); for (KafkaBroker broker : highTrafficBrokers) { LOG.info("high traffic borker: {} : [{}, {}]", - broker.name(), broker.getMaxBytesIn(), broker.getMaxBytesOut()); + broker.getName(), broker.getMaxBytesIn(), broker.getMaxBytesOut()); } return highTrafficBrokers; } @@ -378,14 +378,14 @@ public String getWorkloadBalancingPlanInJson(List highTrafficBroker try { if (broker.getMaxBytesOut() > clusterConfig.getNetworkOutLimitInBytes()) { // need to move some leader partitions out, or switch preferred leaders - List leaderReplicas = leaderTopicPartitions.get(broker.id()); + List leaderReplicas = leaderTopicPartitions.get(broker.getId()); generateLeadersReassignmentPlan(broker, leaderReplicas, averageBytesIn, averageBytesOut); } else if (broker.getMaxBytesIn() > clusterConfig.getNetworkInLimitInBytes()) { // move some followers out may be sufficient generateFollowerReassignmentPlan(broker); } } catch (Exception e) { - LOG.info("Exception in generating assignment plan for {}", broker.name(), e); + LOG.info("Exception in generating assignment plan for {}", broker.getName(), e); } } @@ -435,14 +435,14 @@ public String getWorkloadBalancingPlanInJson(List highTrafficBroker Node[] replicas = partitionInfo.replicas(); Integer[] newReplicas = new Integer[partitionInfo.replicas().length]; for (int i = 0; i < replicas.length; i++) { - if (replicas[i].id() == reassign.source.id()) { - newReplicas[i] = reassign.dest.id(); + if (replicas[i].id() == reassign.source.getId()) { + newReplicas[i] = reassign.dest.getId(); } else { newReplicas[i] = replicas[i].id(); } } assignmentPlan.put(tp, newReplicas); - sourceBrokerId.add(reassign.source.id()); + sourceBrokerId.add(reassign.source.getId()); } if (assignmentPlan.size() > 0) { scala.collection.Map> proposedAssignment = @@ -580,7 +580,7 @@ public UnderReplicatedReason getUnderReplicatedReason(String brokerHost, } else { KafkaBroker leaderBroker = kafkaCluster.getBroker(leaderId); // Leader might be bad as well - if (leaderBroker != null && isDeadBroker(leaderBroker.name(), kafkaPort, leaderId, tp)) { + if (leaderBroker != null && isDeadBroker(leaderBroker.getName(), kafkaPort, leaderId, tp)) { reason = UnderReplicatedReason.LEADER_FAILURE; } else if (isNetworkSaturated(leaderId)) { reason = UnderReplicatedReason.LEADER_NETWORK_SATURATION; @@ -646,7 +646,7 @@ private Map generateReassignmentPlanForDeadBrokers( Integer[] newReplicas = new Integer[replicas.size()]; for (int i = 0; i < replicas.size(); i++) { int brokerId = replicas.get(i); - newReplicas[i] = replacedNodes.containsKey(brokerId) ? replacedNodes.get(brokerId).id() + newReplicas[i] = replacedNodes.containsKey(brokerId) ? replacedNodes.get(brokerId).getId() : brokerId; } replicasMap.put(oosReplica.topicPartition, newReplicas); @@ -710,7 +710,7 @@ public void handleUnderReplicatedPartitions(List initialUrps, } else if (downBrokers.contains(leaderId)) { reason = UnderReplicatedReason.LEADER_FAILURE; } else { - reason = getUnderReplicatedReason(broker.name(), broker.port(), oosBrokerId, leaderId, + reason = getUnderReplicatedReason(broker.getName(), broker.getPort(), oosBrokerId, leaderId, oosReplica.topicPartition); if (reason == UnderReplicatedReason.FOLLOWER_FAILURE) { downBrokers.add(oosBrokerId); @@ -943,7 +943,10 @@ private boolean checkAndReplaceDeadBrokers() { KafkaBroker toBeReplaced = null; for (Map.Entry brokerEntry : kafkaCluster.brokers.entrySet()) { KafkaBroker broker = brokerEntry.getValue(); - double lastUpdateTime = (now - broker.lastStatsTimestamp()) / 1000.0; + if (broker.isDecommissioned()) { + continue; + } + double lastUpdateTime = (now - broker.getLastStatsTimestamp()) / 1000.0; // call broker replacement script to replace dead brokers if (lastUpdateTime > clusterConfig.getBrokerReplacementNoStatsSeconds()) { toBeReplaced = broker; @@ -952,7 +955,7 @@ private boolean checkAndReplaceDeadBrokers() { } if (toBeReplaced != null) { - String brokerName= toBeReplaced.name(); + String brokerName= toBeReplaced.getName(); String clusterName = clusterConfig.getClusterName(); try { @@ -997,6 +1000,24 @@ public void disableMaintenanceMode() { Email.notifyOnMaintenanceMode(drkafkaConfig.getNotificationEmails(), clusterConfig.getClusterName(), maintenanceMode.get()); } + + public void decommissionBroker(Integer brokerId) { + boolean prevState = kafkaCluster.getBroker(brokerId).decommission(); + + // only notify if state changed + if (prevState == false) { + Email.notifyOnDecommissioningBroker(drkafkaConfig.getNotificationEmails(), kafkaCluster.name(), String.valueOf(brokerId)); + } + } + + public void cancelDecommissionBroker(Integer brokerId) { + boolean prevState = kafkaCluster.getBroker(brokerId).cancelDecommission(); + + // only notify if state changed + if (prevState == true) { + Email.notifyOnCancelledDecommissioningBroker(drkafkaConfig.getNotificationEmails(), kafkaCluster.name(), String.valueOf(brokerId)); + } + } /** * KafkaClusterManager periodically check the health of the cluster. If it finds diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokerApi.java b/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersApi.java similarity index 50% rename from drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokerApi.java rename to drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersApi.java index 83f00821..1c2e1a0f 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokerApi.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersApi.java @@ -10,18 +10,24 @@ import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; +import com.pinterest.doctorkafka.DoctorKafka; import com.pinterest.doctorkafka.DoctorKafkaMain; +import com.pinterest.doctorkafka.KafkaBroker; import com.pinterest.doctorkafka.KafkaClusterManager; -@Path("/cluster/{clusterName}/broker") +@Path("/clusters/{clusterName}/brokers") @Produces({ MediaType.APPLICATION_JSON }) @Consumes({ MediaType.APPLICATION_JSON }) -public class BrokerApi { +public class BrokersApi extends DoctorKafkaApi { + + public BrokersApi(DoctorKafka drkafka){ + super(drkafka); + } @GET - public List getBrokerList(@PathParam("clusterName") String clusterName) { - KafkaClusterManager clusterManager = DoctorKafkaMain.doctorKafka.getClusterManager(clusterName); - return clusterManager.getAllBrokers().stream().map(b -> b.name()).collect(Collectors.toList()); + public List getBrokerList(@PathParam("clusterName") String clusterName) { + KafkaClusterManager clusterManager = checkAndGetClusterManager(clusterName); + return clusterManager.getAllBrokers(); } } diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersDecommissionApi.java b/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersDecommissionApi.java new file mode 100644 index 00000000..0f762f53 --- /dev/null +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/api/BrokersDecommissionApi.java @@ -0,0 +1,52 @@ +package com.pinterest.doctorkafka.api; + +import com.pinterest.doctorkafka.DoctorKafka; +import com.pinterest.doctorkafka.util.ApiUtils; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; + +@Path("/clusters/{clusterName}/brokers/{brokerId}/admin/decommission") +@Produces({MediaType.APPLICATION_JSON }) +@Consumes({MediaType.APPLICATION_JSON }) +public class BrokersDecommissionApi extends DoctorKafkaApi { + + private static final Logger LOG = LogManager.getLogger(BrokersDecommissionApi.class); + + public BrokersDecommissionApi(DoctorKafka drkafka) { + super(drkafka); + } + + @GET + public boolean isBrokerDecommissioned(@PathParam("clusterName") String clusterName, @PathParam("brokerId") String brokerId) { + return checkAndGetBroker(clusterName, brokerId).isDecommissioned(); + } + + @PUT + public void decommissionBroker(@Context HttpServletRequest ctx, + @PathParam("clusterName") String clusterName, + @PathParam("brokerId") String brokerIdStr) { + checkAndGetClusterManager(clusterName).decommissionBroker(Integer.parseInt(brokerIdStr)); + ApiUtils.logAPIAction(LOG, ctx, "Decommissioned for broker:" + brokerIdStr + " on cluster "+ clusterName); + } + + @DELETE + public void cancelDecommissionBroker(@Context HttpServletRequest ctx, + @PathParam("clusterName") String clusterName, + @PathParam("brokerId") String brokerIdStr) { + checkAndGetClusterManager(clusterName).cancelDecommissionBroker(Integer.parseInt(brokerIdStr)); + ApiUtils.logAPIAction(LOG, ctx, "Decommissioned cancelled for broker:" + brokerIdStr + " on cluster "+ clusterName); + } + +} diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/api/ClusterApi.java b/drkafka/src/main/java/com/pinterest/doctorkafka/api/ClustersApi.java similarity index 68% rename from drkafka/src/main/java/com/pinterest/doctorkafka/api/ClusterApi.java rename to drkafka/src/main/java/com/pinterest/doctorkafka/api/ClustersApi.java index 790bafca..09d3a995 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/api/ClusterApi.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/api/ClustersApi.java @@ -10,20 +10,18 @@ import com.pinterest.doctorkafka.DoctorKafka; -@Path("/cluster") +@Path("/clusters") @Produces({ MediaType.APPLICATION_JSON }) @Consumes({ MediaType.APPLICATION_JSON }) -public class ClusterApi { +public class ClustersApi extends DoctorKafkaApi { - private DoctorKafka drKafka; - - public ClusterApi(DoctorKafka drKafka) { - this.drKafka = drKafka; + public ClustersApi(DoctorKafka drKafka) { + super(drKafka); } @GET public List getClusterNames() { - return drKafka.getClusterNames(); + return getDrkafka().getClusterNames(); } } diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/api/MaintenanceApi.java b/drkafka/src/main/java/com/pinterest/doctorkafka/api/ClustersMaintenanceApi.java similarity index 60% rename from drkafka/src/main/java/com/pinterest/doctorkafka/api/MaintenanceApi.java rename to drkafka/src/main/java/com/pinterest/doctorkafka/api/ClustersMaintenanceApi.java index 5ff12d27..7150906a 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/api/MaintenanceApi.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/api/ClustersMaintenanceApi.java @@ -4,7 +4,6 @@ import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.GET; -import javax.ws.rs.NotFoundException; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -17,17 +16,17 @@ import com.pinterest.doctorkafka.DoctorKafka; import com.pinterest.doctorkafka.KafkaClusterManager; +import com.pinterest.doctorkafka.util.ApiUtils; -@Path("/cluster/{clusterName}/admin/maintenance") +@Path("/clusters/{clusterName}/admin/maintenance") @Produces({ MediaType.APPLICATION_JSON }) @Consumes({ MediaType.APPLICATION_JSON }) -public class MaintenanceApi { +public class ClustersMaintenanceApi extends DoctorKafkaApi { - private static final Logger LOG = LogManager.getLogger(MaintenanceApi.class); - private DoctorKafka drKafka; + private static final Logger LOG = LogManager.getLogger(ClustersMaintenanceApi.class); - public MaintenanceApi(DoctorKafka drKafka) { - this.drKafka = drKafka; + public ClustersMaintenanceApi(DoctorKafka drKafka) { + super(drKafka); } @GET @@ -41,8 +40,7 @@ public void enableMaintenance(@Context HttpServletRequest ctx, @PathParam("clusterName") String clusterName) { KafkaClusterManager clusterManager = checkAndGetClusterManager(clusterName); clusterManager.enableMaintenanceMode(); - LOG.info("Enabled maintenance mode for cluster:" + clusterName + " by user:" - + ctx.getRemoteUser() + " from ip:" + ctx.getRemoteHost()); + ApiUtils.logAPIAction(LOG, ctx, "Enabled maintenance mode for cluster:" + clusterName); } @DELETE @@ -50,16 +48,7 @@ public void disableMaintenance(@Context HttpServletRequest ctx, @PathParam("clusterName") String clusterName) { KafkaClusterManager clusterManager = checkAndGetClusterManager(clusterName); clusterManager.disableMaintenanceMode(); - LOG.info("Dsiabled maintenance mode for cluster:" + clusterName + " by user:" - + ctx.getRemoteUser() + " from ip:" + ctx.getRemoteHost()); - } - - private KafkaClusterManager checkAndGetClusterManager(String clusterName) { - KafkaClusterManager clusterManager = drKafka.getClusterManager(clusterName); - if (clusterManager == null) { - throw new NotFoundException("Unknown clustername:" + clusterName); - } - return clusterManager; + ApiUtils.logAPIAction(LOG, ctx, "Disabled maintenance mode for cluster:" + clusterName); } } \ No newline at end of file diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/api/DoctorKafkaApi.java b/drkafka/src/main/java/com/pinterest/doctorkafka/api/DoctorKafkaApi.java new file mode 100644 index 00000000..37738e21 --- /dev/null +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/api/DoctorKafkaApi.java @@ -0,0 +1,44 @@ +package com.pinterest.doctorkafka.api; + +import com.pinterest.doctorkafka.DoctorKafka; +import com.pinterest.doctorkafka.KafkaBroker; +import com.pinterest.doctorkafka.KafkaClusterManager; + +import javax.ws.rs.NotFoundException; + +public abstract class DoctorKafkaApi { + private DoctorKafka drkafka; + + public DoctorKafkaApi() { + this.drkafka = null; + } + + public DoctorKafkaApi(DoctorKafka drkafka) { + this.drkafka = drkafka; + } + + protected DoctorKafka getDrkafka() { + return drkafka; + } + + protected KafkaClusterManager checkAndGetClusterManager(String clusterName) { + KafkaClusterManager clusterManager = drkafka.getClusterManager(clusterName); + if (clusterManager == null) { + throw new NotFoundException("Unknown clustername:" + clusterName); + } + return clusterManager; + } + + protected KafkaBroker checkAndGetBroker(String clusterName, String brokerId) { + KafkaClusterManager clusterManager = checkAndGetClusterManager(clusterName); + Integer id = Integer.parseInt(brokerId); + KafkaBroker broker = clusterManager.getCluster().getBroker(id); + if (broker == null) { + throw new NotFoundException("Unknown brokerId: " + brokerId); + } + return broker; + } + + + +} diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/notification/Email.java b/drkafka/src/main/java/com/pinterest/doctorkafka/notification/Email.java index 6ee893d4..72e404ae 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/notification/Email.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/notification/Email.java @@ -89,6 +89,18 @@ public static void notifyOnMaintenanceMode(String[] emails, String clusterName, sendTo(emails, title, content); } + public static void notifyOnDecommissioningBroker(String[] emails, String clusterName, String brokerId) { + String title = "Decommissioning broker " + brokerId + " on " + clusterName; + String content = "Broker:" + brokerId + " Cluster:" + clusterName + " is getting decommissioned"; + sendTo(emails, title, content); + } + + public static void notifyOnCancelledDecommissioningBroker(String[] emails, String clusterName, String brokerId) { + String title = "Cancelled decommissioning broker " + brokerId + " on " + clusterName; + String content = "Broker:" + brokerId + " Cluster:" + clusterName + " decommission cancelled"; + sendTo(emails, title, content); + } + public static void alertOnNoStatsBrokers(String[] emails, String clusterName, List noStatsBrokers) { @@ -129,7 +141,7 @@ public static void alertOnFailureInHandlingUrps(String[] emails, reassignmentFailures.stream().forEach(pair -> { KafkaBroker broker = pair.getKey(); TopicPartition topicPartition = pair.getValue(); - sb.append("Broker : " + broker.name() + ", " + topicPartition); + sb.append("Broker : " + broker.getName() + ", " + topicPartition); }); } if (downBrokers != null && !downBrokers.isEmpty()) { diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/ClusterInfoServlet.java b/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/ClusterInfoServlet.java index f26cdd7b..3bf685c6 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/ClusterInfoServlet.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/ClusterInfoServlet.java @@ -90,14 +90,14 @@ public void renderHTML(PrintWriter writer, Map params) { writer.print(String.format("
overloaded brokers (%d) : ", overloadedBrokers.size())); for (KafkaBroker broker : overloadedBrokers) { - writer.print(broker.name() + ","); + writer.print(broker.getName() + ","); } writer.print("
"); writer.print(String.format("
under-utilized brokers (%d): ", underutilized.size())); for (KafkaBroker broker : underutilized) { - writer.print(broker.name() + ","); + writer.print(broker.getName() + ","); } writer.print("
"); } @@ -105,8 +105,9 @@ public void renderHTML(PrintWriter writer, Map params) { writer.print(""); writer.print(""); String thStr = String.format( - "", - "BrokerId", "BrokerName", "MaxIn (Mb/s)", "MaxOut (Mb/s)", "#Partitions", "Last Update"); + "", + "BrokerId", "BrokerName", "MaxIn (Mb/s)", "MaxOut (Mb/s)", "#Partitions", "Last Update", + "Decommissioned"); writer.print(thStr + ""); writer.print(""); @@ -119,7 +120,7 @@ public void renderHTML(PrintWriter writer, Map params) { KafkaBroker broker = brokerEntry.getValue(); double maxMbInPerSec = broker.getMaxBytesIn() / 1024.0 / 1024.0; double maxMbOutPerSec = broker.getMaxBytesOut() / 1024.0 / 1024.0; - double lastUpdateTime = (now - broker.lastStatsTimestamp()) / 1000.0; + double lastUpdateTime = (now - broker.getLastStatsTimestamp()) / 1000.0; String lastUpateTimeHtml = lastUpdateTime < 600 @@ -128,11 +129,12 @@ public void renderHTML(PrintWriter writer, Map params) { int partitionCount = broker.getLatestStats().getNumReplicas(); String html = String.format( - " %s", + " %s ", brokerEntry.getKey(), "" + broker.name() + "", - maxMbInPerSec, maxMbOutPerSec, partitionCount, lastUpateTimeHtml); + + "&brokerid=" + broker.getId() + "\">" + broker.getName() + "", + maxMbInPerSec, maxMbOutPerSec, partitionCount, lastUpateTimeHtml, + broker.isDecommissioned()); writer.print(html); writer.print(""); diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/DoctorKafkaBrokerStatsServlet.java b/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/DoctorKafkaBrokerStatsServlet.java index 96b81b54..4748b219 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/DoctorKafkaBrokerStatsServlet.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/servlet/DoctorKafkaBrokerStatsServlet.java @@ -28,26 +28,34 @@ public class DoctorKafkaBrokerStatsServlet extends DoctorKafkaServlet { private static final Logger LOG = LogManager.getLogger(DoctorKafkaBrokerStatsServlet.class); private static final Gson gson = (new GsonBuilder()).serializeSpecialFloatingPointValues().create(); - public BrokerStats getLatestStats(String clusterName, int brokerId) - throws ClusterInfoError { + public KafkaBroker getBroker(String clusterName, int brokerId) throws ClusterInfoError { + KafkaClusterManager clusterMananger = + DoctorKafkaMain.doctorKafka.getClusterManager(clusterName); + if (clusterMananger == null) { + throw new ClusterInfoError("Failed to find cluster manager for {}", clusterName); + } + KafkaBroker broker = clusterMananger.getCluster().getBroker(brokerId); + if (broker == null) { + throw new ClusterInfoError( + "Failed to find broker {} in cluster {}", + Integer.toString(brokerId), + clusterName + ); + } - try { - KafkaClusterManager clusterMananger = - DoctorKafkaMain.doctorKafka.getClusterManager(clusterName); - if (clusterMananger == null) { - throw new ClusterInfoError("Failed to find cluster manager for {}", clusterName); - } - KafkaCluster cluster = clusterMananger.getCluster(); - KafkaBroker broker = cluster.brokers.get(brokerId); - BrokerStats latestStats = broker.getLatestStats(); - if (latestStats == null) { - throw new ClusterInfoError("Failed to find Broker {} for {} ", Integer.toString(brokerId), clusterName); - } - return latestStats; - } catch (Exception e) { - LOG.error("Unexpected exception : ", e); - throw new ClusterInfoError("Unexpected exception: {} ", e.toString()); + return broker; + } + + public BrokerStats getLatestStats(String clusterName, KafkaBroker broker) + throws ClusterInfoError { + BrokerStats latestStats = broker.getLatestStats(); + if (latestStats == null) { + throw new ClusterInfoError("Failed to get latest stats from broker {} in cluster {}", + Integer.toString(broker.getId()), + clusterName + ); } + return latestStats; } @Override @@ -83,8 +91,7 @@ public void renderHTML(PrintWriter writer, Map params) { writer.print(""); try { - BrokerStats latestStats = getLatestStats(clusterName, brokerId); - generateBrokerStatsHtml(writer, latestStats); + generateBrokerHtml(writer, clusterName, brokerId); writer.print("
%s %s %s %s %s %s%s %s %s %s %s %s %s
%d %s %.2f %.2f %d%d %s %.2f %.2f %d %s
"); writer.print(" "); writer.print(" "); @@ -95,11 +102,16 @@ public void renderHTML(PrintWriter writer, Map params) { printFooter(writer); } - private void generateBrokerStatsHtml(PrintWriter writer, BrokerStats stats) { + private void generateBrokerHtml(PrintWriter writer, String clusterName, int brokerId) + throws ClusterInfoError{ + KafkaBroker broker = getBroker(clusterName, brokerId); + BrokerStats stats = getLatestStats(clusterName, broker); + writer.print(" " + new Date(stats.getTimestamp()) + ""); writer.print(""); writer.print(""); printHtmlTableRow(writer, "BrokerId", stats.getId()); + printHtmlTableRow(writer, "IsDecommissioned", broker.isDecommissioned()); printHtmlTableRow(writer, "Name", stats.getName()); printHtmlTableRow(writer, "HasFailure", stats.getHasFailure()); printHtmlTableRow(writer, "KafkaVersion", stats.getKafkaVersion()); diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/util/ApiUtils.java b/drkafka/src/main/java/com/pinterest/doctorkafka/util/ApiUtils.java new file mode 100644 index 00000000..8b2e5b3d --- /dev/null +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/util/ApiUtils.java @@ -0,0 +1,11 @@ +package com.pinterest.doctorkafka.util; + +import org.apache.logging.log4j.Logger; + +import javax.servlet.http.HttpServletRequest; + +public class ApiUtils { + public static void logAPIAction(Logger LOG, HttpServletRequest ctx, String message) { + LOG.info("User from:" + ctx.getRemoteUser() + " from ip:" + ctx.getRemoteHost() + " " + message); + } +} diff --git a/drkafka/src/main/java/com/pinterest/doctorkafka/util/ReassignmentInfo.java b/drkafka/src/main/java/com/pinterest/doctorkafka/util/ReassignmentInfo.java index aa4330c4..4d5a32f5 100644 --- a/drkafka/src/main/java/com/pinterest/doctorkafka/util/ReassignmentInfo.java +++ b/drkafka/src/main/java/com/pinterest/doctorkafka/util/ReassignmentInfo.java @@ -19,7 +19,7 @@ public ReassignmentInfo(TopicPartition tp, KafkaBroker src, KafkaBroker dest) { @Override public String toString() { String result = topicPartition.toString() + ": "; - result += source.name() + " -> " + dest.name(); + result += source.getName() + " -> " + dest.getName(); return result; } } diff --git a/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java index a1f23c21..cb531f00 100644 --- a/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java +++ b/drkafka/src/test/java/com/pinterest/doctorkafka/KafkaClusterTest.java @@ -170,7 +170,7 @@ void testLocalityAwareReassignments() throws Exception { BrokerStats bs = new BrokerStats(); bs.setTimestamp(System.currentTimeMillis()); broker.setLatestStats(bs); - kafkaCluster.brokers.put(broker.id(), broker); + kafkaCluster.brokers.put(broker.getId(), broker); } Set replicaSet = new HashSet<>(); @@ -210,7 +210,7 @@ void testLocalityAwareReassignments() throws Exception { // element check assertTrue(actualAssignments .stream() - .map(broker -> broker.id()) + .map(broker -> broker.getId()) .collect(Collectors.toList()) .containsAll(expectedAssignments)); } @@ -248,7 +248,7 @@ void testLocalityAwareReassignments() throws Exception { // element check assertTrue(actualAssignments .stream() - .map(broker -> broker.id()) + .map(broker -> broker.getId()) .collect(Collectors.toList()) .containsAll(expectedAssignments)); } @@ -292,7 +292,7 @@ void testNonLocalityAwareReassignments() throws Exception { BrokerStats bs = new BrokerStats(); bs.setTimestamp(System.currentTimeMillis()); broker.setLatestStats(bs); - kafkaCluster.brokers.put(broker.id(), broker); + kafkaCluster.brokers.put(broker.getId(), broker); } Set replicaSet = new HashSet<>(); @@ -375,7 +375,7 @@ void testLocalityAwareReassignmentsFailure() throws Exception { BrokerStats bs = new BrokerStats(); bs.setTimestamp(System.currentTimeMillis()); broker.setLatestStats(bs); - kafkaCluster.brokers.put(broker.id(), broker); + kafkaCluster.brokers.put(broker.getId(), broker); } Set replicaSet = new HashSet<>();