Skip to content

Commit

Permalink
Cache backup masters in ActiveMasterManager
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Aug 24, 2020
1 parent eee1b42 commit f0c146f
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.Server;
Expand All @@ -34,12 +36,14 @@
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;

/**
* Handles everything on master-side related to master election.
* Handles everything on master-side related to master election. Keeps track of
* currently active master and registered backup masters.
*
* <p>Listens and responds to ZooKeeper notifications on the master znode,
* <p>Listens and responds to ZooKeeper notifications on the master znodes,
* both <code>nodeCreated</code> and <code>nodeDeleted</code>.
*
* <p>Contains blocking methods which will hold up backup masters, waiting
Expand All @@ -65,17 +69,22 @@ public class ActiveMasterManager extends ZKListener {
// notifications) and lazily fetched on-demand.
// ServerName is immutable, so we don't need heavy synchronization around it.
volatile ServerName activeMasterServerName;
// Registered backup masters. List is kept up to date based on ZK change notifications to
// backup znode.
private volatile ImmutableList<ServerName> backupMasters;

/**
* @param watcher ZK watcher
* @param sn ServerName
* @param master In an instance of a Master.
*/
ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master) {
ActiveMasterManager(ZKWatcher watcher, ServerName sn, Server master)
throws InterruptedIOException {
super(watcher);
watcher.registerListener(this);
this.sn = sn;
this.master = master;
updateBackupMasters();
}

// will be set after jetty server is started
Expand All @@ -89,8 +98,18 @@ public void nodeCreated(String path) {
}

@Override
public void nodeDeleted(String path) {
public void nodeChildrenChanged(String path) {
if (path.equals(watcher.getZNodePaths().backupMasterAddressesZNode)) {
try {
updateBackupMasters();
} catch (InterruptedIOException ioe) {
LOG.error("Error updating backup masters", ioe);
}
}
}

@Override
public void nodeDeleted(String path) {
// We need to keep track of the cluster's shutdown status while
// we wait on the current master. We consider that, if the cluster
// was already in a "shutdown" state when we started, that this master
Expand All @@ -101,7 +120,6 @@ public void nodeDeleted(String path) {
if(path.equals(watcher.getZNodePaths().clusterStateZNode) && !master.isStopped()) {
clusterShutDown.set(true);
}

handle(path);
}

Expand All @@ -111,6 +129,11 @@ void handle(final String path) {
}
}

private void updateBackupMasters() throws InterruptedIOException {
backupMasters =
ImmutableList.copyOf(MasterAddressTracker.getBackupMastersAndRenewWatch(watcher));
}

/**
* Fetches the active master's ServerName from zookeeper.
*/
Expand Down Expand Up @@ -318,4 +341,11 @@ public void stop() {
e.getMessage()));
}
}

/**
* @return list of registered backup masters.
*/
public List<ServerName> getBackupMasters() {
return backupMasters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,8 @@ public HMaster(final Configuration conf) throws IOException {
* Protected to have custom implementations in tests override the default ActiveMaster
* implementation.
*/
protected ActiveMasterManager createActiveMasterManager(
ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn,
org.apache.hadoop.hbase.Server server) throws InterruptedIOException {
return new ActiveMasterManager(zk, sn, server);
}

Expand Down Expand Up @@ -2728,51 +2728,8 @@ public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOExcept
return status;
}

List<ServerName> getBackupMasters() throws InterruptedIOException {
// Build Set of backup masters from ZK nodes
List<String> backupMasterStrings;
try {
backupMasterStrings = ZKUtil.listChildrenNoWatch(this.zooKeeper,
this.zooKeeper.getZNodePaths().backupMasterAddressesZNode);
} catch (KeeperException e) {
LOG.warn(this.zooKeeper.prefix("Unable to list backup servers"), e);
backupMasterStrings = null;
}

List<ServerName> backupMasters = Collections.emptyList();
if (backupMasterStrings != null && !backupMasterStrings.isEmpty()) {
backupMasters = new ArrayList<>(backupMasterStrings.size());
for (String s: backupMasterStrings) {
try {
byte [] bytes;
try {
bytes = ZKUtil.getData(this.zooKeeper, ZNodePaths.joinZNode(
this.zooKeeper.getZNodePaths().backupMasterAddressesZNode, s));
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
if (bytes != null) {
ServerName sn;
try {
sn = ProtobufUtil.parseServerNameFrom(bytes);
} catch (DeserializationException e) {
LOG.warn("Failed parse, skipping registering backup server", e);
continue;
}
backupMasters.add(sn);
}
} catch (KeeperException e) {
LOG.warn(this.zooKeeper.prefix("Unable to get information about " +
"backup servers"), e);
}
}
Collections.sort(backupMasters, new Comparator<ServerName>() {
@Override
public int compare(ServerName s1, ServerName s2) {
return s1.getServerName().compareTo(s2.getServerName());
}});
}
return backupMasters;
List<ServerName> getBackupMasters() {
return activeMasterManager.getBackupMasters();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2959,14 +2959,9 @@ public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequ
serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
// Backup masters
try {
// TODO: Cache the backup masters to avoid a ZK RPC for each getMasters() call.
for (ServerName backupMaster: master.getBackupMasters()) {
resp.addMasterServers(GetMastersResponseEntry.newBuilder().setServerName(
ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
}
} catch (InterruptedIOException e) {
LOG.error("Interrupted during getMasters() RPC.", e);
for (ServerName backupMaster: master.getBackupMasters()) {
resp.addMasterServers(GetMastersResponseEntry.newBuilder().setServerName(
ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
}
return resp.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.master;

import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
Expand Down Expand Up @@ -46,7 +47,8 @@ private static class AlwaysStandByMasterManager extends ActiveMasterManager {
private static final Logger LOG =
LoggerFactory.getLogger(AlwaysStandByMasterManager.class);

AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master) {
AlwaysStandByMasterManager(ZKWatcher watcher, ServerName sn, Server master)
throws InterruptedIOException {
super(watcher, sn, master);
}

Expand Down Expand Up @@ -94,8 +96,8 @@ public AlwaysStandByHMaster(Configuration conf) throws IOException {
super(conf);
}

protected ActiveMasterManager createActiveMasterManager(
ZKWatcher zk, ServerName sn, org.apache.hadoop.hbase.Server server) {
protected ActiveMasterManager createActiveMasterManager(ZKWatcher zk, ServerName sn,
org.apache.hadoop.hbase.Server server) throws InterruptedIOException {
return new AlwaysStandByMasterManager(zk, sn, server);
}
}
Loading

0 comments on commit f0c146f

Please sign in to comment.