Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GrpcRetryer now respects DEADLINE_EXCEEDED as non-retryable #654

Merged
merged 1 commit into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
import io.temporal.failure.CanceledFailure;
import io.temporal.failure.TerminatedFailure;
import io.temporal.failure.TimeoutFailure;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.serviceclient.CheckedExceptionWrapper;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.File;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import io.temporal.api.workflowservice.v1.StartWorkflowExecutionResponse;
import io.temporal.api.workflowservice.v1.TerminateWorkflowExecutionRequest;
import io.temporal.internal.common.SignalWithStartWorkflowExecutionParameters;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import io.temporal.failure.FailureConverter;
import io.temporal.failure.TemporalFailure;
import io.temporal.internal.common.OptionsUtils;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
import io.temporal.internal.logging.LoggerTag;
import io.temporal.internal.metrics.MetricsType;
import io.temporal.internal.replay.FailureWrapperException;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.internal.worker.ActivityTaskHandler.Result;
import io.temporal.internal.worker.activity.ActivityWorkerHelper;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.grpc.StatusRuntimeException;
import io.temporal.internal.common.InternalUtils;
import io.temporal.internal.metrics.MetricsType;
import io.temporal.serviceclient.BackoffThrottler;
import io.temporal.internal.BackoffThrottler;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import io.temporal.internal.common.WorkflowExecutionUtils;
import io.temporal.internal.logging.LoggerTag;
import io.temporal.internal.metrics.MetricsType;
import io.temporal.serviceclient.GrpcRetryer;
import io.temporal.internal.retryer.GrpcRetryer;
import io.temporal.serviceclient.MetricsTag;
import io.temporal.serviceclient.RpcRetryOptions;
import io.temporal.serviceclient.WorkflowServiceStubs;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.workflow;

import static org.junit.Assert.*;

import io.grpc.Context;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.temporal.client.WorkflowServiceException;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.shared.TestWorkflows;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

public class GrpcRetryerFunctionalTest {

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setTestTimeoutSeconds(10)
.setWorkflowTypes(FiveSecondsSleepingWorkflow.class)
.build();

private static ScheduledExecutorService scheduledExecutor;

@BeforeClass
public static void beforeClass() {
scheduledExecutor = Executors.newScheduledThreadPool(1);
}

@AfterClass
public static void afterClass() throws Exception {
scheduledExecutor.shutdownNow();
}

/**
* This test verifies that if GRPC Deadline in GRPC Context is reached even before the request, we
* return fast and with the correct DEADLINE_EXCEEDED error and we don't try to retry the
* DEADLINE_EXCEEDED.
*/
@Test(timeout = 2000)
public void contextDeadlineExpiredAtStart() {
TestWorkflows.NoArgsWorkflow workflow =
testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class);

AtomicReference<Exception> exRef = new AtomicReference<>();
Context.current()
.withDeadlineAfter(-1, TimeUnit.MILLISECONDS, scheduledExecutor)
// create a new deadline that already expired
.run(
() -> {
try {
workflow.execute();
fail();
} catch (Exception e) {
exRef.set(e);
}
});

WorkflowServiceException exception = (WorkflowServiceException) exRef.get();
Throwable cause = exception.getCause();

assertTrue(cause instanceof StatusRuntimeException);
assertEquals(
Status.DEADLINE_EXCEEDED.getCode(), ((StatusRuntimeException) cause).getStatus().getCode());
}

/**
* This test verifies that if GRPC Deadline reached in the middle of workflow execution, we return
* fast and with the correct DEADLINE_EXCEEDED error and we don't try to retry the
* DEADLINE_EXCEEDED.
*/
@Test(timeout = 2500)
public void contextDeadlineExpiredBeforeWorkflowFinish() {
TestWorkflows.NoArgsWorkflow workflow =
testWorkflowRule.newWorkflowStub(TestWorkflows.NoArgsWorkflow.class);

AtomicReference<Exception> exRef = new AtomicReference<>();
Context.current()
.withDeadlineAfter(500, TimeUnit.MILLISECONDS, scheduledExecutor)
// create a deadline that expires before workflow finishes
.run(
() -> {
try {
workflow.execute();
fail();
} catch (Exception e) {
exRef.set(e);
}
});

WorkflowServiceException exception = (WorkflowServiceException) exRef.get();
Throwable cause = exception.getCause();

assertTrue(cause instanceof StatusRuntimeException);
assertEquals(
Status.DEADLINE_EXCEEDED.getCode(), ((StatusRuntimeException) cause).getStatus().getCode());
}

public static class FiveSecondsSleepingWorkflow implements TestWorkflows.NoArgsWorkflow {

@Override
public void execute() {
try {
Thread.sleep(5000);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* permissions and limitations under the License.
*/

package io.temporal.serviceclient;
package io.temporal.internal;

import java.time.Duration;
import java.util.Objects;
Expand Down Expand Up @@ -62,7 +62,8 @@
*/
public final class AsyncBackoffThrottler {

private static final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);
private static final ScheduledExecutorService executor =
new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "async-backoff-throttler"));

private final Duration initialSleep;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* permissions and limitations under the License.
*/

package io.temporal.serviceclient;
package io.temporal.internal;

import java.time.Duration;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package io.temporal.internal.retryer;

import io.grpc.StatusRuntimeException;
import io.temporal.internal.AsyncBackoffThrottler;
import io.temporal.serviceclient.RpcRetryOptions;
import java.time.Clock;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class GrpcAsyncRetryer {
private static final Logger log = LoggerFactory.getLogger(GrpcAsyncRetryer.class);

private final Clock clock;

public GrpcAsyncRetryer(Clock clock) {
this.clock = clock;
}

public <R> CompletableFuture<R> retry(
RpcRetryOptions options, Supplier<CompletableFuture<R>> function) {
long startTime = clock.millis();
AsyncBackoffThrottler throttler =
new AsyncBackoffThrottler(
options.getInitialInterval(),
options.getMaximumInterval(),
options.getBackoffCoefficient());
CompletableFuture<R> resultCF = new CompletableFuture<>();
retry(options, function, 1, startTime, throttler, resultCF);
return resultCF;
}

private <R> void retry(
RpcRetryOptions options,
Supplier<CompletableFuture<R>> function,
int attempt,
long startTime,
AsyncBackoffThrottler throttler,
CompletableFuture<R> resultCF) {
options.validate();
throttler
.throttle()
.thenAccept(
(ignore) -> {
// try-catch is because get() call might throw.
CompletableFuture<R> result;

try {
result = function.get();
} catch (Throwable e) {
throttler.failure();
// function isn't supposed to throw exceptions, it should always return a
// CompletableFuture even if it's a failed one.
// But if this happens - process the same way as it would be an exception from
// completable future
failOrRetry(options, function, attempt, startTime, throttler, e, resultCF);
return;
}
if (result == null) {
resultCF.complete(null);
return;
}

result.whenComplete(
(r, e) -> {
if (e == null) {
throttler.success();
resultCF.complete(r);
} else {
throttler.failure();
failOrRetry(options, function, attempt, startTime, throttler, e, resultCF);
}
});
});
}

private <R> void failOrRetry(
RpcRetryOptions options,
Supplier<CompletableFuture<R>> function,
int attempt,
long startTime,
AsyncBackoffThrottler throttler,
Throwable lastException,
CompletableFuture<R> resultCF) {

// If exception is thrown from CompletionStage/CompletableFuture methods like compose or handle
// - it gets wrapped into CompletionException, so here we need to unwrap it. We can get not
// wrapped raw exception here too if CompletableFuture was explicitly filled with this exception
// using CompletableFuture.completeExceptionally
lastException = unwrapCompletionException(lastException);

// Do not retry if it's not StatusRuntimeException
if (!(lastException instanceof StatusRuntimeException)) {
resultCF.completeExceptionally(lastException);
return;
}

StatusRuntimeException exception = (StatusRuntimeException) lastException;

RuntimeException finalException =
GrpcRetryerUtils.createFinalExceptionIfNotRetryable(exception, options);
if (finalException != null) {
resultCF.completeExceptionally(finalException);
return;
}

if (GrpcRetryerUtils.ranOutOfRetries(options, startTime, clock.millis(), attempt)) {
resultCF.completeExceptionally(exception);
return;
}

log.debug("Retrying after failure", lastException);
retry(options, function, attempt + 1, startTime, throttler, resultCF);
}

private static Throwable unwrapCompletionException(Throwable e) {
return e instanceof CompletionException ? e.getCause() : e;
}
}
Loading