Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Transactions with readTime will omit begin and commit transaction requests, and instead pass readTime on individual read requests. #1565

Merged
merged 10 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions google-cloud-firestore/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- ReadTimeTransaction - added abstract modifier to Transaction class-->
<difference>
<differenceType>3005</differenceType>
<className>com/google/cloud/firestore/Transaction</className>
</difference>

<!-- Shutdown/Shutdown Now -->
<difference>
<differenceType>7012</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,22 @@ public Query getQuery() {
*/
@Nonnull
public ApiFuture<AggregateQuerySnapshot> get() {
return get(null);
return get(null, null);
}

@Nonnull
ApiFuture<AggregateQuerySnapshot> get(@Nullable final ByteString transactionId) {
ApiFuture<AggregateQuerySnapshot> get(
@Nullable final ByteString transactionId, @Nullable com.google.protobuf.Timestamp readTime) {
AggregateQueryResponseDeliverer responseDeliverer =
new AggregateQueryResponseDeliverer(
transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
transactionId, readTime, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
runQuery(responseDeliverer);
return responseDeliverer.getFuture();
}

private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) {
RunAggregationQueryRequest request = toProto(responseDeliverer.getTransactionId());
RunAggregationQueryRequest request =
toProto(responseDeliverer.transactionId, responseDeliverer.readTime);
AggregateQueryResponseObserver responseObserver =
new AggregateQueryResponseObserver(responseDeliverer);
ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse> callable =
Expand All @@ -96,28 +98,24 @@ private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) {
private final class AggregateQueryResponseDeliverer {

@Nullable private final ByteString transactionId;
@Nullable private final com.google.protobuf.Timestamp readTime;
private final long startTimeNanos;
private final SettableApiFuture<AggregateQuerySnapshot> future = SettableApiFuture.create();
private final AtomicBoolean isFutureCompleted = new AtomicBoolean(false);

AggregateQueryResponseDeliverer(@Nullable ByteString transactionId, long startTimeNanos) {
AggregateQueryResponseDeliverer(
@Nullable ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime,
long startTimeNanos) {
this.transactionId = transactionId;
this.readTime = readTime;
this.startTimeNanos = startTimeNanos;
}

ApiFuture<AggregateQuerySnapshot> getFuture() {
return future;
}

@Nullable
ByteString getTransactionId() {
return transactionId;
}

long getStartTimeNanos() {
return startTimeNanos;
}

void deliverResult(@Nonnull Map<String, Value> data, Timestamp readTime) {
if (isFutureCompleted.compareAndSet(false, true)) {
Map<String, Value> mappedData = new HashMap<>();
Expand Down Expand Up @@ -176,8 +174,8 @@ private boolean shouldRetry(Throwable throwable) {
FirestoreSettings.newBuilder().runAggregationQuerySettings().getRetryableCodes();
return query.shouldRetryQuery(
throwable,
responseDeliverer.getTransactionId(),
responseDeliverer.getStartTimeNanos(),
responseDeliverer.transactionId,
responseDeliverer.startTimeNanos,
retryableCodes);
}

Expand All @@ -193,18 +191,23 @@ public void onComplete() {}
*/
@Nonnull
public RunAggregationQueryRequest toProto() {
return toProto(null);
return toProto(null, null);
}

@Nonnull
RunAggregationQueryRequest toProto(@Nullable final ByteString transactionId) {
RunAggregationQueryRequest toProto(
@Nullable final ByteString transactionId,
@Nullable final com.google.protobuf.Timestamp readTime) {
RunQueryRequest runQueryRequest = query.toProto();

RunAggregationQueryRequest.Builder request = RunAggregationQueryRequest.newBuilder();
request.setParent(runQueryRequest.getParent());
if (transactionId != null) {
request.setTransaction(transactionId);
}
if (readTime != null) {
request.setReadTime(readTime);
}

StructuredAggregationQuery.Builder structuredAggregationQuery =
request.getStructuredAggregationQueryBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,14 @@ public void getAll(
final @Nonnull DocumentReference[] documentReferences,
@Nullable FieldMask fieldMask,
@Nonnull final ApiStreamObserver<DocumentSnapshot> apiStreamObserver) {
this.getAll(documentReferences, fieldMask, null, apiStreamObserver);
this.getAll(documentReferences, fieldMask, null, null, apiStreamObserver);
}

void getAll(
final @Nonnull DocumentReference[] documentReferences,
@Nullable FieldMask fieldMask,
@Nullable ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime,
final ApiStreamObserver<DocumentSnapshot> apiStreamObserver) {

ResponseObserver<BatchGetDocumentsResponse> responseObserver =
Expand Down Expand Up @@ -304,6 +305,10 @@ public void onComplete() {
request.setTransaction(transactionId);
}

if (readTime != null) {
request.setReadTime(readTime);
}

for (DocumentReference docRef : documentReferences) {
request.addDocuments(docRef.getName());
}
Expand All @@ -318,17 +323,33 @@ public void onComplete() {
streamRequest(request.build(), responseObserver, firestoreClient.batchGetDocumentsCallable());
}

final ApiFuture<List<DocumentSnapshot>> getAll(
final @Nonnull DocumentReference[] documentReferences,
@Nullable FieldMask fieldMask,
@Nullable com.google.protobuf.Timestamp readTime) {
return getAll(documentReferences, fieldMask, null, readTime);
}

private ApiFuture<List<DocumentSnapshot>> getAll(
final @Nonnull DocumentReference[] documentReferences,
@Nullable FieldMask fieldMask,
@Nullable ByteString transactionId) {
return getAll(documentReferences, fieldMask, transactionId, null);
}

/** Internal getAll() method that accepts an optional transaction id. */
ApiFuture<List<DocumentSnapshot>> getAll(
final @Nonnull DocumentReference[] documentReferences,
@Nullable FieldMask fieldMask,
@Nullable ByteString transactionId) {
@Nullable ByteString transactionId,
@Nullable com.google.protobuf.Timestamp readTime) {
final SettableApiFuture<List<DocumentSnapshot>> futureList = SettableApiFuture.create();
final Map<DocumentReference, DocumentSnapshot> documentSnapshotMap = new HashMap<>();
getAll(
documentReferences,
fieldMask,
transactionId,
readTime,
new ApiStreamObserver<DocumentSnapshot>() {
@Override
public void onNext(DocumentSnapshot documentSnapshot) {
Expand Down Expand Up @@ -390,9 +411,16 @@ public <T> ApiFuture<T> runAsyncTransaction(
@Nonnull final Transaction.AsyncFunction<T> updateFunction,
@Nonnull TransactionOptions transactionOptions) {

TransactionRunner<T> transactionRunner =
new TransactionRunner<>(this, updateFunction, transactionOptions);
return transactionRunner.run();
if (transactionOptions.getReadTime() != null) {
// READ_ONLY transactions with readTime have no retry, nor transaction state, so we don't need
// a runner.
return updateFunction.updateCallback(
new ReadTimeTransaction(this, transactionOptions.getReadTime()));
} else {
// For READ_ONLY transactions without readTime, there is still strong consistency applied,
// that cannot be tracked client side.
return new ServerSideTransactionRunner<>(this, updateFunction, transactionOptions).run();
}
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1784,7 +1784,7 @@ boolean shouldRetry(DocumentSnapshot lastDocument, Throwable t) {
*/
@Nonnull
public ApiFuture<QuerySnapshot> get() {
return get(null);
return get(null, null);
}

/**
Expand All @@ -1811,7 +1811,7 @@ public ListenerRegistration addSnapshotListener(
return Watch.forQuery(this).runWatch(executor, listener);
}

ApiFuture<QuerySnapshot> get(@Nullable ByteString transactionId) {
ApiFuture<QuerySnapshot> get(@Nullable ByteString transactionId, @Nullable Timestamp readTime) {
final SettableApiFuture<QuerySnapshot> result = SettableApiFuture.create();

internalStream(
Expand Down Expand Up @@ -1843,7 +1843,7 @@ public void onCompleted() {
},
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
transactionId,
/* readTime= */ null);
readTime);

return result;
}
Expand Down
Loading
Loading