Skip to content

Commit

Permalink
feat(spanner): support max_commit_delay in Spanner transactions (#2854)
Browse files Browse the repository at this point in the history
* add commit delay options

* feat: add max_commit_delay options for commit requests

* Point stuff to devel

* Make samples work with devel. Add a typo.

* Cleanup stuff

* Remove duplicate definitions.

* Fix merge conflict.

* Remove SpannerSample pointing to devel.

* Add some parans

* Add a unit test to ensure unset commit delay is properly propagated

* Add an integration test.

* change maxCommitDelayInMillis of type Int to maxCommitDelay of type Duration.

* Format

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: rahul yadav <[email protected]>
Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 8, 2024
1 parent 741e4cf commit e2b7ae6
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.RequestOptions.Priority;
import java.io.Serializable;
import java.time.Duration;
import java.util.Objects;

/** Specifies options for various spanner operations */
Expand Down Expand Up @@ -140,6 +141,11 @@ public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
return new PriorityOption(priority);
}

public static ReadQueryUpdateTransactionOption maxCommitDelay(Duration maxCommitDelay) {
Preconditions.checkArgument(!maxCommitDelay.isNegative(), "maxCommitDelay should be positive");
return new MaxCommitDelayOption(maxCommitDelay);
}

/**
* Specifying this will cause the reads, queries, updates and writes operations statistics
* collection to be grouped by tag.
Expand Down Expand Up @@ -247,6 +253,21 @@ void appendToOptions(Options options) {

static final CommitStatsOption COMMIT_STATS_OPTION = new CommitStatsOption();

/** Option to request {@link MaxCommitDelayOption} for read/write transactions. */
static final class MaxCommitDelayOption extends InternalOption
implements ReadQueryUpdateTransactionOption {
final Duration maxCommitDelay;

MaxCommitDelayOption(Duration maxCommitDelay) {
this.maxCommitDelay = maxCommitDelay;
}

@Override
void appendToOptions(Options options) {
options.maxCommitDelay = maxCommitDelay;
}
}

/** Option to request Optimistic Concurrency Control for read/write transactions. */
static final class OptimisticLockOption extends InternalOption implements TransactionOption {
@Override
Expand Down Expand Up @@ -354,6 +375,9 @@ void appendToOptions(Options options) {
}

private boolean withCommitStats;

private Duration maxCommitDelay;

private Long limit;
private Integer prefetchChunks;
private Integer bufferRows;
Expand All @@ -375,6 +399,14 @@ boolean withCommitStats() {
return withCommitStats;
}

boolean hasMaxCommitDelay() {
return maxCommitDelay != null;
}

Duration maxCommitDelay() {
return maxCommitDelay;
}

boolean hasLimit() {
return limit != null;
}
Expand Down Expand Up @@ -481,6 +513,9 @@ public String toString() {
if (withCommitStats) {
b.append("withCommitStats: ").append(withCommitStats).append(' ');
}
if (maxCommitDelay != null) {
b.append("maxCommitDelay: ").append(maxCommitDelay).append(' ');
}
if (limit != null) {
b.append("limit: ").append(limit).append(' ');
}
Expand Down Expand Up @@ -533,6 +568,7 @@ public boolean equals(Object o) {

Options that = (Options) o;
return Objects.equals(withCommitStats, that.withCommitStats)
&& Objects.equals(maxCommitDelay, that.maxCommitDelay)
&& (!hasLimit() && !that.hasLimit()
|| hasLimit() && that.hasLimit() && Objects.equals(limit(), that.limit()))
&& (!hasPrefetchChunks() && !that.hasPrefetchChunks()
Expand Down Expand Up @@ -562,6 +598,9 @@ public int hashCode() {
if (withCommitStats) {
result = 31 * result + 1231;
}
if (maxCommitDelay != null) {
result = 31 * result + maxCommitDelay.hashCode();
}
if (limit != null) {
result = 31 * result + limit.hashCode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BatchWriteRequest;
import com.google.spanner.v1.BatchWriteResponse;
Expand All @@ -60,6 +61,7 @@
* users need not be aware of the actual session management, pooling and handling.
*/
class SessionImpl implements Session {

private static final Tracer tracer = Tracing.getTracer();

/** Keep track of running transactions on this session per thread. */
Expand All @@ -86,8 +88,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) {
* only have one such transaction active at a time.
*/
interface SessionTransaction {

/** Invalidates the transaction, generally because a new one has been started on the session. */
void invalidate();

/** Registers the current span on the transaction. */
void setSpan(Span span);
}
Expand Down Expand Up @@ -176,16 +180,24 @@ public CommitResponse writeAtLeastOnceWithOptions(
setActive(null);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
Options options = Options.fromTransactionOptions(transactionOptions);
final CommitRequest.Builder requestBuilder =
CommitRequest.newBuilder()
.setSession(name)
.setReturnCommitStats(
Options.fromTransactionOptions(transactionOptions).withCommitStats())
.setReturnCommitStats(options.withCommitStats())
.addAllMutations(mutationsProto)
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
if (options.hasMaxCommitDelay()) {
requestBuilder.setMaxCommitDelay(
Duration.newBuilder()
.setSeconds(options.maxCommitDelay().getSeconds())
.setNanos(options.maxCommitDelay().getNano())
.build());
}
RequestOptions commitRequestOptions = getRequestOptions(transactionOptions);

if (commitRequestOptions != null) {
requestBuilder.setRequestOptions(commitRequestOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@

/** Default implementation of {@link TransactionRunner}. */
class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {

private static final Tracer tracer = Tracing.getTracer();
private static final Logger txnLogger = Logger.getLogger(TransactionRunner.class.getName());
/**
Expand All @@ -84,6 +85,7 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {

@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {

static class Builder extends AbstractReadContext.Builder<Builder, TransactionContextImpl> {

private Clock clock = new Clock();
Expand Down Expand Up @@ -131,6 +133,7 @@ static Builder newBuilder() {
*/
private class TransactionContextAsyncResultSetImpl extends ForwardingAsyncResultSet
implements ListenableAsyncResultSet {

private TransactionContextAsyncResultSetImpl(ListenableAsyncResultSet delegate) {
super(delegate);
}
Expand Down Expand Up @@ -339,6 +342,13 @@ ApiFuture<CommitResponse> commitAsync() {
}
builder.setRequestOptions(requestOptionsBuilder.build());
}
if (options.hasMaxCommitDelay()) {
builder.setMaxCommitDelay(
com.google.protobuf.Duration.newBuilder()
.setSeconds(options.maxCommitDelay().getSeconds())
.setNanos(options.maxCommitDelay().getNano())
.build());
}
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
finishOps = SettableApiFuture.create();
Expand All @@ -354,6 +364,7 @@ ApiFuture<CommitResponse> commitAsync() {
}

private final class CommitRunnable implements Runnable {

private final SettableApiFuture<CommitResponse> res;
private final ApiFuture<Void> prev;
private final CommitRequest.Builder requestBuilder;
Expand Down Expand Up @@ -575,7 +586,9 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude

@Nullable
String getTransactionTag() {
if (this.options.hasTag()) return this.options.tag();
if (this.options.hasTag()) {
return this.options.tag();
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@

@RunWith(JUnit4.class)
public class DatabaseClientImplTest {

private static final String TEST_PROJECT = "my-project";
private static final String TEST_INSTANCE = "my-instance";
private static final String TEST_DATABASE = "my-database";
Expand Down Expand Up @@ -3635,6 +3636,112 @@ public void testAsyncTransactionManagerCommitWithPriority() {
assertEquals(Priority.PRIORITY_HIGH, request.getRequestOptions().getPriority());
}

@Test
public void testCommitWithoutMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
TransactionRunner runner = client.readWriteTransaction();
runner.run(
transaction -> {
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
return null;
});

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertFalse(request.hasMaxCommitDelay());
}

@Test
public void testCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
TransactionRunner runner =
client.readWriteTransaction(Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
runner.run(
transaction -> {
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
return null;
});

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
request.getMaxCommitDelay());
}

@Test
public void testTransactionManagerCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
TransactionManager manager =
client.transactionManager(Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
TransactionContext transaction = manager.begin();
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
manager.commit();

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
request.getMaxCommitDelay());
}

@Test
public void testAsyncRunnerCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
AsyncRunner runner = client.runAsync(Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
get(
runner.runAsync(
txn -> {
txn.buffer(Mutation.delete("TEST", KeySet.all()));
return ApiFutures.immediateFuture(null);
},
executor));

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
request.getMaxCommitDelay());
}

@Test
public void testAsyncTransactionManagerCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
try (AsyncTransactionManager manager =
client.transactionManagerAsync(Options.maxCommitDelay(java.time.Duration.ofMillis(100)))) {
TransactionContextFuture transaction = manager.beginAsync();
get(
transaction
.then(
(txn, input) -> {
txn.buffer(Mutation.delete("TEST", KeySet.all()));
return ApiFutures.immediateFuture(null);
},
executor)
.commitAsync());
}

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(100000000).build(),
request.getMaxCommitDelay());
}

@Test
public void singleUseNoAction_ClearsCheckedOutSession() {
DatabaseClientImpl client =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,22 @@ public void batchWriteAtLeastOnce() {
}
}

@Test
public void testWriteWithMaxCommitDelay() {
CommitResponse response =
client.writeWithOptions(
Collections.singletonList(
Mutation.newInsertOrUpdateBuilder("T")
.set("K")
.to(lastKey = uniqueString())
.set("StringValue")
.to("v1")
.build()),
Options.maxCommitDelay(java.time.Duration.ofMillis(100)));
assertNotNull(response);
assertNotNull(response.getCommitTimestamp());
}

@Test
public void testWriteReturnsCommitStats() {
assumeFalse("Emulator does not return commit statistics", isUsingEmulator());
Expand Down

0 comments on commit e2b7ae6

Please sign in to comment.