Skip to content

Commit

Permalink
chore(spanner): handle commit retry protocol extension for mux rw (#3449
Browse files Browse the repository at this point in the history
)

In a read-write transaction using a multiplexed session with (read/query + mutation) operations, the CommitResponse from the backend during the commit RPC may include a `MultiplexedSessionRetry` field (indicated by a precommit token). This field signals that the commit RPC should be retried once using the new precommit token. During this retry, mutations should not be resent, as they were already buffered in spanFE during the initial commit RPC call.
  • Loading branch information
harshachinta authored Nov 11, 2024
1 parent 28e67f9 commit 015bd98
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,9 @@ ApiFuture<CommitResponse> commitAsync() {
}
builder.addAllMutations(mutationsProto);
finishOps.addListener(
new CommitRunnable(res, finishOps, builder), MoreExecutors.directExecutor());
new CommitRunnable(
res, finishOps, builder, /* retryAttemptDueToCommitProtocolExtension = */ false),
MoreExecutors.directExecutor());
return res;
}

Expand All @@ -418,14 +420,17 @@ private final class CommitRunnable implements Runnable {
private final SettableApiFuture<CommitResponse> res;
private final ApiFuture<Void> prev;
private final CommitRequest.Builder requestBuilder;
private final boolean retryAttemptDueToCommitProtocolExtension;

CommitRunnable(
SettableApiFuture<CommitResponse> res,
ApiFuture<Void> prev,
CommitRequest.Builder requestBuilder) {
CommitRequest.Builder requestBuilder,
boolean retryAttemptDueToCommitProtocolExtension) {
this.res = res;
this.prev = prev;
this.requestBuilder = requestBuilder;
this.retryAttemptDueToCommitProtocolExtension = retryAttemptDueToCommitProtocolExtension;
}

@Override
Expand Down Expand Up @@ -459,6 +464,13 @@ public void run() {
// Set the precommit token in the CommitRequest for multiplexed sessions.
requestBuilder.setPrecommitToken(getLatestPrecommitToken());
}
if (retryAttemptDueToCommitProtocolExtension) {
// When a retry occurs due to the commit protocol extension, clear all mutations because
// they were already buffered in SpanFE during the previous attempt.
requestBuilder.clearMutations();
span.addAnnotation(
"Retrying commit operation with a new precommit token obtained from the previous CommitResponse");
}
final CommitRequest commitRequest = requestBuilder.build();
span.addAnnotation("Starting Commit");
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
Expand All @@ -479,6 +491,29 @@ public void run() {
return;
}
com.google.spanner.v1.CommitResponse proto = commitFuture.get();

// If the CommitResponse includes a precommit token, the client will retry the
// commit RPC once with the new token and clear any existing mutations.
// This case is applicable only when the read-write transaction uses multiplexed
// session.
if (proto.hasPrecommitToken() && !retryAttemptDueToCommitProtocolExtension) {
// track the latest pre commit token
onPrecommitToken(proto.getPrecommitToken());
span.addAnnotation(
"Commit operation will be retried with new precommit token as the CommitResponse includes a MultiplexedSessionRetry field");
opSpan.end();

// Retry the commit RPC with the latest precommit token from CommitResponse.
new CommitRunnable(
res,
prev,
requestBuilder,
/* retryAttemptDueToCommitProtocolExtension = */ true)
.run();

// Exit to prevent further processing in this attempt.
return;
}
if (!proto.hasCommitTimestamp()) {
throw newSpannerException(
ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ private static void checkStreamException(
private ConcurrentMap<ByteString, Boolean> isPartitionedDmlTransaction =
new ConcurrentHashMap<>();
private ConcurrentMap<ByteString, Boolean> abortedTransactions = new ConcurrentHashMap<>();
private ConcurrentMap<ByteString, Boolean> commitRetryTransactions = new ConcurrentHashMap<>();
private final AtomicBoolean abortNextTransaction = new AtomicBoolean();
private final AtomicBoolean abortNextStatement = new AtomicBoolean();
private final AtomicBoolean ignoreNextInlineBeginRequest = new AtomicBoolean();
Expand Down Expand Up @@ -2045,15 +2046,23 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
return;
}
simulateAbort(session, request.getTransactionId());
commitTransaction(transaction.getId());
CommitResponse.Builder responseBuilder =
CommitResponse.newBuilder().setCommitTimestamp(getCurrentGoogleTimestamp());
if (request.getReturnCommitStats()) {
responseBuilder.setCommitStats(
com.google.spanner.v1.CommitResponse.CommitStats.newBuilder()
// This is not really always equal, but at least it returns a value.
.setMutationCount(request.getMutationsCount())
.build());
CommitResponse.Builder responseBuilder = CommitResponse.newBuilder();
Optional<Boolean> commitRetry =
Optional.fromNullable(commitRetryTransactions.get(request.getTransactionId()));
if (commitRetry.or(Boolean.FALSE) && session.getMultiplexed()) {
responseBuilder.setPrecommitToken(
getCommitResponsePrecommitToken(request.getTransactionId()));
commitRetryTransactions.remove(request.getTransactionId());
} else {
commitTransaction(transaction.getId());
responseBuilder.setCommitTimestamp(getCurrentGoogleTimestamp());
if (request.getReturnCommitStats()) {
responseBuilder.setCommitStats(
com.google.spanner.v1.CommitResponse.CommitStats.newBuilder()
// This is not really always equal, but at least it returns a value.
.setMutationCount(request.getMutationsCount())
.build());
}
}
responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted();
Expand Down Expand Up @@ -2134,6 +2143,14 @@ void markAbortedTransaction(ByteString transactionId) {
transactionSequenceNo.remove(transactionId);
}

public void markCommitRetryOnTransaction(ByteString transactionId) {
Transaction transaction = transactions.get(transactionId);
if (transaction == null || !isReadWriteTransaction(transactionId)) {
return;
}
commitRetryTransactions.putIfAbsent(transactionId, Boolean.TRUE);
}

@Override
public void partitionQuery(
PartitionQueryRequest request, StreamObserver<PartitionResponse> responseObserver) {
Expand Down Expand Up @@ -2527,6 +2544,11 @@ static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken
return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", transactionId);
}

static MultiplexedSessionPrecommitToken getCommitResponsePrecommitToken(
ByteString transactionId) {
return getPrecommitToken("CommitResponsePrecommitToken", transactionId);
}

static MultiplexedSessionPrecommitToken getPrecommitToken(
String value, ByteString transactionId) {
transactionSequenceNo.putIfAbsent(transactionId, new AtomicInteger(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,66 @@ public void testOtherUnimplementedError_ReadWriteTransactionStillUsesMultiplexed
assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

@Test
public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() {
// This test simulates the commit retry protocol extension which occurs when a read-write
// transaction contains read/query + mutation operations.
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));

client
.readWriteTransaction()
.run(
transaction -> {
try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) {
//noinspection StatementWithEmptyBody
while (resultSet.next()) {
// ignore
}
}

Mutation mutation =
Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build();
transaction.buffer(mutation);

TransactionContextImpl impl = (TransactionContextImpl) transaction;
// Force the Commit RPC to return a CommitResponse with MultiplexedSessionRetry field
// set.
// This scenario is only possible when a read-write transaction contains read/query +
// mutation operations.
mockSpanner.markCommitRetryOnTransaction(impl.transactionId);
return null;
});

List<ExecuteSqlRequest> executeSqlRequests =
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
assertEquals(1, executeSqlRequests.size());
// Verify the request is executed using multiplexed sessions
assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed());

List<CommitRequest> commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertEquals(2, commitRequests.size());
assertNotNull(commitRequests.get(0).getPrecommitToken());
assertEquals(
ByteString.copyFromUtf8("PartialResultSetPrecommitToken"),
commitRequests.get(0).getPrecommitToken().getPrecommitToken());
// Verify that the first request has mutations set
assertTrue(commitRequests.get(0).getMutationsCount() > 0);

// Second CommitRequest should contain the latest precommit token received via the
// CommitResponse in previous attempt.
assertNotNull(commitRequests.get(1).getPrecommitToken());
assertEquals(
ByteString.copyFromUtf8("CommitResponsePrecommitToken"),
commitRequests.get(1).getPrecommitToken().getPrecommitToken());
// Verify that the commit retry request does not have any mutations set
assertEquals(0, commitRequests.get(1).getMutationsCount());

assertNotNull(client.multiplexedSessionDatabaseClient);
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
}

private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
assertNotNull(client.multiplexedSessionDatabaseClient);
SessionReference sessionReference =
Expand Down

0 comments on commit 015bd98

Please sign in to comment.