Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Aug 4, 2020
1 parent f580ffb commit f7ae049
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
Expand All @@ -57,6 +59,7 @@
import org.apache.hbase.thirdparty.com.google.common.base.Strings;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;

Expand Down Expand Up @@ -107,7 +110,7 @@ public class MasterRegistry implements ConnectionRegistry {
private final Object refreshMasters = new Object();
// Refreshed every WAIT_TIME_OUT_MS or unless explicitly invoked.
private static final int WAIT_TIME_OUT_MS = 5 * 60 * 1000; // 5 mins
private final Thread masterAddrRefresherThread;
private final ExecutorService masterAddrRefresher;

/**
* Parses the list of master addresses from the provided configuration. Supported format is comma
Expand Down Expand Up @@ -139,7 +142,9 @@ private static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unkno
// Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
// by fetching the end points from this list.
populateMasterStubs(parseMasterAddrs(conf));
Runnable masterEndPointRefresher = () -> {
masterAddrRefresher = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("MasterRegistry refresh end-points").setDaemon(true).build());
masterAddrRefresher.submit(() -> {
while (!Thread.interrupted()) {
try {
// Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
Expand All @@ -159,10 +164,7 @@ private static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unkno
LOG.debug("Error populating latest list of masters.", e);
}
}
};
masterAddrRefresherThread = Threads.newDaemonThreadFactory(
"MasterRegistry refresh end-points").newThread(masterEndPointRefresher);
masterAddrRefresherThread.start();
});
}

private void populateMasterStubs(Set<ServerName> masters) throws IOException {
Expand Down Expand Up @@ -354,7 +356,9 @@ Set<ServerName> getParsedMasterServers() {

@Override
public void close() {
masterAddrRefresherThread.interrupt();
if (masterAddrRefresher != null) {
masterAddrRefresher.shutdownNow();
}
if (rpcClient != null) {
rpcClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,18 @@ public boolean hasCellBlockSupport() {
}
}

/**
* A dummy RpcChannel implementation that intercepts the GetClusterId() RPC calls and injects
* errors. All other RPCs are ignored.
*/
public static final class RpcChannelImpl implements RpcChannel {

@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
if (!method.getName().equals("GetClusterId")) {
// Master registry internally runs other RPCs to keep the master list up to date. This check
// avoids double counting in such cases.
// On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list
// fresh. We do not want to intercept those RPCs here and double count.
return;
}
// simulate the asynchronous behavior otherwise all logic will perform in the same thread...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.HConstants.META_REPLICAS_NUM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
Expand Down Expand Up @@ -170,7 +171,8 @@ public void testDynamicMasterConfigurationRefresh() throws Exception {
registry.getParsedMasterServers().size() == 2);
final Set<ServerName> newMasters2 = registry.getParsedMasterServers();
assertEquals(2, newMasters2.size());
assertFalse(newMasters2.contains(activeMaster));
assertNotNull(TEST_UTIL.getMiniHBaseCluster().getMaster());
assertFalse(newMasters2.contains(activeMaster.getServerName()));
}
}
}

0 comments on commit f7ae049

Please sign in to comment.