Skip to content

Commit

Permalink
Measure communications timeouts from first failure
Browse files Browse the repository at this point in the history
Backoff fails queries early because it measures duration from the
end of the last successful request.  In cases where there is a large
gap between request, a query may fail after a single attempt.
Instead the failure duration is measured from the end of the first
failed request in a sequence of failures.  With this change the
silding failure window is no longer possible, so all of the code
related to it has been removed.
  • Loading branch information
dain committed Feb 13, 2018
1 parent 467ef9c commit c4092b0
Show file tree
Hide file tree
Showing 17 changed files with 146 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class QueryManagerConfig

private int queryManagerExecutorPoolSize = 5;

private Duration remoteTaskMinErrorDuration = new Duration(2, TimeUnit.MINUTES);
private Duration remoteTaskMaxErrorDuration = new Duration(5, TimeUnit.MINUTES);
private int remoteTaskMaxCallbackThreads = 1000;

Expand Down Expand Up @@ -211,17 +210,16 @@ public QueryManagerConfig setQueryManagerExecutorPoolSize(int queryManagerExecut
return this;
}

@NotNull
@MinDuration("1s")
@Deprecated
public Duration getRemoteTaskMinErrorDuration()
{
return remoteTaskMinErrorDuration;
return remoteTaskMaxErrorDuration;
}

@Deprecated
@Config("query.remote-task.min-error-duration")
public QueryManagerConfig setRemoteTaskMinErrorDuration(Duration remoteTaskMinErrorDuration)
{
this.remoteTaskMinErrorDuration = remoteTaskMinErrorDuration;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class ExchangeClient
private final long bufferCapacity;
private final DataSize maxResponseSize;
private final int concurrentRequestMultiplier;
private final Duration minErrorDuration;
private final Duration maxErrorDuration;
private final HttpClient httpClient;
private final ScheduledExecutorService scheduler;
Expand Down Expand Up @@ -99,7 +98,6 @@ public ExchangeClient(
DataSize bufferCapacity,
DataSize maxResponseSize,
int concurrentRequestMultiplier,
Duration minErrorDuration,
Duration maxErrorDuration,
HttpClient httpClient,
ScheduledExecutorService scheduler,
Expand All @@ -109,7 +107,6 @@ public ExchangeClient(
this.bufferCapacity = bufferCapacity.toBytes();
this.maxResponseSize = maxResponseSize;
this.concurrentRequestMultiplier = concurrentRequestMultiplier;
this.minErrorDuration = minErrorDuration;
this.maxErrorDuration = maxErrorDuration;
this.httpClient = httpClient;
this.scheduler = scheduler;
Expand Down Expand Up @@ -157,7 +154,6 @@ public synchronized void addLocation(URI location)
HttpPageBufferClient client = new HttpPageBufferClient(
httpClient,
maxResponseSize,
minErrorDuration,
maxErrorDuration,
location,
new ExchangeClientCallback(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,16 @@ public ExchangeClientConfig setConcurrentRequestMultiplier(int concurrentRequest
return this;
}

@NotNull
@MinDuration("1ms")
@Deprecated
public Duration getMinErrorDuration()
{
return minErrorDuration;
return maxErrorDuration;
}

@Deprecated
@Config("exchange.min-error-duration")
public ExchangeClientConfig setMinErrorDuration(Duration minErrorDuration)
{
this.minErrorDuration = minErrorDuration;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public class ExchangeClientFactory
{
private final DataSize maxBufferedBytes;
private final int concurrentRequestMultiplier;
private final Duration minErrorDuration;
private final Duration maxErrorDuration;
private final HttpClient httpClient;
private final DataSize maxResponseSize;
Expand All @@ -57,7 +56,6 @@ public ExchangeClientFactory(
config.getMaxBufferSize(),
config.getMaxResponseSize(),
config.getConcurrentRequestMultiplier(),
config.getMinErrorDuration(),
config.getMaxErrorDuration(),
config.getPageBufferClientMaxCallbackThreads(),
httpClient,
Expand All @@ -68,15 +66,13 @@ public ExchangeClientFactory(
DataSize maxBufferedBytes,
DataSize maxResponseSize,
int concurrentRequestMultiplier,
Duration minErrorDuration,
Duration maxErrorDuration,
int pageBufferClientMaxCallbackThreads,
HttpClient httpClient,
ScheduledExecutorService scheduler)
{
this.maxBufferedBytes = requireNonNull(maxBufferedBytes, "maxBufferedBytes is null");
this.concurrentRequestMultiplier = concurrentRequestMultiplier;
this.minErrorDuration = requireNonNull(minErrorDuration, "minErrorDuration is null");
this.maxErrorDuration = requireNonNull(maxErrorDuration, "maxErrorDuration is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");

Expand Down Expand Up @@ -116,7 +112,6 @@ public ExchangeClient get(LocalMemoryContext systemMemoryContext)
maxBufferedBytes,
maxResponseSize,
concurrentRequestMultiplier,
minErrorDuration,
maxErrorDuration,
httpClient,
scheduler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,18 @@ public interface ClientCallback
public HttpPageBufferClient(
HttpClient httpClient,
DataSize maxResponseSize,
Duration minErrorDuration,
Duration maxErrorDuration,
URI location,
ClientCallback clientCallback,
ScheduledExecutorService scheduler,
Executor pageBufferClientCallbackExecutor)
{
this(httpClient, maxResponseSize, minErrorDuration, maxErrorDuration, location, clientCallback, scheduler, Ticker.systemTicker(), pageBufferClientCallbackExecutor);
this(httpClient, maxResponseSize, maxErrorDuration, location, clientCallback, scheduler, Ticker.systemTicker(), pageBufferClientCallbackExecutor);
}

public HttpPageBufferClient(
HttpClient httpClient,
DataSize maxResponseSize,
Duration minErrorDuration,
Duration maxErrorDuration,
URI location,
ClientCallback clientCallback,
Expand All @@ -172,10 +170,9 @@ public HttpPageBufferClient(
this.clientCallback = requireNonNull(clientCallback, "clientCallback is null");
this.scheduler = requireNonNull(scheduler, "scheduler is null");
this.pageBufferClientCallbackExecutor = requireNonNull(pageBufferClientCallbackExecutor, "pageBufferClientCallbackExecutor is null");
requireNonNull(minErrorDuration, "minErrorDuration is null");
requireNonNull(maxErrorDuration, "maxErrorDuration is null");
requireNonNull(ticker, "ticker is null");
this.backoff = new Backoff(minErrorDuration, maxErrorDuration, ticker);
this.backoff = new Backoff(maxErrorDuration, ticker);
}

public synchronized PageBufferClientStatus getStatus()
Expand Down Expand Up @@ -373,11 +370,12 @@ public void onFailure(Throwable t)

t = rewriteException(t);
if (!(t instanceof PrestoException) && backoff.failure()) {
String message = format("%s (%s - %s failures, time since last success %s)",
String message = format("%s (%s - %s failures, failure duration %s, total failed request time %s)",
WORKER_NODE_ERROR,
uri,
backoff.getFailureCount(),
backoff.getTimeSinceLastSuccess().convertTo(SECONDS));
backoff.getFailureDuration().convertTo(SECONDS),
backoff.getFailureRequestTimeTotal().convertTo(SECONDS));
t = new PageTransportTimeoutException(fromUri(uri), message, t);
}
handleFailure(t, resultFuture);
Expand Down Expand Up @@ -414,10 +412,11 @@ public void onFailure(Throwable t)

log.error("Request to delete %s failed %s", location, t);
if (!(t instanceof PrestoException) && backoff.failure()) {
String message = format("Error closing remote buffer (%s - %s failures, time since last success %s)",
String message = format("Error closing remote buffer (%s - %s failures, failure duration %s, total failed request time %s)",
location,
backoff.getFailureCount(),
backoff.getTimeSinceLastSuccess().convertTo(SECONDS));
backoff.getFailureDuration().convertTo(SECONDS),
backoff.getFailureRequestTimeTotal().convertTo(SECONDS));
t = new PrestoException(REMOTE_BUFFER_CLOSE_FAILED, message, t);
}
handleFailure(t, resultFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
Expand All @@ -62,7 +61,6 @@ public class HttpRemoteTaskFactory
private final JsonCodec<TaskStatus> taskStatusCodec;
private final JsonCodec<TaskInfo> taskInfoCodec;
private final JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec;
private final Duration minErrorDuration;
private final Duration maxErrorDuration;
private final Duration taskStatusRefreshMaxWait;
private final Duration taskInfoUpdateInterval;
Expand All @@ -88,8 +86,6 @@ public HttpRemoteTaskFactory(QueryManagerConfig config,
this.taskStatusCodec = taskStatusCodec;
this.taskInfoCodec = taskInfoCodec;
this.taskUpdateRequestCodec = taskUpdateRequestCodec;
checkArgument(config.getRemoteTaskMaxErrorDuration().compareTo(config.getRemoteTaskMinErrorDuration()) >= 0, "max error duration is less than min error duration");
this.minErrorDuration = config.getRemoteTaskMinErrorDuration();
this.maxErrorDuration = config.getRemoteTaskMaxErrorDuration();
this.taskStatusRefreshMaxWait = taskConfig.getStatusRefreshMaxWait();
this.taskInfoUpdateInterval = taskConfig.getInfoUpdateInterval();
Expand Down Expand Up @@ -138,7 +134,6 @@ public RemoteTask createRemoteTask(Session session,
executor,
updateScheduledExecutor,
errorScheduledExecutor,
minErrorDuration,
maxErrorDuration,
taskStatusRefreshMaxWait,
taskInfoUpdateInterval,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand All @@ -38,71 +40,70 @@ public class Backoff
.add(new Duration(500, MILLISECONDS))
.build();

private final long minFailureIntervalNanos;
private final long maxFailureIntervalNanos;
private final Ticker ticker;
private final long[] backoffDelayIntervalsNanos;
private final long createTime;

private long lastSuccessTime;
private long firstRequestAfterSuccessTime;
private long firstFailureTime;
private long lastFailureTime;
private long failureCount;
private long failureRequestTimeTotal;

public Backoff(Duration minFailureInterval, Duration maxFailureInterval)
private long lastRequestStart;

public Backoff(Duration maxFailureInterval)
{
this(minFailureInterval, maxFailureInterval, Ticker.systemTicker(), DEFAULT_BACKOFF_DELAY_INTERVALS);
this(maxFailureInterval, Ticker.systemTicker());
}

public Backoff(Duration minFailureInterval, Duration maxFailureInterval, Ticker ticker)
public Backoff(Duration maxFailureInterval, Ticker ticker)
{
this(minFailureInterval, maxFailureInterval, ticker, DEFAULT_BACKOFF_DELAY_INTERVALS);
this(maxFailureInterval, ticker, DEFAULT_BACKOFF_DELAY_INTERVALS);
}

@VisibleForTesting
public Backoff(Duration minFailureInterval, Duration maxFailureInterval, Ticker ticker, List<Duration> backoffDelayIntervals)
public Backoff(Duration maxFailureInterval, Ticker ticker, List<Duration> backoffDelayIntervals)
{
requireNonNull(minFailureInterval, "minFailureInterval is null");
requireNonNull(maxFailureInterval, "maxFailureInterval is null");
requireNonNull(ticker, "ticker is null");
requireNonNull(backoffDelayIntervals, "backoffDelayIntervals is null");
checkArgument(!backoffDelayIntervals.isEmpty(), "backoffDelayIntervals must contain at least one entry");
checkArgument(maxFailureInterval.compareTo(minFailureInterval) >= 0, "maxFailureInterval is less than minFailureInterval");

this.minFailureIntervalNanos = minFailureInterval.roundTo(NANOSECONDS);
this.maxFailureIntervalNanos = maxFailureInterval.roundTo(NANOSECONDS);
this.ticker = ticker;
this.backoffDelayIntervalsNanos = backoffDelayIntervals.stream()
.mapToLong(duration -> duration.roundTo(NANOSECONDS))
.toArray();

this.lastSuccessTime = this.ticker.read();
this.firstRequestAfterSuccessTime = Long.MIN_VALUE;
this.createTime = this.ticker.read();
}

public synchronized long getFailureCount()
{
return failureCount;
}

public synchronized Duration getTimeSinceLastSuccess()
public synchronized Duration getFailureDuration()
{
long lastSuccessfulRequest = this.lastSuccessTime;
long value = ticker.read() - lastSuccessfulRequest;
return new Duration(value, NANOSECONDS).convertToMostSuccinctTimeUnit();
if (firstFailureTime == 0) {
return new Duration(0, MILLISECONDS);
}
long value = ticker.read() - firstFailureTime;
return new Duration(value, NANOSECONDS);
}

public synchronized Duration getFailureRequestTimeTotal()
{
return new Duration(max(0, failureRequestTimeTotal), NANOSECONDS);
}

public synchronized void startRequest()
{
if (firstRequestAfterSuccessTime < lastSuccessTime) {
firstRequestAfterSuccessTime = ticker.read();
}
lastRequestStart = ticker.read();
}

public synchronized void success()
{
lastSuccessTime = ticker.read();
lastRequestStart = 0;
firstFailureTime = 0;
failureCount = 0;
lastFailureTime = 0;
}
Expand All @@ -112,35 +113,28 @@ public synchronized void success()
*/
public synchronized boolean failure()
{
long lastSuccessfulRequest = this.lastSuccessTime;
long now = ticker.read();

lastFailureTime = now;

failureCount++;

long failureInterval;
if (lastSuccessfulRequest - createTime > maxFailureIntervalNanos) {
failureInterval = maxFailureIntervalNanos;
}
else {
failureInterval = Math.max(lastSuccessfulRequest - createTime, minFailureIntervalNanos);
if (lastRequestStart != 0) {
failureRequestTimeTotal += now - lastRequestStart;
lastRequestStart = 0;
}
long failureDuration;
if (firstRequestAfterSuccessTime < lastSuccessTime) {
// If user didn't call startRequest(), use the time of the last success
failureDuration = now - lastSuccessfulRequest;
}
else {
// Otherwise only count the time since the first request that started failing
failureDuration = now - firstRequestAfterSuccessTime;

if (firstFailureTime == 0) {
firstFailureTime = now;
// can not fail on first failure
return false;
}
return failureDuration >= failureInterval;

long failureDuration = now - firstFailureTime;
return failureDuration >= maxFailureIntervalNanos;
}

public synchronized long getBackoffDelayNanos()
{
int failureCount = (int) Math.min(backoffDelayIntervalsNanos.length, this.failureCount);
int failureCount = (int) min(backoffDelayIntervalsNanos.length, this.failureCount);
if (failureCount == 0) {
return 0;
}
Expand All @@ -149,6 +143,6 @@ public synchronized long getBackoffDelayNanos()

// calculate expected delay from now
long nanosSinceLastFailure = ticker.read() - lastFailureTime;
return Math.max(0, currentDelay - nanosSinceLastFailure);
return max(0, currentDelay - nanosSinceLastFailure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ public ContinuousTaskStatusFetcher(
JsonCodec<TaskStatus> taskStatusCodec,
Executor executor,
HttpClient httpClient,
Duration minErrorDuration,
Duration maxErrorDuration,
ScheduledExecutorService errorScheduledExecutor,
RemoteTaskStats stats)
Expand All @@ -98,7 +97,7 @@ public ContinuousTaskStatusFetcher(
this.executor = requireNonNull(executor, "executor is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");

this.errorTracker = new RequestErrorTracker(taskId, initialTaskStatus.getSelf(), minErrorDuration, maxErrorDuration, errorScheduledExecutor, "getting task status");
this.errorTracker = new RequestErrorTracker(taskId, initialTaskStatus.getSelf(), maxErrorDuration, errorScheduledExecutor, "getting task status");
this.stats = requireNonNull(stats, "stats is null");
}

Expand Down
Loading

0 comments on commit c4092b0

Please sign in to comment.