Skip to content

Commit

Permalink
Done operations must be reexecuted
Browse files Browse the repository at this point in the history
Any operation with done == true as reported by the server is not expected to change its result on subsequent waitExecution calls. To properly retry, this action must be reexecuted, if it was truly transient, to achieve a definitive result. Submit a transient status for retry, disallow special behaviors for NOT_FOUND as covered by done observation, and consider method type when handling operation streams.

Closes #18943.

PiperOrigin-RevId: 548680656
Change-Id: Ib2c9887ead1fbd3de97761db6e8b4077783ad03c
  • Loading branch information
werkt committed Jul 24, 2023
1 parent ca67dee commit d8e520a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ ExecuteResponse execute() throws IOException {

try {
Iterator<Operation> operationStream = executeFunction.apply(request);
return handleOperationStream(operationStream);
return handleOperationStream(operationStream, /* waitExecution= */ false);
} catch (Throwable e) {
// If lastOperation is not null, we know the execution request is accepted by the server. In
// this case, we will fallback to WaitExecution() loop when the stream is broken.
Expand All @@ -199,34 +199,43 @@ ExecuteResponse waitExecution() throws IOException {
WaitExecutionRequest.newBuilder().setName(lastOperation.getName()).build();
try {
Iterator<Operation> operationStream = waitExecutionFunction.apply(request);
return handleOperationStream(operationStream);
return handleOperationStream(operationStream, /* waitExecution= */ true);
} catch (StatusRuntimeException e) {
throw new IOException(e);
} catch (Throwable e) {
// A NOT_FOUND error means Operation was lost on the server, retry Execute().
//
// However, we only retry Execute() if executeBackoff should retry. Also increase the retry
// counter at the same time (done by nextDelayMillis()).
if (e instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) e;
if (sre.getStatus().getCode() == Code.NOT_FOUND
&& executeBackoff.nextDelayMillis(sre) >= 0) {
lastOperation = null;
return null;
}
}
lastOperation = null;
throw new IOException(e);
}
}

/** Process a stream of operations from Execute() or WaitExecution(). */
@Nullable
ExecuteResponse handleOperationStream(Iterator<Operation> operationStream) throws IOException {
ExecuteResponse handleOperationStream(
Iterator<Operation> operationStream, boolean waitExecution) throws IOException {
try {
while (operationStream.hasNext()) {
Operation operation = operationStream.next();
ExecuteResponse response = extractResponseOrThrowIfError(operation);

// At this point, we successfully received a response that is not an error.
lastOperation = operation;
// Either done or should be repeated
lastOperation = operation.getDone() ? null : operation;

ExecuteResponse response;
try {
response = extractResponseOrThrowIfError(operation);
} catch (StatusRuntimeException e) {
// An operation error means Operation has been terminally completed, retry Execute().
//
// However, we only retry Execute() if executeBackoff should retry. Also increase the
// retry
// counter at the same time (done by nextDelayMillis()).
if (waitExecution
&& (retrier.isRetriable(e) || e.getStatus().getCode() == Code.NOT_FOUND)
&& executeBackoff.nextDelayMillis(e) >= 0) {
lastOperation = null;
return null;
}
throw e;
}

// We don't want to reset executeBackoff since if there is an error:
// 1. If happened before we received a first response, we want to ensure the retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.google.devtools.build.lib.remote.common.OperationObserver;
import com.google.devtools.build.lib.remote.common.RemoteExecutionClient;
import com.google.devtools.build.lib.remote.util.TestUtils;
import com.google.rpc.Code;
import io.grpc.Status;
import java.io.IOException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -90,7 +89,9 @@ public void executeRemotely_executeAndRetryWait_forever() throws Exception {
public void executeRemotely_executeAndRetryWait_failForConsecutiveErrors() {
executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
for (int i = 0; i < MAX_RETRY_ATTEMPTS * 2; ++i) {
executionService.whenWaitExecution(DUMMY_REQUEST).thenError(Code.UNAVAILABLE);
executionService
.whenWaitExecution(DUMMY_REQUEST)
.thenError(Status.UNAVAILABLE.asRuntimeException());
}

assertThrows(
Expand Down Expand Up @@ -150,7 +151,10 @@ public void executeRemotely_responseWithoutResult_shouldNotCrash() {
public void executeRemotely_retryWaitExecutionWhenUnauthenticated()
throws IOException, InterruptedException {
executionService.whenExecute(DUMMY_REQUEST).thenAck().finish();
executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenError(Code.UNAUTHENTICATED);
executionService
.whenWaitExecution(DUMMY_REQUEST)
.thenAck()
.thenError(Status.UNAUTHENTICATED.asRuntimeException());
executionService.whenWaitExecution(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);

ExecuteResponse response =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,17 +100,4 @@ public void executeRemotely_retryWaitExecutionWhenUnauthenticated()
assertThat(executionService.getWaitTimes()).isEqualTo(1);
assertThat(response).isEqualTo(DUMMY_RESPONSE);
}

@Test
public void executeRemotely_retryExecuteOnNoResult() throws IOException, InterruptedException {
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);

ExecuteResponse response =
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);

assertThat(executionService.getExecTimes()).isEqualTo(2);
assertThat(executionService.getWaitTimes()).isEqualTo(0);
assertThat(response).isEqualTo(DUMMY_RESPONSE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -313,4 +313,18 @@ public void executeRemotely_notifyObserver() throws IOException, InterruptedExce
FakeExecutionService.ackOperation(DUMMY_REQUEST),
FakeExecutionService.doneOperation(DUMMY_REQUEST, DUMMY_RESPONSE));
}

@Test
public void executeRemotely_retryExecuteOnNoResultDoneOperation()
throws IOException, InterruptedException {
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenError(Code.UNAVAILABLE);
executionService.whenExecute(DUMMY_REQUEST).thenAck().thenDone(DUMMY_RESPONSE);

ExecuteResponse response =
executor.executeRemotely(context, DUMMY_REQUEST, OperationObserver.NO_OP);

assertThat(executionService.getExecTimes()).isEqualTo(2);
assertThat(executionService.getWaitTimes()).isEqualTo(0);
assertThat(response).isEqualTo(DUMMY_RESPONSE);
}
}

0 comments on commit d8e520a

Please sign in to comment.