Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): support multiplexed session for Partitioned operations #3231

Merged
merged 20 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
f579642
feat(spanner): support multiplexed session for Partitioned read or qu…
pratickchokhani Aug 1, 2024
8c3bd2f
chore(spanner): lint fixes
pratickchokhani Aug 1, 2024
b208044
feat(spanner): support multiplexed session for Partitioned DML operat…
pratickchokhani Aug 2, 2024
e36a6bb
Merge branch 'main' of https://github.com/googleapis/java-spanner int…
pratickchokhani Nov 22, 2024
938ef53
lint(spanner): javadoc fixes.
pratickchokhani Nov 25, 2024
df8b4c0
feat(spanner): Updated unit tests of Partitioned operations for Multi…
pratickchokhani Nov 25, 2024
af625f4
feat(spanner): Updated unit tests of Partitioned operations for Multi…
pratickchokhani Nov 25, 2024
9b99037
Merge branch 'partitioned-query' of github.com:pratickchokhani/java-s…
pratickchokhani Nov 25, 2024
8bde6f8
Merge branch 'partitioned-query' of github.com:pratickchokhani/java-s…
pratickchokhani Nov 25, 2024
44f1105
Merge branch 'partitioned-query' of github.com:pratickchokhani/java-s…
pratickchokhani Nov 25, 2024
00a2578
lint(spanner): Apply suggestions from code review
pratickchokhani Nov 27, 2024
13b94e3
lint(spanner): Apply suggestions from code review
pratickchokhani Nov 27, 2024
8c85310
Merge branch 'partitioned-query' of github.com:pratickchokhani/java-s…
pratickchokhani Nov 27, 2024
36fd5d5
Merge branch 'partitioned-query' of github.com:pratickchokhani/java-s…
pratickchokhani Nov 27, 2024
61de125
Merge branch 'partitioned-query' of github.com:pratickchokhani/java-s…
pratickchokhani Dec 5, 2024
3a66903
Merge branch 'main' of https://github.com/googleapis/java-spanner int…
pratickchokhani Dec 5, 2024
f4271cc
feat(spanner): Modified BatchClientImpl to store multiplexed session …
pratickchokhani Dec 9, 2024
2db1cc6
Merge branch 'main' of https://github.com/googleapis/java-spanner int…
pratickchokhani Dec 9, 2024
263901f
feat(spanner): Removed env variable for Partitioned Ops ensuring that…
pratickchokhani Dec 9, 2024
def751b
lint(spanner): Removed unused variables.
pratickchokhani Dec 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ jobs:
env:
JOB_TYPE: test
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: true
units-java8:
# Building using Java 17 and run the tests with Java 8 runtime
name: "units (8)"
Expand Down Expand Up @@ -92,6 +93,7 @@ jobs:
env:
JOB_TYPE: test
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS: true
GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS: true
windows:
runs-on: windows-latest
steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,8 @@ env_vars: {
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS"
value: "true"
}

env_vars: {
key: "GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS_PARTITIONED_OPS"
value: "true"
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;

/**
Expand Down Expand Up @@ -51,9 +50,4 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
/** Default implementation for Batch Client interface. */
public class BatchClientImpl implements BatchClient {
private final SessionClient sessionClient;
private final boolean isMultiplexedSessionEnabled;

BatchClientImpl(SessionClient sessionClient) {
BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) {
this.sessionClient = checkNotNull(sessionClient);
this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled;
}

@Override
Expand All @@ -50,7 +52,12 @@ public String getDatabaseRole() {

@Override
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
SessionImpl session = sessionClient.createSession();
SessionImpl session;
if (isMultiplexedSessionEnabled) {
session = sessionClient.createMultiplexedSession();
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
} else {
session = sessionClient.createSession();
}
return new BatchReadOnlyTransactionImpl(
MultiUseReadOnlyTransaction.newBuilder()
.setSession(session)
Expand All @@ -72,7 +79,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
@Override
public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batchTransactionId) {
SessionImpl session =
sessionClient.sessionWithId(checkNotNull(batchTransactionId).getSessionId());
sessionClient.sessionWithId(
checkNotNull(batchTransactionId).getSessionId(),
batchTransactionId.isMultiplexedSession());
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
return new BatchReadOnlyTransactionImpl(
MultiUseReadOnlyTransaction.newBuilder()
.setSession(session)
Expand All @@ -95,12 +104,14 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction
implements BatchReadOnlyTransaction {
private final String sessionName;
private final boolean isMultiplexedSession;
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
private final Map<SpannerRpc.Option, ?> options;

BatchReadOnlyTransactionImpl(
MultiUseReadOnlyTransaction.Builder builder, TimestampBound bound) {
super(builder.setTimestampBound(bound));
this.sessionName = session.getName();
this.isMultiplexedSession = session.getIsMultiplexed();
this.options = session.getOptions();
initTransaction();
}
Expand All @@ -110,11 +121,13 @@ private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransa
super(builder.setTransactionId(batchTransactionId.getTransactionId()));
this.sessionName = session.getName();
this.options = session.getOptions();
this.isMultiplexedSession = session.getIsMultiplexed();
}

@Override
public BatchTransactionId getBatchTransactionId() {
return new BatchTransactionId(sessionName, getTransactionId(), getReadTimestamp());
return new BatchTransactionId(
sessionName, getTransactionId(), getReadTimestamp(), session.getIsMultiplexed());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@ public class BatchTransactionId implements Serializable {
private final ByteString transactionId;
private final String sessionId;
private final Timestamp timestamp;
private final boolean isMultiplexedSession;
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
private static final long serialVersionUID = 8067099123096783939L;

BatchTransactionId(String sessionId, ByteString transactionId, Timestamp timestamp) {
BatchTransactionId(
String sessionId,
ByteString transactionId,
Timestamp timestamp,
boolean isMultiplexedSession) {
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
this.transactionId = Preconditions.checkNotNull(transactionId);
this.sessionId = Preconditions.checkNotNull(sessionId);
this.timestamp = Preconditions.checkNotNull(timestamp);
this.isMultiplexedSession = isMultiplexedSession;
}

ByteString getTransactionId() {
Expand All @@ -52,11 +58,15 @@ Timestamp getTimestamp() {
return timestamp;
}

public boolean isMultiplexedSession() {
return isMultiplexedSession;
}

@Override
public String toString() {
return String.format(
"transactionId: %s, sessionId: %s, timestamp: %s",
transactionId.toStringUtf8(), sessionId, timestamp);
"transactionId: %s, sessionId: %s, timestamp: %s, isMultiplexedSession: %s",
transactionId.toStringUtf8(), sessionId, timestamp, isMultiplexedSession);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class DatabaseClientImpl implements DatabaseClient {
@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
@VisibleForTesting final boolean useMultiplexedSessionForRW;

final boolean useMultiplexedSessionBlindWrite;
Expand All @@ -47,6 +48,7 @@ class DatabaseClientImpl implements DatabaseClient {
pool,
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
/* useMultiplexedSessionPartitionedOps= */ false,
tracer,
/* useMultiplexedSessionForRW = */ false);
}
Expand All @@ -58,6 +60,7 @@ class DatabaseClientImpl implements DatabaseClient {
pool,
/* useMultiplexedSessionBlindWrite = */ false,
/* multiplexedSessionDatabaseClient = */ null,
/* useMultiplexedSessionPartitionedOps= */ false,
tracer,
/* useMultiplexedSessionForRW = */ false);
}
Expand All @@ -67,12 +70,14 @@ class DatabaseClientImpl implements DatabaseClient {
SessionPool pool,
boolean useMultiplexedSessionBlindWrite,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
boolean useMultiplexedSessionPartitionedOps,
TraceWrapper tracer,
boolean useMultiplexedSessionForRW) {
this.clientId = clientId;
this.pool = pool;
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.useMultiplexedSessionPartitionedOps = useMultiplexedSessionPartitionedOps;
this.tracer = tracer;
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
}
Expand Down Expand Up @@ -309,6 +314,14 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti

@Override
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
if (useMultiplexedSessionPartitionedOps) {
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
}
return executePartitionedUpdateSession(stmt, options);
}

private long executePartitionedUpdateSession(
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
final Statement stmt, final UpdateOption... options) {
ISpan span = tracer.spanBuilder(PARTITION_DML_TRANSACTION);
try (IScope s = tracer.withSpan(span)) {
return runWithSessionRetry(session -> session.executePartitionedUpdate(stmt, options));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -224,4 +225,22 @@ private SessionReference getSessionReference() {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
}
}

/**
* Execute `stmt` within PARTITIONED_DML transaction using multiplexed session. This method is a
* blocking call as the interface expects to return the output of the `stmt`.
*/
@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
try {
SessionReference sessionReference = this.sessionFuture.get();
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
return new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, true)
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
.executePartitionedUpdate(stmt, options);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -553,6 +554,11 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti
.transactionManagerAsync(options);
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
return createMultiplexedSessionTransaction(true).executePartitionedUpdate(stmt, options);
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* It is enough with one executor to maintain the multiplexed sessions in all the clients, as they
* do not need to be updated often, and the maintenance task is light. The core pool size is set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,19 @@ SessionImpl createSession() {
* @param consumer The {@link SessionConsumer} to use for callbacks when sessions are available.
*/
void createMultiplexedSession(SessionConsumer consumer) {
try {
SessionImpl sessionImpl = createMultiplexedSession();
consumer.onSessionReady(sessionImpl);
} catch (Throwable t) {
consumer.onSessionCreateFailure(t, 1);
}
}

/**
* Create a multiplexed session and returns it. A multiplexed session is not affiliated with any
* GRPC channel. In case of an error on the gRPC calls, the error will be returned.
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
*/
SessionImpl createMultiplexedSession() {
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
Expand All @@ -253,10 +266,12 @@ void createMultiplexedSession(SessionConsumer consumer) {
spanner,
new SessionReference(
session.getName(), session.getCreateTime(), session.getMultiplexed(), null));
consumer.onSessionReady(sessionImpl);
span.addAnnotation(
String.format("Request for %d multiplexed session returned %d session", 1, 1));
return sessionImpl;
} catch (Throwable t) {
span.setStatus(t);
consumer.onSessionCreateFailure(t, 1);
throw t;
} finally {
span.end();
}
Expand Down Expand Up @@ -289,31 +304,7 @@ private CreateMultiplexedSessionsRunnable(SessionConsumer consumer) {

@Override
public void run() {
ISpan span = spanner.getTracer().spanBuilder(SpannerImpl.CREATE_MULTIPLEXED_SESSION);
try (IScope s = spanner.getTracer().withSpan(span)) {
com.google.spanner.v1.Session session =
spanner
.getRpc()
.createSession(
db.getName(),
spanner.getOptions().getDatabaseRole(),
spanner.getOptions().getSessionLabels(),
null,
true);
SessionImpl sessionImpl =
new SessionImpl(
spanner,
new SessionReference(
session.getName(), session.getCreateTime(), session.getMultiplexed(), null));
span.addAnnotation(
String.format("Request for %d multiplexed session returned %d session", 1, 1));
consumer.onSessionReady(sessionImpl);
} catch (Throwable t) {
span.setStatus(t);
consumer.onSessionCreateFailure(t, 1);
} finally {
span.end();
}
createMultiplexedSession(consumer);
}
}

Expand Down Expand Up @@ -423,10 +414,16 @@ private List<SessionImpl> internalBatchCreateSessions(

/** Returns a {@link SessionImpl} that references the existing session with the given name. */
SessionImpl sessionWithId(String name) {
return sessionWithId(name, /*isMultiplexedSession= */ false);
}

/** Returns a {@link SessionImpl} that references the existing session with the given name. */
SessionImpl sessionWithId(String name, boolean isMultiplexedSession) {
pratickchokhani marked this conversation as resolved.
Show resolved Hide resolved
final Map<SpannerRpc.Option, ?> options;
synchronized (this) {
options = optionMap(SessionOption.channelHint(sessionChannelCounter++));
}
return new SessionImpl(spanner, new SessionReference(name, options));
return new SessionImpl(
spanner, new SessionReference(name, /*createTime= */ null, isMultiplexedSession, options));
}
}
Loading
Loading