From 53e42b3d65f234b33474be517d14980c31f5c8ef Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Mon, 9 Dec 2024 21:11:11 +0530 Subject: [PATCH] feat(spanner): support multiplexed session for Partitioned operations (#3231) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(spanner): support multiplexed session for Partitioned read or query. * chore(spanner): lint fixes * feat(spanner): support multiplexed session for Partitioned DML operations. * lint(spanner): javadoc fixes. * feat(spanner): Updated unit tests of Partitioned operations for Multiplexed Session. * feat(spanner): Updated unit tests of Partitioned operations for Multiplexed Session. * lint(spanner): Apply suggestions from code review Co-authored-by: Knut Olav Løite * lint(spanner): Apply suggestions from code review Co-authored-by: Knut Olav Løite * feat(spanner): Modified BatchClientImpl to store multiplexed session and create fresh session after expiration date. * feat(spanner): Removed env variable for Partitioned Ops ensuring that Multiplexed Session for Partitioned Ops is not available to customers. * lint(spanner): Removed unused variables. --------- Co-authored-by: Knut Olav Løite --- .github/workflows/ci.yaml | 2 + ...tegration-multiplexed-sessions-enabled.cfg | 5 ++ .../spanner/SessionPoolOptionsHelper.java | 8 +++ ...tractMultiplexedSessionDatabaseClient.java | 6 -- .../google/cloud/spanner/BatchClientImpl.java | 57 +++++++++++++++++- .../cloud/spanner/DatabaseClientImpl.java | 13 ++++ .../DelayedMultiplexedSessionTransaction.java | 13 ++++ .../MultiplexedSessionDatabaseClient.java | 7 +++ .../google/cloud/spanner/SessionClient.java | 45 ++++++-------- .../cloud/spanner/SessionPoolOptions.java | 59 ++++++++++++++++--- .../com/google/cloud/spanner/SpannerImpl.java | 27 ++++++++- .../cloud/spanner/BatchClientImplTest.java | 29 +++++++-- .../cloud/spanner/BatchTransactionIdTest.java | 3 + .../IntegrationTestWithClosedSessionsEnv.java | 1 + ...edSessionDatabaseClientMockServerTest.java | 1 + .../RetryOnInvalidatedSessionTest.java | 3 + 16 files changed, 230 insertions(+), 49 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7eca4c6d5f0..ee28d7f8a66 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -53,6 +53,7 @@ jobs: env: JOB_TYPE: test GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true + GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: true units-java8: # Building using Java 17 and run the tests with Java 8 runtime name: "units (8)" @@ -92,6 +93,7 @@ jobs: env: JOB_TYPE: test GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true + GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: true windows: runs-on: windows-latest steps: diff --git a/.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg b/.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg index 771405de422..49edd2e8df6 100644 --- a/.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg +++ b/.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg @@ -36,3 +36,8 @@ env_vars: { key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS" value: "true" } + +env_vars: { + key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS" + value: "true" +} diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java index 8e085947711..f19cb8f4a2f 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java @@ -44,4 +44,12 @@ public static SessionPoolOptions.Builder setUseMultiplexedSessionForRW( SessionPoolOptions.Builder sessionPoolOptionsBuilder, boolean useMultiplexedSessionForRW) { return sessionPoolOptionsBuilder.setUseMultiplexedSessionForRW(useMultiplexedSessionForRW); } + + // TODO: Remove when multiplexed session for partitioned operations are released. + public static SessionPoolOptions.Builder setUseMultiplexedSessionForPartitionedOperations( + SessionPoolOptions.Builder sessionPoolOptionsBuilder, + boolean useMultiplexedSessionForPartitionedOps) { + return sessionPoolOptionsBuilder.setUseMultiplexedSessionPartitionedOps( + useMultiplexedSessionForPartitionedOps); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java index ebfb0e0a774..10ab997d88a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java @@ -19,7 +19,6 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.TransactionOption; -import com.google.cloud.spanner.Options.UpdateOption; import com.google.spanner.v1.BatchWriteResponse; /** @@ -51,9 +50,4 @@ public ServerStream batchWriteAtLeastOnce( throws SpannerException { throw new UnsupportedOperationException(); } - - @Override - public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { - throw new UnsupportedOperationException(); - } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java index 3d886dd383b..a250fd5ba39 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java @@ -30,16 +30,50 @@ import com.google.spanner.v1.PartitionReadRequest; import com.google.spanner.v1.PartitionResponse; import com.google.spanner.v1.TransactionSelector; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; /** Default implementation for Batch Client interface. */ public class BatchClientImpl implements BatchClient { private final SessionClient sessionClient; - BatchClientImpl(SessionClient sessionClient) { + private final boolean isMultiplexedSessionEnabled; + + /** Lock to protect the multiplexed session. */ + private final ReentrantLock multiplexedSessionLock = new ReentrantLock(); + + /** The duration before we try to replace the multiplexed session. The default is 7 days. */ + private final Duration sessionExpirationDuration; + + /** The expiration date/time of the current multiplexed session. */ + @GuardedBy("multiplexedSessionLock") + private final AtomicReference expirationDate; + + @GuardedBy("multiplexedSessionLock") + private final AtomicReference multiplexedSessionReference; + + BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) { this.sessionClient = checkNotNull(sessionClient); + this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled; + this.sessionExpirationDuration = + Duration.ofMillis( + sessionClient + .getSpanner() + .getOptions() + .getSessionPoolOptions() + .getMultiplexedSessionMaintenanceDuration() + .toMillis()); + // Initialize the expiration date to the start of time to avoid unnecessary null checks. + // This also ensured that a new session is created on first request. + this.expirationDate = new AtomicReference<>(Instant.MIN); + this.multiplexedSessionReference = new AtomicReference<>(); } @Override @@ -50,7 +84,12 @@ public String getDatabaseRole() { @Override public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) { - SessionImpl session = sessionClient.createSession(); + SessionImpl session; + if (isMultiplexedSessionEnabled) { + session = getMultiplexedSession(); + } else { + session = sessionClient.createSession(); + } return new BatchReadOnlyTransactionImpl( MultiUseReadOnlyTransaction.newBuilder() .setSession(session) @@ -92,6 +131,20 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc batchTransactionId); } + private SessionImpl getMultiplexedSession() { + this.multiplexedSessionLock.lock(); + try { + if (Clock.systemUTC().instant().isAfter(this.expirationDate.get()) + || this.multiplexedSessionReference.get() == null) { + this.multiplexedSessionReference.set(this.sessionClient.createMultiplexedSession()); + this.expirationDate.set(Clock.systemUTC().instant().plus(this.sessionExpirationDuration)); + } + return this.multiplexedSessionReference.get(); + } finally { + this.multiplexedSessionLock.unlock(); + } + } + private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction implements BatchReadOnlyTransaction { private final String sessionName; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index d7f16f89524..f571354dacb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -36,6 +36,7 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting final String clientId; @VisibleForTesting final SessionPool pool; @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; + @VisibleForTesting final boolean useMultiplexedSessionPartitionedOps; @VisibleForTesting final boolean useMultiplexedSessionForRW; final boolean useMultiplexedSessionBlindWrite; @@ -47,6 +48,7 @@ class DatabaseClientImpl implements DatabaseClient { pool, /* useMultiplexedSessionBlindWrite = */ false, /* multiplexedSessionDatabaseClient = */ null, + /* useMultiplexedSessionPartitionedOps= */ false, tracer, /* useMultiplexedSessionForRW = */ false); } @@ -58,6 +60,7 @@ class DatabaseClientImpl implements DatabaseClient { pool, /* useMultiplexedSessionBlindWrite = */ false, /* multiplexedSessionDatabaseClient = */ null, + /* useMultiplexedSessionPartitionedOps= */ false, tracer, /* useMultiplexedSessionForRW = */ false); } @@ -67,12 +70,14 @@ class DatabaseClientImpl implements DatabaseClient { SessionPool pool, boolean useMultiplexedSessionBlindWrite, @Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient, + boolean useMultiplexedSessionPartitionedOps, TraceWrapper tracer, boolean useMultiplexedSessionForRW) { this.clientId = clientId; this.pool = pool; this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite; this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient; + this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps; this.tracer = tracer; this.useMultiplexedSessionForRW = useMultiplexedSessionForRW; } @@ -309,6 +314,14 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti @Override public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) { + if (useMultiplexedSessionPartitionedOps) { + return getMultiplexedSession().executePartitionedUpdate(stmt, options); + } + return executePartitionedUpdateWithPooledSession(stmt, options); + } + + private long executePartitionedUpdateWithPooledSession( + final Statement stmt, final UpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); try (IScope s = tracer.withSpan(span)) { return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index ad3e6b0cf70..0193805cbeb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -24,6 +24,7 @@ import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction; import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction; import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.ExecutionException; @@ -224,4 +225,16 @@ private SessionReference getSessionReference() { throw SpannerExceptionFactory.propagateInterrupt(interruptedException); } } + + /** + * Execute `stmt` within PARTITIONED_DML transaction using multiplexed session. This method is a + * blocking call as the interface expects to return the output of the `stmt`. + */ + @Override + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + SessionReference sessionReference = getSessionReference(); + return new MultiplexedSessionTransaction( + client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ true) + .executePartitionedUpdate(stmt, options); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index bd709adbd99..01f41a2dfdc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -24,6 +24,7 @@ import com.google.api.core.SettableApiFuture; import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.common.annotations.VisibleForTesting; @@ -553,6 +554,12 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti .transactionManagerAsync(options); } + @Override + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + return createMultiplexedSessionTransaction(/* singleUse = */ true) + .executePartitionedUpdate(stmt, options); + } + /** * It is enough with one executor to maintain the multiplexed sessions in all the clients, as they * do not need to be updated often, and the maintenance task is light. The core pool size is set diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 0eed13b018c..a3cbbf33826 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -237,6 +237,19 @@ SessionImpl createSession() { * @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available. */ void createMultiplexedSession(SessionConsumer consumer) { + try { + SessionImpl sessionImpl = createMultiplexedSession(); + consumer.onSessionReady(sessionImpl); + } catch (Throwable t) { + consumer.onSessionCreateFailure(t, 1); + } + } + + /** + * Creates a multiplexed session and returns it. A multiplexed session is not affiliated with any + * GRPC channel. In case of an error during the gRPC calls, an exception will be thrown. + */ + SessionImpl createMultiplexedSession() { ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION); try (IScope s = spanner.getTracer().withSpan(span)) { com.google.spanner.v1.Session session = @@ -253,10 +266,12 @@ void createMultiplexedSession(SessionConsumer consumer) { spanner, new SessionReference( session.getName(), session.getCreateTime(), session.getMultiplexed(), null)); - consumer.onSessionReady(sessionImpl); + span.addAnnotation( + String.format("Request for %d multiplexed session returned %d session", 1, 1)); + return sessionImpl; } catch (Throwable t) { span.setStatus(t); - consumer.onSessionCreateFailure(t, 1); + throw t; } finally { span.end(); } @@ -289,31 +304,7 @@ private CreateMultiplexedSessionsRunnable(SessionConsumer consumer) { @Override public void run() { - ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION); - try (IScope s = spanner.getTracer().withSpan(span)) { - com.google.spanner.v1.Session session = - spanner - .getRpc() - .createSession( - db.getName(), - spanner.getOptions().getDatabaseRole(), - spanner.getOptions().getSessionLabels(), - null, - true); - SessionImpl sessionImpl = - new SessionImpl( - spanner, - new SessionReference( - session.getName(), session.getCreateTime(), session.getMultiplexed(), null)); - span.addAnnotation( - String.format("Request for %d multiplexed session returned %d session", 1, 1)); - consumer.onSessionReady(sessionImpl); - } catch (Throwable t) { - span.setStatus(t); - consumer.onSessionCreateFailure(t, 1); - } finally { - span.end(); - } + createMultiplexedSession(consumer); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index d4f3e598b11..a691f14817f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -85,6 +85,9 @@ public class SessionPoolOptions { private final boolean useMultiplexedSessionForRW; + private final boolean useMultiplexedSessionForPartitionedOps; + + // TODO: Change to use java.time.Duration. private final Duration multiplexedSessionMaintenanceDuration; private SessionPoolOptions(Builder builder) { @@ -127,6 +130,14 @@ private SessionPoolOptions(Builder builder) { (useMultiplexedSessionForRWFromEnvVariable != null) ? useMultiplexedSessionForRWFromEnvVariable : builder.useMultiplexedSessionForRW; + // useMultiplexedSessionPartitionedOps priority => Environment var > private setter > client + // default + Boolean useMultiplexedSessionFromEnvVariablePartitionedOps = + getUseMultiplexedSessionFromEnvVariablePartitionedOps(); + this.useMultiplexedSessionForPartitionedOps = + (useMultiplexedSessionFromEnvVariablePartitionedOps != null) + ? useMultiplexedSessionFromEnvVariablePartitionedOps + : builder.useMultiplexedSessionPartitionedOps; this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration; } @@ -349,17 +360,31 @@ public boolean getUseMultiplexedSessionForRW() { return getUseMultiplexedSession() && useMultiplexedSessionForRW; } + @VisibleForTesting + @InternalApi + public boolean getUseMultiplexedSessionPartitionedOps() { + return useMultiplexedSessionForPartitionedOps; + } + private static Boolean getUseMultiplexedSessionFromEnvVariable() { - String useMultiplexedSessionFromEnvVariable = - System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"); - if (useMultiplexedSessionFromEnvVariable != null - && useMultiplexedSessionFromEnvVariable.length() > 0) { - if ("true".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable) - || "false".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable)) { - return Boolean.parseBoolean(useMultiplexedSessionFromEnvVariable); + return parseBooleanEnvVariable("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"); + } + + @VisibleForTesting + @InternalApi + protected static Boolean getUseMultiplexedSessionFromEnvVariablePartitionedOps() { + // Checks the value of env, GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS + // This returns null until Partitioned Operations is supported. + return null; + } + + private static Boolean parseBooleanEnvVariable(String variableName) { + String envVariable = System.getenv(variableName); + if (envVariable != null && envVariable.length() > 0) { + if ("true".equalsIgnoreCase(envVariable) || "false".equalsIgnoreCase(envVariable)) { + return Boolean.parseBoolean(envVariable); } else { - throw new IllegalArgumentException( - "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS should be either true or false."); + throw new IllegalArgumentException(variableName + " should be either true or false."); } } return null; @@ -585,6 +610,12 @@ public static class Builder { // default. private boolean useMultiplexedSessionForRW = false; + // This field controls the default behavior of session management for Partitioned operations in + // Java client. + // Set useMultiplexedSessionPartitionedOps to true to make multiplexed session for Partitioned + // operations the default. + private boolean useMultiplexedSessionPartitionedOps = false; + private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7); private Clock poolMaintainerClock = Clock.INSTANCE; @@ -628,6 +659,7 @@ private Builder(SessionPoolOptions options) { this.useMultiplexedSession = options.useMultiplexedSession; this.useMultiplexedSessionBlindWrite = options.useMultiplexedSessionBlindWrite; this.useMultiplexedSessionForRW = options.useMultiplexedSessionForRW; + this.useMultiplexedSessionPartitionedOps = options.useMultiplexedSessionForPartitionedOps; this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration; this.poolMaintainerClock = options.poolMaintainerClock; } @@ -847,6 +879,15 @@ Builder setUseMultiplexedSessionForRW(boolean useMultiplexedSessionForRW) { return this; } + /** + * Sets whether the client should use multiplexed session for Partitioned operations or not. + * This method is intentionally package-private and intended for internal use. + */ + Builder setUseMultiplexedSessionPartitionedOps(boolean useMultiplexedSessionPartitionedOps) { + this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps; + return this; + } + @VisibleForTesting Builder setMultiplexedSessionMaintenanceDuration( Duration multiplexedSessionMaintenanceDuration) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 1348d586e3a..ed815c77088 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -107,6 +108,11 @@ private static String nextDatabaseClientId(DatabaseId databaseId) { @GuardedBy("this") private final Map dbClients = new HashMap<>(); + @GuardedBy("dbBatchClientLock") + private final Map dbBatchClients = new HashMap<>(); + + private final ReentrantLock dbBatchClientLock = new ReentrantLock(); + private final CloseableExecutorProvider asyncExecutorProvider; @GuardedBy("this") @@ -308,6 +314,7 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { pool, getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(), multiplexedSessionDatabaseClient, + getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps(), useMultiplexedSessionForRW); dbClients.put(db, dbClient); return dbClient; @@ -321,19 +328,37 @@ DatabaseClientImpl createDatabaseClient( SessionPool pool, boolean useMultiplexedSessionBlindWrite, @Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient, + boolean useMultiplexedSessionPartitionedOps, boolean useMultiplexedSessionForRW) { return new DatabaseClientImpl( clientId, pool, useMultiplexedSessionBlindWrite, multiplexedSessionClient, + useMultiplexedSessionPartitionedOps, tracer, useMultiplexedSessionForRW); } @Override public BatchClient getBatchClient(DatabaseId db) { - return new BatchClientImpl(getSessionClient(db)); + if (getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()) { + this.dbBatchClientLock.lock(); + try { + if (this.dbBatchClients.containsKey(db)) { + return this.dbBatchClients.get(db); + } + BatchClientImpl batchClient = + new BatchClientImpl( + getSessionClient(db), /*useMultiplexedSessionPartitionedOps=*/ true); + this.dbBatchClients.put(db, batchClient); + return batchClient; + } finally { + this.dbBatchClientLock.unlock(); + } + } + return new BatchClientImpl( + getSessionClient(db), /*useMultiplexedSessionPartitionedOps=*/ false); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java index 8d05df538a5..edafc7ddba9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java @@ -36,6 +36,7 @@ import com.google.spanner.v1.Session; import com.google.spanner.v1.Transaction; import io.opentelemetry.api.OpenTelemetry; +import java.time.Duration; import java.util.Collections; import java.util.Map; import org.junit.Before; @@ -56,6 +57,7 @@ public final class BatchClientImplTest { private static final String SESSION_NAME = DB_NAME + "/sessions/s1"; private static final ByteString TXN_ID = ByteString.copyFromUtf8("my-txn"); private static final String TIMESTAMP = "2017-11-15T10:54:20Z"; + private static boolean isMultiplexedSession = false; @Mock private SpannerRpc gapicRpc; @Mock private SpannerOptions spannerOptions; @@ -68,6 +70,11 @@ public final class BatchClientImplTest { public static void setupOpenTelemetry() { SpannerOptions.resetActiveTracingFramework(); SpannerOptions.enableOpenTelemetryTraces(); + Boolean useMultiplexedSessionFromEnvVariablePartitionedOps = + SessionPoolOptions.getUseMultiplexedSessionFromEnvVariablePartitionedOps(); + isMultiplexedSession = + useMultiplexedSessionFromEnvVariablePartitionedOps != null + && useMultiplexedSessionFromEnvVariablePartitionedOps; } @SuppressWarnings("unchecked") @@ -88,18 +95,32 @@ public void setUp() { when(spannerOptions.getTransportOptions()).thenReturn(transportOptions); SessionPoolOptions sessionPoolOptions = mock(SessionPoolOptions.class); when(sessionPoolOptions.getPoolMaintainerClock()).thenReturn(Clock.INSTANCE); + when(sessionPoolOptions.getUseMultiplexedSessionPartitionedOps()) + .thenReturn(isMultiplexedSession); + when(sessionPoolOptions.getMultiplexedSessionMaintenanceDuration()).thenReturn(Duration.ZERO); when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions); @SuppressWarnings("resource") SpannerImpl spanner = new SpannerImpl(gapicRpc, spannerOptions); - client = new BatchClientImpl(spanner.getSessionClient(db)); + client = new BatchClientImpl(spanner.getSessionClient(db), isMultiplexedSession); } @SuppressWarnings("unchecked") @Test public void testBatchReadOnlyTxnWithBound() throws Exception { - Session sessionProto = Session.newBuilder().setName(SESSION_NAME).build(); - when(gapicRpc.createSession(eq(DB_NAME), anyString(), anyMap(), optionsCaptor.capture())) - .thenReturn(sessionProto); + Session sessionProto = + Session.newBuilder().setName(SESSION_NAME).setMultiplexed(isMultiplexedSession).build(); + if (isMultiplexedSession) { + when(gapicRpc.createSession( + eq(DB_NAME), + anyString(), + anyMap(), + optionsCaptor.capture(), + eq(isMultiplexedSession))) + .thenReturn(sessionProto); + } else { + when(gapicRpc.createSession(eq(DB_NAME), anyString(), anyMap(), optionsCaptor.capture())) + .thenReturn(sessionProto); + } com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP); Transaction txnMetadata = Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java index cf431348ce5..81d4de41695 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java @@ -50,5 +50,8 @@ public void serialization() { reserializeAndAssert( new BatchTransactionId( "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE)); + reserializeAndAssert( + new BatchTransactionId( + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE)); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java index 6deca476fc3..72cfe0bfe44 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java @@ -51,6 +51,7 @@ DatabaseClientImpl createDatabaseClient( SessionPool pool, boolean useMultiplexedSessionBlindWriteIgnore, MultiplexedSessionDatabaseClient ignore, + boolean useMultiplexedSessionPartitionedOpsIgnore, boolean useMultiplexedSessionForRWIgnore) { return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 9f3d0751471..3121b868b83 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -95,6 +95,7 @@ public void createSpannerInstance() { .setUseMultiplexedSession(true) .setUseMultiplexedSessionBlindWrite(true) .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) // Set the maintainer to loop once every 1ms .setMultiplexedSessionMaintenanceLoopFrequency(Duration.ofMillis(1L)) // Set multiplexed sessions to be replaced once every 1ms diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index 42a62be33aa..3032a1cae40 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -1273,6 +1273,9 @@ public void transactionManagerReadRowUsingIndexInvalidatedDuringTransaction() @Test public void partitionedDml() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()); assertThrowsSessionNotFoundIfShouldFail( () -> client.executePartitionedUpdate(UPDATE_STATEMENT)); }