Skip to content

Commit

Permalink
Avoid discarding SRE state for IO cause
Browse files Browse the repository at this point in the history
Unwrapping all StatusRuntimeExceptions in in ReferenceCountedChannel
when caused by IOException will discard critical tracing and
retriability. The Retrier evaluations may not see an SRE in the causal
chain, and presume it is invariably an unretriable exception. In
general, IOExceptions as SRE wrappers are unsuitable containers and are
routinely misued either for identification (grpc aware status), or
capture (handleInitError).
  • Loading branch information
werkt committed Jul 4, 2023
1 parent 4034ae0 commit f2aba79
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory.ChannelConnection;
import com.google.devtools.build.lib.remote.grpc.DynamicConnectionPool;
Expand All @@ -28,9 +30,12 @@
import io.netty.util.ReferenceCounted;
import io.reactivex.rxjava3.annotations.CheckReturnValue;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.core.SingleSource;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Function;
import java.io.IOException;
import java.util.concurrent.ExecutionException;

/**
* A wrapper around a {@link DynamicConnectionPool} exposing {@link Channel} and a reference count.
Expand Down Expand Up @@ -80,19 +85,46 @@ public <T> ListenableFuture<T> withChannelFuture(
}

public <T> T withChannelBlocking(Function<Channel, T> source)
throws IOException, InterruptedException {
throws ExecutionException, IOException, InterruptedException {
try {
return withChannel(channel -> Single.just(source.apply(channel))).blockingGet();
} catch (RuntimeException e) {
return withChannelBlockingGet(source);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
throwIfInstanceOf(cause, IOException.class);
throwIfInstanceOf(cause, InterruptedException.class);
}
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfUnchecked(cause);
throw e;
}
}

// prevents rxjava silent possible wrap of RuntimeException and misinterpretation
private <T> T withChannelBlockingGet(Function<Channel, T> source)
throws ExecutionException, InterruptedException {
SettableFuture<T> future = SettableFuture.create();
withChannel(channel -> Single.just(source.apply(channel))).subscribe(new SingleObserver<T>() {
@Override
public void onError(Throwable t) {
future.setException(t);
}

@Override
public void onSuccess(T t) {
future.set(t);
}

@Override
public void onSubscribe(Disposable d) {
future.addListener(
() -> {
if (future.isCancelled()) {
d.dispose();
}
},
directExecutor());
}
});
return future.get();
}

@CheckReturnValue
public <T> Single<T> withChannel(Function<Channel, ? extends SingleSource<? extends T>> source) {
return dynamicConnectionPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
digestUtil,
ServerCapabilitiesRequirement.CACHE);
}
} catch (IOException e) {
} catch (AbruptExitException e) {
throw e; // prevent abrupt interception
} catch (Exception e) {
String errorMessage =
"Failed to query remote execution capabilities: "
+ Utils.grpcAwareErrorMessage(e, verboseFailures);
Expand All @@ -603,12 +605,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) {
try {
remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
} catch (InterruptedException e) {
handleInitFailure(env, new IOException(e), Code.CACHE_INIT_FAILURE);
return;
}
if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) {
remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName;
Expand All @@ -630,7 +629,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteVerifyDownloads,
digestUtil,
cacheClient);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
Expand Down Expand Up @@ -695,7 +694,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteVerifyDownloads,
digestUtil,
cacheClient);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
Expand Down Expand Up @@ -741,7 +740,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}

private static void handleInitFailure(
CommandEnvironment env, IOException e, Code remoteExecutionCode) {
CommandEnvironment env, Exception e, Code remoteExecutionCode) {
env.getReporter().handle(Event.error(e.getMessage()));
env.getBlazeModuleEnvironment()
.exit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,15 @@ public ServerCapabilities get(String buildRequestId, String commandId)
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "capabilities", null);
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);
try {
GetCapabilitiesRequest request =
instanceName == null
? GetCapabilitiesRequest.getDefaultInstance()
: GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build();
return retrier.execute(
() ->
channel.withChannelBlocking(
channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request)));
} catch (StatusRuntimeException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new IOException(e);
}
GetCapabilitiesRequest request =
instanceName == null
? GetCapabilitiesRequest.getDefaultInstance()
: GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build();
ServerCapabilities caps = retrier.execute(
() ->
channel.withChannelBlocking(
channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request)));
return caps;
}

static class ClientServerCompatibilityStatus {
Expand Down

0 comments on commit f2aba79

Please sign in to comment.