Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: remove custom transport executor #2366

Merged
merged 3 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.api.core.InternalApi;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.GaxProperties;
import com.google.api.gax.grpc.GaxGrpcProperties;
import com.google.api.gax.grpc.GrpcCallContext;
Expand Down Expand Up @@ -185,7 +184,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -200,8 +198,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -211,53 +207,6 @@
/** Implementation of Cloud Spanner remote calls using Gapic libraries. */
@InternalApi
public class GapicSpannerRpc implements SpannerRpc {

/**
* {@link ExecutorProvider} that keeps track of the executors that are created and shuts these
* down when the {@link SpannerRpc} is closed.
*/
private static final class ManagedInstantiatingExecutorProvider implements ExecutorProvider {

// 4 Gapic clients * 4 channels per client.
private static final int DEFAULT_MIN_THREAD_COUNT = 16;
private final List<ScheduledExecutorService> executors = new LinkedList<>();
private final ThreadFactory threadFactory;

private ManagedInstantiatingExecutorProvider(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}

@Override
public boolean shouldAutoClose() {
return false;
}

@Override
public ScheduledExecutorService getExecutor() {
int numCpus = Runtime.getRuntime().availableProcessors();
int numThreads = Math.max(DEFAULT_MIN_THREAD_COUNT, numCpus);
ScheduledExecutorService executor =
new ScheduledThreadPoolExecutor(numThreads, threadFactory);
synchronized (this) {
executors.add(executor);
}
return executor;
}

/** Shuts down all executors that have been created by this {@link ExecutorProvider}. */
private synchronized void shutdown() {
for (ScheduledExecutorService executor : executors) {
executor.shutdown();
}
}

private void awaitTermination() throws InterruptedException {
for (ScheduledExecutorService executor : executors) {
executor.awaitTermination(10L, TimeUnit.SECONDS);
}
}
}

private static final PathTemplate PROJECT_NAME_TEMPLATE =
PathTemplate.create("projects/{project}");
private static final PathTemplate OPERATION_NAME_TEMPLATE =
Expand All @@ -277,7 +226,6 @@ private void awaitTermination() throws InterruptedException {
CLIENT_LIBRARY_LANGUAGE + "/" + GaxProperties.getLibraryVersion(GapicSpannerRpc.class);
private static final String API_FILE = "grpc-gcp-apiconfig.json";

private final ManagedInstantiatingExecutorProvider executorProvider;
private boolean rpcIsClosed;
private final SpannerStub spannerStub;
private final SpannerStub partitionedDmlStub;
Expand Down Expand Up @@ -356,13 +304,6 @@ public GapicSpannerRpc(final SpannerOptions options) {
this.compressorName = options.getCompressorName();

if (initializeStubs) {
// Create a managed executor provider.
this.executorProvider =
new ManagedInstantiatingExecutorProvider(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(options.getTransportChannelExecutorThreadNameFormat())
.build());
// First check if SpannerOptions provides a TransportChannelProvider. Create one
// with information gathered from SpannerOptions if none is provided
InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder =
Expand All @@ -373,11 +314,6 @@ public GapicSpannerRpc(final SpannerOptions options) {
.setMaxInboundMetadataSize(MAX_METADATA_SIZE)
.setPoolSize(options.getNumChannels())

// Before updating this method to setExecutor, please verify with a code owner on
// the lowest version of gax-grpc that needs to be supported. Currently v1.47.17,
// which doesn't support the setExecutor variant.
.setExecutorProvider(executorProvider)

// Set a keepalive time of 120 seconds to help long running
// commit GRPC calls succeed
.setKeepAliveTime(Duration.ofSeconds(GRPC_KEEPALIVE_SECONDS))
Expand Down Expand Up @@ -536,7 +472,6 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
this.databaseAdminStubSettings = null;
this.spannerWatchdog = null;
this.partitionedDmlRetrySettings = null;
this.executorProvider = null;
}
}

Expand Down Expand Up @@ -1932,15 +1867,13 @@ public void shutdown() {
this.instanceAdminStub.close();
this.databaseAdminStub.close();
this.spannerWatchdog.shutdown();
this.executorProvider.shutdown();

try {
this.spannerStub.awaitTermination(10L, TimeUnit.SECONDS);
this.partitionedDmlStub.awaitTermination(10L, TimeUnit.SECONDS);
this.instanceAdminStub.awaitTermination(10L, TimeUnit.SECONDS);
this.databaseAdminStub.awaitTermination(10L, TimeUnit.SECONDS);
this.spannerWatchdog.awaitTermination(10L, TimeUnit.SECONDS);
this.executorProvider.awaitTermination();
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
Expand All @@ -1954,7 +1887,6 @@ public void shutdownNow() {
this.instanceAdminStub.close();
this.databaseAdminStub.close();
this.spannerWatchdog.shutdown();
this.executorProvider.shutdown();

this.spannerStub.shutdownNow();
this.partitionedDmlStub.shutdownNow();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ public ScheduledExecutorService get() {

@Override
public void release(ScheduledExecutorService executor) {
executor.shutdown();
try {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleans up errors that are being logged during tests, clogging up the test log.

executor.shutdown();
} catch (Throwable ignore) {
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -923,14 +923,22 @@ public void testCustomAsyncExecutorProvider() {
@Test
public void testDefaultNumChannelsWithGrpcGcpExtensionEnabled() {
SpannerOptions options =
SpannerOptions.newBuilder().setProjectId("test-project").enableGrpcGcpExtension().build();
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleans up errors that are being logged during tests, clogging up the test log.

.enableGrpcGcpExtension()
.build();

assertEquals(SpannerOptions.GRPC_GCP_ENABLED_DEFAULT_CHANNELS, options.getNumChannels());
}

@Test
public void testDefaultNumChannelsWithGrpcGcpExtensionDisabled() {
SpannerOptions options = SpannerOptions.newBuilder().setProjectId("test-project").build();
SpannerOptions options =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleans up errors that are being logged during tests, clogging up the test log.

.build();

assertEquals(SpannerOptions.DEFAULT_CHANNELS, options.getNumChannels());
}
Expand All @@ -943,6 +951,7 @@ public void testNumChannelsWithGrpcGcpExtensionEnabled() {
SpannerOptions options1 =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleans up errors that are being logged during tests, clogging up the test log.

.setNumChannels(numChannels)
.enableGrpcGcpExtension()
.build();
Expand All @@ -954,6 +963,7 @@ public void testNumChannelsWithGrpcGcpExtensionEnabled() {
SpannerOptions options2 =
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleans up errors that are being logged during tests, clogging up the test log.

.enableGrpcGcpExtension()
.setNumChannels(numChannels)
.build();
Expand All @@ -972,12 +982,19 @@ public void checkCreatedInstanceWhenGrpcGcpExtensionDisabled() {
Spanner spanner2 = options1.getService();

assertNotSame(spanner1, spanner2);

spanner1.close();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleans up errors that are being logged during tests, clogging up the test log.

spanner2.close();
}

@Test
public void checkCreatedInstanceWhenGrpcGcpExtensionEnabled() {
SpannerOptions options =
SpannerOptions.newBuilder().setProjectId("test-project").enableGrpcGcpExtension().build();
SpannerOptions.newBuilder()
.setProjectId("test-project")
.setCredentials(NoCredentials.getInstance())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleans up errors that are being logged during tests, clogging up the test log.

.enableGrpcGcpExtension()
.build();
SpannerOptions options1 = options.toBuilder().build();
assertEquals(true, options.isGrpcGcpExtensionEnabled());
assertEquals(options.isGrpcGcpExtensionEnabled(), options1.isGrpcGcpExtensionEnabled());
Expand All @@ -986,5 +1003,8 @@ public void checkCreatedInstanceWhenGrpcGcpExtensionEnabled() {
Spanner spanner2 = options1.getService();

assertNotSame(spanner1, spanner2);

spanner1.close();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleans up errors that are being logged during tests, clogging up the test log.

spanner2.close();
}
}
Loading