diff --git a/src/main/java/com/google/devtools/build/lib/remote/BUILD b/src/main/java/com/google/devtools/build/lib/remote/BUILD index 3f1b39f731533a..741a246f7edf19 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/BUILD @@ -42,7 +42,6 @@ java_library( ":ExecutionStatusException", ":ReferenceCountedChannel", ":Retrier", - "//src/main/java/com/google/devtools/build/lib:build-request-options", "//src/main/java/com/google/devtools/build/lib:runtime", "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/actions:artifacts", @@ -75,6 +74,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/disk", "//src/main/java/com/google/devtools/build/lib/remote/downloader", + "//src/main/java/com/google/devtools/build/lib/remote/grpc", "//src/main/java/com/google/devtools/build/lib/remote/http", "//src/main/java/com/google/devtools/build/lib/remote/logging", "//src/main/java/com/google/devtools/build/lib/remote/merkletree", @@ -98,6 +98,7 @@ java_library( "//third_party:guava", "//third_party:jsr305", "//third_party:netty", + "//third_party:rxjava3", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", "//third_party/protobuf:protobuf_java_util", @@ -128,10 +129,9 @@ java_library( name = "ReferenceCountedChannel", srcs = [ "ReferenceCountedChannel.java", - "ReferenceCountedChannelPool.java", ], deps = [ - "//third_party:guava", + "//src/main/java/com/google/devtools/build/lib/remote/grpc", "//third_party:netty", "//third_party/grpc:grpc-jar", ], diff --git a/src/main/java/com/google/devtools/build/lib/remote/GoogleChannelConnectionFactory.java b/src/main/java/com/google/devtools/build/lib/remote/GoogleChannelConnectionFactory.java new file mode 100644 index 00000000000000..2901e2f6417543 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/GoogleChannelConnectionFactory.java @@ -0,0 +1,60 @@ +// Copyright 2021 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote; + +import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; +import io.grpc.ClientInterceptor; +import io.reactivex.rxjava3.core.Single; +import java.util.List; + +/** + * A {@link ChannelConnectionFactory} which creates {@link ChannelConnection} using {@link + * ChannelFactory}. + */ +public class GoogleChannelConnectionFactory implements ChannelConnectionFactory { + private final ChannelFactory channelFactory; + private final String target; + private final String proxy; + private final AuthAndTLSOptions options; + private final List interceptors; + private final int maxConcurrency; + + public GoogleChannelConnectionFactory( + ChannelFactory channelFactory, + String target, + String proxy, + AuthAndTLSOptions options, + List interceptors, + int maxConcurrency) { + this.channelFactory = channelFactory; + this.target = target; + this.proxy = proxy; + this.options = options; + this.interceptors = interceptors; + this.maxConcurrency = maxConcurrency; + } + + @Override + public Single create() { + return Single.fromCallable( + () -> + new ChannelConnection(channelFactory.newChannel(target, proxy, options, interceptors))); + } + + @Override + public int maxConcurrency() { + return maxConcurrency; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java index 6a9d09b69a7c8f..8d9bcab28a3316 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannel.java @@ -13,84 +13,101 @@ // limitations under the License. package com.google.devtools.build.lib.remote; +import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; +import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory.ChannelConnection; +import com.google.devtools.build.lib.remote.grpc.DynamicConnectionPool; +import com.google.devtools.build.lib.remote.grpc.SharedConnectionFactory.SharedConnection; import io.grpc.CallOptions; +import io.grpc.Channel; import io.grpc.ClientCall; -import io.grpc.ManagedChannel; +import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; import io.grpc.MethodDescriptor; +import io.grpc.Status; import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCounted; -import java.util.concurrent.TimeUnit; +import java.io.IOException; /** - * A wrapper around a {@link io.grpc.ManagedChannel} exposing a reference count. When instantiated - * the reference count is 1. {@link ManagedChannel#shutdown()} will be called on the wrapped channel - * when the reference count reaches 0. + * A wrapper around a {@link DynamicConnectionPool} exposing {@link Channel} and a reference count. + * When instantiated the reference count is 1. {@link DynamicConnectionPool#close()} will be called + * on the wrapped channel when the reference count reaches 0. * *

See {@link ReferenceCounted} for more information about reference counting. */ -public class ReferenceCountedChannel extends ManagedChannel implements ReferenceCounted { - - private final ManagedChannel channel; - private final AbstractReferenceCounted referenceCounted; - - public ReferenceCountedChannel(ManagedChannel channel) { - this( - channel, - new AbstractReferenceCounted() { - @Override - protected void deallocate() { - channel.shutdown(); +public class ReferenceCountedChannel extends Channel implements ReferenceCounted { + private final DynamicConnectionPool dynamicConnectionPool; + private final AbstractReferenceCounted referenceCounted = + new AbstractReferenceCounted() { + @Override + protected void deallocate() { + try { + dynamicConnectionPool.close(); + } catch (IOException e) { + throw new AssertionError(e.getMessage(), e); } + } - @Override - public ReferenceCounted touch(Object o) { - return this; - } - }); - } - - protected ReferenceCountedChannel( - ManagedChannel channel, AbstractReferenceCounted referenceCounted) { - this.channel = channel; - this.referenceCounted = referenceCounted; - } + @Override + public ReferenceCounted touch(Object o) { + return this; + } + }; - @Override - public ManagedChannel shutdown() { - throw new UnsupportedOperationException("Don't call shutdown() directly, but use release() " - + "instead."); + public ReferenceCountedChannel(ChannelConnectionFactory connectionFactory) { + this.dynamicConnectionPool = + new DynamicConnectionPool(connectionFactory, connectionFactory.maxConcurrency()); } - @Override public boolean isShutdown() { - return channel.isShutdown(); - } - - @Override - public boolean isTerminated() { - return channel.isTerminated(); - } - - @Override - public ManagedChannel shutdownNow() { - throw new UnsupportedOperationException("Don't call shutdownNow() directly, but use release() " - + "instead."); - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException { - return channel.awaitTermination(timeout, timeUnit); + return dynamicConnectionPool.isClosed(); + } + + /** A {@link ClientCall} which call {@link SharedConnection#close()} after the RPC is closed. */ + static class ConnectionCleanupCall + extends ForwardingClientCall.SimpleForwardingClientCall { + private final SharedConnection connection; + + protected ConnectionCleanupCall(ClientCall delegate, SharedConnection connection) { + super(delegate); + this.connection = connection; + } + + @Override + public void start(Listener responseListener, Metadata headers) { + super.start( + new ForwardingClientCallListener.SimpleForwardingClientCallListener( + responseListener) { + @Override + public void onClose(Status status, Metadata trailers) { + super.onClose(status, trailers); + + try { + connection.close(); + } catch (IOException e) { + throw new AssertionError(e.getMessage(), e); + } + } + }, + headers); + } } @Override public ClientCall newCall( MethodDescriptor methodDescriptor, CallOptions callOptions) { - return channel.newCall(methodDescriptor, callOptions); + SharedConnection sharedConnection = dynamicConnectionPool.create().blockingGet(); + ChannelConnection connection = (ChannelConnection) sharedConnection.getUnderlyingConnection(); + return new ConnectionCleanupCall<>( + connection.getChannel().newCall(methodDescriptor, callOptions), sharedConnection); } @Override public String authority() { - return channel.authority(); + SharedConnection sharedConnection = dynamicConnectionPool.create().blockingGet(); + ChannelConnection connection = (ChannelConnection) sharedConnection.getUnderlyingConnection(); + return connection.getChannel().authority(); } @Override @@ -131,4 +148,4 @@ public boolean release() { public boolean release(int decrement) { return referenceCounted.release(decrement); } -} \ No newline at end of file +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannelPool.java b/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannelPool.java deleted file mode 100644 index 0a9efa0d742342..00000000000000 --- a/src/main/java/com/google/devtools/build/lib/remote/ReferenceCountedChannelPool.java +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright 2020 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.remote; - -import com.google.common.collect.ImmutableList; -import io.grpc.CallOptions; -import io.grpc.ClientCall; -import io.grpc.ManagedChannel; -import io.grpc.MethodDescriptor; -import io.netty.util.AbstractReferenceCounted; -import io.netty.util.ReferenceCounted; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * A wrapper around a {@link io.grpc.ManagedChannel} exposing a reference count and performing a - * round-robin load balance across a list of channels. When instantiated the reference count is 1. - * {@link ManagedChannel#shutdown()} will be called on the wrapped channel when the reference count - * reaches 0. - * - *

See {@link ReferenceCounted} for more information about reference counting. - */ -public class ReferenceCountedChannelPool extends ReferenceCountedChannel { - - private final AtomicInteger indexTicker = new AtomicInteger(); - private final ImmutableList channels; - - public ReferenceCountedChannelPool(ImmutableList channels) { - super( - channels.get(0), - new AbstractReferenceCounted() { - @Override - protected void deallocate() { - for (ManagedChannel channel : channels) { - channel.shutdown(); - } - } - - @Override - public ReferenceCounted touch(Object o) { - return null; - } - }); - this.channels = channels; - } - - @Override - public boolean isShutdown() { - for (ManagedChannel channel : channels) { - if (!channel.isShutdown()) { - return false; - } - } - return true; - } - - @Override - public boolean isTerminated() { - for (ManagedChannel channel : channels) { - if (!channel.isTerminated()) { - return false; - } - } - return true; - } - - @Override - public boolean awaitTermination(long timeout, TimeUnit timeUnit) throws InterruptedException { - long endTimeNanos = System.nanoTime() + timeUnit.toNanos(timeout); - for (ManagedChannel channel : channels) { - long awaitTimeNanos = endTimeNanos - System.nanoTime(); - if (awaitTimeNanos <= 0) { - break; - } - channel.awaitTermination(awaitTimeNanos, TimeUnit.NANOSECONDS); - } - return isTerminated(); - } - - @Override - public ClientCall newCall( - MethodDescriptor methodDescriptor, CallOptions callOptions) { - return getNextChannel().newCall(methodDescriptor, callOptions); - } - - @Override - public String authority() { - // Assume all channels have the same authority. - return channels.get(0).authority(); - } - - /** - * Performs a simple round robin on the list of {@link ManagedChannel}s in the {@code channels} - * list. - * - * @see Suggestion from - * gRPC team. - * @return A {@link ManagedChannel} that can be used for a single RPC call. - */ - private ManagedChannel getNextChannel() { - int index = getChannelIndex(channels.size(), indexTicker.getAndIncrement()); - return channels.get(index); - } - - public static int getChannelIndex(int poolSize, int affinity) { - int index = affinity % poolSize; - return Math.abs(index); - } -} diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java index aef605b4faef9d..57741a8f28e26e 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java @@ -18,7 +18,6 @@ import com.google.common.base.Ascii; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.disk.DiskAndRemoteCacheClient; import com.google.devtools.build.lib.remote.disk.DiskCacheClient; @@ -27,13 +26,9 @@ import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; -import io.grpc.ClientInterceptor; -import io.grpc.ManagedChannel; import io.netty.channel.unix.DomainSocketAddress; import java.io.IOException; import java.net.URI; -import java.util.ArrayList; -import java.util.List; import javax.annotation.Nullable; /** @@ -57,21 +52,6 @@ public static RemoteCacheClient createDiskAndRemoteClient( return new DiskAndRemoteCacheClient(diskCacheClient, remoteCacheClient, options); } - public static ReferenceCountedChannel createGrpcChannelPool( - ChannelFactory channelFactory, - int poolSize, - String target, - String proxyUri, - AuthAndTLSOptions authOptions, - @Nullable List interceptors) - throws IOException { - List channels = new ArrayList<>(); - for (int i = 0; i < poolSize; i++) { - channels.add(channelFactory.newChannel(target, proxyUri, authOptions, interceptors)); - } - return new ReferenceCountedChannelPool(ImmutableList.copyOf(channels)); - } - public static RemoteCacheClient create( RemoteOptions options, @Nullable Credentials creds, diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 04d31513dff003..669c646e42bfb3 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -50,7 +50,6 @@ import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; import com.google.devtools.build.lib.buildeventstream.LocalFilesArtifactUploader; import com.google.devtools.build.lib.buildtool.BuildRequest; -import com.google.devtools.build.lib.buildtool.BuildRequestOptions; import com.google.devtools.build.lib.collect.nestedset.NestedSet; import com.google.devtools.build.lib.events.Event; import com.google.devtools.build.lib.events.Reporter; @@ -94,6 +93,7 @@ import io.grpc.CallCredentials; import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -283,6 +283,9 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { String buildRequestId = env.getBuildRequestId(); env.getReporter().handle(Event.info(String.format("Invocation ID: %s", invocationId))); + RxJavaPlugins.setErrorHandler( + error -> env.getReporter().handle(Event.error(Throwables.getStackTraceAsString(error)))); + Path logDir = env.getOutputBase().getRelative(env.getRuntime().getProductName() + "-remote-logs"); cleanAndCreateRemoteLogsDir(logDir); @@ -309,39 +312,29 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { ReferenceCountedChannel cacheChannel = null; ReferenceCountedChannel downloaderChannel = null; - int poolSize = 1; - BuildRequestOptions buildRequestOptions = - env.getOptions().getOptions(BuildRequestOptions.class); - if (buildRequestOptions != null) { - // The following calculation is based on the suggestion from comment - // https://github.com/bazelbuild/bazel/issues/11801#issuecomment-672973245 - // - // The number of concurrent requests for one connection to a gRPC server is limited by - // MAX_CONCURRENT_STREAMS which is normally being 100+. We assume 50 concurrent requests for - // each connection should be fairly well. The number of connections opened by one channel is - // based on the resolved IPs of that server. We assume servers normally have 2 IPs. So the - // number of required channels is calculated as: ceil(jobs / 100). - poolSize = (int) Math.ceil((double) buildRequestOptions.jobs / 100.0); - } + // The number of concurrent requests for one connection to a gRPC server is limited by + // MAX_CONCURRENT_STREAMS which is normally being 100+. We assume 50 concurrent requests for + // each connection should be fairly well. The number of connections opened by one channel is + // based on the resolved IPs of that server. We assume servers normally have 2 IPs. So the + // max concurrency per connection is 100. + int maxConcurrencyPerConnection = 100; + if (enableRemoteExecution) { ImmutableList.Builder interceptors = ImmutableList.builder(); interceptors.add(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions)); if (loggingInterceptor != null) { interceptors.add(loggingInterceptor); } - try { - execChannel = - RemoteCacheClientFactory.createGrpcChannelPool( - channelFactory, - poolSize, - remoteOptions.remoteExecutor, - remoteOptions.remoteProxy, - authAndTlsOptions, - interceptors.build()); - } catch (IOException e) { - handleInitFailure(env, e, Code.EXEC_CHANNEL_INIT_FAILURE); - return; - } + execChannel = + new ReferenceCountedChannel( + new GoogleChannelConnectionFactory( + channelFactory, + remoteOptions.remoteExecutor, + remoteOptions.remoteProxy, + authAndTlsOptions, + interceptors.build(), + maxConcurrencyPerConnection)); + // Create a separate channel if --remote_executor and --remote_cache point to different // endpoints. if (remoteOptions.remoteCache.equals(remoteOptions.remoteExecutor)) { @@ -355,19 +348,15 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { if (loggingInterceptor != null) { interceptors.add(loggingInterceptor); } - try { - cacheChannel = - RemoteCacheClientFactory.createGrpcChannelPool( - channelFactory, - poolSize, - remoteOptions.remoteCache, - remoteOptions.remoteProxy, - authAndTlsOptions, - interceptors.build()); - } catch (IOException e) { - handleInitFailure(env, e, Code.CACHE_CHANNEL_INIT_FAILURE); - return; - } + cacheChannel = + new ReferenceCountedChannel( + new GoogleChannelConnectionFactory( + channelFactory, + remoteOptions.remoteCache, + remoteOptions.remoteProxy, + authAndTlsOptions, + interceptors.build(), + maxConcurrencyPerConnection)); } if (enableRemoteDownloader) { @@ -380,19 +369,15 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException { if (loggingInterceptor != null) { interceptors.add(loggingInterceptor); } - try { - downloaderChannel = - RemoteCacheClientFactory.createGrpcChannelPool( - channelFactory, - poolSize, - remoteOptions.remoteDownloader, - remoteOptions.remoteProxy, - authAndTlsOptions, - interceptors.build()); - } catch (IOException e) { - handleInitFailure(env, e, Code.DOWNLOADER_CHANNEL_INIT_FAILURE); - return; - } + downloaderChannel = + new ReferenceCountedChannel( + new GoogleChannelConnectionFactory( + channelFactory, + remoteOptions.remoteDownloader, + remoteOptions.remoteProxy, + authAndTlsOptions, + interceptors.build(), + maxConcurrencyPerConnection)); } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/grpc/ChannelConnectionFactory.java b/src/main/java/com/google/devtools/build/lib/remote/grpc/ChannelConnectionFactory.java new file mode 100644 index 00000000000000..1e626fff82197e --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/remote/grpc/ChannelConnectionFactory.java @@ -0,0 +1,61 @@ +// Copyright 2021 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.remote.grpc; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; +import io.reactivex.rxjava3.core.Single; +import java.io.IOException; + +/** A {@link ConnectionFactory} which creates {@link ChannelConnection}. */ +public interface ChannelConnectionFactory extends ConnectionFactory { + @Override + Single create(); + + /** Returns the max concurrency supported by the underlying {@link ManagedChannel}. */ + int maxConcurrency(); + + /** A {@link Connection} which wraps around {@link ManagedChannel}. */ + class ChannelConnection implements Connection { + private final ManagedChannel channel; + + public ChannelConnection(ManagedChannel channel) { + this.channel = channel; + } + + @Override + public ClientCall call( + MethodDescriptor method, CallOptions options) { + return channel.newCall(method, options); + } + + @Override + public void close() throws IOException { + channel.shutdown(); + try { + channel.awaitTermination(Integer.MAX_VALUE, SECONDS); + } catch (InterruptedException e) { + throw new IOException(e.getMessage(), e); + } + } + + public ManagedChannel getChannel() { + return channel; + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/remote/grpc/DynamicConnectionPool.java b/src/main/java/com/google/devtools/build/lib/remote/grpc/DynamicConnectionPool.java index 480ed66241f5c1..6824563d737a38 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/grpc/DynamicConnectionPool.java +++ b/src/main/java/com/google/devtools/build/lib/remote/grpc/DynamicConnectionPool.java @@ -45,6 +45,10 @@ public DynamicConnectionPool( this.factories = new ArrayList<>(); } + public boolean isClosed() { + return closed.get(); + } + @Override public void close() throws IOException { if (closed.compareAndSet(false, true)) { diff --git a/src/test/java/com/google/devtools/build/lib/remote/BUILD b/src/test/java/com/google/devtools/build/lib/remote/BUILD index 02fa8cd79deb7d..14b2bb3c7a0e14 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/BUILD @@ -68,6 +68,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/remote", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/disk", + "//src/main/java/com/google/devtools/build/lib/remote/grpc", "//src/main/java/com/google/devtools/build/lib/remote/http", "//src/main/java/com/google/devtools/build/lib/remote/merkletree", "//src/main/java/com/google/devtools/build/lib/remote/options", @@ -98,6 +99,7 @@ java_test( "//third_party:junit4", "//third_party:mockito", "//third_party:netty", + "//third_party:rxjava3", "//third_party:truth", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java index 147f045f5793fb..f41b16a16dd640 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java @@ -50,6 +50,7 @@ import com.google.devtools.build.lib.remote.ByteStreamUploaderTest.MaybeFailOnceUploadService; import com.google.devtools.build.lib.remote.common.MissingDigestsFinder; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TestUtils; import com.google.devtools.build.lib.vfs.DigestHashFunction; @@ -57,7 +58,6 @@ import com.google.devtools.build.lib.vfs.FileSystemUtils; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; -import io.grpc.ManagedChannel; import io.grpc.Server; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -65,6 +65,7 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.util.MutableHandlerRegistry; +import io.reactivex.rxjava3.core.Single; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.StandardCharsets; @@ -92,7 +93,8 @@ public class ByteStreamBuildEventArtifactUploaderTest { private ListeningScheduledExecutorService retryService; private Server server; - private ManagedChannel channel; + private ChannelConnectionFactory channelConnectionFactory; + private final FileSystem fs = new InMemoryFileSystem(new JavaClock(), DigestHashFunction.SHA256); private final Path execRoot = fs.getPath("/execroot"); @@ -108,7 +110,19 @@ public final void setUp() throws Exception { .fallbackHandlerRegistry(serviceRegistry) .build() .start(); - channel = InProcessChannelBuilder.forName(serverName).build(); + channelConnectionFactory = + new ChannelConnectionFactory() { + @Override + public Single create() { + return Single.just( + new ChannelConnection(InProcessChannelBuilder.forName(serverName).build())); + } + + @Override + public int maxConcurrency() { + return 100; + } + }; outputRoot = ArtifactRoot.asDerivedRoot(execRoot, RootType.Output, "out"); outputRoot.getRoot().asPath().createDirectoryAndParents(); @@ -123,8 +137,6 @@ public void tearDown() throws Exception { retryService.awaitTermination( com.google.devtools.build.lib.testutil.TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - channel.shutdownNow(); - channel.awaitTermination(5, TimeUnit.SECONDS); server.shutdownNow(); server.awaitTermination(); } @@ -155,7 +167,7 @@ public void uploadsShouldWork() throws Exception { RemoteRetrier retrier = TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService); - ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channel); + ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory); ByteStreamUploader uploader = new ByteStreamUploader( "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier); @@ -212,36 +224,37 @@ public void someUploadsFail() throws Exception { filesToUpload.put(file, new LocalFile(file, LocalFileType.OUTPUT)); } String hashOfBlobThatShouldFail = blobsByHash.keySet().iterator().next().toString(); - serviceRegistry.addService(new MaybeFailOnceUploadService(blobsByHash) { - @Override - public StreamObserver write(StreamObserver response) { - StreamObserver delegate = super.write(response); - return new StreamObserver() { + serviceRegistry.addService( + new MaybeFailOnceUploadService(blobsByHash) { @Override - public void onNext(WriteRequest value) { - if (value.getResourceName().contains(hashOfBlobThatShouldFail)) { - response.onError(Status.CANCELLED.asException()); - } else { - delegate.onNext(value); - } + public StreamObserver write(StreamObserver response) { + StreamObserver delegate = super.write(response); + return new StreamObserver() { + @Override + public void onNext(WriteRequest value) { + if (value.getResourceName().contains(hashOfBlobThatShouldFail)) { + response.onError(Status.CANCELLED.asException()); + } else { + delegate.onNext(value); + } + } + + @Override + public void onError(Throwable t) { + delegate.onError(t); + } + + @Override + public void onCompleted() { + delegate.onCompleted(); + } + }; } - - @Override - public void onError(Throwable t) { - delegate.onError(t); - } - - @Override - public void onCompleted() { - delegate.onCompleted(); - } - }; - } - }); + }); RemoteRetrier retrier = TestUtils.newRemoteRetrier(() -> new FixedBackoff(1, 0), (e) -> true, retryService); - ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channel); + ReferenceCountedChannel refCntChannel = new ReferenceCountedChannel(channelConnectionFactory); ByteStreamUploader uploader = new ByteStreamUploader( "instance", refCntChannel, CallCredentialsProvider.NO_CREDENTIALS, 3, retrier); diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java index a53278765492d7..9cd30be310e126 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java @@ -37,6 +37,7 @@ import com.google.devtools.build.lib.analysis.BlazeVersionInfo; import com.google.devtools.build.lib.authandtls.CallCredentialsProvider; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TestUtils; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; @@ -44,7 +45,6 @@ import com.google.protobuf.ByteString; import io.grpc.BindableService; import io.grpc.CallCredentials; -import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerCall; @@ -61,6 +61,7 @@ import io.grpc.stub.MetadataUtils; import io.grpc.stub.StreamObserver; import io.grpc.util.MutableHandlerRegistry; +import io.reactivex.rxjava3.core.Single; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -98,8 +99,9 @@ public class ByteStreamUploaderTest { private final MutableHandlerRegistry serviceRegistry = new MutableHandlerRegistry(); private ListeningScheduledExecutorService retryService; + private final String serverName = "Server for " + this.getClass(); private Server server; - private ManagedChannel channel; + private ChannelConnectionFactory channelConnectionFactory; private RemoteActionExecutionContext context; @Mock private Retrier.Backoff mockBackoff; @@ -108,13 +110,24 @@ public class ByteStreamUploaderTest { public final void setUp() throws Exception { MockitoAnnotations.initMocks(this); - String serverName = "Server for " + this.getClass(); server = InProcessServerBuilder.forName(serverName) .fallbackHandlerRegistry(serviceRegistry) .build() .start(); - channel = InProcessChannelBuilder.forName(serverName).build(); + channelConnectionFactory = + new ChannelConnectionFactory() { + @Override + public Single create() { + return Single.just( + new ChannelConnection(InProcessChannelBuilder.forName(serverName).build())); + } + + @Override + public int maxConcurrency() { + return 100; + } + }; RequestMetadata metadata = TracingMetadataUtils.buildMetadata( "none", @@ -131,8 +144,6 @@ public void tearDown() throws Exception { retryService.awaitTermination( com.google.devtools.build.lib.testutil.TestUtils.WAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS); - channel.shutdownNow(); - channel.awaitTermination(5, TimeUnit.SECONDS); server.shutdownNow(); server.awaitTermination(); } @@ -144,7 +155,7 @@ public void singleBlobUploadShouldWork() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -221,7 +232,7 @@ public void progressiveUploadShouldWork() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, 3, retrier); @@ -338,7 +349,7 @@ public void concurrentlyCompletedUploadIsNotRetried() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, 1, retrier); @@ -397,7 +408,7 @@ public void unimplementedQueryShouldRestartUpload() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, 3, retrier); @@ -468,7 +479,7 @@ public void earlyWriteResponseShouldCompleteUpload() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, 3, retrier); @@ -506,7 +517,7 @@ public void incorrectCommittedSizeFailsUpload() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, 3, retrier); @@ -548,7 +559,7 @@ public void multipleBlobsUploadShouldWork() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -583,7 +594,7 @@ public void contextShouldBePreservedUponRetries() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -699,9 +710,21 @@ public void customHeadersAreAttachedToRequest() throws Exception { new ByteStreamUploader( INSTANCE_NAME, new ReferenceCountedChannel( - InProcessChannelBuilder.forName("Server for " + this.getClass()) - .intercept(MetadataUtils.newAttachHeadersInterceptor(metadata)) - .build()), + new ChannelConnectionFactory() { + @Override + public Single create() { + return Single.just( + new ChannelConnection( + InProcessChannelBuilder.forName(serverName) + .intercept(MetadataUtils.newAttachHeadersInterceptor(metadata)) + .build())); + } + + @Override + public int maxConcurrency() { + return 100; + } + }), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -762,7 +785,7 @@ public void sameBlobShouldNotBeUploadedTwice() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -828,7 +851,7 @@ public void errorsShouldBeReported() throws IOException, InterruptedException { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -861,7 +884,7 @@ public void shutdownShouldCancelOngoingUploads() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -925,7 +948,7 @@ public void failureInRetryExecutorShouldBeHandled() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -963,7 +986,7 @@ public void resourceNameWithoutInstanceName() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( /* instanceName= */ null, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -1006,7 +1029,7 @@ public void nonRetryableStatusShouldNotBeRetried() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( /* instanceName= */ null, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -1042,7 +1065,7 @@ public void failedUploadsShouldNotDeduplicate() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -1120,7 +1143,7 @@ public void deduplicationOfUploadsShouldWork() throws Exception { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), CallCredentialsProvider.NO_CREDENTIALS, /* callTimeoutSecs= */ 60, retrier); @@ -1201,7 +1224,7 @@ public void refresh() throws IOException { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), callCredentialsProvider, /* callTimeoutSecs= */ 60, retrier); @@ -1258,7 +1281,7 @@ public void refresh() throws IOException { ByteStreamUploader uploader = new ByteStreamUploader( INSTANCE_NAME, - new ReferenceCountedChannel(channel), + new ReferenceCountedChannel(channelConnectionFactory), callCredentialsProvider, /* callTimeoutSecs= */ 60, retrier); diff --git a/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java index 44e915e7345da6..0f28d47911d0bd 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutorTest.java @@ -29,16 +29,19 @@ import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff; import com.google.devtools.build.lib.remote.common.OperationObserver; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.TestUtils; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.common.options.Options; import com.google.longrunning.Operation; import com.google.rpc.Code; +import io.grpc.ManagedChannel; import io.grpc.Server; import io.grpc.Status; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; +import io.reactivex.rxjava3.core.Single; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -113,10 +116,22 @@ public final void setUp() throws Exception { retryService); ReferenceCountedChannel channel = new ReferenceCountedChannel( - InProcessChannelBuilder.forName(fakeServerName) - .intercept(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions)) - .directExecutor() - .build()); + new ChannelConnectionFactory() { + @Override + public Single create() { + ManagedChannel ch = + InProcessChannelBuilder.forName(fakeServerName) + .intercept(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions)) + .directExecutor() + .build(); + return Single.just(new ChannelConnection(ch)); + } + + @Override + public int maxConcurrency() { + return 100; + } + }); executor = new ExperimentalGrpcRemoteExecutor( diff --git a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java index 74594e7c0192ba..d775c8bdde3f55 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/GrpcCacheClientTest.java @@ -64,6 +64,7 @@ import com.google.devtools.build.lib.remote.Retrier.Backoff; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey; +import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.merkletree.MerkleTree; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; @@ -85,6 +86,7 @@ import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.ClientInterceptor; +import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Server; @@ -97,6 +99,7 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.util.MutableHandlerRegistry; +import io.reactivex.rxjava3.core.Single; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -220,11 +223,23 @@ private GrpcCacheClient newClient(RemoteOptions remoteOptions, Supplier backoffSupplier, RemoteRetrier.RETRIABLE_GRPC_ERRORS, retryService); ReferenceCountedChannel channel = new ReferenceCountedChannel( - InProcessChannelBuilder.forName(fakeServerName) - .directExecutor() - .intercept(new CallCredentialsInterceptor(creds)) - .intercept(TracingMetadataUtils.newCacheHeadersInterceptor(remoteOptions)) - .build()); + new ChannelConnectionFactory() { + @Override + public Single create() { + ManagedChannel ch = + InProcessChannelBuilder.forName(fakeServerName) + .directExecutor() + .intercept(new CallCredentialsInterceptor(creds)) + .intercept(TracingMetadataUtils.newCacheHeadersInterceptor(remoteOptions)) + .build(); + return Single.just(new ChannelConnection(ch)); + } + + @Override + public int maxConcurrency() { + return 100; + } + }); ByteStreamUploader uploader = new ByteStreamUploader( remoteOptions.remoteInstanceName, diff --git a/src/test/java/com/google/devtools/build/lib/remote/ReferenceCountedChannelPoolTest.java b/src/test/java/com/google/devtools/build/lib/remote/ReferenceCountedChannelPoolTest.java deleted file mode 100644 index e877a7ba9a2fa5..00000000000000 --- a/src/test/java/com/google/devtools/build/lib/remote/ReferenceCountedChannelPoolTest.java +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2020 The Bazel Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package com.google.devtools.build.lib.remote; - -import static com.google.common.truth.Truth.assertThat; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Tests for {@link com.google.devtools.build.lib.remote.ReferenceCountedChannelPool} */ -@RunWith(JUnit4.class) -public class ReferenceCountedChannelPoolTest { - @Test - public void getChannelIndexTest() { - int poolSize = Integer.MAX_VALUE; - int affinity = Integer.MIN_VALUE; - int index = ReferenceCountedChannelPool.getChannelIndex(poolSize, affinity); - assertThat(index >= 0).isTrue(); - assertThat(index < poolSize).isTrue(); - } -} diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteServerCapabilitiesTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteServerCapabilitiesTest.java index e490854a5c0e9f..4d771f347a6dca 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteServerCapabilitiesTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteServerCapabilitiesTest.java @@ -34,11 +34,13 @@ import com.google.devtools.build.lib.authandtls.GoogleAuthUtils; import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff; import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement; +import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.TestUtils; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.common.options.Options; import io.grpc.CallCredentials; +import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerCall; @@ -50,6 +52,7 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.util.MutableHandlerRegistry; +import io.reactivex.rxjava3.core.Single; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -148,10 +151,22 @@ public void getCapabilities( retryService); ReferenceCountedChannel channel = new ReferenceCountedChannel( - InProcessChannelBuilder.forName(fakeServerName) - .intercept(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions)) - .directExecutor() - .build()); + new ChannelConnectionFactory() { + @Override + public Single create() { + ManagedChannel ch = + InProcessChannelBuilder.forName(fakeServerName) + .intercept(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions)) + .directExecutor() + .build(); + return Single.just(new ChannelConnection(ch)); + } + + @Override + public int maxConcurrency() { + return 100; + } + }); RemoteServerCapabilities client = new RemoteServerCapabilities("instance", channel.retain(), null, 3, retrier); @@ -195,7 +210,19 @@ public void getCapabilities( retryService); ReferenceCountedChannel channel = new ReferenceCountedChannel( - InProcessChannelBuilder.forName(fakeServerName).directExecutor().build()); + new ChannelConnectionFactory() { + @Override + public Single create() { + ManagedChannel ch = + InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(); + return Single.just(new ChannelConnection(ch)); + } + + @Override + public int maxConcurrency() { + return 100; + } + }); CallCredentials creds = GoogleAuthUtils.newCallCredentials(Options.getDefaults(AuthAndTLSOptions.class)); RemoteServerCapabilities client = diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java index 28a9a8ee9c5d4c..ebb1f37c4b4b87 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteSpawnRunnerWithGrpcRemoteExecutorTest.java @@ -70,6 +70,7 @@ import com.google.devtools.build.lib.exec.ExecutionOptions; import com.google.devtools.build.lib.exec.util.FakeOwner; import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff; +import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.FakeSpawnExecutionContext; @@ -90,6 +91,7 @@ import com.google.rpc.PreconditionFailure; import com.google.rpc.PreconditionFailure.Violation; import io.grpc.BindableService; +import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Server; import io.grpc.ServerCall; @@ -101,6 +103,7 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.util.MutableHandlerRegistry; +import io.reactivex.rxjava3.core.Single; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; @@ -253,10 +256,23 @@ public PathFragment getExecPath() { retryService); ReferenceCountedChannel channel = new ReferenceCountedChannel( - InProcessChannelBuilder.forName(fakeServerName) - .intercept(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions)) - .directExecutor() - .build()); + new ChannelConnectionFactory() { + @Override + public Single create() { + ManagedChannel ch = + InProcessChannelBuilder.forName(fakeServerName) + .intercept(TracingMetadataUtils.newExecHeadersInterceptor(remoteOptions)) + .directExecutor() + .build(); + + return Single.just(new ChannelConnection(ch)); + } + + @Override + public int maxConcurrency() { + return 100; + } + }); GrpcRemoteExecutor executor = new GrpcRemoteExecutor(channel.retain(), CallCredentialsProvider.NO_CREDENTIALS, retrier); CallCredentialsProvider callCredentialsProvider = diff --git a/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD b/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD index a2687529bed175..9a580a71fbc8ab 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/downloader/BUILD @@ -27,6 +27,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/remote:Retrier", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/downloader", + "//src/main/java/com/google/devtools/build/lib/remote/grpc", "//src/main/java/com/google/devtools/build/lib/remote/options", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/util", @@ -41,6 +42,7 @@ java_test( "//third_party:guava", "//third_party:junit4", "//third_party:mockito", + "//third_party:rxjava3", "//third_party:truth", "//third_party/grpc:grpc-jar", "//third_party/protobuf:protobuf_java", diff --git a/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java index da7be03d003758..abd53e95ece91d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/downloader/GrpcRemoteDownloaderTest.java @@ -40,6 +40,7 @@ import com.google.devtools.build.lib.remote.RemoteRetrier.ExponentialBackoff; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; +import com.google.devtools.build.lib.remote.grpc.ChannelConnectionFactory; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.InMemoryCacheClient; @@ -51,11 +52,13 @@ import com.google.devtools.common.options.Options; import com.google.protobuf.ByteString; import io.grpc.CallCredentials; +import io.grpc.ManagedChannel; import io.grpc.Server; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.util.MutableHandlerRegistry; +import io.reactivex.rxjava3.core.Single; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -121,7 +124,19 @@ private GrpcRemoteDownloader newDownloader(RemoteCacheClient cacheClient) throws retryService); final ReferenceCountedChannel channel = new ReferenceCountedChannel( - InProcessChannelBuilder.forName(fakeServerName).directExecutor().build()); + new ChannelConnectionFactory() { + @Override + public Single create() { + ManagedChannel ch = + InProcessChannelBuilder.forName(fakeServerName).directExecutor().build(); + return Single.just(new ChannelConnection(ch)); + } + + @Override + public int maxConcurrency() { + return 100; + } + }); return new GrpcRemoteDownloader( "none", "none", diff --git a/src/test/shell/integration/minimal_jdk_test.sh b/src/test/shell/integration/minimal_jdk_test.sh index ba0c03d0def1de..4fb9fa67f5b06a 100755 --- a/src/test/shell/integration/minimal_jdk_test.sh +++ b/src/test/shell/integration/minimal_jdk_test.sh @@ -42,13 +42,13 @@ export BAZEL_SUFFIX="_jdk_minimal" source "$(rlocation "io_bazel/src/test/shell/integration_test_setup.sh")" \ || { echo "integration_test_setup.sh not found!" >&2; exit 1; } -# Bazel's install base is < 280MB with minimal JDK and > 300MB with an all +# Bazel's install base is < 290MB with minimal JDK and > 300MB with an all # modules JDK. -function test_size_less_than_280MB() { +function test_size_less_than_290MB() { bazel info ib=$(bazel info install_base) size=$(du -s "$ib" | cut -d\ -f1) - maxsize=$((1024*280)) + maxsize=$((1024*290)) if [ $size -gt $maxsize ]; then echo "$ib was too big:" 1>&2 du -a "$ib" 1>&2