Skip to content

Commit

Permalink
HBASE-24765: Dynamic master discovery
Browse files Browse the repository at this point in the history
This patch adds the ability to discover newly added masters
dynamically on the master registry side. The trigger for the
re-fetch is either periodic (5 mins) or any registry RPC failure.

Updates the client side connection metrics to maintain a counter
per RPC type so that clients have visibility into counts grouped
by RPC method name.

I didn't add the method to ZK registry interface since there
is a design discussion going on in splittable meta doc. We can
add it later if needed.
  • Loading branch information
bharathv committed Aug 14, 2020
1 parent 7b099ea commit e46dfce
Show file tree
Hide file tree
Showing 10 changed files with 432 additions and 32 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;

import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;

/**
* Thread safe utility that keeps master end points used by {@link MasterRegistry} up to date. This
* uses the RPC {@link ClientMetaService#getMasters} to fetch the latest list of registered masters.
* By default the refresh happens periodically (configured via
* {@link #PERIODIC_REFRESH_INTERVAL_SECS}). The refresh can also be triggered on demand via
* {@link #refreshNow()}. To prevent a flood of on-demand refreshes we expect that any attempts two
* should be spaced at least {@link #MIN_SECS_BETWEEN_REFRESHES} seconds apart.
*/
@InterfaceAudience.Private
public class MasterAddressRefresher implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(MasterAddressRefresher.class);
public static final String PERIODIC_REFRESH_INTERVAL_SECS =
"hbase.client.master_registry.refresh_interval_secs";
private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300;
public static final String MIN_SECS_BETWEEN_REFRESHES =
"hbase.client.master_registry.min_secs_between_refreshes";
private static final long MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60;

private final ExecutorService pool;
private final MasterRegistry registry;
private final long periodicRefreshMs;
private final long timeBetweenRefreshesMs;
private final Object refreshMasters = new Object();

@Override
public void close() {
pool.shutdownNow();
}

/**
* Thread that refreshes the master end points until it is interrupted via {@link #close()}.
* Multiple callers attempting to refresh at the same time synchronize on {@link #refreshMasters}.
*/
private class RefreshThread implements Runnable {
@Override
public void run() {
long lastRpcTs = 0;
while (!Thread.interrupted()) {
try {
// Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't
// have duplicate refreshes because once the thread is past the wait(), notify()s are
// ignored until the thread is back to the waiting state.
synchronized (refreshMasters) {
refreshMasters.wait(periodicRefreshMs);
}
long currentTs = EnvironmentEdgeManager.currentTime();
if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) {
continue;
}
lastRpcTs = currentTs;
LOG.debug("Attempting to refresh master address end points.");
Set<ServerName> newMasters = new HashSet<>(registry.getMasters().get());
registry.populateMasterStubs(newMasters);
LOG.debug("Finished refreshing master end points. {}", newMasters);
} catch (InterruptedException e) {
LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e);
break;
} catch (ExecutionException | IOException e) {
LOG.debug("Error populating latest list of masters.", e);
}
}
}
}

MasterAddressRefresher(Configuration conf, MasterRegistry registry) {
pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("master-registry-refresh-end-points").setDaemon(true).build());
periodicRefreshMs = 1000 * conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS,
PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT);
timeBetweenRefreshesMs = 1000 * conf.getLong(MIN_SECS_BETWEEN_REFRESHES,
MIN_SECS_BETWEEN_REFRESHES_DEFAULT);
Preconditions.checkArgument(periodicRefreshMs > 0);
Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs);
this.registry = registry;
pool.submit(new RefreshThread());
}

/**
* Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh.
* See class comment for details.
*/
void refreshNow() {
synchronized (refreshMasters) {
refreshMasters.notify();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
Expand All @@ -57,10 +59,11 @@

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;

Expand Down Expand Up @@ -89,11 +92,14 @@ public class MasterRegistry implements ConnectionRegistry {
private final int hedgedReadFanOut;

// Configured list of masters to probe the meta information from.
private final ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;
private volatile ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2Stub;

// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
private final int rpcTimeoutMs;

protected final MasterAddressRefresher masterAddressRefresher;

/**
* Parses the list of master addresses from the provided configuration. Supported format is comma
Expand All @@ -115,20 +121,27 @@ private static Set<ServerName> parseMasterAddrs(Configuration conf) throws Unkno
MasterRegistry(Configuration conf) throws IOException {
this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY,
MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT));
int rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
// this through the master registry...
// This is a problem as we will use the cluster id to determine the authentication method
rpcClient = RpcClientFactory.createClient(conf, null);
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
Set<ServerName> masterAddrs = parseMasterAddrs(conf);
// Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters
// by fetching the end points from this list.
populateMasterStubs(parseMasterAddrs(conf));
masterAddressRefresher = new MasterAddressRefresher(conf, this);
}

void populateMasterStubs(Set<ServerName> masters) throws IOException {
Preconditions.checkNotNull(masters);
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
ImmutableMap.builderWithExpectedSize(masterAddrs.size());
ImmutableMap.builderWithExpectedSize(masters.size());
User user = User.getCurrent();
for (ServerName masterAddr : masterAddrs) {
for (ServerName masterAddr : masters) {
builder.put(masterAddr,
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
}
masterAddr2Stub = builder.build();
}
Expand Down Expand Up @@ -169,7 +182,13 @@ private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interfac
CompletableFuture<T> future = new CompletableFuture<>();
callable.call(controller, stub, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
IOException failureReason = controller.getFailed();
future.completeExceptionally(failureReason);
if (ClientExceptionsUtil.isConnectionException(failureReason)) {
// RPC has failed, trigger a refresh of master end points. We can have some spurious
// refreshes, but that is okay since the RPC is not expensive and not in a hot path.
masterAddressRefresher.refreshNow();
}
} else {
future.complete(resp);
}
Expand All @@ -188,8 +207,9 @@ private IOException badResponse(String debug) {
* been tried and all of them are failed, we will fail the future.
*/
private <T extends Message> void groupCall(CompletableFuture<T> future,
List<ClientMetaService.Interface> masterStubs, int startIndexInclusive, Callable<T> callable,
Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
Set<ServerName> masterServers, List<ClientMetaService.Interface> masterStubs,
int startIndexInclusive, Callable<T> callable, Predicate<T> isValidResp, String debug,
ConcurrentLinkedQueue<Throwable> errors) {
int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size());
AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
Expand All @@ -210,10 +230,10 @@ private <T extends Message> void groupCall(CompletableFuture<T> future,
RetriesExhaustedException ex = new RetriesExhaustedException("masters",
masterStubs.size(), new ArrayList<>(errors));
future.completeExceptionally(
new MasterRegistryFetchException(masterAddr2Stub.keySet(), ex));
new MasterRegistryFetchException(masterServers, ex));
} else {
groupCall(future, masterStubs, endIndexExclusive, callable, isValidResp, debug,
errors);
groupCall(future, masterServers, masterStubs, endIndexExclusive, callable,
isValidResp, debug, errors);
}
}
} else {
Expand All @@ -226,17 +246,20 @@ private <T extends Message> void groupCall(CompletableFuture<T> future,

private <T extends Message> CompletableFuture<T> call(Callable<T> callable,
Predicate<T> isValidResp, String debug) {
List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2Stub.values());
ImmutableMap<ServerName, ClientMetaService.Interface> masterAddr2StubRef = masterAddr2Stub;
Set<ServerName> masterServers = masterAddr2StubRef.keySet();
List<ClientMetaService.Interface> masterStubs = new ArrayList<>(masterAddr2StubRef.values());
Collections.shuffle(masterStubs, ThreadLocalRandom.current());
CompletableFuture<T> future = new CompletableFuture<>();
groupCall(future, masterStubs, 0, callable, isValidResp, debug, new ConcurrentLinkedQueue<>());
groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug,
new ConcurrentLinkedQueue<>());
return future;
}

/**
* Simple helper to transform the result of getMetaRegionLocations() rpc.
*/
private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
List<HRegionLocation> regionLocations = new ArrayList<>();
resp.getMetaLocationsList()
.forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
Expand All @@ -247,7 +270,7 @@ private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsRespo
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
return this.<GetMetaRegionLocationsResponse> call((c, s, d) -> s.getMetaRegionLocations(c,
GetMetaRegionLocationsRequest.getDefaultInstance(), d), r -> r.getMetaLocationsCount() != 0,
"getMetaLocationsCount").thenApply(this::transformMetaRegionLocations);
"getMetaLocationsCount").thenApply(MasterRegistry::transformMetaRegionLocations);
}

@Override
Expand All @@ -259,17 +282,41 @@ public CompletableFuture<String> getClusterId() {
.thenApply(GetClusterIdResponse::getClusterId);
}

private ServerName transformServerName(GetActiveMasterResponse resp) {
return ProtobufUtil.toServerName(resp.getServerName());
private static boolean hasActiveMaster(GetMastersResponse resp) {
List<GetMastersResponseEntry> activeMasters =
resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
Collectors.toList());
return activeMasters.size() == 1;
}

private static ServerName filterActiveMaster(GetMastersResponse resp) {
List<GetMastersResponseEntry> activeMasters =
resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect(
Collectors.toList());
Preconditions.checkState(activeMasters.size() == 1);
return ProtobufUtil.toServerName(activeMasters.get(0).getServerName());
}

@Override
public CompletableFuture<ServerName> getActiveMaster() {
return this
.<GetActiveMasterResponse> call(
(c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
GetActiveMasterResponse::hasServerName, "getActiveMaster()")
.thenApply(this::transformServerName);
.<GetMastersResponse> call(
(c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d),
MasterRegistry::hasActiveMaster, "getMasters()")
.thenApply(MasterRegistry::filterActiveMaster);
}

private static List<ServerName> transformServerNames(GetMastersResponse resp) {
return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName(
s.getServerName())).collect(Collectors.toList());
}

CompletableFuture<List<ServerName>> getMasters() {
System.out.println("getMasters()");
return this
.<GetMastersResponse> call((c, s, d) -> s.getMasters(
c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0,
"getMasters()").thenApply(MasterRegistry::transformServerNames);
}

@VisibleForTesting
Expand All @@ -279,6 +326,9 @@ Set<ServerName> getParsedMasterServers() {

@Override
public void close() {
if (masterAddressRefresher != null) {
masterAddressRefresher.close();
}
if (rpcClient != null) {
rpcClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class MetricsConnection implements StatisticTrackable {
/** Set this key to {@code true} to enable metrics collection of client requests. */
public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";

private static final String CNT_BASE = "rpcCount_";
private static final String DRTN_BASE = "rpcCallDurationMs_";
private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
Expand Down Expand Up @@ -303,6 +304,8 @@ private static interface NewMetric<T> {
LOAD_FACTOR, CONCURRENCY_LEVEL);
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
@VisibleForTesting protected final ConcurrentMap<String, Counter> rpcCounters =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);

MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
Supplier<ThreadPoolExecutor> metaPool) {
Expand Down Expand Up @@ -434,8 +437,7 @@ private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> f
}

/** Update call stats for non-critical-path methods */
private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
final String methodName = method.getService().getName() + "_" + method.getName();
private void updateRpcGeneric(String methodName, CallStats stats) {
getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
Expand All @@ -450,6 +452,9 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
if (callsPerServer > 0) {
concurrentCallsPerServerHist.update(callsPerServer);
}
// Update the counter that tracks RPCs by type.
final String methodName = method.getService().getName() + "_" + method.getName();
getMetric(CNT_BASE + methodName, rpcCounters, counterFactory).inc();
// this implementation is tied directly to protobuf implementation details. would be better
// if we could dispatch based on something static, ie, request Message type.
if (method.getService() == ClientService.getDescriptor()) {
Expand Down Expand Up @@ -511,7 +516,7 @@ public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
}
}
// Fallback to dynamic registry lookup for DDL methods.
updateRpcGeneric(method, stats);
updateRpcGeneric(methodName, stats);
}

public void incrCacheDroppingExceptions(Object exception) {
Expand Down
Loading

0 comments on commit e46dfce

Please sign in to comment.