Skip to content

Commit

Permalink
Address some more comments from Nick.
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Jan 3, 2020
1 parent 01a785b commit ccbbf61
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,23 @@ public Set<ServerName> getParsedMasterServers() {
* @param isValidResp Checks if the rpc response has a valid result.
* @param transformResult Transforms the result to a different form as expected by callers.
* @param hrc RpcController instance for this rpc.
* @param debug Debug message passed along to the caller in case of exceptions.
* @param <T> RPC result type.
* @param <R> Transformed type of the result.
* @return A call back that can be embedded in the non-blocking rpc call.
*/
private <T, R> RpcCallback<T> getRpcCallBack(CompletableFuture<R> future,
Predicate<T> isValidResp, Function<T, R> transformResult, HBaseRpcController hrc) {
Predicate<T> isValidResp, Function<T, R> transformResult, HBaseRpcController hrc,
final String debug) {
return rpcResult -> {
if (rpcResult == null) {
future.completeExceptionally(
new MasterRegistryFetchException(masterServers, hrc.getFailed()));
}
if (!isValidResp.test(rpcResult)) {
// Rpc returned ok, but result was malformed.
future.completeExceptionally(
new IOException("Invalid result for request. Will be retried"));
future.completeExceptionally(new IOException(
String.format("Invalid result for request %s. Will be retried", debug)));

}
future.complete(transformResult.apply(rpcResult));
Expand All @@ -170,7 +172,8 @@ public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetMetaRegionLocationsResponse> callback = getRpcCallBack(result,
(rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc);
(rpcResp) -> rpcResp.getMetaLocationsCount() != 0, this::transformMetaRegionLocations, hrc,
"getMetaRegionLocations()");
try {
getMasterStub().getMetaRegionLocations(
hrc, GetMetaRegionLocationsRequest.getDefaultInstance(), callback);
Expand All @@ -185,7 +188,8 @@ public CompletableFuture<String> getClusterId() {
CompletableFuture<String> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetClusterIdResponse> callback = getRpcCallBack(result,
GetClusterIdResponse::hasClusterId, GetClusterIdResponse::getClusterId, hrc);
GetClusterIdResponse::hasClusterId, GetClusterIdResponse::getClusterId, hrc,
"getClusterId()");
try {
getMasterStub().getClusterId(hrc, GetClusterIdRequest.getDefaultInstance(), callback);
} catch (IOException e) {
Expand All @@ -203,7 +207,8 @@ public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> result = new CompletableFuture<>();
HBaseRpcController hrc = rpcControllerFactory.newController();
RpcCallback<GetActiveMasterResponse> callback = getRpcCallBack(result,
GetActiveMasterResponse::hasServerName, this::transformServerName, hrc);
GetActiveMasterResponse::hasServerName, this::transformServerName, hrc,
"getActiveMaster()");
try {
getMasterStub().getActiveMaster(hrc, GetActiveMasterRequest.getDefaultInstance(), callback);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,13 +533,16 @@ public RpcChannel createRpcChannel(ServerName sn, User user, int rpcTimeout)
@Override
public RpcChannel createHedgedRpcChannel(Set<ServerName> sns, User user, int rpcTimeout)
throws UnknownHostException {
int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
final int hedgedRpcFanOut = conf.getInt(HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_KEY,
HConstants.HBASE_RPCS_HEDGED_REQS_FANOUT_DEFAULT);
Set<InetSocketAddress> addresses = new HashSet<>();
for (ServerName sn: sns) {
addresses.add(createAddr(sn));
}
return new HedgedRpcChannel(this, addresses, user, rpcTimeout, hedgedRpcFanOut);
Preconditions.checkState(this instanceof NettyRpcClient,
"Hedging only supported for non-blocking connection implementations.");
return new HedgedRpcChannel((NettyRpcClient) this, addresses, user, rpcTimeout,
hedgedRpcFanOut);
}

private static class AbstractRpcChannel {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@
class HedgedRpcChannel implements RpcChannel {
private static final Logger LOG = LoggerFactory.getLogger(HedgedRpcChannel.class);

private final AbstractRpcClient<?> rpcClient;
/**
* Currently hedging is only supported for non-blocking connection implementation types because
* the channel implementation inherently relies on the connection implementation being async.
* Refer to the comments in doCallMethod().
*/
private final NettyRpcClient rpcClient;
// List of service addresses to hedge the requests to.
private final List<InetSocketAddress> addrs;
private final User ticket;
Expand Down Expand Up @@ -197,7 +202,7 @@ public String toString() {
}
}

public HedgedRpcChannel(AbstractRpcClient<?> rpcClient, Set<InetSocketAddress> addrs,
public HedgedRpcChannel(NettyRpcClient rpcClient, Set<InetSocketAddress> addrs,
User ticket, int rpcTimeout, int fanOutSize) {
this.rpcClient = rpcClient;
this.addrs = new ArrayList<>(Preconditions.checkNotNull(addrs));
Expand Down Expand Up @@ -237,16 +242,13 @@ private void doCallMethod(Descriptors.MethodDescriptor method, HBaseRpcControlle
// a blocking implementation (ex: BlockingRpcConnection). That essentially serializes all
// the write calls. Handling blocking connection means that this should be run in a separate
// thread and hence more code complexity. Is it ok to handle only non-blocking connections?
// Should we have a check in the constructor if the underlying connection is a blocking
// impl and then log some warning?
batchRpcCtx.addCallInFlight(rpcClient.callMethod(method, rpcController, request,
responsePrototype, ticket, address,
new BatchRpcCtxCallBack(batchRpcCtx, rpcController)));
}
if (batchRpcCtx.waitForResults()) {
return;
}
// TODO: Sleep between batches?
// Entire batch has failed, lets try the next batch.
LOG.debug("Failed request {}, {}.", method.getName(), batchRpcCtx);
i = batchEnd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
Expand All @@ -46,6 +44,15 @@
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
Expand All @@ -56,14 +63,6 @@
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;

/**
* Some basic ipc tests.
Expand Down Expand Up @@ -234,7 +233,6 @@ public void testRpcMaxRequestSize() throws IOException, ServiceException {
/**
* Tests that the RpcServer creates & dispatches CallRunner object to scheduler with non-null
* remoteAddress set to its Call Object
* @throws ServiceException
*/
@Test
public void testRpcServerForNotNullRemoteAddressInCallObject()
Expand Down Expand Up @@ -371,6 +369,8 @@ public void testAsyncEcho() throws IOException {
*/
@Test
public void testHedgedAsyncEcho() throws Exception {
// Hedging is not supported for blocking connection types.
Assume.assumeFalse(this instanceof TestBlockingIPC);
List<RpcServer> rpcServers = new ArrayList<>();
List<InetSocketAddress> addresses = new ArrayList<>();
// Create a mix of running and failing servers.
Expand Down Expand Up @@ -420,6 +420,8 @@ public void testHedgedAsyncEcho() throws Exception {

@Test
public void testHedgedAsyncTimeouts() throws Exception {
// Hedging is not supported for blocking connection types.
Assume.assumeFalse(this instanceof TestBlockingIPC);
List<RpcServer> rpcServers = new ArrayList<>();
List<InetSocketAddress> addresses = new ArrayList<>();
final int numServers = 3;
Expand Down

0 comments on commit ccbbf61

Please sign in to comment.