Skip to content

Commit

Permalink
HBASE-24765: Dynamic master discovery
Browse files Browse the repository at this point in the history
This patch adds the ability to discover newly added masters
dynamically on the master registry side. The trigger for the
re-fetch is either 5mins or any registry RPC failure.

I didn't add the method to ZK registry interface since there
is a design discussion going on in splittable meta doc. We can
add it later if needed.
  • Loading branch information
bharathv committed Aug 4, 2020
1 parent d2f5a5f commit f580ffb
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
Expand All @@ -45,7 +47,10 @@
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.DNS.ServerType;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
Expand All @@ -57,10 +62,11 @@

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;

Expand All @@ -77,6 +83,8 @@
@InterfaceAudience.Private
public class MasterRegistry implements ConnectionRegistry {

private static final Logger LOG = LoggerFactory.getLogger(MasterRegistry.class);

/** Configuration key that controls the fan out of requests **/
public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY =
"hbase.client.master_registry.hedged.fanout";
Expand All @@ -89,11 +97,17 @@ public class MasterRegistry implements ConnectionRegistry {
private final int hedgedReadFanOut;

// Configured list of masters to probe the meta information from.
private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;

// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
private final int rpcTimeoutMs;
// For synchronizing on refreshing the master end-points
private final Object refreshMasters = new Object();
// Refreshed every WAIT_TIME_OUT_MS or unless explicitly invoked.
private static final int WAIT_TIME_OUT_MS = 5 * 60 * 1000; // 5 mins
private final Thread masterAddrRefresherThread;

/**
* Parses the list of master addresses from the provided configuration. Supported format is comma
Expand All @@ -115,20 +129,50 @@ private static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unkno
MasterRegistry(Configuration conf) throws IOException {
this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
// this through the master registry...
// This is a problem as we will use the cluster id to determine the authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
Set<ServerName> masterAddrs = parseMasterAddrs(conf);
// Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
// by fetching the end points from this list.
populateMasterStubs(parseMasterAddrs(conf));
Runnable masterEndPointRefresher = () -> {
while (!Thread.interrupted()) {
try {
// Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
// have duplicate refreshes because once the thread is past the wait(), notify()s are
// ignored until the thread is back to the waiting state.
synchronized (refreshMasters) {
refreshMasters.wait(WAIT_TIME_OUT_MS);
}
LOG.debug("Attempting to refresh master address end points.");
Set<ServerName> newMasters = new HashSet<>(getMasters().get());
populateMasterStubs(newMasters);
LOG.debug("Finished refreshing master end points. {}", newMasters);
} catch (InterruptedException e) {
LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
break;
} catch (ExecutionException | IOException e) {
LOG.debug("Error populating latest list of masters.", e);
}
}
};
masterAddrRefresherThread = Threads.newDaemonThreadFactory(
"MasterRegistry refresh end-points").newThread(masterEndPointRefresher);
masterAddrRefresherThread.start();
}

private void populateMasterStubs(Set<ServerName> masters) throws IOException {
Preconditions.checkNotNull(masters);
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
ImmutableMap.builderWithExpectedSize(masterAddrs.size());
ImmutableMap.builderWithExpectedSize(masters.size());
User user = User.getCurrent();
for (ServerName masterAddr : masterAddrs) {
for (ServerName masterAddr : masters) {
builder.put(masterAddr,
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
}
masterAddr2Stub = builder.build();
}
Expand Down Expand Up @@ -170,6 +214,11 @@ private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interfac
callable.call(controller, stub, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
// RPC has failed, trigger a refresh of master end points. We can have some spurious
// refreshes, but that is okay since the RPC is not expensive and not in a hot path.
synchronized (refreshMasters) {
refreshMasters.notify();
}
} else {
future.complete(resp);
}
Expand All @@ -188,8 +237,9 @@ private IOException badResponse(String debug) {
* been tried and all of them are failed, we will fail the future.
*/
private <T extends Message> void groupCall(CompletableFuture<T> future,
List<ClientMetaService.Interface> masterStubs, int startIndexInclusive, Callable<T> callable,
Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
Set<ServerName> masterServers, List<ClientMetaService.Interface> masterStubs,
int startIndexInclusive, Callable<T> callable, Predicate<T> isValidResp, String debug,
ConcurrentLinkedQueue<Throwable> errors) {
int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
Expand All @@ -210,10 +260,10 @@ private <T extends Message> void groupCall(CompletableFuture<T> future,
RetriesExhaustedException ex = new RetriesExhaustedException("masters",
masterStubs.size(), new ArrayList<>(errors));
future.completeExceptionally(
new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex));
new MasterRegistryFetchException(masterServers, ex));
} else {
groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug,
errors);
groupCall(future, masterServers, masterStubs, endIndexExclusive, callable,
isValidResp, debug, errors);
}
}
} else {
Expand All @@ -226,17 +276,19 @@ private <T extends Message> void groupCall(CompletableFuture<T> future,

private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
Predicate<T> isValidResp, String debug) {
Set<ServerName> masterServers = masterAddr2Stub.keySet();
List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2Stub.values());
Collections.shuffle(masterStubs, ThreadLocalRandom.current());
CompletableFuture<T> future = new CompletableFuture<>();
groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>());
groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug,
new ConcurrentLinkedQueue<>());
return future;
}

/**
* Simple helper to transform the result of getMetaRegionLocations() rpc.
*/
private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
List<HRegionLocation> regionLocations = new ArrayList<>();
resp.getMetaLocationsList()
.forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
Expand All @@ -247,7 +299,7 @@ private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsRespo
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
"getMetaLocationsCount").thenApply(this::transformMetaRegionLocations);
"getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations);
}

@Override
Expand All @@ -259,17 +311,40 @@ public CompletableFuture<String> getClusterId() {
.thenApply(GetClusterIdResponse::getClusterId);
}

private ServerName transformServerName(GetActiveMasterResponse resp) {
return ProtobufUtil.toServerName(resp.getServerName());
private static boolean hasActiveMaster(GetMastersResponse resp) {
List<GetMastersResponseEntry> activeMasters =
resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
Collectors.toList());
return activeMasters.size() == 1;
}

private static ServerName filterActiveMaster(GetMastersResponse resp) {
List<GetMastersResponseEntry> activeMasters =
resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
Collectors.toList());
Preconditions.checkState(activeMasters.size() == 1);
return ProtobufUtil.toServerName(activeMasters.get(0).getServerName());
}

@Override
public CompletableFuture<ServerName> getActiveMaster() {
return this
.<GetActiveMasterResponse> call(
(c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
GetActiveMasterResponse::hasServerName, "getActiveMaster()")
.thenApply(this::transformServerName);
.<GetMastersResponse> call(
(c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
MasterRegistry::hasActiveMaster, "getMasters()")
.thenApply(MasterRegistry::filterActiveMaster);
}

private static List<ServerName> transformServerNames(GetMastersResponse resp) {
return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName(
s.getServerName())).collect(Collectors.toList());
}

public CompletableFuture<List<ServerName>> getMasters() {
return this
.<GetMastersResponse> call((c, s, d) -> s.getMasters(
c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0,
"getMasters()").thenApply(MasterRegistry::transformServerNames);
}

@VisibleForTesting
Expand All @@ -279,6 +354,7 @@ Set<ServerName> getParsedMasterServers() {

@Override
public void close() {
masterAddrRefresherThread.interrupt();
if (rpcClient != null) {
rpcClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ public static final class RpcChannelImpl implements RpcChannel {
@Override
public void callMethod(MethodDescriptor method, RpcController controller, Message request,
Message responsePrototype, RpcCallback<Message> done) {
if (!method.getName().equals("GetClusterId")) {
// Master registry internally runs other RPCs to keep the master list up to date. This check
// avoids double counting in such cases.
return;
}
// simulate the asynchronous behavior otherwise all logic will perform in the same thread...
EXECUTOR.execute(() -> {
int index = CALLED.getAndIncrement();
Expand All @@ -129,7 +134,7 @@ public void callMethod(MethodDescriptor method, RpcController controller, Messag
} else if (GOOD_RESP_INDEXS.contains(index)) {
done.run(RESP);
} else {
((HBaseRpcController) controller).setFailed("inject error");
controller.setFailed("inject error");
done.run(null);
}
});
Expand Down
22 changes: 20 additions & 2 deletions hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,17 @@ message GetActiveMasterResponse {
optional ServerName server_name = 1;
}

/** Request and response to get the current list of all registers master servers */
message GetMastersRequest {
}
message GetMastersResponseEntry {
required ServerName server_name = 1;
required bool is_active = 2;
}
message GetMastersResponse {
repeated GetMastersResponseEntry master_servers = 1;
}

/** Request and response to get the current list of meta region locations */
message GetMetaRegionLocationsRequest {
}
Expand All @@ -1270,7 +1281,8 @@ message GetMetaRegionLocationsResponse {
}

/**
* Implements all the RPCs needed by clients to look up cluster meta information needed for connection establishment.
* Implements all the RPCs needed by clients to look up cluster meta information needed for
* connection establishment.
*/
service ClientMetaService {
/**
Expand All @@ -1279,10 +1291,16 @@ service ClientMetaService {
rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse);

/**
* Get active master server name for this cluster.
* Get active master server name for this cluster. Retained for out of sync client and master
* rolling upgrades. Newer clients switched to GetMasters RPC request.
*/
rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse);

/**
* Get registered list of master servers in this cluster.
*/
rpc GetMasters(GetMastersRequest) returns(GetMastersResponse);

/**
* Get current meta replicas' region locations.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2722,7 +2722,7 @@ public ClusterMetrics getClusterMetrics(EnumSet<Option> options) throws IOExcept
return status;
}

private List<ServerName> getBackupMasters() throws InterruptedIOException {
List<ServerName> getBackupMasters() throws InterruptedIOException {
// Build Set of backup masters from ZK nodes
List<String> backupMasterStrings;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -206,6 +207,9 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
Expand Down Expand Up @@ -2938,6 +2942,27 @@ public GetActiveMasterResponse getActiveMaster(RpcController rpcController,
return resp.build();
}

@Override
public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request)
throws ServiceException {
GetMastersResponse.Builder resp = GetMastersResponse.newBuilder();
// Active master
Optional<ServerName> serverName = master.getActiveMaster();
serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder()
.setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build()));
// Backup masters
try {
// TODO: Cache the backup masters to avoid a ZK RPC for each getMasters() call.
for (ServerName backupMaster: master.getBackupMasters()) {
resp.addMasterServers(GetMastersResponseEntry.newBuilder().setServerName(
ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build());
}
} catch (InterruptedIOException e) {
LOG.error("Interrupted during getMasters() RPC.", e);
}
return resp.build();
}

@Override
public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController rpcController,
GetMetaRegionLocationsRequest request) throws ServiceException {
Expand Down
Loading

0 comments on commit f580ffb

Please sign in to comment.