Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
add decommission api
Browse files Browse the repository at this point in the history
add email notifications for decommissioning
refactor api code
make broker api return decommission info, refactored getters to comply with convention, reordering to group getter/setters
modify UI to expose decommission status
  • Loading branch information
kabochya committed May 9, 2019
1 parent f84afe7 commit b7d0d3d
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.apache.logging.log4j.Logger;

import com.google.common.collect.ImmutableList;
import com.pinterest.doctorkafka.api.MaintenanceApi;

import com.pinterest.doctorkafka.api.DecommissionBrokerApi;
import com.pinterest.doctorkafka.api.ClusterMaintenanceApi;
import com.pinterest.doctorkafka.api.BrokerApi;
import com.pinterest.doctorkafka.api.ClusterApi;
import com.pinterest.doctorkafka.config.DoctorKafkaAppConfig;
Expand Down Expand Up @@ -115,9 +117,10 @@ private void configureServerRuntime(DoctorKafkaAppConfig configuration, DoctorKa

private void registerAPIs(Environment environment, DoctorKafka doctorKafka) {
environment.jersey().setUrlPattern("/api/*");
environment.jersey().register(new BrokerApi());
environment.jersey().register(new BrokerApi(doctorKafka));
environment.jersey().register(new ClusterApi(doctorKafka));
environment.jersey().register(new MaintenanceApi(doctorKafka));
environment.jersey().register(new ClusterMaintenanceApi(doctorKafka));
environment.jersey().register(new DecommissionBrokerApi(doctorKafka));
}

private void startMetricsService() {
Expand Down
258 changes: 157 additions & 101 deletions drkafka/src/main/java/com/pinterest/doctorkafka/KafkaBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonIgnore;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;


public class KafkaBroker implements Comparable<KafkaBroker> {

private static final Logger LOG = LogManager.getLogger(KafkaBroker.class);
Expand All @@ -41,6 +45,7 @@ public class KafkaBroker implements Comparable<KafkaBroker> {
private Set<TopicPartition> toBeAddedReplicas;

private KafkaCluster kafkaCluster;
private AtomicBoolean isDecommissioned = new AtomicBoolean(false);

public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, KafkaCluster kafkaCluster, int brokerId) {
assert clusterConfig != null;
Expand All @@ -59,19 +64,6 @@ public KafkaBroker(DoctorKafkaClusterConfig clusterConfig, KafkaCluster kafkaClu
this.kafkaCluster = kafkaCluster;
}

public JsonElement toJson() {
// Return a JSON representation of a Kafka Broker. Sadly, not everything can be trivially added.
JsonObject json = new JsonObject();
json.add("brokerId", gson.toJsonTree(brokerId));
json.add("brokerName", gson.toJsonTree(brokerName));
json.add("rackId", gson.toJsonTree(rackId));
json.add("bytesInPerSecLimit", gson.toJsonTree(bytesInPerSecLimit));
json.add("bytesOutPerSecLimit", gson.toJsonTree(bytesOutPerSecLimit));
json.add("maxBytesOut", gson.toJsonTree(getMaxBytesOut()));
json.add("maxBytesIn", gson.toJsonTree(getMaxBytesIn()));
return json;
}

public long getMaxBytesIn() {
long result = 0L;
for (TopicPartition topicPartition : leaderReplicas) {
Expand Down Expand Up @@ -100,83 +92,26 @@ public long getReservedBytesOut() {
return reservedBytesOut;
}

public int id() {
@JsonProperty
public int getId() {
return this.brokerId;
}

public String name() {
@JsonProperty
public String getName() {
return brokerName;
}

public int port() {
@JsonProperty
public int getPort() {
return this.brokerPort;
}

public long lastStatsTimestamp() {
public long getLastStatsTimestamp() {
return latestStats == null ? 0 : latestStats.getTimestamp();
}

public boolean reserveBandwidth(TopicPartition tp, double inBound, double outBound){
if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound &&
bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) {
reservedBytesIn += inBound;
reservedBytesOut += outBound;
toBeAddedReplicas.add(tp);
return true;
}
return false;
}

/**
* cancels reservation of bandwidth for a TopicPartition
* @param tp TopicPartition to cancel reservation
* @param inBound amount of in-bound traffic to remove from broker
* @param outBound amount of out-bound traffic to remove from broker
* @return if reserved bandwidth has been successfully removed from broker
*/
public boolean removeReservedBandwidth(TopicPartition tp, double inBound, double outBound){
if ( toBeAddedReplicas.contains(tp) ){
reservedBytesIn -= inBound;
reservedBytesOut -= outBound;
toBeAddedReplicas.remove(tp);
return true;
}
return false;
}

@Deprecated
public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) {
if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound) {
reservedBytesIn += inBound;
toBeAddedReplicas.add(tp);
return true;
}
return false;
}

/**
* To reserve out-bound bandwidth, we first need to compute the current network outbound usage.
* When a partition reassignment happens, the outbound metric of leader prelicas will be high.
* Because of this, using the brokerstats based information will lead to inaccurate decision.
* One observation is that in-bound traffic is more stable than the out-bound traffic,
* and read/write ratio for topics usually does not change often. Because of this, we can
* use the in-bound traffic times read/write ratio to infer the required out-bound bandwidth
* for topic partitions.
*
* @param tp the topic partition for reserving outbound bandwidth
* @param outBound the outbound bandwidth requirements in bytes/second
* @return whether the reservation is successful or not.
*/
@Deprecated
public boolean reserveOutBoundBandwidth(TopicPartition tp, double outBound) {
if (bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) {
reservedBytesOut += outBound;
toBeAddedReplicas.add(tp);
return true;
}
return false;
}

@JsonIgnore
public List<TopicPartition> getLeaderTopicPartitions() {
BrokerStats brokerStats = getLatestStats();
if (brokerStats == null) {
Expand All @@ -189,6 +124,7 @@ public List<TopicPartition> getLeaderTopicPartitions() {
return topicPartitions;
}

@JsonIgnore
public List<TopicPartition> getFollowerTopicPartitions() {
BrokerStats brokerStats = getLatestStats();
if (brokerStats == null) {
Expand All @@ -208,14 +144,86 @@ public void clearResourceAllocationCounters() {
this.toBeAddedReplicas.clear();
}

@JsonIgnore
protected void setLeaderReplicas(Set<TopicPartition> leaderReplicas) {
this.leaderReplicas = leaderReplicas;
}

@JsonIgnore
protected void setFollowerReplicas(Set<TopicPartition> followerReplicas) {
this.followerReplicas= followerReplicas;
}

/**
* This is used in partition reassignment. During the partition reassignment, we cannot
* put two replicas on the same broker.
*
* @param tp the topic partition for examining
* @return whether the broker has a replica for the given topic partition.
*/
public boolean hasTopicPartition(TopicPartition tp) {
return leaderReplicas.contains(tp) || followerReplicas.contains(tp)
|| toBeAddedReplicas.contains(tp);
}

@JsonIgnore
public BrokerStats getLatestStats() {
return latestStats;
}

@VisibleForTesting
protected void setLatestStats(BrokerStats brokerStats){
this.latestStats = brokerStats;
}

@JsonProperty
public String getRackId(){
return rackId;
}

@VisibleForTesting
protected void setRackId(String rackId){
this.rackId = rackId;
}

/**
*
* Broker Decommissioning:
* Currently, a decommissioned broker
* WILL:
* 1. Be ignored when checking for dead brokers (and thus no replacement will happen)
* 2. Be ignored during URP reassignments
* 3. Still update stats so it can catch up if decommission is cancelled
* WILL NOT:
* 1. Reassign the partitions to other brokers, you have to do it manually
*
* Note: Decommissioning is ephemeral for now, state is not preserved in ZK, so if a restart happens,
* we will have to do it again
*
*/

/**
* Decommissions the broker
* @return previous decommission state
*/
public boolean decommission() {
return this.isDecommissioned.getAndSet(true);
}

/**
* Cancels decommission of the broker
* @return previous decommission state
*/
public boolean cancelDecommission() {
return this.isDecommissioned.getAndSet(false);
}

@JsonProperty
public boolean isDecommissioned() {
return this.isDecommissioned.get();
}


/**
* Record the stats, and update the topic partition list based on the stats
*
Expand Down Expand Up @@ -251,46 +259,93 @@ public synchronized void update(BrokerStats stats) {
}
}

/**
* This is used in partition reassignment. During the partition reassignment, we cannot
* put two replicas on the same broker.
*
* @param tp the topic partition for examining
* @return whether the broker has a replica for the given topic partition.
*/
public boolean hasTopicPartition(TopicPartition tp) {
return leaderReplicas.contains(tp) || followerReplicas.contains(tp)
|| toBeAddedReplicas.contains(tp);
}

public BrokerStats getLatestStats() {
return latestStats;
public boolean reserveBandwidth(TopicPartition tp, double inBound, double outBound){
if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound &&
bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) {
reservedBytesIn += inBound;
reservedBytesOut += outBound;
toBeAddedReplicas.add(tp);
return true;
}
return false;
}

@VisibleForTesting
protected void setLatestStats(BrokerStats brokerStats){
this.latestStats = brokerStats;
/**
* cancels reservation of bandwidth for a TopicPartition
* @param tp TopicPartition to cancel reservation
* @param inBound amount of in-bound traffic to remove from broker
* @param outBound amount of out-bound traffic to remove from broker
* @return if reserved bandwidth has been successfully removed from broker
*/
public boolean removeReservedBandwidth(TopicPartition tp, double inBound, double outBound){
if ( toBeAddedReplicas.contains(tp) ){
reservedBytesIn -= inBound;
reservedBytesOut -= outBound;
toBeAddedReplicas.remove(tp);
return true;
}
return false;
}

public String getRackId(){
return rackId;
@Deprecated
public boolean reserveInBoundBandwidth(TopicPartition tp, double inBound) {
if (bytesInPerSecLimit > getMaxBytesIn() + reservedBytesIn + inBound) {
reservedBytesIn += inBound;
toBeAddedReplicas.add(tp);
return true;
}
return false;
}

@VisibleForTesting
protected void setRackId(String rackId){
this.rackId = rackId;
/**
* To reserve out-bound bandwidth, we first need to compute the current network outbound usage.
* When a partition reassignment happens, the outbound metric of leader prelicas will be high.
* Because of this, using the brokerstats based information will lead to inaccurate decision.
* One observation is that in-bound traffic is more stable than the out-bound traffic,
* and read/write ratio for topics usually does not change often. Because of this, we can
* use the in-bound traffic times read/write ratio to infer the required out-bound bandwidth
* for topic partitions.
*
* @param tp the topic partition for reserving outbound bandwidth
* @param outBound the outbound bandwidth requirements in bytes/second
* @return whether the reservation is successful or not.
*/
@Deprecated
public boolean reserveOutBoundBandwidth(TopicPartition tp, double outBound) {
if (bytesOutPerSecLimit > getMaxBytesOut() + reservedBytesOut + outBound) {
reservedBytesOut += outBound;
toBeAddedReplicas.add(tp);
return true;
}
return false;
}


@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("brokerId:" + id());
sb.append("brokerId:" + getId());
sb.append("; rackId = " + rackId);
sb.append("; stats : " + (latestStats == null ? "null" : latestStats));
return sb.toString();
}


@Deprecated
public JsonElement toJson() {
// Return a JSON representation of a Kafka Broker. Sadly, not everything can be trivially added.
JsonObject json = new JsonObject();
json.add("brokerId", gson.toJsonTree(brokerId));
json.add("brokerName", gson.toJsonTree(brokerName));
json.add("rackId", gson.toJsonTree(rackId));
json.add("bytesInPerSecLimit", gson.toJsonTree(bytesInPerSecLimit));
json.add("bytesOutPerSecLimit", gson.toJsonTree(bytesOutPerSecLimit));
json.add("maxBytesOut", gson.toJsonTree(getMaxBytesOut()));
json.add("maxBytesIn", gson.toJsonTree(getMaxBytesIn()));
return json;
}


public int compareTo(KafkaBroker another) {
double networkUsage = getMaxBytesIn() + getMaxBytesOut()
+ reservedBytesIn + reservedBytesOut;
Expand All @@ -315,4 +370,5 @@ public int compare(KafkaBroker x, KafkaBroker y) {
return (xNetworkUsage < yNetworkUsage) ? -1 : (xNetworkUsage > yNetworkUsage ? 1 : 0);
}
}

}
Loading

0 comments on commit b7d0d3d

Please sign in to comment.