Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid discarding SRE state for IO cause #18836

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory;
import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory.ChannelConnection;
import com.google.devtools.build.lib.remote.grpc.DynamicConnectionPool;
Expand All @@ -28,9 +30,12 @@
import io.netty.util.ReferenceCounted;
import io.reactivex.rxjava3.annotations.CheckReturnValue;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.core.SingleSource;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Function;
import java.io.IOException;
import java.util.concurrent.ExecutionException;

/**
* A wrapper around a {@link DynamicConnectionPool} exposing {@link Channel} and a reference count.
Expand Down Expand Up @@ -80,19 +85,46 @@ public <T> ListenableFuture<T> withChannelFuture(
}

public <T> T withChannelBlocking(Function<Channel, T> source)
throws IOException, InterruptedException {
throws ExecutionException, IOException, InterruptedException {
try {
return withChannel(channel -> Single.just(source.apply(channel))).blockingGet();
} catch (RuntimeException e) {
return withChannelBlockingGet(source);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
throwIfInstanceOf(cause, IOException.class);
throwIfInstanceOf(cause, InterruptedException.class);
}
Throwables.throwIfInstanceOf(cause, IOException.class);
Throwables.throwIfUnchecked(cause);
throw e;
}
}

// prevents rxjava silent possible wrap of RuntimeException and misinterpretation
private <T> T withChannelBlockingGet(Function<Channel, T> source)
throws ExecutionException, InterruptedException {
SettableFuture<T> future = SettableFuture.create();
withChannel(channel -> Single.just(source.apply(channel))).subscribe(new SingleObserver<T>() {
@Override
public void onError(Throwable t) {
future.setException(t);
}

@Override
public void onSuccess(T t) {
future.set(t);
}

@Override
public void onSubscribe(Disposable d) {
future.addListener(
() -> {
if (future.isCancelled()) {
d.dispose();
}
},
directExecutor());
}
});
return future.get();
}

@CheckReturnValue
public <T> Single<T> withChannel(Function<Channel, ? extends SingleSource<? extends T>> source) {
return dynamicConnectionPool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
digestUtil,
ServerCapabilitiesRequirement.CACHE);
}
} catch (IOException e) {
} catch (AbruptExitException e) {
throw e; // prevent abrupt interception
} catch (Exception e) {
String errorMessage =
"Failed to query remote execution capabilities: "
+ Utils.grpcAwareErrorMessage(e, verboseFailures);
Expand All @@ -603,12 +605,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) {
try {
remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
} catch (InterruptedException e) {
handleInitFailure(env, new IOException(e), Code.CACHE_INIT_FAILURE);
return;
}
if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) {
remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName;
Expand All @@ -630,7 +629,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteVerifyDownloads,
digestUtil,
cacheClient);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
Expand Down Expand Up @@ -695,7 +694,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions.remoteVerifyDownloads,
digestUtil,
cacheClient);
} catch (IOException e) {
} catch (Exception e) {
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
return;
}
Expand Down Expand Up @@ -741,7 +740,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}

private static void handleInitFailure(
CommandEnvironment env, IOException e, Code remoteExecutionCode) {
CommandEnvironment env, Exception e, Code remoteExecutionCode) {
env.getReporter().handle(Event.error(e.getMessage()));
env.getBlazeModuleEnvironment()
.exit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,21 +73,15 @@ public ServerCapabilities get(String buildRequestId, String commandId)
RequestMetadata metadata =
TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "capabilities", null);
RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata);
try {
GetCapabilitiesRequest request =
instanceName == null
? GetCapabilitiesRequest.getDefaultInstance()
: GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build();
return retrier.execute(
() ->
channel.withChannelBlocking(
channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request)));
} catch (StatusRuntimeException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new IOException(e);
}
GetCapabilitiesRequest request =
instanceName == null
? GetCapabilitiesRequest.getDefaultInstance()
: GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build();
ServerCapabilities caps = retrier.execute(
() ->
channel.withChannelBlocking(
channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request)));
return caps;
}

static class ClientServerCompatibilityStatus {
Expand Down