From f2aba7935c9f7123d0a5fee31f51e54f4fdc0ffd Mon Sep 17 00:00:00 2001 From: George Gensure Date: Sun, 25 Jun 2023 23:12:55 -0400 Subject: [PATCH] Avoid discarding SRE state for IO cause Unwrapping all StatusRuntimeExceptions in in ReferenceCountedChannel when caused by IOException will discard critical tracing and retriability. The Retrier evaluations may not see an SRE in the causal chain, and presume it is invariably an unretriable exception. In general, IOExceptions as SRE wrappers are unsuitable containers and are routinely misued either for identification (grpc aware status), or capture (handleInitError). --- .../lib/remote/ReferenceCountedChannel.java | 46 ++++++++++++++++--- .../build/lib/remote/RemoteModule.java | 15 +++--- .../lib/remote/RemoteServerCapabilities.java | 24 ++++------ 3 files changed, 55 insertions(+), 30 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java index 8e4e95afd56810..a37f393292877c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java @@ -16,7 +16,9 @@ import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory.ChannelConnection; import com.google.devtools.build.lib.remote.grpc.DynamicConnectionPool; @@ -28,9 +30,12 @@ import io.netty.util.ReferenceCounted; import io.reactivex.rxjava3.annotations.CheckReturnValue; import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; import io.reactivex.rxjava3.core.SingleSource; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.functions.Function; import java.io.IOException; +import java.util.concurrent.ExecutionException; /** * A wrapper around a {@link DynamicConnectionPool} exposing {@link Channel} and a reference count. @@ -80,19 +85,46 @@ public ListenableFuture withChannelFuture( } public T withChannelBlocking(Function source) - throws IOException, InterruptedException { + throws ExecutionException, IOException, InterruptedException { try { - return withChannel(channel -> Single.just(source.apply(channel))).blockingGet(); - } catch (RuntimeException e) { + return withChannelBlockingGet(source); + } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause != null) { - throwIfInstanceOf(cause, IOException.class); - throwIfInstanceOf(cause, InterruptedException.class); - } + Throwables.throwIfInstanceOf(cause, IOException.class); + Throwables.throwIfUnchecked(cause); throw e; } } + // prevents rxjava silent possible wrap of RuntimeException and misinterpretation + private T withChannelBlockingGet(Function source) + throws ExecutionException, InterruptedException { + SettableFuture future = SettableFuture.create(); + withChannel(channel -> Single.just(source.apply(channel))).subscribe(new SingleObserver() { + @Override + public void onError(Throwable t) { + future.setException(t); + } + + @Override + public void onSuccess(T t) { + future.set(t); + } + + @Override + public void onSubscribe(Disposable d) { + future.addListener( + () -> { + if (future.isCancelled()) { + d.dispose(); + } + }, + directExecutor()); + } + }); + return future.get(); + } + @CheckReturnValue public Single withChannel(Function> source) { return dynamicConnectionPool diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 2c3930733d33a5..a03163a2d0ecb6 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -578,7 +578,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { digestUtil, ServerCapabilitiesRequirement.CACHE); } - } catch (IOException e) { + } catch (AbruptExitException e) { + throw e; // prevent abrupt interception + } catch (Exception e) { String errorMessage = "Failed to query remote execution capabilities: " + Utils.grpcAwareErrorMessage(e, verboseFailures); @@ -603,12 +605,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { if (Strings.isNullOrEmpty(remoteBytestreamUriPrefix)) { try { remoteBytestreamUriPrefix = cacheChannel.withChannelBlocking(Channel::authority); - } catch (IOException e) { + } catch (Exception e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; - } catch (InterruptedException e) { - handleInitFailure(env, new IOException(e), Code.CACHE_INIT_FAILURE); - return; } if (!Strings.isNullOrEmpty(remoteOptions.remoteInstanceName)) { remoteBytestreamUriPrefix += "/" + remoteOptions.remoteInstanceName; @@ -630,7 +629,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { remoteOptions.remoteVerifyDownloads, digestUtil, cacheClient); - } catch (IOException e) { + } catch (Exception e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; } @@ -695,7 +694,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { remoteOptions.remoteVerifyDownloads, digestUtil, cacheClient); - } catch (IOException e) { + } catch (Exception e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; } @@ -741,7 +740,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { } private static void handleInitFailure( - CommandEnvironment env, IOException e, Code remoteExecutionCode) { + CommandEnvironment env, Exception e, Code remoteExecutionCode) { env.getReporter().handle(Event.error(e.getMessage())); env.getBlazeModuleEnvironment() .exit( diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java index 65624b09392c6e..1783fbfbcfc6ec 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteServerCapabilities.java @@ -73,21 +73,15 @@ public ServerCapabilities get(String buildRequestId, String commandId) RequestMetadata metadata = TracingMetadataUtils.buildMetadata(buildRequestId, commandId, "capabilities", null); RemoteActionExecutionContext context = RemoteActionExecutionContext.create(metadata); - try { - GetCapabilitiesRequest request = - instanceName == null - ? GetCapabilitiesRequest.getDefaultInstance() - : GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build(); - return retrier.execute( - () -> - channel.withChannelBlocking( - channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request))); - } catch (StatusRuntimeException e) { - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - throw new IOException(e); - } + GetCapabilitiesRequest request = + instanceName == null + ? GetCapabilitiesRequest.getDefaultInstance() + : GetCapabilitiesRequest.newBuilder().setInstanceName(instanceName).build(); + ServerCapabilities caps = retrier.execute( + () -> + channel.withChannelBlocking( + channel -> capabilitiesBlockingStub(context, channel).getCapabilities(request))); + return caps; } static class ClientServerCompatibilityStatus {