From f579642dd06a77428807fe96fb96c4f0c101d75e Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Thu, 1 Aug 2024 15:58:43 +0530 Subject: [PATCH 01/11] feat(spanner): support multiplexed session for Partitioned read or query. --- .../google/cloud/spanner/BatchClientImpl.java | 19 +++++-- .../cloud/spanner/BatchTransactionId.java | 9 +++- .../google/cloud/spanner/SessionClient.java | 52 +++++++++---------- .../cloud/spanner/SessionPoolOptions.java | 51 ++++++++++++++++++ .../com/google/cloud/spanner/SpannerImpl.java | 2 +- .../cloud/spanner/BatchClientImplTest.java | 2 +- .../cloud/spanner/BatchTransactionIdTest.java | 15 ++++-- 7 files changed, 110 insertions(+), 40 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java index 22fb9f710c1..aff88718331 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java @@ -37,9 +37,11 @@ /** Default implementation for Batch Client interface. */ public class BatchClientImpl implements BatchClient { private final SessionClient sessionClient; + private final boolean isMultiplexedSessionEnabled; - BatchClientImpl(SessionClient sessionClient) { + BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) { this.sessionClient = checkNotNull(sessionClient); + this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled; } @Override @@ -50,7 +52,12 @@ public String getDatabaseRole() { @Override public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) { - SessionImpl session = sessionClient.createSession(); + SessionImpl session; + if (isMultiplexedSessionEnabled) { + session = sessionClient.createMultiplexedSession(); + } else { + session = sessionClient.createSession(); + } return new BatchReadOnlyTransactionImpl( MultiUseReadOnlyTransaction.newBuilder() .setSession(session) @@ -71,7 +78,7 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) { @Override public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batchTransactionId) { SessionImpl session = - sessionClient.sessionWithId(checkNotNull(batchTransactionId).getSessionId()); + sessionClient.sessionWithId(checkNotNull(batchTransactionId).getSessionId(), batchTransactionId.isMultiplexedSession()); return new BatchReadOnlyTransactionImpl( MultiUseReadOnlyTransaction.newBuilder() .setSession(session) @@ -93,12 +100,14 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction implements BatchReadOnlyTransaction { private final String sessionName; + private final boolean isMultiplexedSession; private final Map options; BatchReadOnlyTransactionImpl( MultiUseReadOnlyTransaction.Builder builder, TimestampBound bound) { super(builder.setTimestampBound(bound)); this.sessionName = session.getName(); + this.isMultiplexedSession = session.getIsMultiplexed(); this.options = session.getOptions(); initTransaction(); } @@ -108,11 +117,13 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa super(builder.setTransactionId(batchTransactionId.getTransactionId())); this.sessionName = session.getName(); this.options = session.getOptions(); + this.isMultiplexedSession = session.getIsMultiplexed(); } @Override public BatchTransactionId getBatchTransactionId() { - return new BatchTransactionId(sessionName, getTransactionId(), getReadTimestamp()); + return new BatchTransactionId(sessionName, getTransactionId(), getReadTimestamp(), + session.getIsMultiplexed()); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java index a5a02ac1360..fc919d4a282 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java @@ -32,12 +32,15 @@ public class BatchTransactionId implements Serializable { private final ByteString transactionId; private final String sessionId; private final Timestamp timestamp; + private final boolean isMultiplexedSession; private static final long serialVersionUID = 8067099123096783939L; - BatchTransactionId(String sessionId, ByteString transactionId, Timestamp timestamp) { + BatchTransactionId(String sessionId, ByteString transactionId, Timestamp timestamp, + boolean isMultiplexedSession) { this.transactionId = Preconditions.checkNotNull(transactionId); this.sessionId = Preconditions.checkNotNull(sessionId); this.timestamp = Preconditions.checkNotNull(timestamp); + this.isMultiplexedSession = isMultiplexedSession; } ByteString getTransactionId() { @@ -52,6 +55,10 @@ Timestamp getTimestamp() { return timestamp; } + public boolean isMultiplexedSession() { + return isMultiplexedSession; + } + @Override public String toString() { return String.format( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 0eed13b018c..11cd6fe10f6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -237,6 +237,19 @@ SessionImpl createSession() { * @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available. */ void createMultiplexedSession(SessionConsumer consumer) { + try { + SessionImpl sessionImpl = createMultiplexedSession(); + consumer.onSessionReady(sessionImpl); + } catch (Throwable t) { + consumer.onSessionCreateFailure(t, 1); + } + } + + /** + * Create a multiplexed session and returns it. A multiplexed session is not affiliated with any + * GRPC channel. In case of an error on the gRPC calls, the error will be returned. + */ + SessionImpl createMultiplexedSession() { ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION); try (IScope s = spanner.getTracer().withSpan(span)) { com.google.spanner.v1.Session session = @@ -253,10 +266,12 @@ void createMultiplexedSession(SessionConsumer consumer) { spanner, new SessionReference( session.getName(), session.getCreateTime(), session.getMultiplexed(), null)); - consumer.onSessionReady(sessionImpl); + span.addAnnotation( + String.format("Request for %d multiplexed session returned %d session", 1, 1)); + return sessionImpl; } catch (Throwable t) { span.setStatus(t); - consumer.onSessionCreateFailure(t, 1); + throw t; } finally { span.end(); } @@ -289,31 +304,7 @@ private CreateMultiplexedSessionsRunnable(SessionConsumer consumer) { @Override public void run() { - ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION); - try (IScope s = spanner.getTracer().withSpan(span)) { - com.google.spanner.v1.Session session = - spanner - .getRpc() - .createSession( - db.getName(), - spanner.getOptions().getDatabaseRole(), - spanner.getOptions().getSessionLabels(), - null, - true); - SessionImpl sessionImpl = - new SessionImpl( - spanner, - new SessionReference( - session.getName(), session.getCreateTime(), session.getMultiplexed(), null)); - span.addAnnotation( - String.format("Request for %d multiplexed session returned %d session", 1, 1)); - consumer.onSessionReady(sessionImpl); - } catch (Throwable t) { - span.setStatus(t); - consumer.onSessionCreateFailure(t, 1); - } finally { - span.end(); - } + createMultiplexedSession(consumer); } } @@ -423,10 +414,15 @@ private List internalBatchCreateSessions( /** Returns a {@link SessionImpl} that references the existing session with the given name. */ SessionImpl sessionWithId(String name) { + return sessionWithId(name, false); + } + + /** Returns a {@link SessionImpl} that references the existing session with the given name. */ + SessionImpl sessionWithId(String name, boolean isMultiplexedSession) { final Map options; synchronized (this) { options = optionMap(SessionOption.channelHint(sessionChannelCounter++)); } - return new SessionImpl(spanner, new SessionReference(name, options)); + return new SessionImpl(spanner, new SessionReference(name, /*createTime= */ null, isMultiplexedSession, options)); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index ba2eedbccb8..a4a85e313b2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -73,6 +73,8 @@ public class SessionPoolOptions { private final boolean useMultiplexedSession; + private final boolean useMultiplexedSessionForPartitionedOps; + // TODO: Change to use java.time.Duration. private final Duration multiplexedSessionMaintenanceDuration; @@ -108,6 +110,12 @@ private SessionPoolOptions(Builder builder) { (useMultiplexedSessionFromEnvVariable != null) ? useMultiplexedSessionFromEnvVariable : builder.useMultiplexedSession; + // useMultiplexedSessionPartitionedOps priority => Environment var > private setter > client default + Boolean useMultiplexedSessionFromEnvVariablePartitionedOps = getUseMultiplexedSessionFromEnvVariablePartitionedOps(); + this.useMultiplexedSessionForPartitionedOps = + (useMultiplexedSessionFromEnvVariablePartitionedOps != null) + ? useMultiplexedSessionFromEnvVariablePartitionedOps + : builder.useMultiplexedSessionPartitionedOps; this.multiplexedSessionMaintenanceDuration = builder.multiplexedSessionMaintenanceDuration; } @@ -307,6 +315,12 @@ public boolean getUseMultiplexedSession() { return useMultiplexedSession; } + @VisibleForTesting + @InternalApi + public boolean getUseMultiplexedSessionPartitionedOps() { + return useMultiplexedSessionForPartitionedOps; + } + private static Boolean getUseMultiplexedSessionFromEnvVariable() { String useMultiplexedSessionFromEnvVariable = System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"); @@ -323,6 +337,22 @@ private static Boolean getUseMultiplexedSessionFromEnvVariable() { return null; } + private static Boolean getUseMultiplexedSessionFromEnvVariablePartitionedOps() { + String useMultiplexedSessionFromEnvVariablePartitionedOps = + System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"); + if (useMultiplexedSessionFromEnvVariablePartitionedOps != null + && useMultiplexedSessionFromEnvVariablePartitionedOps.length() > 0) { + if ("true".equalsIgnoreCase(useMultiplexedSessionFromEnvVariablePartitionedOps) + || "false".equalsIgnoreCase(useMultiplexedSessionFromEnvVariablePartitionedOps)) { + return Boolean.parseBoolean(useMultiplexedSessionFromEnvVariablePartitionedOps); + } else { + throw new IllegalArgumentException( + "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS should be either true or false."); + } + } + return null; + } + Duration getMultiplexedSessionMaintenanceDuration() { return multiplexedSessionMaintenanceDuration; } @@ -529,6 +559,10 @@ public static class Builder { // Set useMultiplexedSession to true to make multiplexed session the default. private boolean useMultiplexedSession = false; + // This field controls the default behavior of session management in Java client. + // Set useMultiplexedSessionPartitionedOps to true to make multiplexed session the default. + private boolean useMultiplexedSessionPartitionedOps = false; + private Duration multiplexedSessionMaintenanceDuration = Duration.ofDays(7); private Clock poolMaintainerClock = Clock.INSTANCE; @@ -570,6 +604,7 @@ private Builder(SessionPoolOptions options) { this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold; this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions; this.useMultiplexedSession = options.useMultiplexedSession; + this.useMultiplexedSessionPartitionedOps = options.useMultiplexedSessionForPartitionedOps; this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration; this.poolMaintainerClock = options.poolMaintainerClock; } @@ -757,6 +792,22 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) { return this; } + /** + * Sets whether the client should use multiplexed session or not. If set to true, the client + * optimises and runs multiple applicable requests concurrently on a single session. A single + * multiplexed session is sufficient to handle all concurrent traffic. + * + *

When set to false, the client uses the regular session cached in the session pool for + * running 1 concurrent transaction per session. We require to provision sufficient sessions by + * making use of {@link SessionPoolOptions#minSessions} and {@link + * SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in + * higher latencies. + */ + Builder setUseMultiplexedSessionPartitionedOps(boolean useMultiplexedSessionPartitionedOps) { + this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps; + return this; + } + @VisibleForTesting Builder setMultiplexedSessionMaintenanceDuration( Duration multiplexedSessionMaintenanceDuration) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 6aa0d646a84..5a348fe7f3b 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -317,7 +317,7 @@ DatabaseClientImpl createDatabaseClient( @Override public BatchClient getBatchClient(DatabaseId db) { - return new BatchClientImpl(getSessionClient(db)); + return new BatchClientImpl(getSessionClient(db), getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java index 8d05df538a5..a0e693b7c8a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java @@ -91,7 +91,7 @@ public void setUp() { when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions); @SuppressWarnings("resource") SpannerImpl spanner = new SpannerImpl(gapicRpc, spannerOptions); - client = new BatchClientImpl(spanner.getSessionClient(db)); + client = new BatchClientImpl(spanner.getSessionClient(db), false); } @SuppressWarnings("unchecked") diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java index cf431348ce5..a6d59e4d376 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java @@ -34,14 +34,18 @@ public void equalAndHashCode() { new EqualsTester() .addEqualityGroup( new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE), + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, + false), new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE)) + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, + false)) .addEqualityGroup( new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE), + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE, + false), new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE)) + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE, + false)) .testEquals(); } @@ -49,6 +53,7 @@ public void equalAndHashCode() { public void serialization() { reserializeAndAssert( new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE)); + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, + false)); } } From 8c3bd2f67e40b766cbad2ae61bed30df45d45ce9 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Thu, 1 Aug 2024 16:00:18 +0530 Subject: [PATCH 02/11] chore(spanner): lint fixes --- .../com/google/cloud/spanner/BatchClientImpl.java | 8 +++++--- .../google/cloud/spanner/BatchTransactionId.java | 5 ++++- .../com/google/cloud/spanner/SessionClient.java | 3 ++- .../google/cloud/spanner/SessionPoolOptions.java | 6 ++++-- .../com/google/cloud/spanner/SpannerImpl.java | 4 +++- .../cloud/spanner/BatchTransactionIdTest.java | 15 +++++---------- 6 files changed, 23 insertions(+), 18 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java index aff88718331..30626e45281 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java @@ -78,7 +78,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) { @Override public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batchTransactionId) { SessionImpl session = - sessionClient.sessionWithId(checkNotNull(batchTransactionId).getSessionId(), batchTransactionId.isMultiplexedSession()); + sessionClient.sessionWithId( + checkNotNull(batchTransactionId).getSessionId(), + batchTransactionId.isMultiplexedSession()); return new BatchReadOnlyTransactionImpl( MultiUseReadOnlyTransaction.newBuilder() .setSession(session) @@ -122,8 +124,8 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa @Override public BatchTransactionId getBatchTransactionId() { - return new BatchTransactionId(sessionName, getTransactionId(), getReadTimestamp(), - session.getIsMultiplexed()); + return new BatchTransactionId( + sessionName, getTransactionId(), getReadTimestamp(), session.getIsMultiplexed()); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java index fc919d4a282..36f870be4c8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java @@ -35,7 +35,10 @@ public class BatchTransactionId implements Serializable { private final boolean isMultiplexedSession; private static final long serialVersionUID = 8067099123096783939L; - BatchTransactionId(String sessionId, ByteString transactionId, Timestamp timestamp, + BatchTransactionId( + String sessionId, + ByteString transactionId, + Timestamp timestamp, boolean isMultiplexedSession) { this.transactionId = Preconditions.checkNotNull(transactionId); this.sessionId = Preconditions.checkNotNull(sessionId); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index 11cd6fe10f6..2a377ad631f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -423,6 +423,7 @@ SessionImpl sessionWithId(String name, boolean isMultiplexedSession) { synchronized (this) { options = optionMap(SessionOption.channelHint(sessionChannelCounter++)); } - return new SessionImpl(spanner, new SessionReference(name, /*createTime= */ null, isMultiplexedSession, options)); + return new SessionImpl( + spanner, new SessionReference(name, /*createTime= */ null, isMultiplexedSession, options)); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index a4a85e313b2..46c5eb29036 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -110,8 +110,10 @@ private SessionPoolOptions(Builder builder) { (useMultiplexedSessionFromEnvVariable != null) ? useMultiplexedSessionFromEnvVariable : builder.useMultiplexedSession; - // useMultiplexedSessionPartitionedOps priority => Environment var > private setter > client default - Boolean useMultiplexedSessionFromEnvVariablePartitionedOps = getUseMultiplexedSessionFromEnvVariablePartitionedOps(); + // useMultiplexedSessionPartitionedOps priority => Environment var > private setter > client + // default + Boolean useMultiplexedSessionFromEnvVariablePartitionedOps = + getUseMultiplexedSessionFromEnvVariablePartitionedOps(); this.useMultiplexedSessionForPartitionedOps = (useMultiplexedSessionFromEnvVariablePartitionedOps != null) ? useMultiplexedSessionFromEnvVariablePartitionedOps diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 5a348fe7f3b..5d3cdd3eba6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -317,7 +317,9 @@ DatabaseClientImpl createDatabaseClient( @Override public BatchClient getBatchClient(DatabaseId db) { - return new BatchClientImpl(getSessionClient(db), getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()); + return new BatchClientImpl( + getSessionClient(db), + getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java index a6d59e4d376..cd158502704 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java @@ -34,18 +34,14 @@ public void equalAndHashCode() { new EqualsTester() .addEqualityGroup( new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, - false), + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, false), new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, - false)) + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, false)) .addEqualityGroup( new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE, - false), + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE, false), new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE, - false)) + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE, false)) .testEquals(); } @@ -53,7 +49,6 @@ public void equalAndHashCode() { public void serialization() { reserializeAndAssert( new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, - false)); + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, false)); } } From b208044b8917b41829c5c777d665f5e23354a1ed Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Fri, 2 Aug 2024 13:45:22 +0530 Subject: [PATCH 03/11] feat(spanner): support multiplexed session for Partitioned DML operations. --- ...tractMultiplexedSessionDatabaseClient.java | 6 ---- .../cloud/spanner/DatabaseClientImpl.java | 25 +++++++++++++-- .../DelayedMultiplexedSessionTransaction.java | 20 ++++++++++++ .../MultiplexedSessionDatabaseClient.java | 6 ++++ .../cloud/spanner/SessionPoolOptions.java | 32 ++++++------------- .../com/google/cloud/spanner/SpannerImpl.java | 12 +++++-- .../IntegrationTestWithClosedSessionsEnv.java | 5 ++- 7 files changed, 72 insertions(+), 34 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java index 92035a18418..dba86d511a5 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java @@ -19,7 +19,6 @@ import com.google.api.gax.rpc.ServerStream; import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.TransactionOption; -import com.google.cloud.spanner.Options.UpdateOption; import com.google.spanner.v1.BatchWriteResponse; /** @@ -88,9 +87,4 @@ public AsyncRunner runAsync(TransactionOption... options) { public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) { throw new UnsupportedOperationException(); } - - @Override - public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { - throw new UnsupportedOperationException(); - } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index a20bcd9e925..9c62e037f3d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -35,26 +35,39 @@ class DatabaseClientImpl implements DatabaseClient { private final TraceWrapper tracer; @VisibleForTesting final String clientId; @VisibleForTesting final SessionPool pool; + final boolean useMultiplexedSessionPartitionedOps; @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; @VisibleForTesting DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) { - this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer); + this( + "", + pool, + /* multiplexedSessionDatabaseClient= */ null, + /* useMultiplexedSessionPartitionedOps= */ false, + tracer); } @VisibleForTesting DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) { - this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer); + this( + clientId, + pool, + /* multiplexedSessionDatabaseClient= */ null, + /* useMultiplexedSessionPartitionedOps= */ false, + tracer); } DatabaseClientImpl( String clientId, SessionPool pool, @Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient, + boolean useMultiplexedSessionPartitionedOps, TraceWrapper tracer) { this.clientId = clientId; this.pool = pool; this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient; + this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps; this.tracer = tracer; } @@ -261,6 +274,14 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti @Override public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) { + if (useMultiplexedSessionPartitionedOps) { + return getMultiplexedSession().executePartitionedUpdate(stmt, options); + } + return executePartitionedUpdateSession(stmt, options); + } + + private long executePartitionedUpdateSession( + final Statement stmt, final UpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); try (IScope s = tracer.withSpan(span)) { return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options)); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 928927d49a0..543e6e3d0ac 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -22,7 +22,9 @@ import com.google.api.core.ApiFutures; import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction; import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.common.util.concurrent.MoreExecutors; +import java.util.concurrent.ExecutionException; /** * Represents a delayed execution of a transaction on a multiplexed session. The execution is @@ -119,4 +121,22 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { .readOnlyTransaction(bound), MoreExecutors.directExecutor())); } + + /** + * Execute `stmt` within PARTITIONED_DML transaction using multiplexed session. This method is a + * blocking call as the interface expects to return the output of the `stmt`. + */ + @Override + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + try { + SessionReference sessionReference = this.sessionFuture.get(); + return new MultiplexedSessionTransaction( + client, span, sessionReference, NO_CHANNEL_HINT, true) + .executePartitionedUpdate(stmt, options); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index e742481be2c..a5dc483e1a3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -21,6 +21,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.common.annotations.VisibleForTesting; @@ -388,6 +389,11 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { return createMultiplexedSessionTransaction(false).readOnlyTransaction(bound); } + @Override + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { + return createMultiplexedSessionTransaction(true).executePartitionedUpdate(stmt, options); + } + /** * It is enough with one executor to maintain the multiplexed sessions in all the clients, as they * do not need to be updated often, and the maintenance task is light. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 46c5eb29036..51950e258aa 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -324,32 +324,20 @@ public boolean getUseMultiplexedSessionPartitionedOps() { } private static Boolean getUseMultiplexedSessionFromEnvVariable() { - String useMultiplexedSessionFromEnvVariable = - System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"); - if (useMultiplexedSessionFromEnvVariable != null - && useMultiplexedSessionFromEnvVariable.length() > 0) { - if ("true".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable) - || "false".equalsIgnoreCase(useMultiplexedSessionFromEnvVariable)) { - return Boolean.parseBoolean(useMultiplexedSessionFromEnvVariable); - } else { - throw new IllegalArgumentException( - "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS should be either true or false."); - } - } - return null; + return parseBooleanEnvVariable("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"); } private static Boolean getUseMultiplexedSessionFromEnvVariablePartitionedOps() { - String useMultiplexedSessionFromEnvVariablePartitionedOps = - System.getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"); - if (useMultiplexedSessionFromEnvVariablePartitionedOps != null - && useMultiplexedSessionFromEnvVariablePartitionedOps.length() > 0) { - if ("true".equalsIgnoreCase(useMultiplexedSessionFromEnvVariablePartitionedOps) - || "false".equalsIgnoreCase(useMultiplexedSessionFromEnvVariablePartitionedOps)) { - return Boolean.parseBoolean(useMultiplexedSessionFromEnvVariablePartitionedOps); + return parseBooleanEnvVariable("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"); + } + + private static Boolean parseBooleanEnvVariable(String variableName) { + String envVariable = System.getenv(variableName); + if (envVariable != null && envVariable.length() > 0) { + if ("true".equalsIgnoreCase(envVariable) || "false".equalsIgnoreCase(envVariable)) { + return Boolean.parseBoolean(envVariable); } else { - throw new IllegalArgumentException( - "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS should be either true or false."); + throw new IllegalArgumentException(variableName + " should be either true or false."); } } return null; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 5d3cdd3eba6..4907b78ee35 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -300,7 +300,11 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { numMultiplexedSessionsReleased); pool.maybeWaitOnMinSessions(); DatabaseClientImpl dbClient = - createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient); + createDatabaseClient( + clientId, + pool, + multiplexedSessionDatabaseClient, + getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()); dbClients.put(db, dbClient); return dbClient; } @@ -311,8 +315,10 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { DatabaseClientImpl createDatabaseClient( String clientId, SessionPool pool, - @Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient) { - return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, tracer); + @Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient, + boolean useMultiplexedSessionPartitionedOps) { + return new DatabaseClientImpl( + clientId, pool, multiplexedSessionClient, useMultiplexedSessionPartitionedOps, tracer); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java index b71771ae2ca..c1564567bc4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java @@ -47,7 +47,10 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl { @Override DatabaseClientImpl createDatabaseClient( - String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore) { + String clientId, + SessionPool pool, + MultiplexedSessionDatabaseClient ignore, + boolean useMultiplexedSessionPartitionedOpsIgnore) { return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer); } } From 938ef533bc86ad2de3bd1667d21e9688c3e3c990 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Mon, 25 Nov 2024 18:17:21 +0530 Subject: [PATCH 04/11] lint(spanner): javadoc fixes. --- .../java/com/google/cloud/spanner/BatchTransactionId.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java index 36f870be4c8..2e17b70e37a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java @@ -65,8 +65,8 @@ public boolean isMultiplexedSession() { @Override public String toString() { return String.format( - "transactionId: %s, sessionId: %s, timestamp: %s", - transactionId.toStringUtf8(), sessionId, timestamp); + "transactionId: %s, sessionId: %s, timestamp: %s, isMultiplexedSession: %s", + transactionId.toStringUtf8(), sessionId, timestamp, isMultiplexedSession); } @Override From df8b4c0dc445cbde6cf2913441346baef8030c09 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Mon, 25 Nov 2024 19:32:39 +0530 Subject: [PATCH 05/11] feat(spanner): Updated unit tests of Partitioned operations for Multiplexed Session. --- .../cloud/spanner/DatabaseClientImpl.java | 6 ++-- .../DelayedMultiplexedSessionTransaction.java | 2 +- .../cloud/spanner/SessionPoolOptions.java | 4 ++- .../com/google/cloud/spanner/SpannerImpl.java | 4 +-- .../cloud/spanner/BatchClientImplTest.java | 31 ++++++++++++++++--- ...edSessionDatabaseClientMockServerTest.java | 1 + .../RetryOnInvalidatedSessionTest.java | 3 ++ 7 files changed, 40 insertions(+), 11 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 44a5f484373..efc359415a9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -36,7 +36,7 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting final String clientId; @VisibleForTesting final SessionPool pool; @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; - final boolean useMultiplexedSessionPartitionedOps; + @VisibleForTesting final boolean useMultiplexedSessionPartitionedOps; @VisibleForTesting final boolean useMultiplexedSessionForRW; final boolean useMultiplexedSessionBlindWrite; @@ -48,7 +48,7 @@ class DatabaseClientImpl implements DatabaseClient { pool, /* useMultiplexedSessionBlindWrite = */ false, /* multiplexedSessionDatabaseClient = */ null, - /* useMultiplexedSessionPartitionedOps= */ false, + /* useMultiplexedSessionPartitionedOps= */ false, tracer, /* useMultiplexedSessionForRW = */ false); } @@ -60,7 +60,7 @@ class DatabaseClientImpl implements DatabaseClient { pool, /* useMultiplexedSessionBlindWrite = */ false, /* multiplexedSessionDatabaseClient = */ null, - /* useMultiplexedSessionPartitionedOps= */ false, + /* useMultiplexedSessionPartitionedOps= */ false, tracer, /* useMultiplexedSessionForRW = */ false); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 47896a7699a..1db7a2d2b6d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -23,8 +23,8 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction; import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction; -import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.ExecutionException; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index c21f4b26200..24ff4a76c73 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -360,7 +360,9 @@ private static Boolean getUseMultiplexedSessionFromEnvVariable() { return parseBooleanEnvVariable("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"); } - private static Boolean getUseMultiplexedSessionFromEnvVariablePartitionedOps() { + @VisibleForTesting + @InternalApi + protected static Boolean getUseMultiplexedSessionFromEnvVariablePartitionedOps() { return parseBooleanEnvVariable("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 35adcc2eedf..976aba9af37 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -308,7 +308,7 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { pool, getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(), multiplexedSessionDatabaseClient, - getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps(), + getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps(), useMultiplexedSessionForRW); dbClients.put(db, dbClient); return dbClient; @@ -329,7 +329,7 @@ DatabaseClientImpl createDatabaseClient( pool, useMultiplexedSessionBlindWrite, multiplexedSessionClient, - useMultiplexedSessionPartitionedOps, + useMultiplexedSessionPartitionedOps, tracer, useMultiplexedSessionForRW); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java index a0e693b7c8a..efd13bcf943 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java @@ -56,6 +56,7 @@ public final class BatchClientImplTest { private static final String SESSION_NAME = DB_NAME + "/sessions/s1"; private static final ByteString TXN_ID = ByteString.copyFromUtf8("my-txn"); private static final String TIMESTAMP = "2017-11-15T10:54:20Z"; + private static boolean isMultiplexedSession = false; @Mock private SpannerRpc gapicRpc; @Mock private SpannerOptions spannerOptions; @@ -68,6 +69,11 @@ public final class BatchClientImplTest { public static void setupOpenTelemetry() { SpannerOptions.resetActiveTracingFramework(); SpannerOptions.enableOpenTelemetryTraces(); + Boolean useMultiplexedSessionFromEnvVariablePartitionedOps = + SessionPoolOptions.getUseMultiplexedSessionFromEnvVariablePartitionedOps(); + isMultiplexedSession = + useMultiplexedSessionFromEnvVariablePartitionedOps != null + && useMultiplexedSessionFromEnvVariablePartitionedOps; } @SuppressWarnings("unchecked") @@ -88,18 +94,31 @@ public void setUp() { when(spannerOptions.getTransportOptions()).thenReturn(transportOptions); SessionPoolOptions sessionPoolOptions = mock(SessionPoolOptions.class); when(sessionPoolOptions.getPoolMaintainerClock()).thenReturn(Clock.INSTANCE); + when(sessionPoolOptions.getUseMultiplexedSessionPartitionedOps()) + .thenReturn(isMultiplexedSession); when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions); @SuppressWarnings("resource") SpannerImpl spanner = new SpannerImpl(gapicRpc, spannerOptions); - client = new BatchClientImpl(spanner.getSessionClient(db), false); + client = new BatchClientImpl(spanner.getSessionClient(db), isMultiplexedSession); } @SuppressWarnings("unchecked") @Test public void testBatchReadOnlyTxnWithBound() throws Exception { - Session sessionProto = Session.newBuilder().setName(SESSION_NAME).build(); - when(gapicRpc.createSession(eq(DB_NAME), anyString(), anyMap(), optionsCaptor.capture())) - .thenReturn(sessionProto); + Session sessionProto = + Session.newBuilder().setName(SESSION_NAME).setMultiplexed(isMultiplexedSession).build(); + if (isMultiplexedSession) { + when(gapicRpc.createSession( + eq(DB_NAME), + anyString(), + anyMap(), + optionsCaptor.capture(), + eq(isMultiplexedSession))) + .thenReturn(sessionProto); + } else { + when(gapicRpc.createSession(eq(DB_NAME), anyString(), anyMap(), optionsCaptor.capture())) + .thenReturn(sessionProto); + } com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP); Transaction txnMetadata = Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build(); @@ -113,12 +132,14 @@ public void testBatchReadOnlyTxnWithBound() throws Exception { assertThat(batchTxn.getReadTimestamp()).isEqualTo(t); assertThat(batchTxn.getReadTimestamp()) .isEqualTo(batchTxn.getBatchTransactionId().getTimestamp()); + assertEquals(batchTxn.getBatchTransactionId().isMultiplexedSession(), isMultiplexedSession); } @Test public void testBatchReadOnlyTxnWithTxnId() { when(txnID.getSessionId()).thenReturn(SESSION_NAME); when(txnID.getTransactionId()).thenReturn(TXN_ID); + when(txnID.isMultiplexedSession()).thenReturn(isMultiplexedSession); Timestamp t = Timestamp.parseTimestamp(TIMESTAMP); when(txnID.getTimestamp()).thenReturn(t); @@ -128,6 +149,8 @@ public void testBatchReadOnlyTxnWithTxnId() { assertThat(batchTxn.getReadTimestamp()).isEqualTo(t); assertThat(batchTxn.getReadTimestamp()) .isEqualTo(batchTxn.getBatchTransactionId().getTimestamp()); + assertThat(batchTxn.getBatchTransactionId().isMultiplexedSession()) + .isEqualTo(isMultiplexedSession); } @Test diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 4dc1da62e7b..b424b457bb0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -95,6 +95,7 @@ public void createSpannerInstance() { .setUseMultiplexedSession(true) .setUseMultiplexedSessionBlindWrite(true) .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) // Set the maintainer to loop once every 1ms .setMultiplexedSessionMaintenanceLoopFrequency(Duration.ofMillis(1L)) // Set multiplexed sessions to be replaced once every 1ms diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index 42a62be33aa..3032a1cae40 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -1273,6 +1273,9 @@ public void transactionManagerReadRowUsingIndexInvalidatedDuringTransaction() @Test public void partitionedDml() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()); assertThrowsSessionNotFoundIfShouldFail( () -> client.executePartitionedUpdate(UPDATE_STATEMENT)); } From af625f4722a6d05c0b225861ab2c29617c88967d Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Mon, 25 Nov 2024 19:32:39 +0530 Subject: [PATCH 06/11] feat(spanner): Updated unit tests of Partitioned operations for Multiplexed Session. --- .github/workflows/ci.yaml | 2 ++ ...tegration-multiplexed-sessions-enabled.cfg | 5 +++ .../cloud/spanner/DatabaseClientImpl.java | 6 ++-- .../DelayedMultiplexedSessionTransaction.java | 2 +- .../cloud/spanner/SessionPoolOptions.java | 4 ++- .../com/google/cloud/spanner/SpannerImpl.java | 4 +-- .../cloud/spanner/BatchClientImplTest.java | 31 ++++++++++++++++--- ...edSessionDatabaseClientMockServerTest.java | 1 + .../RetryOnInvalidatedSessionTest.java | 3 ++ 9 files changed, 47 insertions(+), 11 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 7eca4c6d5f0..ee28d7f8a66 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -53,6 +53,7 @@ jobs: env: JOB_TYPE: test GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true + GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: true units-java8: # Building using Java 17 and run the tests with Java 8 runtime name: "units (8)" @@ -92,6 +93,7 @@ jobs: env: JOB_TYPE: test GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true + GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: true windows: runs-on: windows-latest steps: diff --git a/.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg b/.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg index 771405de422..49edd2e8df6 100644 --- a/.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg +++ b/.kokoro/presubmit/integration-multiplexed-sessions-enabled.cfg @@ -36,3 +36,8 @@ env_vars: { key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS" value: "true" } + +env_vars: { + key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS" + value: "true" +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 44a5f484373..efc359415a9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -36,7 +36,7 @@ class DatabaseClientImpl implements DatabaseClient { @VisibleForTesting final String clientId; @VisibleForTesting final SessionPool pool; @VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient; - final boolean useMultiplexedSessionPartitionedOps; + @VisibleForTesting final boolean useMultiplexedSessionPartitionedOps; @VisibleForTesting final boolean useMultiplexedSessionForRW; final boolean useMultiplexedSessionBlindWrite; @@ -48,7 +48,7 @@ class DatabaseClientImpl implements DatabaseClient { pool, /* useMultiplexedSessionBlindWrite = */ false, /* multiplexedSessionDatabaseClient = */ null, - /* useMultiplexedSessionPartitionedOps= */ false, + /* useMultiplexedSessionPartitionedOps= */ false, tracer, /* useMultiplexedSessionForRW = */ false); } @@ -60,7 +60,7 @@ class DatabaseClientImpl implements DatabaseClient { pool, /* useMultiplexedSessionBlindWrite = */ false, /* multiplexedSessionDatabaseClient = */ null, - /* useMultiplexedSessionPartitionedOps= */ false, + /* useMultiplexedSessionPartitionedOps= */ false, tracer, /* useMultiplexedSessionForRW = */ false); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 47896a7699a..1db7a2d2b6d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -23,8 +23,8 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction; import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction; -import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.ExecutionException; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index c21f4b26200..24ff4a76c73 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -360,7 +360,9 @@ private static Boolean getUseMultiplexedSessionFromEnvVariable() { return parseBooleanEnvVariable("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"); } - private static Boolean getUseMultiplexedSessionFromEnvVariablePartitionedOps() { + @VisibleForTesting + @InternalApi + protected static Boolean getUseMultiplexedSessionFromEnvVariablePartitionedOps() { return parseBooleanEnvVariable("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 35adcc2eedf..976aba9af37 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -308,7 +308,7 @@ public DatabaseClient getDatabaseClient(DatabaseId db) { pool, getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(), multiplexedSessionDatabaseClient, - getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps(), + getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps(), useMultiplexedSessionForRW); dbClients.put(db, dbClient); return dbClient; @@ -329,7 +329,7 @@ DatabaseClientImpl createDatabaseClient( pool, useMultiplexedSessionBlindWrite, multiplexedSessionClient, - useMultiplexedSessionPartitionedOps, + useMultiplexedSessionPartitionedOps, tracer, useMultiplexedSessionForRW); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java index a0e693b7c8a..efd13bcf943 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java @@ -56,6 +56,7 @@ public final class BatchClientImplTest { private static final String SESSION_NAME = DB_NAME + "/sessions/s1"; private static final ByteString TXN_ID = ByteString.copyFromUtf8("my-txn"); private static final String TIMESTAMP = "2017-11-15T10:54:20Z"; + private static boolean isMultiplexedSession = false; @Mock private SpannerRpc gapicRpc; @Mock private SpannerOptions spannerOptions; @@ -68,6 +69,11 @@ public final class BatchClientImplTest { public static void setupOpenTelemetry() { SpannerOptions.resetActiveTracingFramework(); SpannerOptions.enableOpenTelemetryTraces(); + Boolean useMultiplexedSessionFromEnvVariablePartitionedOps = + SessionPoolOptions.getUseMultiplexedSessionFromEnvVariablePartitionedOps(); + isMultiplexedSession = + useMultiplexedSessionFromEnvVariablePartitionedOps != null + && useMultiplexedSessionFromEnvVariablePartitionedOps; } @SuppressWarnings("unchecked") @@ -88,18 +94,31 @@ public void setUp() { when(spannerOptions.getTransportOptions()).thenReturn(transportOptions); SessionPoolOptions sessionPoolOptions = mock(SessionPoolOptions.class); when(sessionPoolOptions.getPoolMaintainerClock()).thenReturn(Clock.INSTANCE); + when(sessionPoolOptions.getUseMultiplexedSessionPartitionedOps()) + .thenReturn(isMultiplexedSession); when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions); @SuppressWarnings("resource") SpannerImpl spanner = new SpannerImpl(gapicRpc, spannerOptions); - client = new BatchClientImpl(spanner.getSessionClient(db), false); + client = new BatchClientImpl(spanner.getSessionClient(db), isMultiplexedSession); } @SuppressWarnings("unchecked") @Test public void testBatchReadOnlyTxnWithBound() throws Exception { - Session sessionProto = Session.newBuilder().setName(SESSION_NAME).build(); - when(gapicRpc.createSession(eq(DB_NAME), anyString(), anyMap(), optionsCaptor.capture())) - .thenReturn(sessionProto); + Session sessionProto = + Session.newBuilder().setName(SESSION_NAME).setMultiplexed(isMultiplexedSession).build(); + if (isMultiplexedSession) { + when(gapicRpc.createSession( + eq(DB_NAME), + anyString(), + anyMap(), + optionsCaptor.capture(), + eq(isMultiplexedSession))) + .thenReturn(sessionProto); + } else { + when(gapicRpc.createSession(eq(DB_NAME), anyString(), anyMap(), optionsCaptor.capture())) + .thenReturn(sessionProto); + } com.google.protobuf.Timestamp timestamp = Timestamps.parse(TIMESTAMP); Transaction txnMetadata = Transaction.newBuilder().setId(TXN_ID).setReadTimestamp(timestamp).build(); @@ -113,12 +132,14 @@ public void testBatchReadOnlyTxnWithBound() throws Exception { assertThat(batchTxn.getReadTimestamp()).isEqualTo(t); assertThat(batchTxn.getReadTimestamp()) .isEqualTo(batchTxn.getBatchTransactionId().getTimestamp()); + assertEquals(batchTxn.getBatchTransactionId().isMultiplexedSession(), isMultiplexedSession); } @Test public void testBatchReadOnlyTxnWithTxnId() { when(txnID.getSessionId()).thenReturn(SESSION_NAME); when(txnID.getTransactionId()).thenReturn(TXN_ID); + when(txnID.isMultiplexedSession()).thenReturn(isMultiplexedSession); Timestamp t = Timestamp.parseTimestamp(TIMESTAMP); when(txnID.getTimestamp()).thenReturn(t); @@ -128,6 +149,8 @@ public void testBatchReadOnlyTxnWithTxnId() { assertThat(batchTxn.getReadTimestamp()).isEqualTo(t); assertThat(batchTxn.getReadTimestamp()) .isEqualTo(batchTxn.getBatchTransactionId().getTimestamp()); + assertThat(batchTxn.getBatchTransactionId().isMultiplexedSession()) + .isEqualTo(isMultiplexedSession); } @Test diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 4dc1da62e7b..b424b457bb0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -95,6 +95,7 @@ public void createSpannerInstance() { .setUseMultiplexedSession(true) .setUseMultiplexedSessionBlindWrite(true) .setUseMultiplexedSessionForRW(true) + .setUseMultiplexedSessionPartitionedOps(true) // Set the maintainer to loop once every 1ms .setMultiplexedSessionMaintenanceLoopFrequency(Duration.ofMillis(1L)) // Set multiplexed sessions to be replaced once every 1ms diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index 42a62be33aa..3032a1cae40 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -1273,6 +1273,9 @@ public void transactionManagerReadRowUsingIndexInvalidatedDuringTransaction() @Test public void partitionedDml() throws InterruptedException { + assumeFalse( + "Multiplexed session do not throw a SessionNotFound errors. ", + spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()); assertThrowsSessionNotFoundIfShouldFail( () -> client.executePartitionedUpdate(UPDATE_STATEMENT)); } From 00a25787acabd17829777169a16f18146a9bc340 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Wed, 27 Nov 2024 12:58:06 +0530 Subject: [PATCH 07/11] lint(spanner): Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Knut Olav Løite --- .../java/com/google/cloud/spanner/DatabaseClientImpl.java | 2 +- .../cloud/spanner/DelayedMultiplexedSessionTransaction.java | 4 ++-- .../cloud/spanner/MultiplexedSessionDatabaseClient.java | 2 +- .../src/main/java/com/google/cloud/spanner/SessionClient.java | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index efc359415a9..9ef55aba65a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -320,7 +320,7 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption... return executePartitionedUpdateSession(stmt, options); } - private long executePartitionedUpdateSession( + private long executePartitionedUpdateWithPooledSession( final Statement stmt, final UpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); try (IScope s = tracer.withSpan(span)) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 1db7a2d2b6d..dc63285f8ca 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -233,9 +233,9 @@ private SessionReference getSessionReference() { @Override public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { try { - SessionReference sessionReference = this.sessionFuture.get(); + SessionReference sessionReference = getSessionReference(); return new MultiplexedSessionTransaction( - client, span, sessionReference, NO_CHANNEL_HINT, true) + client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ true) .executePartitionedUpdate(stmt, options); } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 824ecc32c23..a882ce798f9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -556,7 +556,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti @Override public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { - return createMultiplexedSessionTransaction(true).executePartitionedUpdate(stmt, options); + return createMultiplexedSessionTransaction(/* singleUse = */ true).executePartitionedUpdate(stmt, options); } /** diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index b7c338b9ad1..927fa6a51d6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -246,8 +246,8 @@ void createMultiplexedSession(SessionConsumer consumer) { } /** - * Create a multiplexed session and returns it. A multiplexed session is not affiliated with any - * GRPC channel. In case of an error on the gRPC calls, the error will be returned. + * Creates a multiplexed session and returns it. A multiplexed session is not affiliated with any + * GRPC channel. In case of an error during the gRPC calls, an exception will be thrown. */ SessionImpl createMultiplexedSession() { ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION); From 13b94e39ea792151f96a3699e4235ef4d1588656 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Wed, 27 Nov 2024 12:58:06 +0530 Subject: [PATCH 08/11] lint(spanner): Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Knut Olav Løite --- .../java/com/google/cloud/spanner/DatabaseClientImpl.java | 2 +- .../cloud/spanner/DelayedMultiplexedSessionTransaction.java | 4 ++-- .../cloud/spanner/MultiplexedSessionDatabaseClient.java | 2 +- .../src/main/java/com/google/cloud/spanner/SessionClient.java | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index efc359415a9..9ef55aba65a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -320,7 +320,7 @@ public long executePartitionedUpdate(final Statement stmt, final UpdateOption... return executePartitionedUpdateSession(stmt, options); } - private long executePartitionedUpdateSession( + private long executePartitionedUpdateWithPooledSession( final Statement stmt, final UpdateOption... options) { ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION); try (IScope s = tracer.withSpan(span)) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java index 1db7a2d2b6d..dc63285f8ca 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java @@ -233,9 +233,9 @@ private SessionReference getSessionReference() { @Override public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { try { - SessionReference sessionReference = this.sessionFuture.get(); + SessionReference sessionReference = getSessionReference(); return new MultiplexedSessionTransaction( - client, span, sessionReference, NO_CHANNEL_HINT, true) + client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ true) .executePartitionedUpdate(stmt, options); } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 824ecc32c23..a882ce798f9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -556,7 +556,7 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti @Override public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { - return createMultiplexedSessionTransaction(true).executePartitionedUpdate(stmt, options); + return createMultiplexedSessionTransaction(/* singleUse = */ true).executePartitionedUpdate(stmt, options); } /** diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java index b7c338b9ad1..927fa6a51d6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java @@ -246,8 +246,8 @@ void createMultiplexedSession(SessionConsumer consumer) { } /** - * Create a multiplexed session and returns it. A multiplexed session is not affiliated with any - * GRPC channel. In case of an error on the gRPC calls, the error will be returned. + * Creates a multiplexed session and returns it. A multiplexed session is not affiliated with any + * GRPC channel. In case of an error during the gRPC calls, an exception will be thrown. */ SessionImpl createMultiplexedSession() { ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION); From f4271cc3fdfce1e77c170115a03bc32b3aed5272 Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Mon, 9 Dec 2024 14:15:53 +0530 Subject: [PATCH 09/11] feat(spanner): Modified BatchClientImpl to store multiplexed session and create fresh session after expiration date. --- .../google/cloud/spanner/BatchClientImpl.java | 51 ++++++++++++++++++- .../com/google/cloud/spanner/SpannerImpl.java | 24 ++++++++- .../cloud/spanner/BatchClientImplTest.java | 2 + 3 files changed, 74 insertions(+), 3 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java index b65b9b952af..f7866ea092f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java @@ -30,18 +30,53 @@ import com.google.spanner.v1.PartitionReadRequest; import com.google.spanner.v1.PartitionResponse; import com.google.spanner.v1.TransactionSelector; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; /** Default implementation for Batch Client interface. */ public class BatchClientImpl implements BatchClient { private final SessionClient sessionClient; + private final boolean isMultiplexedSessionEnabled; + /** Lock to protect the multiplexed session. */ + private final ReentrantLock multiplexedSessionLock = new ReentrantLock(); + + /** The duration before we try to replace the multiplexed session. The default is 7 days. */ + private final Duration sessionExpirationDuration; + + /** The expiration date/time of the current multiplexed session. */ + @GuardedBy("multiplexedSessionLock") + private final AtomicReference expirationDate; + + @GuardedBy("multiplexedSessionLock") + private final AtomicReference multiplexedSessionReference; + + private final Clock clock; + BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) { this.sessionClient = checkNotNull(sessionClient); this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled; + this.sessionExpirationDuration = + Duration.ofMillis( + sessionClient + .getSpanner() + .getOptions() + .getSessionPoolOptions() + .getMultiplexedSessionMaintenanceDuration() + .toMillis()); + // Initialize the expiration date to the start of time to avoid unnecessary null checks. + // This also ensured that a new session is created on first request. + this.expirationDate = new AtomicReference<>(Instant.MIN); + this.multiplexedSessionReference = new AtomicReference<>(); + clock = Clock.systemUTC(); } @Override @@ -54,7 +89,7 @@ public String getDatabaseRole() { public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) { SessionImpl session; if (isMultiplexedSessionEnabled) { - session = sessionClient.createMultiplexedSession(); + session = getMultiplexedSession(); } else { session = sessionClient.createSession(); } @@ -99,6 +134,20 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc batchTransactionId); } + private SessionImpl getMultiplexedSession() { + this.multiplexedSessionLock.lock(); + try { + if (this.clock.instant().isAfter(this.expirationDate.get()) + || this.multiplexedSessionReference.get() == null) { + this.multiplexedSessionReference.set(this.sessionClient.createMultiplexedSession()); + this.expirationDate.set(this.clock.instant().plus(this.sessionExpirationDuration)); + } + return this.multiplexedSessionReference.get(); + } finally { + this.multiplexedSessionLock.unlock(); + } + } + private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction implements BatchReadOnlyTransaction { private final String sessionName; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java index 976aba9af37..1ae9e5119ce 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java @@ -51,6 +51,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -107,6 +108,11 @@ private static String nextDatabaseClientId(DatabaseId databaseId) { @GuardedBy("this") private final Map dbClients = new HashMap<>(); + @GuardedBy("dbBatchClientLock") + private final Map dbBatchClients = new HashMap<>(); + + private final ReentrantLock dbBatchClientLock = new ReentrantLock(); + private final CloseableExecutorProvider asyncExecutorProvider; @GuardedBy("this") @@ -336,9 +342,23 @@ DatabaseClientImpl createDatabaseClient( @Override public BatchClient getBatchClient(DatabaseId db) { + if (getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()) { + this.dbBatchClientLock.lock(); + try { + if (this.dbBatchClients.containsKey(db)) { + return this.dbBatchClients.get(db); + } + BatchClientImpl batchClient = + new BatchClientImpl( + getSessionClient(db), /*useMultiplexedSessionPartitionedOps=*/ true); + this.dbBatchClients.put(db, batchClient); + return batchClient; + } finally { + this.dbBatchClientLock.unlock(); + } + } return new BatchClientImpl( - getSessionClient(db), - getOptions().getSessionPoolOptions().getUseMultiplexedSessionPartitionedOps()); + getSessionClient(db), /*useMultiplexedSessionPartitionedOps=*/ false); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java index 62884841a6a..84dd2913699 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java @@ -47,6 +47,7 @@ import org.mockito.Captor; import org.mockito.Mock; import org.mockito.Mockito; +import org.threeten.bp.Duration; /** Unit tests for {@link com.google.cloud.spanner.BatchClientImpl}. */ @RunWith(JUnit4.class) @@ -96,6 +97,7 @@ public void setUp() { when(sessionPoolOptions.getPoolMaintainerClock()).thenReturn(Clock.INSTANCE); when(sessionPoolOptions.getUseMultiplexedSessionPartitionedOps()) .thenReturn(isMultiplexedSession); + when(sessionPoolOptions.getMultiplexedSessionMaintenanceDuration()).thenReturn(Duration.ZERO); when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions); @SuppressWarnings("resource") SpannerImpl spanner = new SpannerImpl(gapicRpc, spannerOptions); From 263901f938fe077da68d53929ce074f0ed680faf Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Mon, 9 Dec 2024 14:58:33 +0530 Subject: [PATCH 10/11] feat(spanner): Removed env variable for Partitioned Ops ensuring that Multiplexed Session for Partitioned Ops is not available to customers. --- .../google/cloud/spanner/SessionPoolOptionsHelper.java | 8 ++++++++ .../java/com/google/cloud/spanner/SessionPoolOptions.java | 4 +++- .../com/google/cloud/spanner/BatchClientImplTest.java | 1 - 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java b/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java index 8e085947711..f19cb8f4a2f 100644 --- a/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java +++ b/google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java @@ -44,4 +44,12 @@ public static SessionPoolOptions.Builder setUseMultiplexedSessionForRW( SessionPoolOptions.Builder sessionPoolOptionsBuilder, boolean useMultiplexedSessionForRW) { return sessionPoolOptionsBuilder.setUseMultiplexedSessionForRW(useMultiplexedSessionForRW); } + + // TODO: Remove when multiplexed session for partitioned operations are released. + public static SessionPoolOptions.Builder setUseMultiplexedSessionForPartitionedOperations( + SessionPoolOptions.Builder sessionPoolOptionsBuilder, + boolean useMultiplexedSessionForPartitionedOps) { + return sessionPoolOptionsBuilder.setUseMultiplexedSessionPartitionedOps( + useMultiplexedSessionForPartitionedOps); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java index 584c0705d49..a691f14817f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java @@ -373,7 +373,9 @@ private static Boolean getUseMultiplexedSessionFromEnvVariable() { @VisibleForTesting @InternalApi protected static Boolean getUseMultiplexedSessionFromEnvVariablePartitionedOps() { - return parseBooleanEnvVariable("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"); + // Checks the value of env, GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS + // This returns null until Partitioned Operations is supported. + return null; } private static Boolean parseBooleanEnvVariable(String variableName) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java index bf485e2e12f..edafc7ddba9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchClientImplTest.java @@ -36,7 +36,6 @@ import com.google.spanner.v1.Session; import com.google.spanner.v1.Transaction; import io.opentelemetry.api.OpenTelemetry; - import java.time.Duration; import java.util.Collections; import java.util.Map; From def751b9c9c1541388098101fe7aa3b9784f80ca Mon Sep 17 00:00:00 2001 From: Pratick Chokhani Date: Mon, 9 Dec 2024 20:23:18 +0530 Subject: [PATCH 11/11] lint(spanner): Removed unused variables. --- .../com/google/cloud/spanner/BatchClientImpl.java | 13 +++---------- .../google/cloud/spanner/BatchTransactionId.java | 6 +----- .../cloud/spanner/BatchTransactionIdTest.java | 12 ++++++------ 3 files changed, 10 insertions(+), 21 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java index f7866ea092f..a250fd5ba39 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java @@ -59,8 +59,6 @@ public class BatchClientImpl implements BatchClient { @GuardedBy("multiplexedSessionLock") private final AtomicReference multiplexedSessionReference; - private final Clock clock; - BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) { this.sessionClient = checkNotNull(sessionClient); this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled; @@ -76,7 +74,6 @@ public class BatchClientImpl implements BatchClient { // This also ensured that a new session is created on first request. this.expirationDate = new AtomicReference<>(Instant.MIN); this.multiplexedSessionReference = new AtomicReference<>(); - clock = Clock.systemUTC(); } @Override @@ -137,10 +134,10 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc private SessionImpl getMultiplexedSession() { this.multiplexedSessionLock.lock(); try { - if (this.clock.instant().isAfter(this.expirationDate.get()) + if (Clock.systemUTC().instant().isAfter(this.expirationDate.get()) || this.multiplexedSessionReference.get() == null) { this.multiplexedSessionReference.set(this.sessionClient.createMultiplexedSession()); - this.expirationDate.set(this.clock.instant().plus(this.sessionExpirationDuration)); + this.expirationDate.set(Clock.systemUTC().instant().plus(this.sessionExpirationDuration)); } return this.multiplexedSessionReference.get(); } finally { @@ -151,14 +148,12 @@ private SessionImpl getMultiplexedSession() { private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction implements BatchReadOnlyTransaction { private final String sessionName; - private final boolean isMultiplexedSession; private final Map options; BatchReadOnlyTransactionImpl( MultiUseReadOnlyTransaction.Builder builder, TimestampBound bound) { super(builder.setTimestampBound(bound)); this.sessionName = session.getName(); - this.isMultiplexedSession = session.getIsMultiplexed(); this.options = session.getOptions(); initTransaction(); } @@ -168,13 +163,11 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa super(builder.setTransactionId(batchTransactionId.getTransactionId())); this.sessionName = session.getName(); this.options = session.getOptions(); - this.isMultiplexedSession = session.getIsMultiplexed(); } @Override public BatchTransactionId getBatchTransactionId() { - return new BatchTransactionId( - sessionName, getTransactionId(), getReadTimestamp(), session.getIsMultiplexed()); + return new BatchTransactionId(sessionName, getTransactionId(), getReadTimestamp()); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java index 2e2b4f89001..a5a02ac1360 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchTransactionId.java @@ -34,11 +34,7 @@ public class BatchTransactionId implements Serializable { private final Timestamp timestamp; private static final long serialVersionUID = 8067099123096783939L; - BatchTransactionId( - String sessionId, - ByteString transactionId, - Timestamp timestamp, - boolean isMultiplexedSession) { + BatchTransactionId(String sessionId, ByteString transactionId, Timestamp timestamp) { this.transactionId = Preconditions.checkNotNull(transactionId); this.sessionId = Preconditions.checkNotNull(sessionId); this.timestamp = Preconditions.checkNotNull(timestamp); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java index 10d1a28ed8a..81d4de41695 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BatchTransactionIdTest.java @@ -34,14 +34,14 @@ public void equalAndHashCode() { new EqualsTester() .addEqualityGroup( new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, false), + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE), new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, false)) + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE)) .addEqualityGroup( new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE, true), + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE), new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE, true)) + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MAX_VALUE)) .testEquals(); } @@ -49,9 +49,9 @@ public void equalAndHashCode() { public void serialization() { reserializeAndAssert( new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, false)); + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE)); reserializeAndAssert( new BatchTransactionId( - "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE, true)); + "testSession", ByteString.copyFromUtf8("testTxn"), Timestamp.MIN_VALUE)); } }