Skip to content

Commit

Permalink
HubSpot Backport: HBASE-26807 Unify CallQueueTooBigException special …
Browse files Browse the repository at this point in the history
…pause with CallDroppedException (apache#4273)

Signed-off-by: Nick Dimiduk <[email protected]>
  • Loading branch information
bbeaudreault committed Apr 24, 2022
1 parent bd85929 commit b95fe05
Show file tree
Hide file tree
Showing 41 changed files with 925 additions and 362 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hadoop.hbase;

import java.io.IOException;

import org.apache.yetus.audience.InterfaceAudience;

/**
Expand All @@ -28,14 +26,16 @@
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
public class CallDroppedException extends IOException {
public class CallDroppedException extends HBaseServerException {
public CallDroppedException() {
super();
// For now all call drops are due to server being overloaded.
// We could decouple this if desired.
super(true);
}

// Absence of this constructor prevents proper unwrapping of
// remote exception on the client side
public CallDroppedException(String message) {
super(message);
super(true, message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

package org.apache.hadoop.hbase;

import java.io.IOException;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Returned to clients when their request was dropped because the call queue was too big to
* accept a new call. Clients should retry upon receiving it.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
public class CallQueueTooBigException extends IOException {
public class CallQueueTooBigException extends CallDroppedException {
public CallQueueTooBigException() {
super();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;

import org.apache.yetus.audience.InterfaceAudience;

/**
* Base class for exceptions thrown by an HBase server. May contain extra info about
* the state of the server when the exception was thrown.
*/
@InterfaceAudience.Public
public class HBaseServerException extends HBaseIOException {
private boolean serverOverloaded;

public HBaseServerException() {
this(false);
}

public HBaseServerException(String message) {
this(false, message);
}

public HBaseServerException(boolean serverOverloaded) {
this.serverOverloaded = serverOverloaded;
}

public HBaseServerException(boolean serverOverloaded, String message) {
super(message);
this.serverOverloaded = serverOverloaded;
}

/**
* @param t throwable to check for server overloaded state
* @return True if the server was considered overloaded when the exception was thrown
*/
public static boolean isServerOverloaded(Throwable t) {
if (t instanceof HBaseServerException) {
return ((HBaseServerException) t).isServerOverloaded();
}
return false;
}

/**
* Necessary for parsing RemoteException on client side
* @param serverOverloaded True if server was overloaded when exception was thrown
*/
public void setServerOverloaded(boolean serverOverloaded) {
this.serverOverloaded = serverOverloaded;
}

/**
* @return True if server was considered overloaded when exception was thrown
*/
public boolean isServerOverloaded() {
return serverOverloaded;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.concurrent.TimeUnit;

import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.yetus.audience.InterfaceAudience;

/**
Expand Down Expand Up @@ -50,21 +51,42 @@ public interface AsyncAdminBuilder {
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
* @return this for invocation chaining
* @see #setRetryPauseForCQTBE(long, TimeUnit)
* @see #setRetryPauseForServerOverloaded(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit);

/**
* Set the base pause time for retrying when we hit {@code CallQueueTooBigException}. We use an
* exponential policy to generate sleep time when retrying.
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
* We use an exponential policy to generate sleep time from this base when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually {@code CallQueueTooBigException}
* means the server is overloaded. We just use the normal pause value for
* {@code CallQueueTooBigException} if here you specify a smaller value.
* {@link #setRetryPause(long, TimeUnit)} method, as usually
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
* specify a smaller value.
*
* @see #setRetryPause(long, TimeUnit)
* @deprecated Since 2.5.0, will be removed in 4.0.0. Please use
* {@link #setRetryPauseForServerOverloaded(long, TimeUnit)} instead.
*/
AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit);
@Deprecated
default AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit) {
return setRetryPauseForServerOverloaded(pause, unit);
}

/**
* Set the base pause time for retrying when {@link HBaseServerException#isServerOverloaded()}.
* We use an exponential policy to generate sleep time when retrying.
* <p/>
* This value should be greater than the normal pause value which could be set with the above
* {@link #setRetryPause(long, TimeUnit)} method, as usually
* {@link HBaseServerException#isServerOverloaded()} means the server is overloaded. We just use
* the normal pause value for {@link HBaseServerException#isServerOverloaded()} if here you
* specify a smaller value.
*
* @see #setRetryPause(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPauseForServerOverloaded(long pause, TimeUnit unit);

/**
* Set the max retry times for an admin operation. Usually it is the max attempt times minus 1.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {

protected long pauseNs;

protected long pauseForCQTBENs;
protected long pauseNsForServerOverloaded;

protected int maxAttempts;

Expand All @@ -43,7 +43,7 @@ abstract class AsyncAdminBuilderBase implements AsyncAdminBuilder {
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.operationTimeoutNs = connConf.getOperationTimeoutNs();
this.pauseNs = connConf.getPauseNs();
this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded();
this.maxAttempts = connConf.getMaxRetries();
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
}
Expand All @@ -67,8 +67,8 @@ public AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit) {
}

@Override
public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) {
this.pauseForCQTBENs = unit.toNanos(timeout);
public AsyncAdminBuilder setRetryPauseForServerOverloaded(long timeout, TimeUnit unit) {
this.pauseNsForServerOverloaded = unit.toNanos(timeout);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ public interface Callable<T> {
private ServerName serverName;

public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority,
long pauseNs, long pauseForCQTBENs, int maxAttempts, long operationTimeoutNs,
long pauseNs, long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
super(retryTimer, conn, priority, pauseNs, pauseNsForServerOverloaded, maxAttempts,
operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseServerException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RetryImmediatelyException;
Expand All @@ -63,9 +63,7 @@
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.io.netty.util.Timer;

import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
Expand Down Expand Up @@ -104,7 +102,7 @@ class AsyncBatchRpcRetryingCaller<T> {

private final long pauseNs;

private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;

private final int maxAttempts;

Expand Down Expand Up @@ -150,13 +148,14 @@ public int getPriority() {
}

public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, long pauseForCQTBENs,
int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
TableName tableName, List<? extends Row> actions, long pauseNs,
long pauseNsForServerOverloaded, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
Expand Down Expand Up @@ -466,17 +465,17 @@ private void onError(Map<byte[], RegionRequest> actionsByRegion, int tries, Thro
.collect(Collectors.toList());
addError(copiedActions, error, serverName);
tryResubmit(copiedActions.stream(), tries, error instanceof RetryImmediatelyException,
error instanceof CallQueueTooBigException);
HBaseServerException.isServerOverloaded(error));
}

private void tryResubmit(Stream<Action> actions, int tries, boolean immediately,
boolean isCallQueueTooBig) {
boolean isServerOverloaded) {
if (immediately) {
groupAndSend(actions, tries);
return;
}
long delayNs;
long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
long pauseNsToUse = isServerOverloaded ? pauseNsForServerOverloaded : pauseNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class AsyncClientScanner {

private final long pauseNs;

private final long pauseForCQTBENs;
private final long pauseNsForServerOverloaded;

private final int maxAttempts;

Expand All @@ -86,7 +86,7 @@ class AsyncClientScanner {
private final ScanResultCache resultCache;

public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseForCQTBENs,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long pauseNsForServerOverloaded,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
Expand All @@ -100,7 +100,7 @@ public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableN
this.conn = conn;
this.retryTimer = retryTimer;
this.pauseNs = pauseNs;
this.pauseForCQTBENs = pauseForCQTBENs;
this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
Expand Down Expand Up @@ -170,7 +170,8 @@ private void startScan(OpenScannerResponse resp) {
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts)
.startLogErrorsCnt(startLogErrorsCnt).start(resp.controller, resp.resp),
(hasMore, error) -> {
if (error != null) {
Expand All @@ -191,7 +192,8 @@ private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
.priority(scan.getPriority())
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForCQTBE(pauseForCQTBENs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
}

Expand Down
Loading

0 comments on commit b95fe05

Please sign in to comment.