diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 7f325665542..eb8633d0449 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -23,7 +23,6 @@ import com.google.api.core.InternalApi; import com.google.api.core.NanoClock; import com.google.api.gax.core.CredentialsProvider; -import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.GaxProperties; import com.google.api.gax.grpc.GaxGrpcProperties; import com.google.api.gax.grpc.GrpcCallContext; @@ -185,7 +184,6 @@ import java.nio.charset.StandardCharsets; import java.util.Comparator; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -200,8 +198,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -211,53 +207,6 @@ /** Implementation of Cloud Spanner remote calls using Gapic libraries. */ @InternalApi public class GapicSpannerRpc implements SpannerRpc { - - /** - * {@link ExecutorProvider} that keeps track of the executors that are created and shuts these - * down when the {@link SpannerRpc} is closed. - */ - private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider { - - // 4 Gapic clients * 4 channels per client. - private static final int DEFAULT_MIN_THREAD_COUNT = 16; - private final List executors = new LinkedList<>(); - private final ThreadFactory threadFactory; - - private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) { - this.threadFactory = threadFactory; - } - - @Override - public boolean shouldAutoClose() { - return false; - } - - @Override - public ScheduledExecutorService getExecutor() { - int numCpus = Runtime.getRuntime().availableProcessors(); - int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus); - ScheduledExecutorService executor = - new ScheduledThreadPoolExecutor(numThreads, threadFactory); - synchronized (this) { - executors.add(executor); - } - return executor; - } - - /** Shuts down all executors that have been created by this {@link ExecutorProvider}. */ - private synchronized void shutdown() { - for (ScheduledExecutorService executor : executors) { - executor.shutdown(); - } - } - - private void awaitTermination() throws InterruptedException { - for (ScheduledExecutorService executor : executors) { - executor.awaitTermination(10L, TimeUnit.SECONDS); - } - } - } - private static final PathTemplate PROJECT_NAME_TEMPLATE = PathTemplate.create("projects/{project}"); private static final PathTemplate OPERATION_NAME_TEMPLATE = @@ -277,7 +226,6 @@ private void awaitTermination() throws InterruptedException { CLIENT_LIBRARY_LANGUAGE + "/" + GaxProperties.getLibraryVersion(GapicSpannerRpc.class); private static final String API_FILE = "grpc-gcp-apiconfig.json"; - private final ManagedInstantiatingExecutorProvider executorProvider; private boolean rpcIsClosed; private final SpannerStub spannerStub; private final SpannerStub partitionedDmlStub; @@ -356,13 +304,6 @@ public GapicSpannerRpc(final SpannerOptions options) { this.compressorName = options.getCompressorName(); if (initializeStubs) { - // Create a managed executor provider. - this.executorProvider = - new ManagedInstantiatingExecutorProvider( - new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat(options.getTransportChannelExecutorThreadNameFormat()) - .build()); // First check if SpannerOptions provides a TransportChannelProvider. Create one // with information gathered from SpannerOptions if none is provided InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder = @@ -373,11 +314,6 @@ public GapicSpannerRpc(final SpannerOptions options) { .setMaxInboundMetadataSize(MAX_METADATA_SIZE) .setPoolSize(options.getNumChannels()) - // Before updating this method to setExecutor, please verify with a code owner on - // the lowest version of gax-grpc that needs to be supported. Currently v1.47.17, - // which doesn't support the setExecutor variant. - .setExecutorProvider(executorProvider) - // Set a keepalive time of 120 seconds to help long running // commit GRPC calls succeed .setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS)) @@ -536,7 +472,6 @@ public UnaryCallable createUnaryCalla this.databaseAdminStubSettings = null; this.spannerWatchdog = null; this.partitionedDmlRetrySettings = null; - this.executorProvider = null; } } @@ -1932,7 +1867,6 @@ public void shutdown() { this.instanceAdminStub.close(); this.databaseAdminStub.close(); this.spannerWatchdog.shutdown(); - this.executorProvider.shutdown(); try { this.spannerStub.awaitTermination(10L, TimeUnit.SECONDS); @@ -1940,7 +1874,6 @@ public void shutdown() { this.instanceAdminStub.awaitTermination(10L, TimeUnit.SECONDS); this.databaseAdminStub.awaitTermination(10L, TimeUnit.SECONDS); this.spannerWatchdog.awaitTermination(10L, TimeUnit.SECONDS); - this.executorProvider.awaitTermination(); } catch (InterruptedException e) { throw SpannerExceptionFactory.propagateInterrupt(e); } @@ -1954,7 +1887,6 @@ public void shutdownNow() { this.instanceAdminStub.close(); this.databaseAdminStub.close(); this.spannerWatchdog.shutdown(); - this.executorProvider.shutdown(); this.spannerStub.shutdownNow(); this.partitionedDmlStub.shutdownNow(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index 6aaf28a5fc4..3a595358fea 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -54,7 +54,10 @@ public ScheduledExecutorService get() { @Override public void release(ScheduledExecutorService executor) { - executor.shutdown(); + try { + executor.shutdown(); + } catch (Throwable ignore) { + } } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 55a15809a48..8819dab462b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -923,14 +923,22 @@ public void testCustomAsyncExecutorProvider() { @Test public void testDefaultNumChannelsWithGrpcGcpExtensionEnabled() { SpannerOptions options = - SpannerOptions.newBuilder().setProjectId("test-project").enableGrpcGcpExtension().build(); + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .enableGrpcGcpExtension() + .build(); assertEquals(SpannerOptions.GRPC_GCP_ENABLED_DEFAULT_CHANNELS, options.getNumChannels()); } @Test public void testDefaultNumChannelsWithGrpcGcpExtensionDisabled() { - SpannerOptions options = SpannerOptions.newBuilder().setProjectId("test-project").build(); + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .build(); assertEquals(SpannerOptions.DEFAULT_CHANNELS, options.getNumChannels()); } @@ -943,6 +951,7 @@ public void testNumChannelsWithGrpcGcpExtensionEnabled() { SpannerOptions options1 = SpannerOptions.newBuilder() .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) .setNumChannels(numChannels) .enableGrpcGcpExtension() .build(); @@ -954,6 +963,7 @@ public void testNumChannelsWithGrpcGcpExtensionEnabled() { SpannerOptions options2 = SpannerOptions.newBuilder() .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) .enableGrpcGcpExtension() .setNumChannels(numChannels) .build(); @@ -972,12 +982,19 @@ public void checkCreatedInstanceWhenGrpcGcpExtensionDisabled() { Spanner spanner2 = options1.getService(); assertNotSame(spanner1, spanner2); + + spanner1.close(); + spanner2.close(); } @Test public void checkCreatedInstanceWhenGrpcGcpExtensionEnabled() { SpannerOptions options = - SpannerOptions.newBuilder().setProjectId("test-project").enableGrpcGcpExtension().build(); + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .enableGrpcGcpExtension() + .build(); SpannerOptions options1 = options.toBuilder().build(); assertEquals(true, options.isGrpcGcpExtensionEnabled()); assertEquals(options.isGrpcGcpExtensionEnabled(), options1.isGrpcGcpExtensionEnabled()); @@ -986,5 +1003,8 @@ public void checkCreatedInstanceWhenGrpcGcpExtensionEnabled() { Spanner spanner2 = options1.getService(); assertNotSame(spanner1, spanner2); + + spanner1.close(); + spanner2.close(); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java deleted file mode 100644 index 91919b15234..00000000000 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsThreadTest.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.spanner; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.cloud.NoCredentials; -import com.google.cloud.spanner.connection.AbstractMockServerTest; -import com.google.common.base.Stopwatch; -import com.google.spanner.admin.database.v1.ListDatabasesResponse; -import com.google.spanner.admin.instance.v1.ListInstancesResponse; -import io.grpc.ManagedChannelBuilder; -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public class SpannerOptionsThreadTest extends AbstractMockServerTest { - private static final int NUMBER_OF_TEST_RUNS = 2; - private static final int DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT = 4; - private static final int NUM_GAPIC_CLIENTS = 4; - private static final int NUM_THREADS = - Math.max( - DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * NUM_GAPIC_CLIENTS, - Runtime.getRuntime().availableProcessors()); - private static final String SPANNER_THREAD_NAME = "Cloud-Spanner-TransportChannel"; - private static final String THREAD_PATTERN = "%s-[0-9]+"; - - private final DatabaseId dbId = DatabaseId.of("p", "i", "d"); - - private SpannerOptions createOptions() { - return SpannerOptions.newBuilder() - .setProjectId("p") - // Set a custom channel configurator to allow http instead of https. - .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) - .setHost("http://localhost:" + getPort()) - .setCredentials(NoCredentials.getInstance()) - .build(); - } - - @Test - public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException { - int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME); - for (int i = 0; i < NUMBER_OF_TEST_RUNS; i++) { - waitForStartup(); - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)).isAtMost(baseThreadCount); - // Create Spanner instance. - // We make a copy of the options instance, as SpannerOptions caches any service object - // that has been handed out. - SpannerOptions options = createOptions(); - Spanner spanner = options.getService(); - // Get a database client and do a query. This should initiate threads for the Spanner service. - DatabaseClient client = spanner.getDatabaseClient(dbId); - List resultSets = new ArrayList<>(); - // SpannerStub affiliates a channel with a session, so we need to use multiple sessions - // to ensure we also hit multiple channels. - for (int i2 = 0; i2 < options.getSessionPoolOptions().getMaxSessions(); i2++) { - ResultSet rs = client.singleUse().executeQuery(SELECT_COUNT_STATEMENT); - // Execute ResultSet#next() to send the query to Spanner. - rs.next(); - // Delay closing the result set in order to force the use of multiple sessions. - // As each session is linked to one transport channel, using multiple different - // sessions should initialize multiple transport channels. - resultSets.add(rs); - // Check whether the number of expected threads has been reached. - if (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) == NUM_THREADS + baseThreadCount) { - break; - } - } - for (ResultSet rs : resultSets) { - rs.close(); - } - // Check the number of threads after the query. Doing a request should initialize a thread - // pool for the underlying SpannerClient. - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)) - .isEqualTo(NUM_THREADS + baseThreadCount); - - // Then do a request to the InstanceAdmin service and check the number of threads. - // Doing a request should initialize a thread pool for the underlying InstanceAdminClient. - for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) { - InstanceAdminClient instanceAdminClient = spanner.getInstanceAdminClient(); - mockInstanceAdmin.addResponse(ListInstancesResponse.getDefaultInstance()); - instanceAdminClient.listInstances(); - } - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)) - .isEqualTo(NUM_THREADS + baseThreadCount); - - // Then do a request to the DatabaseAdmin service and check the number of threads. - // Doing a request should initialize a thread pool for the underlying DatabaseAdminClient. - for (int i2 = 0; i2 < DEFAULT_NUM_CHANNELS_PER_GAPIC_CLIENT * 2; i2++) { - DatabaseAdminClient databaseAdminClient = spanner.getDatabaseAdminClient(); - mockDatabaseAdmin.addResponse(ListDatabasesResponse.getDefaultInstance()); - databaseAdminClient.listDatabases(dbId.getInstanceId().getInstance()); - } - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)) - .isEqualTo(NUM_THREADS + baseThreadCount); - - // Now close the Spanner instance and check whether the threads are shutdown or not. - spanner.close(); - // Wait a little to allow the threads to actually shutdown. - Stopwatch watch = Stopwatch.createStarted(); - while (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) > baseThreadCount - && watch.elapsed(TimeUnit.SECONDS) < 2) { - Thread.sleep(50L); - } - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)).isAtMost(baseThreadCount); - } - } - - @Test - public void testMultipleSpannersFromSameSpannerOptions() throws InterruptedException { - waitForStartup(); - int baseThreadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME); - SpannerOptions options = createOptions(); - try (Spanner spanner1 = options.getService()) { - // Having both in the try-with-resources block is not possible, as it is the same instance. - // One will be closed before the other, and the closing of the second instance would fail. - Spanner spanner2 = options.getService(); - assertThat(spanner1).isSameInstanceAs(spanner2); - DatabaseClient client1 = spanner1.getDatabaseClient(dbId); - DatabaseClient client2 = spanner2.getDatabaseClient(dbId); - assertThat(client1).isSameInstanceAs(client2); - try (ResultSet rs1 = client1.singleUse().executeQuery(SELECT_COUNT_STATEMENT); - ResultSet rs2 = client2.singleUse().executeQuery(SELECT_COUNT_STATEMENT)) { - while (rs1.next() && rs2.next()) { - // Do nothing, just consume the result sets. - } - } - } - Stopwatch watch = Stopwatch.createStarted(); - while (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) > baseThreadCount - && watch.elapsed(TimeUnit.SECONDS) < 2) { - Thread.sleep(50L); - } - assertThat(getNumberOfThreadsWithName(SPANNER_THREAD_NAME)).isAtMost(baseThreadCount); - } - - private void waitForStartup() throws InterruptedException { - // Wait until the IT environment has already started all base worker threads. - int threadCount; - Stopwatch watch = Stopwatch.createStarted(); - do { - threadCount = getNumberOfThreadsWithName(SPANNER_THREAD_NAME); - Thread.sleep(100L); - } while (getNumberOfThreadsWithName(SPANNER_THREAD_NAME) > threadCount - && watch.elapsed(TimeUnit.SECONDS) < 5); - } - - private int getNumberOfThreadsWithName(String serviceName) { - Pattern pattern = Pattern.compile(String.format(THREAD_PATTERN, serviceName)); - Set threadSet = Thread.getAllStackTraces().keySet(); - int res = 0; - for (Thread thread : threadSet) { - if (pattern.matcher(thread.getName()).matches()) { - res++; - } - } - return res; - } -} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerThreadsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerThreadsTest.java index 9789168776f..919eedf6071 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerThreadsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerThreadsTest.java @@ -16,7 +16,6 @@ package com.google.cloud.spanner; -import static com.google.common.truth.Truth.assertThat; import static org.hamcrest.CoreMatchers.*; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; @@ -185,6 +184,7 @@ public void testCloseAllThreadsWhenClosingSpanner() throws InterruptedException while (getNumberOfThreadsWithName(threadName, false, initialNumberOfThreads) > initialNumberOfThreads && watch.elapsed(TimeUnit.SECONDS) < 2) { + //noinspection BusyWait Thread.sleep(10L); } assertThat( @@ -241,6 +241,7 @@ && getNumberOfThreadsWithName(threadName, false, initialNumberOfThreads) while (getNumberOfThreadsWithName(threadName, false, initialNumberOfThreads) > initialNumberOfThreads && watch.elapsed(TimeUnit.SECONDS) < 5) { + //noinspection BusyWait Thread.sleep(10L); } assertEquals( @@ -283,7 +284,7 @@ private int getNumberOfThreadsWithName(String serviceName, boolean dumpStack, in } } if (dumpStack && res > expected) { - found.stream().forEach(t -> dumpThread(t)); + found.forEach(this::dumpThread); } return res; }