Skip to content

Commit

Permalink
HBASE-22267 Implement client push back for async client
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Apr 20, 2019
1 parent 353f922 commit 8ac87ab
Show file tree
Hide file tree
Showing 11 changed files with 472 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
import org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -134,6 +136,10 @@ public void addAction(HRegionLocation loc, Action action) {
() -> new RegionRequest(loc)).actions.add(action);
}

public void setRegionRequest(byte[] regionName, RegionRequest regionReq) {
actionsByRegion.put(regionName, regionReq);
}

public int getPriority() {
return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream())
.mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
Expand Down Expand Up @@ -298,6 +304,8 @@ private void onComplete(Action action, RegionRequest regionReq, int tries, Serve

private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
ServerName serverName, MultiResponse resp) {
ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
serverName, resp);
List<Action> failedActions = new ArrayList<>();
MutableBoolean retryImmediately = new MutableBoolean(false);
actionsByRegion.forEach((rn, regionReq) -> {
Expand Down Expand Up @@ -333,55 +341,88 @@ private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int tries,
}
}

private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) {
private void sendToServer(ServerName serverName, ServerRequest serverReq, int tries) {
long remainingNs;
if (operationTimeoutNs > 0) {
remainingNs = remainingTimeNs();
if (remainingNs <= 0) {
failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream())
.flatMap(r -> r.actions.stream()), tries);
failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> r.actions.stream()),
tries);
return;
}
} else {
remainingNs = Long.MAX_VALUE;
}
actionsByServer.forEach((sn, serverReq) -> {
ClientService.Interface stub;
try {
stub = conn.getRegionServerStub(sn);
} catch (IOException e) {
onError(serverReq.actionsByRegion, tries, e, sn);
return;
}
ClientProtos.MultiRequest req;
List<CellScannable> cells = new ArrayList<>();
// Map from a created RegionAction to the original index for a RowMutations within
// the original list of actions. This will be used to process the results when there
// is RowMutations in the action list.
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
try {
req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
} catch (IOException e) {
onError(serverReq.actionsByRegion, tries, e, sn);
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
calcPriority(serverReq.getPriority(), tableName));
if (!cells.isEmpty()) {
controller.setCellScanner(createCellScanner(cells));
ClientService.Interface stub;
try {
stub = conn.getRegionServerStub(serverName);
} catch (IOException e) {
onError(serverReq.actionsByRegion, tries, e, serverName);
return;
}
ClientProtos.MultiRequest req;
List<CellScannable> cells = new ArrayList<>();
// Map from a created RegionAction to the original index for a RowMutations within
// the original list of actions. This will be used to process the results when there
// is RowMutations in the action list.
Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
try {
req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
} catch (IOException e) {
onError(serverReq.actionsByRegion, tries, e, serverName);
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
calcPriority(serverReq.getPriority(), tableName));
if (!cells.isEmpty()) {
controller.setCellScanner(createCellScanner(cells));
}
stub.multi(controller, req, resp -> {
if (controller.failed()) {
onError(serverReq.actionsByRegion, tries, controller.getFailed(), serverName);
} else {
try {
onComplete(serverReq.actionsByRegion, tries, serverName, ResponseConverter.getResults(req,
rowMutationsIndexMap, resp, controller.cellScanner()));
} catch (Exception e) {
onError(serverReq.actionsByRegion, tries, e, serverName);
return;
}
}
stub.multi(controller, req, resp -> {
if (controller.failed()) {
onError(serverReq.actionsByRegion, tries, controller.getFailed(), sn);
});
}

// We will make use of the ServerStatisticTracker to determine whether we need to delay a bit,
// based on the load of the region server and the region.
private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int tries) {
Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker();
if (!optStats.isPresent()) {
actionsByServer.forEach((serverName, serverReq) -> {
metrics.ifPresent(MetricsConnection::incrNormalRunners);
sendToServer(serverName, serverReq, tries);
});
return;
}
ServerStatisticTracker stats = optStats.get();
ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
actionsByServer.forEach((serverName, serverReq) -> {
ServerStatistics serverStats = stats.getStats(serverName);
Map<Long, ServerRequest> groupByBackoff = new HashMap<>();
serverReq.actionsByRegion.forEach((regionName, regionReq) -> {
long backoff = backoffPolicy.getBackoffTime(serverName, regionName, serverStats);
groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest())
.setRegionRequest(regionName, regionReq);
});
groupByBackoff.forEach((backoff, sr) -> {
if (backoff > 0) {
metrics.ifPresent(m -> m.incrDelayRunnersAndUpdateDelayInterval(backoff));
retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), backoff,
TimeUnit.MILLISECONDS);
} else {
try {
onComplete(serverReq.actionsByRegion, tries, sn, ResponseConverter.getResults(req,
rowMutationsIndexMap, resp, controller.cellScanner()));
} catch (Exception e) {
onError(serverReq.actionsByRegion, tries, e, sn);
return;
}
metrics.ifPresent(MetricsConnection::incrNormalRunners);
sendToServer(serverName, sr, tries);
}
});
});
Expand Down Expand Up @@ -454,7 +495,7 @@ private void groupAndSend(Stream<Action> actions, int tries) {
}))
.toArray(CompletableFuture[]::new)), (v, r) -> {
if (!actionsByServer.isEmpty()) {
send(actionsByServer, tries);
sendOrDelay(actionsByServer, tries);
}
if (!locateFailed.isEmpty()) {
tryResubmit(locateFailed.stream(), tries, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
Expand Down Expand Up @@ -101,6 +103,9 @@ class AsyncConnectionImpl implements AsyncConnection {
private final AtomicReference<CompletableFuture<MasterService.Interface>> masterStubMakeFuture =
new AtomicReference<>();

private final Optional<ServerStatisticTracker> stats;
private final ClientBackoffPolicy backoffPolicy;

private ChoreService authService;

private volatile boolean closed = false;
Expand Down Expand Up @@ -133,6 +138,8 @@ public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, String cl
} else {
nonceGenerator = NO_NONCE_GENERATOR;
}
this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
}

private void spawnRenewalChore(final UserGroupInformation user) {
Expand Down Expand Up @@ -233,6 +240,14 @@ void clearMasterStubCache(MasterService.Interface stub) {
masterStub.compareAndSet(stub, null);
}

Optional<ServerStatisticTracker> getStatisticsTracker() {
return stats;
}

ClientBackoffPolicy getBackoffPolicy() {
return backoffPolicy;
}

@Override
public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, connConf) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -55,9 +56,6 @@

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;

/**
* The context, and return value, for a single submit/submitAll call.
* Note on how this class (one AP submit) works. Initially, all requests are split into groups
Expand Down Expand Up @@ -614,8 +612,8 @@ private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName serv
traceText = "AsyncProcess.clientBackoff.sendMultiAction";
runnable = runner;
if (asyncProcess.connection.getConnectionMetrics() != null) {
asyncProcess.connection.getConnectionMetrics().incrDelayRunners();
asyncProcess.connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
asyncProcess.connection.getConnectionMetrics()
.incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime());
}
} else {
if (asyncProcess.connection.getConnectionMetrics() != null) {
Expand Down Expand Up @@ -802,19 +800,16 @@ private void logNoResubmit(ServerName oldServer, int numAttempt,
* @param responses - the response, if any
* @param numAttempt - the attempt
*/
private void receiveMultiAction(MultiAction multiAction,
ServerName server, MultiResponse responses, int numAttempt) {
private void receiveMultiAction(MultiAction multiAction, ServerName server,
MultiResponse responses, int numAttempt) {
assert responses != null;

Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
updateStats(server, results);

updateStats(server, responses);
// Success or partial success
// Analyze detailed results. We can still have individual failures to be redo.
// two specific throwables are managed:
// - DoNotRetryIOException: we continue to retry for other actions
// - RegionMovedException: we update the cache with the new region location

Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
List<Action> toReplay = new ArrayList<>();
Throwable lastException = null;
int failureCount = 0;
Expand Down Expand Up @@ -926,26 +921,9 @@ private void cleanServerCache(ServerName server, Throwable regionException) {
}

@VisibleForTesting
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
boolean metrics = asyncProcess.connection.getConnectionMetrics() != null;
boolean stats = asyncProcess.connection.getStatisticsTracker() != null;
if (!stats && !metrics) {
return;
}
for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats : results.entrySet()) {
byte[] regionName = regionStats.getKey();
ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat();
if (stat == null) {
LOG.error("No ClientProtos.RegionLoadStats found for server=" + server
+ ", region=" + Bytes.toStringBinary(regionName));
continue;
}
RegionLoadStats regionLoadstats = ProtobufUtil.createRegionLoadStats(stat);
ResultStatsUtil.updateStats(asyncProcess.connection.getStatisticsTracker(), server,
regionName, regionLoadstats);
ResultStatsUtil.updateStats(asyncProcess.connection.getConnectionMetrics(),
server, regionName, regionLoadstats);
}
protected void updateStats(ServerName server, MultiResponse resp) {
ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()),
Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), server, resp);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
Expand Down Expand Up @@ -672,4 +674,25 @@ static <T> CompletableFuture<T> getOrFetch(AtomicReference<T> cacheRef,
}
}
}

static void updateStats(Optional<ServerStatisticTracker> optStats,
Optional<MetricsConnection> optMetrics, ServerName serverName, MultiResponse resp) {
if (!optStats.isPresent() && !optMetrics.isPresent()) {
// ServerStatisticTracker and MetricsConnection are both not present, just return
return;
}
resp.getResults().forEach((regionName, regionResult) -> {
ClientProtos.RegionLoadStats stat = regionResult.getStat();
if (stat == null) {
LOG.error("No ClientProtos.RegionLoadStats found for server={}, region={}", serverName,
Bytes.toStringBinary(regionName));
return;
}
RegionLoadStats regionLoadStats = ProtobufUtil.createRegionLoadStats(stat);
optStats.ifPresent(
stats -> ResultStatsUtil.updateStats(stats, serverName, regionName, regionLoadStats));
optMetrics.ifPresent(
metrics -> ResultStatsUtil.updateStats(metrics, serverName, regionName, regionLoadStats));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -421,13 +421,9 @@ public void incrNormalRunners() {
this.runnerStats.incrNormalRunners();
}

/** Increment the number of delay runner counts. */
public void incrDelayRunners() {
/** Increment the number of delay runner counts and update delay interval of delay runner. */
public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
this.runnerStats.incrDelayRunners();
}

/** Update delay interval of delay runner. */
public void updateDelayInterval(long interval) {
this.runnerStats.updateDelayInterval(interval);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ public CompletableFuture<Boolean> thenMutate(RowMutations mutation) {
preCheck();
return RawAsyncTableImpl.this
.<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
stub, mutation,
.action((controller, loc, stub) -> RawAsyncTableImpl.this.<Boolean> mutateRow(controller,
loc, stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
resp -> resp.getExists()))
Expand All @@ -373,7 +373,7 @@ public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {

// We need the MultiRequest when constructing the org.apache.hadoop.hbase.client.MultiResponse,
// so here I write a new method as I do not want to change the abstraction of call method.
private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
Converter<MultiRequest, byte[], RowMutations> reqConvert,
Function<Result, RESP> respConverter) {
Expand All @@ -391,6 +391,8 @@ public void run(MultiResponse resp) {
try {
org.apache.hadoop.hbase.client.MultiResponse multiResp =
ResponseConverter.getResults(req, resp, controller.cellScanner());
ConnectionUtils.updateStats(conn.getStatisticsTracker(), conn.getConnectionMetrics(),
loc.getServerName(), multiResp);
Throwable ex = multiResp.getException(regionName);
if (ex != null) {
future.completeExceptionally(ex instanceof IOException ? ex
Expand All @@ -415,8 +417,8 @@ public void run(MultiResponse resp) {
@Override
public CompletableFuture<Void> mutateRow(RowMutations mutation) {
return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Void> mutateRow(controller, loc, stub,
mutation, (rn, rm) -> {
.action((controller, loc, stub) -> this.<Void> mutateRow(controller, loc, stub, mutation,
(rn, rm) -> {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
Expand Down
Loading

0 comments on commit 8ac87ab

Please sign in to comment.