Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): support max_commit_delay in Spanner transactions #2854

Merged
merged 17 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ public static ReadQueryUpdateTransactionOption priority(RpcPriority priority) {
return new PriorityOption(priority);
}

public static ReadQueryUpdateTransactionOption maxCommitDelayInMilliSeconds(
int maxCommitDelayInMilliSeconds) {
Preconditions.checkArgument(
maxCommitDelayInMilliSeconds > 0, "maxCommitDelayInMilliSeconds should be greater than 0");
return new MaxCommitDelayOption(maxCommitDelayInMilliSeconds);
}

/**
* Specifying this will cause the reads, queries, updates and writes operations statistics
* collection to be grouped by tag.
Expand Down Expand Up @@ -247,6 +254,21 @@ void appendToOptions(Options options) {

static final CommitStatsOption COMMIT_STATS_OPTION = new CommitStatsOption();

/** Option to request {@link MaxCommitDelayOption} for read/write transactions. */
static final class MaxCommitDelayOption extends InternalOption
implements ReadQueryUpdateTransactionOption {
final int maxCommitDelayInMilliSeconds;

MaxCommitDelayOption(int maxCommitDelayInMilliSeconds) {
this.maxCommitDelayInMilliSeconds = maxCommitDelayInMilliSeconds;
}

@Override
void appendToOptions(Options options) {
options.maxCommitDelayInMilliSeconds = maxCommitDelayInMilliSeconds;
}
}

/** Option to request Optimistic Concurrency Control for read/write transactions. */
static final class OptimisticLockOption extends InternalOption implements TransactionOption {
@Override
Expand Down Expand Up @@ -354,6 +376,9 @@ void appendToOptions(Options options) {
}

private boolean withCommitStats;

private Integer maxCommitDelayInMilliSeconds;

private Long limit;
private Integer prefetchChunks;
private Integer bufferRows;
Expand All @@ -375,6 +400,14 @@ boolean withCommitStats() {
return withCommitStats;
}

boolean hasMaxCommitDelayInMilliSeconds() {
return maxCommitDelayInMilliSeconds != null;
}

int maxCommitDelayInMilliSeconds() {
return maxCommitDelayInMilliSeconds;
}

boolean hasLimit() {
return limit != null;
}
Expand Down Expand Up @@ -481,6 +514,9 @@ public String toString() {
if (withCommitStats) {
b.append("withCommitStats: ").append(withCommitStats).append(' ');
}
if (maxCommitDelayInMilliSeconds != null) {
b.append("maxCommitDelayInMilliSeconds: ").append(maxCommitDelayInMilliSeconds).append(' ');
}
if (limit != null) {
b.append("limit: ").append(limit).append(' ');
}
Expand Down Expand Up @@ -533,6 +569,7 @@ public boolean equals(Object o) {

Options that = (Options) o;
return Objects.equals(withCommitStats, that.withCommitStats)
&& Objects.equals(maxCommitDelayInMilliSeconds, that.maxCommitDelayInMilliSeconds)
&& (!hasLimit() && !that.hasLimit()
|| hasLimit() && that.hasLimit() && Objects.equals(limit(), that.limit()))
&& (!hasPrefetchChunks() && !that.hasPrefetchChunks()
Expand Down Expand Up @@ -562,6 +599,9 @@ public int hashCode() {
if (withCommitStats) {
result = 31 * result + 1231;
}
if (maxCommitDelayInMilliSeconds != null) {
result = 31 * result + maxCommitDelayInMilliSeconds.hashCode();
}
if (limit != null) {
result = 31 * result + limit.hashCode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BatchWriteRequest;
import com.google.spanner.v1.BatchWriteResponse;
Expand Down Expand Up @@ -176,16 +177,21 @@ public CommitResponse writeAtLeastOnceWithOptions(
setActive(null);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
Options options = Options.fromTransactionOptions(transactionOptions);
final CommitRequest.Builder requestBuilder =
CommitRequest.newBuilder()
.setSession(name)
.setReturnCommitStats(
Options.fromTransactionOptions(transactionOptions).withCommitStats())
.setReturnCommitStats(options.withCommitStats())
.addAllMutations(mutationsProto)
.setSingleUseTransaction(
TransactionOptions.newBuilder()
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
if (options.hasMaxCommitDelayInMilliSeconds()) {
requestBuilder.setMaxCommitDelay(
Duration.newBuilder().setNanos(options.maxCommitDelayInMilliSeconds() * 1000000).build());
}
RequestOptions commitRequestOptions = getRequestOptions(transactionOptions);

if (commitRequestOptions != null) {
requestBuilder.setRequestOptions(commitRequestOptions);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,13 @@ ApiFuture<CommitResponse> commitAsync() {
}
builder.setRequestOptions(requestOptionsBuilder.build());
}
if (options.hasMaxCommitDelayInMilliSeconds()) {
builder.setMaxCommitDelay(
com.google.protobuf.Duration.newBuilder()
.setNanos(
(int) TimeUnit.MILLISECONDS.toNanos(options.maxCommitDelayInMilliSeconds()))
.build());
}
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
finishOps = SettableApiFuture.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3635,6 +3635,112 @@ public void testAsyncTransactionManagerCommitWithPriority() {
assertEquals(Priority.PRIORITY_HIGH, request.getRequestOptions().getPriority());
}

@Test
public void testCommitWithoutMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
TransactionRunner runner = client.readWriteTransaction();
runner.run(
transaction -> {
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
return null;
});

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertFalse(request.hasMaxCommitDelay());
}

@Test
public void testCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
TransactionRunner runner =
client.readWriteTransaction(Options.maxCommitDelayInMilliSeconds(1000));
runner.run(
transaction -> {
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
return null;
});

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(1000000000).build(),
request.getMaxCommitDelay());
}

@Test
public void testTransactionManagerCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
TransactionManager manager =
client.transactionManager(Options.maxCommitDelayInMilliSeconds(1000));
TransactionContext transaction = manager.begin();
transaction.buffer(Mutation.delete("TEST", KeySet.all()));
manager.commit();

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(1000000000).build(),
request.getMaxCommitDelay());
}

@Test
public void testAsyncRunnerCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
AsyncRunner runner = client.runAsync(Options.maxCommitDelayInMilliSeconds(1000));
get(
runner.runAsync(
txn -> {
txn.buffer(Mutation.delete("TEST", KeySet.all()));
return ApiFutures.immediateFuture(null);
},
executor));

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(1000000000).build(),
request.getMaxCommitDelay());
}

@Test
public void testAsyncTransactionManagerCommitWithMaxCommitDelay() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
try (AsyncTransactionManager manager =
client.transactionManagerAsync(Options.maxCommitDelayInMilliSeconds(1000))) {
TransactionContextFuture transaction = manager.beginAsync();
get(
transaction
.then(
(txn, input) -> {
txn.buffer(Mutation.delete("TEST", KeySet.all()));
return ApiFutures.immediateFuture(null);
},
executor)
.commitAsync());
}

List<CommitRequest> requests = mockSpanner.getRequestsOfType(CommitRequest.class);
assertThat(requests).hasSize(1);
CommitRequest request = requests.get(0);
assertNotNull(request.getMaxCommitDelay());
assertEquals(
com.google.protobuf.Duration.newBuilder().setNanos(1000000000).build(),
request.getMaxCommitDelay());
}

@Test
public void singleUseNoAction_ClearsCheckedOutSession() {
DatabaseClientImpl client =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,22 @@ public void batchWriteAtLeastOnce() {
}
}

@Test
public void testWriteWithMaxCommitDelay() {
CommitResponse response =
client.writeWithOptions(
Collections.singletonList(
Mutation.newInsertOrUpdateBuilder("T")
.set("K")
.to(lastKey = uniqueString())
.set("StringValue")
.to("v1")
.build()),
Options.maxCommitDelayInMilliSeconds(100));
nginsberg-google marked this conversation as resolved.
Show resolved Hide resolved
assertNotNull(response);
assertNotNull(response.getCommitTimestamp());
}

@Test
public void testWriteReturnsCommitStats() {
assumeFalse("Emulator does not return commit statistics", isUsingEmulator());
Expand Down
Loading