From d8e520aaccf1a7457a631135b1972427223cbeac Mon Sep 17 00:00:00 2001 From: George Gensure Date: Mon, 17 Jul 2023 07:09:39 -0700 Subject: [PATCH] Done operations must be reexecuted Any operation with done == true as reported by the server is not expected to change its result on subsequent waitExecution calls. To properly retry, this action must be reexecuted, if it was truly transient, to achieve a definitive result. Submit a transient status for retry, disallow special behaviors for NOT_FOUND as covered by done observation, and consider method type when handling operation streams. Closes #18943. PiperOrigin-RevId: 548680656 Change-Id: Ib2c9887ead1fbd3de97761db6e8b4077783ad03c --- .../ExperimentalGrpcRemoteExecutor.java | 45 +++++++++++-------- .../ExperimentalGrpcRemoteExecutorTest.java | 10 +++-- .../lib/remote/GrpcRemoteExecutorTest.java | 13 ------ .../remote/GrpcRemoteExecutorTestBase.java | 14 ++++++ 4 files changed, 48 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java b/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java index 6a8a3f83a50409..003115278b3fd0 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java @@ -179,7 +179,7 @@ ExecuteResponse execute() throws IOException { try { Iterator operationStream = executeFunction.apply(request); - return handleOperationStream(operationStream); + return handleOperationStream(operationStream, /* waitExecution= */ false); } catch (Throwable e) { // If lastOperation is not null, we know the execution request is accepted by the server. In // this case, we will fallback to WaitExecution() loop when the stream is broken. @@ -199,34 +199,43 @@ ExecuteResponse waitExecution() throws IOException { WaitExecutionRequest.newBuilder().setName(lastOperation.getName()).build(); try { Iterator operationStream = waitExecutionFunction.apply(request); - return handleOperationStream(operationStream); + return handleOperationStream(operationStream, /* waitExecution= */ true); + } catch (StatusRuntimeException e) { + throw new IOException(e); } catch (Throwable e) { - // A NOT_FOUND error means Operation was lost on the server, retry Execute(). - // - // However, we only retry Execute() if executeBackoff should retry. Also increase the retry - // counter at the same time (done by nextDelayMillis()). - if (e instanceof StatusRuntimeException) { - StatusRuntimeException sre = (StatusRuntimeException) e; - if (sre.getStatus().getCode() == Code.NOT_FOUND - && executeBackoff.nextDelayMillis(sre) >= 0) { - lastOperation = null; - return null; - } - } + lastOperation = null; throw new IOException(e); } } /** Process a stream of operations from Execute() or WaitExecution(). */ @Nullable - ExecuteResponse handleOperationStream(Iterator operationStream) throws IOException { + ExecuteResponse handleOperationStream( + Iterator operationStream, boolean waitExecution) throws IOException { try { while (operationStream.hasNext()) { Operation operation = operationStream.next(); - ExecuteResponse response = extractResponseOrThrowIfError(operation); - // At this point, we successfully received a response that is not an error. - lastOperation = operation; + // Either done or should be repeated + lastOperation = operation.getDone() ? null : operation; + + ExecuteResponse response; + try { + response = extractResponseOrThrowIfError(operation); + } catch (StatusRuntimeException e) { + // An operation error means Operation has been terminally completed, retry Execute(). + // + // However, we only retry Execute() if executeBackoff should retry. Also increase the + // retry + // counter at the same time (done by nextDelayMillis()). + if (waitExecution + && (retrier.isRetriable(e) || e.getStatus().getCode() == Code.NOT_FOUND) + && executeBackoff.nextDelayMillis(e) >= 0) { + lastOperation = null; + return null; + } + throw e; + } // We don't want to reset executeBackoff since if there is an error: // 1. If happened before we received a first response, we want to ensure the retry diff --git a/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java index 1841058df29712..bbfcd8a3a7e5b8 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java @@ -26,7 +26,6 @@ import com.google.devtools.build.lib.remote.common.OperationObserver; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.util.TestUtils; -import com.google.rpc.Code; import io.grpc.Status; import java.io.IOException; import java.util.concurrent.Executors; @@ -90,7 +89,9 @@ public void executeRemotely_executeAndRetryWait_forever() throws Exception { public void executeRemotely_executeAndRetryWait_failForConsecutiveErrors() { executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); for (int i = 0; i < MAX_RETRY_ATTEMPTS * 2; ++i) { - executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.UNAVAILABLE); + executionService + .whenWaitExecution(DUMMY_REQUEST) + .thenError(Status.UNAVAILABLE.asRuntimeException()); } assertThrows( @@ -150,7 +151,10 @@ public void executeRemotely_responseWithoutResult_shouldNotCrash() { public void executeRemotely_retryWaitExecutionWhenUnauthenticated() throws IOException, InterruptedException { executionService.whenExecute(DUMMY_REQUEST).thenAck().finish(); - executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.UNAUTHENTICATED); + executionService + .whenWaitExecution(DUMMY_REQUEST) + .thenAck() + .thenError(Status.UNAUTHENTICATED.asRuntimeException()); executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); ExecuteResponse response = diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTest.java index 57c13174b01771..b5af2b64a0a5c7 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTest.java @@ -100,17 +100,4 @@ public void executeRemotely_retryWaitExecutionWhenUnauthenticated() assertThat(executionService.getWaitTimes()).isEqualTo(1); assertThat(response).isEqualTo(DUMMY_RESPONSE); } - - @Test - public void executeRemotely_retryExecuteOnNoResult() throws IOException, InterruptedException { - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE); - executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); - - ExecuteResponse response = - executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); - - assertThat(executionService.getExecTimes()).isEqualTo(2); - assertThat(executionService.getWaitTimes()).isEqualTo(0); - assertThat(response).isEqualTo(DUMMY_RESPONSE); - } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTestBase.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTestBase.java index 103d5ef85502b3..9094b5f5d45446 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTestBase.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutorTestBase.java @@ -313,4 +313,18 @@ public void executeRemotely_notifyObserver() throws IOException, InterruptedExce FakeExecutionService.ackOperation(DUMMY_REQUEST), FakeExecutionService.doneOperation(DUMMY_REQUEST, DUMMY_RESPONSE)); } + + @Test + public void executeRemotely_retryExecuteOnNoResultDoneOperation() + throws IOException, InterruptedException { + executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE); + executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE); + + ExecuteResponse response = + executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP); + + assertThat(executionService.getExecTimes()).isEqualTo(2); + assertThat(executionService.getWaitTimes()).isEqualTo(0); + assertThat(response).isEqualTo(DUMMY_RESPONSE); + } }