Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multiplexed session for blind write with single use transaction #3229

Merged
merged 10 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -803,10 +803,13 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
.setTotalTimeout(rpcTimeout)
.build();

com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions =
SessionPoolOptionsHelper.setUseMultiplexedSession(
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession)
.build();
com.google.cloud.spanner.SessionPoolOptions.Builder poolOptionsBuilder =
com.google.cloud.spanner.SessionPoolOptions.newBuilder();
SessionPoolOptionsHelper.setUseMultiplexedSession(
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession);
SessionPoolOptionsHelper.setUseMultiplexedSessionBlindWrite(
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession);
com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions = poolOptionsBuilder.build();
// Cloud Spanner Client does not support global retry settings,
// Thus, we need to add retry settings to each individual stub.
SpannerOptions.Builder optionsBuilder =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ public static SessionPoolOptions.Builder setUseMultiplexedSession(
SessionPoolOptions.Builder sessionPoolOptionsBuilder, boolean useMultiplexedSession) {
return sessionPoolOptionsBuilder.setUseMultiplexedSession(useMultiplexedSession);
}

// TODO: Remove when Builder.setUseMultiplexedSession(..) has been made public.
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
public static SessionPoolOptions.Builder setUseMultiplexedSessionBlindWrite(
SessionPoolOptions.Builder sessionPoolOptionsBuilder,
boolean useMultiplexedSessionBlindWrite) {
return sessionPoolOptionsBuilder.setUseMultiplexedSessionBlindWrite(
useMultiplexedSessionBlindWrite);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,37 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;

final boolean useMultiplexedSessionBlindWrite;

@VisibleForTesting
DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) {
this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
this(
"",
pool,
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
tracer);
}

@VisibleForTesting
DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) {
this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
this(
clientId,
pool,
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
tracer);
}

DatabaseClientImpl(
String clientId,
SessionPool pool,
boolean useMultiplexedSessionBlindWrite,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
TraceWrapper tracer) {
this.clientId = clientId;
this.pool = pool;
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.tracer = tracer;
}
Expand Down Expand Up @@ -122,7 +136,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
if (getMultiplexedSessionDatabaseClient() != null) {
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient()
.writeAtLeastOnceWithOptions(mutations, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public class SessionPoolOptions {

private final boolean useMultiplexedSession;

/**
* Controls whether multiplexed session is enabled for blind write or not. This is only used for
* systest soak. Should be removed once released.
*/
private final boolean useMultiplexedSessionBlindWrite;
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved

private final boolean useMultiplexedSessionForRW;

// TODO: Change to use java.time.Duration.
Expand Down Expand Up @@ -110,6 +116,7 @@ private SessionPoolOptions(Builder builder) {
(useMultiplexedSessionFromEnvVariable != null)
? useMultiplexedSessionFromEnvVariable
: builder.useMultiplexedSession;
this.useMultiplexedSessionBlindWrite = builder.useMultiplexedSessionBlindWrite;
// useMultiplexedSessionForRW priority => Environment var > private setter > client default
Boolean useMultiplexedSessionForRWFromEnvVariable =
getUseMultiplexedSessionForRWFromEnvVariable();
Expand Down Expand Up @@ -184,6 +191,7 @@ public int hashCode() {
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock,
this.useMultiplexedSession,
this.useMultiplexedSessionBlindWrite,
this.useMultiplexedSessionForRW,
this.multiplexedSessionMaintenanceDuration);
}
Expand Down Expand Up @@ -318,6 +326,12 @@ public boolean getUseMultiplexedSession() {
return useMultiplexedSession;
}

@VisibleForTesting
@InternalApi
public boolean getUseMultiplexedSessionBlindWrite() {
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
return getUseMultiplexedSession() && useMultiplexedSessionBlindWrite;
}

@VisibleForTesting
@InternalApi
public boolean getUseMultiplexedSessionForRW() {
Expand Down Expand Up @@ -554,6 +568,11 @@ public static class Builder {
// Set useMultiplexedSession to true to make multiplexed session the default.
private boolean useMultiplexedSession = false;

// This field controls the default behavior of session management in Java client.
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
// Set useMultiplexedSessionBlindWrite to true to make multiplexed session the default for blind
// write.
private boolean useMultiplexedSessionBlindWrite = false;

// This field controls the default behavior of session management for RW operations in Java
// client.
// Set useMultiplexedSessionForRW to true to make multiplexed session for RW operations the
Expand Down Expand Up @@ -601,6 +620,7 @@ private Builder(SessionPoolOptions options) {
this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold;
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
this.useMultiplexedSession = options.useMultiplexedSession;
this.useMultiplexedSessionBlindWrite = options.useMultiplexedSessionBlindWrite;
this.useMultiplexedSessionForRW = options.useMultiplexedSessionForRW;
this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration;
this.poolMaintainerClock = options.poolMaintainerClock;
Expand Down Expand Up @@ -789,6 +809,22 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) {
return this;
}

/**
* Sets whether the client should use multiplexed session for blind write or not. If set to
* true, the client optimises and runs multiple applicable requests concurrently on a single
* session. A single multiplexed session is sufficient to handle all concurrent traffic.
*
* <p>When set to false, the client uses the regular session cached in the session pool for
* running 1 concurrent transaction per session. We require to provision sufficient sessions by
* making use of {@link SessionPoolOptions#minSessions} and {@link
* SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in
* higher latencies.
*/
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
Builder setUseMultiplexedSessionBlindWrite(boolean useMultiplexedSessionBlindWrite) {
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
return this;
}

/**
* Sets whether the client should use multiplexed session for R/W operations or not. This method
* is intentionally package-private and intended for internal use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,11 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
numMultiplexedSessionsReleased);
pool.maybeWaitOnMinSessions();
DatabaseClientImpl dbClient =
createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient);
createDatabaseClient(
clientId,
pool,
getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(),
multiplexedSessionDatabaseClient);
dbClients.put(db, dbClient);
return dbClient;
}
Expand All @@ -314,8 +318,10 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
DatabaseClientImpl createDatabaseClient(
String clientId,
SessionPool pool,
boolean useMultiplexedSessionBlindWrite,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient) {
return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, tracer);
return new DatabaseClientImpl(
clientId, pool, useMultiplexedSessionBlindWrite, multiplexedSessionClient, tracer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl {

@Override
DatabaseClientImpl createDatabaseClient(
String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore) {
String clientId,
SessionPool pool,
boolean useMultiplexedSessionBlindWriteIgnore,
MultiplexedSessionDatabaseClient ignore) {
return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void createSpannerInstance() {
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setUseMultiplexedSession(true)
.setUseMultiplexedSessionBlindWrite(true)
// Set the maintainer to loop once every 1ms
.setMultiplexedSessionMaintenanceLoopFrequency(Duration.ofMillis(1L))
// Set multiplexed sessions to be replaced once every 1ms
Expand Down
Loading