Skip to content

Commit

Permalink
HBASE-23305: Master based registry implementation
Browse files Browse the repository at this point in the history
Implements a master based registry for clients.

- Parameterized existing client tests to run with multiple
  registry combinations.
- Added unit-test coverage for the new registry implemenation.
  • Loading branch information
bharathv committed Dec 26, 2019
1 parent 1da55f3 commit 641a8c5
Show file tree
Hide file tree
Showing 17 changed files with 702 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.REGISTRY_IMPL_CONF_KEY;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
Expand All @@ -27,8 +28,6 @@
@InterfaceAudience.Private
final class AsyncRegistryFactory {

static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";

private AsyncRegistryFactory() {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY;
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Function;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse;

/**
* Master based registry implementation. Makes RPCs to the configured master addresses from config
* {@value org.apache.hadoop.hbase.HConstants#MASTER_ADDRS_KEY}.
*
* It supports hedged reads, which can be enabled by setting
* {@value org.apache.hadoop.hbase.HConstants#MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY} to True. Fan
* out the requests batch is controlled by
* {@value org.apache.hadoop.hbase.HConstants#HBASE_RPCS_HEDGED_REQS_FANOUT_KEY}.
*
* TODO: Handle changes to the configuration dynamically without having to restart the client.
*/
@InterfaceAudience.Private
public class MasterRegistry implements AsyncRegistry {
private static final String MASTER_ADDRS_CONF_SEPARATOR = ",";

// Configured list of masters to probe the meta information from.
private final List<ServerName> masterServers;

// RPC client used to talk to the masters.
private final RpcClient rpcClient;
private final RpcControllerFactory rpcControllerFactory;
private final int rpcTimeoutMs;

MasterRegistry(Configuration conf) {
boolean hedgedReadsEnabled = conf.getBoolean(MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY,
MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT);
Configuration finalConf;
if (!hedgedReadsEnabled) {
// If hedged reads are disabled, it is equivalent to setting a fan out of 1. We make a copy of
// the configuration so that other places reusing this reference is not affected.
finalConf = new Configuration(conf);
finalConf.setInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY, 1);
} else {
finalConf = conf;
}
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
masterServers = new ArrayList<>();
parseMasterAddrs(finalConf);
rpcClient = RpcClientFactory.createClient(finalConf, HConstants.CLUSTER_ID_DEFAULT);
rpcControllerFactory = RpcControllerFactory.instantiate(finalConf);
}

/**
* @return Stub needed to make RPC using a hedged channel to the master end points.
*/
private ClientMetaService.Interface getMasterStub() throws IOException {
return ClientMetaService.newStub(
rpcClient.createHedgedRpcChannel(masterServers, User.getCurrent(), rpcTimeoutMs));
}

/**
* Parses the list of master addresses from the provided configuration. Supported format is
* comma separated host[:port] values. If no port number if specified, default master port is
* assumed.
* @param conf Configuration to parse from.
*/
private void parseMasterAddrs(Configuration conf) {
String configuredMasters = conf.get(MASTER_ADDRS_KEY, MASTER_ADDRS_DEFAULT);
for (String masterAddr: configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) {
HostAndPort masterHostPort =
HostAndPort.fromString(masterAddr.trim()).withDefaultPort(HConstants.DEFAULT_MASTER_PORT);
masterServers.add(ServerName.valueOf(masterHostPort.toString(), ServerName.NON_STARTCODE));
}
Preconditions.checkArgument(!masterServers.isEmpty(), "At least one master address is needed");
}

@VisibleForTesting
public List<ServerName> getParsedMasterServers() {
return Collections.unmodifiableList(masterServers);
}

/**
* Returns a call back that can be passed along to the non-blocking rpc call. It is invoked once
* the rpc finishes and the response is propagated to the passed future.
* @param future Result future to which the rpc response is propagated.
* @param isValidResp Checks if the rpc response has a valid result.
* @param transformResult Transforms the result to a different form as expected by callers.
* @param hrc RpcController instance for this rpc.
* @param <T> RPC result type.
* @param <R> Transformed type of the result.
* @return A call back that can be embedded in the non-blocking rpc call.
*/
private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
Function<T, Boolean> isValidResp, Function<T, R> transformResult, HBaseRpcController hrc) {
return rpcResult -> {
if (rpcResult == null) {
future.completeExceptionally(
new MasterRegistryFetchException(masterServers, hrc.getFailed()));
}
if (!isValidResp.apply(rpcResult)) {
// Rpc returned ok, but result was malformed.
future.completeExceptionally(
new IOException("Invalid result for request. Will be retried"));

}
future.complete(transformResult.apply(rpcResult));
};
}

/**
* Simple helper to transform the result of getMetaRegionLocations() rpc.
*/
private RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
List<HRegionLocation> regionLocations = new ArrayList<>();
resp.getMetaLocationsList().forEach(
location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
return new RegionLocations(regionLocations);
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocation() {
CompletableFuture<RegionLocations> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetMetaRegionLocationsResponse> callback = getRpcCallBack(result,
(rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc);
try {
getMasterStub().getMetaRegionLocations(
hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}

@Override
public CompletableFuture<String> getClusterId() {
CompletableFuture<String> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetClusterIdResponse> callback = getRpcCallBack(result,
GetClusterIdResponse::hasClusterId, GetClusterIdResponse::getClusterId, hrc);
try {
getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}

private ServerName transformServerName(GetActiveMasterResponse resp) {
return ProtobufUtil.toServerName(resp.getServerName());
}

@Override
public CompletableFuture<ServerName> getMasterAddress() {
CompletableFuture<ServerName> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetActiveMasterResponse> callback = getRpcCallBack(result,
GetActiveMasterResponse::hasServerName, this::transformServerName, hrc);
try {
getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback);
} catch (IOException e) {
result.completeExceptionally(e);
}
return result;
}

@Override
public void close() {
if (rpcClient != null) {
rpcClient.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.exceptions;

import java.util.List;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.PrettyPrinter;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Exception thrown when an master registry RPC fails in client. The exception includes the list of
* masters to which RPC was attempted.
*/
@InterfaceAudience.Private
public class MasterRegistryFetchException extends HBaseIOException {
public MasterRegistryFetchException(List<ServerName> masters, Throwable failure) {
super(String.format("Exception making rpc to masters %s", PrettyPrinter.toString(masters)),
failure);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@
*/
package org.apache.hadoop.hbase.ipc;

import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;

import java.io.Closeable;
import java.io.IOException;

import java.util.List;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel;

/**
* Interface for RpcClient implementations so ConnectionManager can handle it.
Expand Down Expand Up @@ -57,12 +55,6 @@ public interface RpcClient extends Closeable {
// The client in 0.99+ does not ping the server.
int PING_CALL_ID = -1;

// Hedged RPC configurations.
// Imposes a maximum limit on the total number of hedged RPCs that can done using a single
// RpcClient instance. This limit is intended to a guardrail for both client and server loads.
String MAX_HEDGED_REQUESTS_PER_CLIENT_CONF = "hbase.ipc.max.parallel.hedged.reqs";
int MAX_HEDGED_REQUESTS_PER_CLIENT_DEFAULT = 5;

/**
* Creates a "channel" that can be used by a blocking protobuf service. Useful setting up
* protobuf blocking stubs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.FutureUtils;
Expand Down Expand Up @@ -70,7 +71,7 @@ public void close() {

@BeforeClass
public static void setUp() {
CONF.setClass(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, AsyncRegistryForTest.class,
CONF.setClass(HConstants.REGISTRY_IMPL_CONF_KEY, AsyncRegistryForTest.class,
AsyncRegistry.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,17 @@ public enum OperationStatusCode {
public static final String MASTER_INFO_PORT = "hbase.master.info.port";

/** Configuration key for the list of master host:ports **/
public static final String MASTER_ADDRS_KEY = "hbase.master.addrs";
public static final String MASTER_ADDRS_KEY = "hbase.masters";

public static final String MASTER_ADDRS_DEFAULT = "localhost:" + DEFAULT_MASTER_PORT;

/** Configuration to enable hedged reads on master registry **/
public static final String MASTER_REGISTRY_ENABLE_HEDGED_READS_KEY =
"hbase.client.master_registry.enable_hedged_reads";

/** Default value for enabling hedging reads on master registry **/
public static final boolean MASTER_REGISTRY_ENABLE_HEDGED_READS_DEFAULT = false;

/** Parameter name for the master type being backup (waits for primary to go inactive). */
public static final String MASTER_TYPE_BACKUP = "hbase.master.backup";

Expand Down Expand Up @@ -938,6 +945,10 @@ public enum OperationStatusCode {
*/
public static final long NO_SEQNUM = -1;

/**
* Registry implementation to be used on the client side.
*/
public static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";

/*
* cluster replication constants.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.apache.hadoop.hbase.util;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -196,7 +198,11 @@ private static long humanReadableIntervalToSec(final String humanReadableInterva
* @return Pretty printed string for the collection.
*/
public static String toString(Collection<?> collection) {
return collection.stream().map(Objects::toString).collect(Collectors.toList()).toString();
List<String> stringList = new ArrayList<>();
for (Object o: collection) {
stringList.add(Objects.toString(o));
}
return "[" + String.join(",", stringList) + "]";
}

}
Loading

0 comments on commit 641a8c5

Please sign in to comment.