Skip to content

Commit

Permalink
feat: support multiplexed session for blind write with single use tra…
Browse files Browse the repository at this point in the history
…nsaction (#3229)

* feat(spanner): support multiplexed session for blind write with single use transaction.

* test(spanner): added test for the support of multiplexed session for blind writes (writeAtLeastOnce)

* chore(spanner): lint

* fix(spanner): updated the adoption for blind write into GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS.

* chore: simplify code to make it easier to reuse for later additions

* feat(spanner): added flag to control use of multiplexed session for blind write. This flag will be used by systest.

* lint(spanner): javadoc fixes.

---------

Co-authored-by: Knut Olav Løite <[email protected]>
  • Loading branch information
pratickchokhani and olavloite authored Sep 17, 2024
1 parent c54abd1 commit b3e2b0f
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -803,10 +803,13 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
.setTotalTimeout(rpcTimeout)
.build();

com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions =
SessionPoolOptionsHelper.setUseMultiplexedSession(
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession)
.build();
com.google.cloud.spanner.SessionPoolOptions.Builder poolOptionsBuilder =
com.google.cloud.spanner.SessionPoolOptions.newBuilder();
SessionPoolOptionsHelper.setUseMultiplexedSession(
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession);
SessionPoolOptionsHelper.setUseMultiplexedSessionBlindWrite(
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession);
com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions = poolOptionsBuilder.build();
// Cloud Spanner Client does not support global retry settings,
// Thus, we need to add retry settings to each individual stub.
SpannerOptions.Builder optionsBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ public static SessionPoolOptions.Builder setUseMultiplexedSession(
SessionPoolOptions.Builder sessionPoolOptionsBuilder, boolean useMultiplexedSession) {
return sessionPoolOptionsBuilder.setUseMultiplexedSession(useMultiplexedSession);
}

// TODO: Remove when multiplexed session for blind write is released.
public static SessionPoolOptions.Builder setUseMultiplexedSessionBlindWrite(
SessionPoolOptions.Builder sessionPoolOptionsBuilder,
boolean useMultiplexedSessionBlindWrite) {
return sessionPoolOptionsBuilder.setUseMultiplexedSessionBlindWrite(
useMultiplexedSessionBlindWrite);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,7 @@ public CommitResponse writeWithOptions(Iterable<Mutation> mutations, Transaction

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
throw new UnsupportedOperationException();
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,37 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;

final boolean useMultiplexedSessionBlindWrite;

@VisibleForTesting
DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) {
this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
this(
"",
pool,
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
tracer);
}

@VisibleForTesting
DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) {
this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
this(
clientId,
pool,
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
tracer);
}

DatabaseClientImpl(
String clientId,
SessionPool pool,
boolean useMultiplexedSessionBlindWrite,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
TraceWrapper tracer) {
this.clientId = clientId;
this.pool = pool;
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.tracer = tracer;
}
Expand All @@ -65,13 +79,21 @@ PooledSessionFuture getSession() {

@VisibleForTesting
DatabaseClient getMultiplexedSession() {
if (this.multiplexedSessionDatabaseClient != null
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported()) {
if (canUseMultiplexedSessions()) {
return this.multiplexedSessionDatabaseClient;
}
return pool.getMultiplexedSessionWithFallback();
}

private MultiplexedSessionDatabaseClient getMultiplexedSessionDatabaseClient() {
return canUseMultiplexedSessions() ? this.multiplexedSessionDatabaseClient : null;
}

private boolean canUseMultiplexedSessions() {
return this.multiplexedSessionDatabaseClient != null
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported();
}

@Override
public Dialect getDialect() {
return pool.getDialect();
Expand Down Expand Up @@ -114,6 +136,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient()
.writeAtLeastOnceWithOptions(mutations, options);
}
return runWithSessionRetry(
session -> session.writeAtLeastOnceWithOptions(mutations, options));
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;

/**
* Represents a delayed execution of a transaction on a multiplexed session. The execution is
Expand Down Expand Up @@ -119,4 +121,37 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.readOnlyTransaction(bound),
MoreExecutors.directExecutor()));
}

/**
* This is a blocking method, as the interface that it implements is also defined as a blocking
* method.
*/
@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
SessionReference sessionReference = getSessionReference();
try (MultiplexedSessionTransaction transaction =
new MultiplexedSessionTransaction(client, span, sessionReference, NO_CHANNEL_HINT, true)) {
return transaction.writeAtLeastOnceWithOptions(mutations, options);
}
}

/**
* Gets the session reference that this delayed transaction is waiting for. This method should
* only be called by methods that are allowed to be blocking.
*/
private SessionReference getSessionReference() {
try {
return this.sessionFuture.get();
} catch (ExecutionException executionException) {
// Propagate the underlying exception as a RuntimeException (SpannerException is also a
// RuntimeException).
if (executionException.getCause() instanceof RuntimeException) {
throw (RuntimeException) executionException.getCause();
}
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -107,6 +108,14 @@ void onReadDone() {
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
CommitResponse response = super.writeAtLeastOnceWithOptions(mutations, options);
onTransactionDone();
return response;
}

@Override
void onTransactionDone() {
boolean markedDone = false;
Expand Down Expand Up @@ -358,6 +367,13 @@ private int getSingleUseChannelHint() {
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
return createMultiplexedSessionTransaction(true)
.writeAtLeastOnceWithOptions(mutations, options);
}

@Override
public ReadContext singleUse() {
return createMultiplexedSessionTransaction(true).singleUse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public class SessionPoolOptions {

private final boolean useMultiplexedSession;

/**
* Controls whether multiplexed session is enabled for blind write or not. This is only used for
* systest soak. TODO: Remove when multiplexed session for blind write is released.
*/
private final boolean useMultiplexedSessionBlindWrite;

private final boolean useMultiplexedSessionForRW;

// TODO: Change to use java.time.Duration.
Expand Down Expand Up @@ -110,6 +116,7 @@ private SessionPoolOptions(Builder builder) {
(useMultiplexedSessionFromEnvVariable != null)
? useMultiplexedSessionFromEnvVariable
: builder.useMultiplexedSession;
this.useMultiplexedSessionBlindWrite = builder.useMultiplexedSessionBlindWrite;
// useMultiplexedSessionForRW priority => Environment var > private setter > client default
Boolean useMultiplexedSessionForRWFromEnvVariable =
getUseMultiplexedSessionForRWFromEnvVariable();
Expand Down Expand Up @@ -184,6 +191,7 @@ public int hashCode() {
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock,
this.useMultiplexedSession,
this.useMultiplexedSessionBlindWrite,
this.useMultiplexedSessionForRW,
this.multiplexedSessionMaintenanceDuration);
}
Expand Down Expand Up @@ -318,6 +326,12 @@ public boolean getUseMultiplexedSession() {
return useMultiplexedSession;
}

@VisibleForTesting
@InternalApi
protected boolean getUseMultiplexedSessionBlindWrite() {
return getUseMultiplexedSession() && useMultiplexedSessionBlindWrite;
}

@VisibleForTesting
@InternalApi
public boolean getUseMultiplexedSessionForRW() {
Expand Down Expand Up @@ -554,6 +568,9 @@ public static class Builder {
// Set useMultiplexedSession to true to make multiplexed session the default.
private boolean useMultiplexedSession = false;

// TODO: Remove when multiplexed session for blind write is released.
private boolean useMultiplexedSessionBlindWrite = false;

// This field controls the default behavior of session management for RW operations in Java
// client.
// Set useMultiplexedSessionForRW to true to make multiplexed session for RW operations the
Expand Down Expand Up @@ -601,6 +618,7 @@ private Builder(SessionPoolOptions options) {
this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold;
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
this.useMultiplexedSession = options.useMultiplexedSession;
this.useMultiplexedSessionBlindWrite = options.useMultiplexedSessionBlindWrite;
this.useMultiplexedSessionForRW = options.useMultiplexedSessionForRW;
this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration;
this.poolMaintainerClock = options.poolMaintainerClock;
Expand Down Expand Up @@ -789,6 +807,17 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) {
return this;
}

/**
* This method enables multiplexed sessions for blind writes. This method will be removed in the
* future when multiplexed sessions has been made the default for all operations.
*/
@InternalApi
@VisibleForTesting
Builder setUseMultiplexedSessionBlindWrite(boolean useMultiplexedSessionBlindWrite) {
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
return this;
}

/**
* Sets whether the client should use multiplexed session for R/W operations or not. This method
* is intentionally package-private and intended for internal use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,11 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
numMultiplexedSessionsReleased);
pool.maybeWaitOnMinSessions();
DatabaseClientImpl dbClient =
createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient);
createDatabaseClient(
clientId,
pool,
getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(),
multiplexedSessionDatabaseClient);
dbClients.put(db, dbClient);
return dbClient;
}
Expand All @@ -314,8 +318,10 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
DatabaseClientImpl createDatabaseClient(
String clientId,
SessionPool pool,
boolean useMultiplexedSessionBlindWrite,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient) {
return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, tracer);
return new DatabaseClientImpl(
clientId, pool, useMultiplexedSessionBlindWrite, multiplexedSessionClient, tracer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl {

@Override
DatabaseClientImpl createDatabaseClient(
String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore) {
String clientId,
SessionPool pool,
boolean useMultiplexedSessionBlindWriteIgnore,
MultiplexedSessionDatabaseClient ignore) {
return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer);
}
}
Expand Down
Loading

0 comments on commit b3e2b0f

Please sign in to comment.