Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multiplexed session for blind write with single use transaction #3229

Merged
merged 10 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,7 @@ public CommitResponse writeWithOptions(Iterable<Mutation> mutations, Transaction

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
throw new UnsupportedOperationException();
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,21 @@ PooledSessionFuture getSession() {

@VisibleForTesting
DatabaseClient getMultiplexedSession() {
if (this.multiplexedSessionDatabaseClient != null
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported()) {
if (canUseMultiplexedSessions()) {
return this.multiplexedSessionDatabaseClient;
}
return pool.getMultiplexedSessionWithFallback();
}

private MultiplexedSessionDatabaseClient getMultiplexedSessionDatabaseClient() {
return canUseMultiplexedSessions() ? this.multiplexedSessionDatabaseClient : null;
}

private boolean canUseMultiplexedSessions() {
return this.multiplexedSessionDatabaseClient != null
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported();
}

@Override
public Dialect getDialect() {
return pool.getDialect();
Expand Down Expand Up @@ -114,6 +122,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
if (getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient()
.writeAtLeastOnceWithOptions(mutations, options);
}
return runWithSessionRetry(
session -> session.writeAtLeastOnceWithOptions(mutations, options));
} catch (RuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;

/**
* Represents a delayed execution of a transaction on a multiplexed session. The execution is
Expand Down Expand Up @@ -119,4 +121,37 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
.readOnlyTransaction(bound),
MoreExecutors.directExecutor()));
}

/**
* This is a blocking method, as the interface that it implements is also defined as a blocking
* method.
*/
@Override
public CommitResponse writeAtLeastOnceWithOptions(
olavloite marked this conversation as resolved.
Show resolved Hide resolved
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
SessionReference sessionReference = getSessionReference();
try (MultiplexedSessionTransaction transaction =
new MultiplexedSessionTransaction(client, span, sessionReference, NO_CHANNEL_HINT, true)) {
return transaction.writeAtLeastOnceWithOptions(mutations, options);
}
}

/**
* Gets the session reference that this delayed transaction is waiting for. This method should
* only be called by methods that are allowed to be blocking.
*/
private SessionReference getSessionReference() {
try {
return this.sessionFuture.get();
} catch (ExecutionException executionException) {
// Propagate the underlying exception as a RuntimeException (SpannerException is also a
// RuntimeException).
if (executionException.getCause() instanceof RuntimeException) {
throw (RuntimeException) executionException.getCause();
}
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionClient.SessionConsumer;
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -107,6 +108,14 @@ void onReadDone() {
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
CommitResponse response = super.writeAtLeastOnceWithOptions(mutations, options);
onTransactionDone();
return response;
}

@Override
void onTransactionDone() {
boolean markedDone = false;
Expand Down Expand Up @@ -358,6 +367,13 @@ private int getSingleUseChannelHint() {
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
return createMultiplexedSessionTransaction(true)
.writeAtLeastOnceWithOptions(mutations, options);
}

@Override
public ReadContext singleUse() {
return createMultiplexedSessionTransaction(true).singleUse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
Expand All @@ -24,15 +25,21 @@
import static org.junit.Assert.assertTrue;

import com.google.cloud.NoCredentials;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.connection.RandomResultSetGenerator;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.RequestOptions.Priority;
import com.google.spanner.v1.Session;
import io.grpc.Status;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -309,6 +316,156 @@ public void testMaintainerInvalidatesMultiplexedSessionClientIfUnimplemented() {
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

@Test
public void testWriteAtLeastOnceAborted() {
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
// Force the Commit RPC to return Aborted the first time it is called. The exception is cleared
// after the first call, so the retry should succeed.
mockSpanner.setCommitExecutionTime(
SimulatedExecutionTime.ofException(
mockSpanner.createAbortedException(ByteString.copyFromUtf8("test"))));
Timestamp timestamp =
client.writeAtLeastOnce(
Collections.singletonList(
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()));
assertNotNull(timestamp);

List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertEquals(2, commitRequests.size());
olavloite marked this conversation as resolved.
Show resolved Hide resolved
for (CommitRequest request : commitRequests) {
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
}

assertNotNull(client.multiplexedSessionDatabaseClient);
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

@Test
public void testWriteAtLeastOnce() {
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
Timestamp timestamp =
client.writeAtLeastOnce(
Collections.singletonList(
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()));
assertNotNull(timestamp);

List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(commitRequests).hasSize(1);
CommitRequest commit = commitRequests.get(0);
assertNotNull(commit.getSingleUseTransaction());
assertTrue(commit.getSingleUseTransaction().hasReadWrite());
assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams());
assertNotNull(commit.getRequestOptions());
assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority());
assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed());

assertNotNull(client.multiplexedSessionDatabaseClient);
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

@Test
public void testWriteAtLeastOnceWithCommitStats() {
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
CommitResponse response =
client.writeAtLeastOnceWithOptions(
Collections.singletonList(
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()),
Options.commitStats());
assertNotNull(response);
assertNotNull(response.getCommitTimestamp());
assertNotNull(response.getCommitStats());

List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(commitRequests).hasSize(1);
CommitRequest commit = commitRequests.get(0);
assertNotNull(commit.getSingleUseTransaction());
assertTrue(commit.getSingleUseTransaction().hasReadWrite());
assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams());
assertNotNull(commit.getRequestOptions());
assertEquals(Priority.PRIORITY_UNSPECIFIED, commit.getRequestOptions().getPriority());
assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed());

assertNotNull(client.multiplexedSessionDatabaseClient);
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

@Test
public void testWriteAtLeastOnceWithOptions() {
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
client.writeAtLeastOnceWithOptions(
Collections.singletonList(
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()),
Options.priority(RpcPriority.LOW));

List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(commitRequests).hasSize(1);
CommitRequest commit = commitRequests.get(0);
assertNotNull(commit.getSingleUseTransaction());
assertTrue(commit.getSingleUseTransaction().hasReadWrite());
assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams());
assertNotNull(commit.getRequestOptions());
assertEquals(Priority.PRIORITY_LOW, commit.getRequestOptions().getPriority());
assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed());

assertNotNull(client.multiplexedSessionDatabaseClient);
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

@Test
public void testWriteAtLeastOnceWithTagOptions() {
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
client.writeAtLeastOnceWithOptions(
Collections.singletonList(
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()),
Options.tag("app=spanner,env=test"));

List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(commitRequests).hasSize(1);
CommitRequest commit = commitRequests.get(0);
assertNotNull(commit.getSingleUseTransaction());
assertTrue(commit.getSingleUseTransaction().hasReadWrite());
assertFalse(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams());
assertNotNull(commit.getRequestOptions());
assertThat(commit.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test");
assertThat(commit.getRequestOptions().getRequestTag()).isEmpty();
assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed());

assertNotNull(client.multiplexedSessionDatabaseClient);
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

@Test
public void testWriteAtLeastOnceWithExcludeTxnFromChangeStreams() {
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
client.writeAtLeastOnceWithOptions(
Collections.singletonList(
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()),
Options.excludeTxnFromChangeStreams());

List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(commitRequests).hasSize(1);
CommitRequest commit = commitRequests.get(0);
assertNotNull(commit.getSingleUseTransaction());
assertTrue(commit.getSingleUseTransaction().hasReadWrite());
assertTrue(commit.getSingleUseTransaction().getExcludeTxnFromChangeStreams());
assertTrue(mockSpanner.getSession(commit.getSession()).getMultiplexed());

assertNotNull(client.multiplexedSessionDatabaseClient);
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
assertNotNull(client.multiplexedSessionDatabaseClient);
SessionReference sessionReference =
Expand Down
Loading