diff --git a/google-cloud-firestore/clirr-ignored-differences.xml b/google-cloud-firestore/clirr-ignored-differences.xml index 5c9b4d6a0..7a4c83689 100644 --- a/google-cloud-firestore/clirr-ignored-differences.xml +++ b/google-cloud-firestore/clirr-ignored-differences.xml @@ -17,6 +17,12 @@ + + + 3005 + com/google/cloud/firestore/Transaction + + 7012 diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java index 1613b74dd..921300b47 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java @@ -72,20 +72,22 @@ public Query getQuery() { */ @Nonnull public ApiFuture get() { - return get(null); + return get(null, null); } @Nonnull - ApiFuture get(@Nullable final ByteString transactionId) { + ApiFuture get( + @Nullable final ByteString transactionId, @Nullable com.google.protobuf.Timestamp readTime) { AggregateQueryResponseDeliverer responseDeliverer = new AggregateQueryResponseDeliverer( - transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime()); + transactionId, readTime, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime()); runQuery(responseDeliverer); return responseDeliverer.getFuture(); } private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) { - RunAggregationQueryRequest request = toProto(responseDeliverer.getTransactionId()); + RunAggregationQueryRequest request = + toProto(responseDeliverer.transactionId, responseDeliverer.readTime); AggregateQueryResponseObserver responseObserver = new AggregateQueryResponseObserver(responseDeliverer); ServerStreamingCallable callable = @@ -96,12 +98,17 @@ private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) { private final class AggregateQueryResponseDeliverer { @Nullable private final ByteString transactionId; + @Nullable private final com.google.protobuf.Timestamp readTime; private final long startTimeNanos; private final SettableApiFuture future = SettableApiFuture.create(); private final AtomicBoolean isFutureCompleted = new AtomicBoolean(false); - AggregateQueryResponseDeliverer(@Nullable ByteString transactionId, long startTimeNanos) { + AggregateQueryResponseDeliverer( + @Nullable ByteString transactionId, + @Nullable com.google.protobuf.Timestamp readTime, + long startTimeNanos) { this.transactionId = transactionId; + this.readTime = readTime; this.startTimeNanos = startTimeNanos; } @@ -109,15 +116,6 @@ ApiFuture getFuture() { return future; } - @Nullable - ByteString getTransactionId() { - return transactionId; - } - - long getStartTimeNanos() { - return startTimeNanos; - } - void deliverResult(@Nonnull Map data, Timestamp readTime) { if (isFutureCompleted.compareAndSet(false, true)) { Map mappedData = new HashMap<>(); @@ -176,8 +174,8 @@ private boolean shouldRetry(Throwable throwable) { FirestoreSettings.newBuilder().runAggregationQuerySettings().getRetryableCodes(); return query.shouldRetryQuery( throwable, - responseDeliverer.getTransactionId(), - responseDeliverer.getStartTimeNanos(), + responseDeliverer.transactionId, + responseDeliverer.startTimeNanos, retryableCodes); } @@ -193,11 +191,13 @@ public void onComplete() {} */ @Nonnull public RunAggregationQueryRequest toProto() { - return toProto(null); + return toProto(null, null); } @Nonnull - RunAggregationQueryRequest toProto(@Nullable final ByteString transactionId) { + RunAggregationQueryRequest toProto( + @Nullable final ByteString transactionId, + @Nullable final com.google.protobuf.Timestamp readTime) { RunQueryRequest runQueryRequest = query.toProto(); RunAggregationQueryRequest.Builder request = RunAggregationQueryRequest.newBuilder(); @@ -205,6 +205,9 @@ RunAggregationQueryRequest toProto(@Nullable final ByteString transactionId) { if (transactionId != null) { request.setTransaction(transactionId); } + if (readTime != null) { + request.setReadTime(readTime); + } StructuredAggregationQuery.Builder structuredAggregationQuery = request.getStructuredAggregationQueryBuilder(); diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java index 1fab69e97..82dfc5176 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java @@ -209,13 +209,14 @@ public void getAll( final @Nonnull DocumentReference[] documentReferences, @Nullable FieldMask fieldMask, @Nonnull final ApiStreamObserver apiStreamObserver) { - this.getAll(documentReferences, fieldMask, null, apiStreamObserver); + this.getAll(documentReferences, fieldMask, null, null, apiStreamObserver); } void getAll( final @Nonnull DocumentReference[] documentReferences, @Nullable FieldMask fieldMask, @Nullable ByteString transactionId, + @Nullable com.google.protobuf.Timestamp readTime, final ApiStreamObserver apiStreamObserver) { ResponseObserver responseObserver = @@ -304,6 +305,10 @@ public void onComplete() { request.setTransaction(transactionId); } + if (readTime != null) { + request.setReadTime(readTime); + } + for (DocumentReference docRef : documentReferences) { request.addDocuments(docRef.getName()); } @@ -318,17 +323,33 @@ public void onComplete() { streamRequest(request.build(), responseObserver, firestoreClient.batchGetDocumentsCallable()); } + final ApiFuture> getAll( + final @Nonnull DocumentReference[] documentReferences, + @Nullable FieldMask fieldMask, + @Nullable com.google.protobuf.Timestamp readTime) { + return getAll(documentReferences, fieldMask, null, readTime); + } + + private ApiFuture> getAll( + final @Nonnull DocumentReference[] documentReferences, + @Nullable FieldMask fieldMask, + @Nullable ByteString transactionId) { + return getAll(documentReferences, fieldMask, transactionId, null); + } + /** Internal getAll() method that accepts an optional transaction id. */ ApiFuture> getAll( final @Nonnull DocumentReference[] documentReferences, @Nullable FieldMask fieldMask, - @Nullable ByteString transactionId) { + @Nullable ByteString transactionId, + @Nullable com.google.protobuf.Timestamp readTime) { final SettableApiFuture> futureList = SettableApiFuture.create(); final Map documentSnapshotMap = new HashMap<>(); getAll( documentReferences, fieldMask, transactionId, + readTime, new ApiStreamObserver() { @Override public void onNext(DocumentSnapshot documentSnapshot) { @@ -390,9 +411,16 @@ public ApiFuture runAsyncTransaction( @Nonnull final Transaction.AsyncFunction updateFunction, @Nonnull TransactionOptions transactionOptions) { - TransactionRunner transactionRunner = - new TransactionRunner<>(this, updateFunction, transactionOptions); - return transactionRunner.run(); + if (transactionOptions.getReadTime() != null) { + // READ_ONLY transactions with readTime have no retry, nor transaction state, so we don't need + // a runner. + return updateFunction.updateCallback( + new ReadTimeTransaction(this, transactionOptions.getReadTime())); + } else { + // For READ_ONLY transactions without readTime, there is still strong consistency applied, + // that cannot be tracked client side. + return new ServerSideTransactionRunner<>(this, updateFunction, transactionOptions).run(); + } } @Nonnull diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java index 59ec6f960..865e522c9 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java @@ -1784,7 +1784,7 @@ boolean shouldRetry(DocumentSnapshot lastDocument, Throwable t) { */ @Nonnull public ApiFuture get() { - return get(null); + return get(null, null); } /** @@ -1811,7 +1811,7 @@ public ListenerRegistration addSnapshotListener( return Watch.forQuery(this).runWatch(executor, listener); } - ApiFuture get(@Nullable ByteString transactionId) { + ApiFuture get(@Nullable ByteString transactionId, @Nullable Timestamp readTime) { final SettableApiFuture result = SettableApiFuture.create(); internalStream( @@ -1843,7 +1843,7 @@ public void onCompleted() { }, /* startTimeNanos= */ rpcContext.getClock().nanoTime(), transactionId, - /* readTime= */ null); + readTime); return result; } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ReadTimeTransaction.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ReadTimeTransaction.java new file mode 100644 index 000000000..36f86f4d1 --- /dev/null +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ReadTimeTransaction.java @@ -0,0 +1,210 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Timestamp; +import io.opencensus.trace.Tracing; +import java.util.List; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * The ReadTimeTransaction is a ready-only Transaction that specifies a ReadTime. Unlike a + * `ServerSideTransaction`, we do not need a `transactionId` since we provide a `readTime` on all + * requests. No concurrency control is required, since data in the past is immutable. As with all + * `read-only` transactions, we do not allow any write request. + * + * @see Transaction + */ +final class ReadTimeTransaction extends Transaction { + + public static final String WRITE_EXCEPTION_MSG = + "Firestore ready-only transactions do not support writes"; + private final Timestamp readTime; + + ReadTimeTransaction(FirestoreImpl firestore, Timestamp readTime) { + super(firestore); + Preconditions.checkNotNull(readTime, "readTime cannot be null"); + this.readTime = readTime; + } + + @Override + public boolean hasTransactionId() { + return false; + } + + @Nonnull + @Override + public ApiFuture get(@Nonnull DocumentReference documentRef) { + Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_GETDOCUMENT); + return ApiFutures.transform( + firestore.getAll(new DocumentReference[] {documentRef}, /*fieldMask=*/ null, readTime), + snapshots -> snapshots.isEmpty() ? null : snapshots.get(0), + MoreExecutors.directExecutor()); + } + + @Nonnull + @Override + public ApiFuture> getAll( + @Nonnull DocumentReference... documentReferences) { + return firestore.getAll(documentReferences, /*fieldMask=*/ null, readTime); + } + + @Nonnull + @Override + public ApiFuture> getAll( + @Nonnull DocumentReference[] documentReferences, @Nullable FieldMask fieldMask) { + return firestore.getAll(documentReferences, /*fieldMask=*/ null, readTime); + } + + @Nonnull + @Override + public ApiFuture get(@Nonnull Query query) { + return query.get(null, com.google.cloud.Timestamp.fromProto(readTime)); + } + + @Nonnull + @Override + public ApiFuture get(@Nonnull AggregateQuery query) { + return query.get(null, readTime); + } + + @Nonnull + @Override + public Transaction create( + @Nonnull DocumentReference documentReference, @Nonnull Map fields) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction create(@Nonnull DocumentReference documentReference, @Nonnull Object pojo) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction set( + @Nonnull DocumentReference documentReference, @Nonnull Map fields) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction set( + @Nonnull DocumentReference documentReference, + @Nonnull Map fields, + @Nonnull SetOptions options) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction set(@Nonnull DocumentReference documentReference, @Nonnull Object pojo) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction set( + @Nonnull DocumentReference documentReference, + @Nonnull Object pojo, + @Nonnull SetOptions options) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction update( + @Nonnull DocumentReference documentReference, @Nonnull Map fields) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction update( + @Nonnull DocumentReference documentReference, + @Nonnull Map fields, + Precondition precondition) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction update( + @Nonnull DocumentReference documentReference, + @Nonnull String field, + @Nullable Object value, + Object... moreFieldsAndValues) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction update( + @Nonnull DocumentReference documentReference, + @Nonnull FieldPath fieldPath, + @Nullable Object value, + Object... moreFieldsAndValues) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction update( + @Nonnull DocumentReference documentReference, + @Nonnull Precondition precondition, + @Nonnull String field, + @Nullable Object value, + Object... moreFieldsAndValues) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction update( + @Nonnull DocumentReference documentReference, + @Nonnull Precondition precondition, + @Nonnull FieldPath fieldPath, + @Nullable Object value, + Object... moreFieldsAndValues) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction delete( + @Nonnull DocumentReference documentReference, @Nonnull Precondition precondition) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Nonnull + @Override + public Transaction delete(@Nonnull DocumentReference documentReference) { + throw new IllegalStateException(WRITE_EXCEPTION_MSG); + } + + @Override + public String toString() { + return String.format("%s{readTime=%s}", getClass().getSimpleName(), readTime); + } +} diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ServerSideTransaction.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ServerSideTransaction.java new file mode 100644 index 000000000..0227c2c4c --- /dev/null +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ServerSideTransaction.java @@ -0,0 +1,217 @@ +/* + * Copyright 2017 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.firestore; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.cloud.firestore.TransactionOptions.TransactionOptionsType; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.firestore.v1.BeginTransactionRequest; +import com.google.firestore.v1.BeginTransactionResponse; +import com.google.firestore.v1.RollbackRequest; +import com.google.firestore.v1.TransactionOptions.ReadOnly; +import com.google.protobuf.ByteString; +import com.google.protobuf.Empty; +import io.opencensus.trace.Tracing; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +/** + * A `ServerSideTransaction` is a `Transaction` that uses server generated `transactionId` on + * requests. The implementation starts with a `beginTransaction` request that receives a + * `transactionId` from server. The `ServerSideTransactionRunner` must either `commit()` or + * `rollback()` when done. + * + * @see Transaction + * @see ServerSideTransactionRunner + */ +final class ServerSideTransaction extends Transaction { + + private static final Logger LOGGER = Logger.getLogger(ServerSideTransaction.class.getName()); + + private static final String READ_BEFORE_WRITE_ERROR_MSG = + "Firestore transactions require all reads to be executed before all writes"; + + private final FirestoreImpl firestore; + + final ByteString transactionId; + + private ServerSideTransaction(FirestoreImpl firestore, ByteString transactionId) { + super(firestore); + this.firestore = firestore; + this.transactionId = transactionId; + } + + public ByteString getTransactionId() { + return transactionId; + } + + public static ApiFuture begin( + FirestoreImpl firestore, + TransactionOptions transactionOptions, + @Nullable ServerSideTransaction previousTransaction) { + Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_BEGINTRANSACTION); + BeginTransactionRequest.Builder beginTransaction = BeginTransactionRequest.newBuilder(); + beginTransaction.setDatabase(firestore.getDatabaseName()); + ByteString previousTransactionId = + previousTransaction != null ? previousTransaction.transactionId : null; + + if (TransactionOptionsType.READ_WRITE.equals(transactionOptions.getType()) + && previousTransactionId != null) { + beginTransaction + .getOptionsBuilder() + .getReadWriteBuilder() + .setRetryTransaction(previousTransactionId); + } else if (TransactionOptionsType.READ_ONLY.equals(transactionOptions.getType())) { + final ReadOnly.Builder readOnlyBuilder = ReadOnly.newBuilder(); + if (transactionOptions.getReadTime() != null) { + readOnlyBuilder.setReadTime(transactionOptions.getReadTime()); + } + beginTransaction.getOptionsBuilder().setReadOnly(readOnlyBuilder); + } + + ApiFuture transactionBeginFuture = + firestore.sendRequest( + beginTransaction.build(), firestore.getClient().beginTransactionCallable()); + + return ApiFutures.transform( + transactionBeginFuture, + beginTransactionResponse -> + new ServerSideTransaction(firestore, beginTransactionResponse.getTransaction()), + MoreExecutors.directExecutor()); + } + + /** Commits a transaction. */ + ApiFuture> commit() { + return super.commit(transactionId); + } + + /** Rolls a transaction back and releases all read locks. */ + ApiFuture rollback() { + Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_ROLLBACK); + RollbackRequest req = + RollbackRequest.newBuilder() + .setTransaction(transactionId) + .setDatabase(firestore.getDatabaseName()) + .build(); + + ApiFuture rollbackFuture = + firestore.sendRequest(req, firestore.getClient().rollbackCallable()); + + ApiFuture transform = + ApiFutures.transform(rollbackFuture, resp -> null, MoreExecutors.directExecutor()); + + return ApiFutures.catching( + transform, + Throwable.class, + (error) -> { + LOGGER.log( + Level.WARNING, + "Failed best effort to rollback of transaction " + transactionId, + error); + return null; + }, + MoreExecutors.directExecutor()); + } + + @Override + public boolean hasTransactionId() { + return true; + } + + /** + * Reads the document referred to by the provided DocumentReference. Holds a pessimistic lock on + * the returned document. + * + * @return The contents of the Document at this DocumentReference. + */ + @Override + @Nonnull + public ApiFuture get(@Nonnull DocumentReference documentRef) { + Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_GETDOCUMENT); + Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); + return ApiFutures.transform( + firestore.getAll( + new DocumentReference[] {documentRef}, + /*fieldMask=*/ null, + transactionId, + /*readTime=*/ null), + snapshots -> snapshots.isEmpty() ? null : snapshots.get(0), + MoreExecutors.directExecutor()); + } + + /** + * Retrieves multiple documents from Firestore. Holds a pessimistic lock on all returned + * documents. + * + * @param documentReferences List of Document References to fetch. + */ + @Override + @Nonnull + public ApiFuture> getAll( + @Nonnull DocumentReference... documentReferences) { + Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); + return firestore.getAll( + documentReferences, /*fieldMask=*/ null, transactionId, /*readTime=*/ null); + } + + /** + * Retrieves multiple documents from Firestore, while optionally applying a field mask to reduce + * the amount of data transmitted from the backend. Holds a pessimistic lock on all returned + * documents. + * + * @param documentReferences Array with Document References to fetch. + * @param fieldMask If set, specifies the subset of fields to return. + */ + @Override + @Nonnull + public ApiFuture> getAll( + @Nonnull DocumentReference[] documentReferences, @Nullable FieldMask fieldMask) { + Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); + return firestore.getAll(documentReferences, fieldMask, transactionId, /*readTime=*/ null); + } + + /** + * Returns the result set from the provided query. Holds a pessimistic lock on all returned + * documents. + * + * @return The contents of the Document at this DocumentReference. + */ + @Override + @Nonnull + public ApiFuture get(@Nonnull Query query) { + Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); + return query.get(transactionId, /*readTime=*/ null); + } + + /** + * Returns the result from the provided aggregate query. Holds a pessimistic lock on all accessed + * documents. + * + * @return The result of the aggregation. + */ + @Override + @Nonnull + public ApiFuture get(@Nonnull AggregateQuery query) { + Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); + return query.get(transactionId, null); + } +} diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ServerSideTransactionRunner.java similarity index 91% rename from google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java rename to google-cloud-firestore/src/main/java/com/google/cloud/firestore/ServerSideTransactionRunner.java index 6ab4fecd2..5cf4c1f79 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/ServerSideTransactionRunner.java @@ -46,7 +46,7 @@ *

TransactionRunner uses exponential backoff to increase the chance that retries succeed. To * customize the backoff settings, you can specify custom settings via {@link FirestoreOptions}. */ -class TransactionRunner { +final class ServerSideTransactionRunner { private static final Tracer tracer = Tracing.getTracer(); private static final io.opencensus.trace.Status TOO_MANY_RETRIES_STATUS = @@ -62,7 +62,7 @@ class TransactionRunner { private final ExponentialRetryAlgorithm backoffAlgorithm; private final TransactionOptions transactionOptions; private TimedAttemptSettings nextBackoffAttempt; - private Transaction transaction; + private ServerSideTransaction transaction; private int attemptsRemaining; /** @@ -71,7 +71,7 @@ class TransactionRunner { * @param transactionOptions The options determining which executor the {@code userCallback} is * run on and whether the transaction is read-write or read-only */ - TransactionRunner( + ServerSideTransactionRunner( FirestoreImpl firestore, Transaction.AsyncFunction userCallback, TransactionOptions transactionOptions) { @@ -94,8 +94,6 @@ class TransactionRunner { } ApiFuture run() { - this.transaction = new Transaction(firestore, transactionOptions, this.transaction); - --attemptsRemaining; span.addAnnotation( @@ -110,10 +108,18 @@ ApiFuture run() { MoreExecutors.directExecutor()); } + ApiFuture begin() { + ServerSideTransaction previousTransaction = this.transaction; + this.transaction = null; + return ServerSideTransaction.begin(firestore, transactionOptions, previousTransaction); + } + private ApiFuture maybeRollback() { - return transaction.hasTransactionId() - ? transaction.rollback() - : ApiFutures.immediateFuture(null); + return hasTransaction() ? transaction.rollback() : ApiFutures.immediateFuture(null); + } + + private boolean hasTransaction() { + return transaction != null; } /** A callback that invokes the BeginTransaction callback. */ @@ -127,7 +133,7 @@ private ApiFuture rollbackCallback(Void input) { nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt); return ApiFutures.transformAsync( - backoff, TransactionRunner.this::backoffCallback, MoreExecutors.directExecutor()); + backoff, this::backoffCallback, MoreExecutors.directExecutor()); } /** @@ -165,14 +171,15 @@ public void onSuccess(T result) { /** A callback that invokes the BeginTransaction callback. */ private ApiFuture backoffCallback(Void input) { return ApiFutures.transformAsync( - transaction.begin(), this::beginTransactionCallback, MoreExecutors.directExecutor()); + begin(), this::beginTransactionCallback, MoreExecutors.directExecutor()); } /** * The callback for the BeginTransaction RPC, which invokes the user callback and handles all * errors thereafter. */ - private ApiFuture beginTransactionCallback(Void input) { + private ApiFuture beginTransactionCallback(ServerSideTransaction serverSideTransaction) { + this.transaction = serverSideTransaction; return ApiFutures.transformAsync( invokeUserCallback(), this::userFunctionCallback, MoreExecutors.directExecutor()); } @@ -202,7 +209,7 @@ private ApiFuture restartTransactionCallback(Throwable throwable) { } ApiException apiException = (ApiException) throwable; - if (transaction.hasTransactionId() && isRetryableTransactionError(apiException)) { + if (isRetryableTransactionError(apiException)) { if (attemptsRemaining > 0) { span.addAnnotation("retrying"); return run(); @@ -248,7 +255,7 @@ private static boolean isRetryableTransactionError(ApiException exception) { private ApiFuture rollbackAndReject(final Throwable throwable) { final SettableApiFuture failedTransaction = SettableApiFuture.create(); - if (transaction.hasTransactionId()) { + if (hasTransaction()) { // We use `addListener()` since we want to return the original exception regardless of // whether rollback() succeeds. transaction diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java index e76c2fdba..22fa4e065 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java @@ -17,19 +17,8 @@ package com.google.cloud.firestore; import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; -import com.google.cloud.firestore.TransactionOptions.TransactionOptionsType; -import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.firestore.v1.BeginTransactionRequest; -import com.google.firestore.v1.BeginTransactionResponse; -import com.google.firestore.v1.RollbackRequest; -import com.google.firestore.v1.TransactionOptions.ReadOnly; -import com.google.protobuf.ByteString; -import com.google.protobuf.Empty; -import io.opencensus.trace.Tracing; +import com.google.api.core.InternalExtensionOnly; import java.util.List; -import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -40,12 +29,17 @@ * * @see Firestore#runTransaction(Function) */ -public final class Transaction extends UpdateBuilder { +@InternalExtensionOnly +public abstract class Transaction extends UpdateBuilder { private static final Logger LOGGER = Logger.getLogger(Transaction.class.getName()); private static final String READ_BEFORE_WRITE_ERROR_MSG = "Firestore transactions require all reads to be executed before all writes"; + protected Transaction(FirestoreImpl firestore) { + super(firestore); + } + /** * User callback that takes a Firestore Transaction. * @@ -66,89 +60,18 @@ public interface AsyncFunction { ApiFuture updateCallback(Transaction transaction); } - private final TransactionOptions transactionOptions; - private ByteString transactionId; - - Transaction( - FirestoreImpl firestore, - TransactionOptions transactionOptions, - @Nullable Transaction previousTransaction) { - super(firestore); - this.transactionOptions = transactionOptions; - this.transactionId = previousTransaction != null ? previousTransaction.transactionId : null; + @Override + protected String className() { + return "Transaction"; } - public boolean hasTransactionId() { - return transactionId != null; - } + public abstract boolean hasTransactionId(); + @Override Transaction wrapResult(int writeIndex) { return this; } - /** Starts a transaction and obtains the transaction id. */ - ApiFuture begin() { - Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_BEGINTRANSACTION); - BeginTransactionRequest.Builder beginTransaction = BeginTransactionRequest.newBuilder(); - beginTransaction.setDatabase(firestore.getDatabaseName()); - - if (TransactionOptionsType.READ_WRITE.equals(transactionOptions.getType()) - && transactionId != null) { - beginTransaction.getOptionsBuilder().getReadWriteBuilder().setRetryTransaction(transactionId); - } else if (TransactionOptionsType.READ_ONLY.equals(transactionOptions.getType())) { - final ReadOnly.Builder readOnlyBuilder = ReadOnly.newBuilder(); - if (transactionOptions.getReadTime() != null) { - readOnlyBuilder.setReadTime(transactionOptions.getReadTime()); - } - beginTransaction.getOptionsBuilder().setReadOnly(readOnlyBuilder); - } - - ApiFuture transactionBeginFuture = - firestore.sendRequest( - beginTransaction.build(), firestore.getClient().beginTransactionCallable()); - - return ApiFutures.transform( - transactionBeginFuture, - beginTransactionResponse -> { - transactionId = beginTransactionResponse.getTransaction(); - return null; - }, - MoreExecutors.directExecutor()); - } - - /** Commits a transaction. */ - ApiFuture> commit() { - return super.commit(transactionId); - } - - /** Rolls a transaction back and releases all read locks. */ - ApiFuture rollback() { - Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_ROLLBACK); - RollbackRequest req = - RollbackRequest.newBuilder() - .setTransaction(transactionId) - .setDatabase(firestore.getDatabaseName()) - .build(); - - ApiFuture rollbackFuture = - firestore.sendRequest(req, firestore.getClient().rollbackCallable()); - - ApiFuture transform = - ApiFutures.transform(rollbackFuture, resp -> null, MoreExecutors.directExecutor()); - - return ApiFutures.catching( - transform, - Throwable.class, - (error) -> { - LOGGER.log( - Level.WARNING, - "Failed best effort to rollback of transaction " + transactionId, - error); - return null; - }, - MoreExecutors.directExecutor()); - } - /** * Reads the document referred to by the provided DocumentReference. Holds a pessimistic lock on * the returned document. @@ -156,14 +79,7 @@ ApiFuture rollback() { * @return The contents of the Document at this DocumentReference. */ @Nonnull - public ApiFuture get(@Nonnull DocumentReference documentRef) { - Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); - Tracing.getTracer().getCurrentSpan().addAnnotation(TraceUtil.SPAN_NAME_GETDOCUMENT); - return ApiFutures.transform( - firestore.getAll(new DocumentReference[] {documentRef}, /*fieldMask=*/ null, transactionId), - snapshots -> snapshots.isEmpty() ? null : snapshots.get(0), - MoreExecutors.directExecutor()); - } + public abstract ApiFuture get(@Nonnull DocumentReference documentRef); /** * Retrieves multiple documents from Firestore. Holds a pessimistic lock on all returned @@ -172,12 +88,8 @@ public ApiFuture get(@Nonnull DocumentReference documentRef) { * @param documentReferences List of Document References to fetch. */ @Nonnull - public ApiFuture> getAll( - @Nonnull DocumentReference... documentReferences) { - Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); - - return firestore.getAll(documentReferences, /*fieldMask=*/ null, transactionId); - } + public abstract ApiFuture> getAll( + @Nonnull DocumentReference... documentReferences); /** * Retrieves multiple documents from Firestore, while optionally applying a field mask to reduce @@ -188,12 +100,8 @@ public ApiFuture> getAll( * @param fieldMask If set, specifies the subset of fields to return. */ @Nonnull - public ApiFuture> getAll( - @Nonnull DocumentReference[] documentReferences, @Nullable FieldMask fieldMask) { - Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); - - return firestore.getAll(documentReferences, fieldMask, transactionId); - } + public abstract ApiFuture> getAll( + @Nonnull DocumentReference[] documentReferences, @Nullable FieldMask fieldMask); /** * Returns the result set from the provided query. Holds a pessimistic lock on all returned @@ -202,11 +110,7 @@ public ApiFuture> getAll( * @return The contents of the Document at this DocumentReference. */ @Nonnull - public ApiFuture get(@Nonnull Query query) { - Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); - - return query.get(transactionId); - } + public abstract ApiFuture get(@Nonnull Query query); /** * Returns the result from the provided aggregate query. Holds a pessimistic lock on all accessed @@ -215,9 +119,5 @@ public ApiFuture get(@Nonnull Query query) { * @return The result of the aggregation. */ @Nonnull - public ApiFuture get(@Nonnull AggregateQuery query) { - Preconditions.checkState(isEmpty(), READ_BEFORE_WRITE_ERROR_MSG); - - return query.get(transactionId); - } + public abstract ApiFuture get(@Nonnull AggregateQuery query); } diff --git a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java index 3b589fa9b..27f5a497d 100644 --- a/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java +++ b/google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java @@ -296,15 +296,17 @@ private T addWrite(DocumentReference documentReference, Write.Builder write) { synchronized (writes) { Preconditions.checkState( !committed, - String.format( - "Cannot modify a %s that has already been committed.", - this.getClass().getSimpleName())); + String.format("Cannot modify a %s that has already been committed.", className())); writes.add(operation); writeIndex = writes.size() - 1; } return wrapResult(writeIndex); } + protected String className() { + return this.getClass().getSimpleName(); + } + /** Removes all values in 'fields' that are not specified in 'fieldMask'. */ private static Map applyFieldMask( Map fields, List fieldMask) { diff --git a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index 2c092918b..3766800a5 100644 --- a/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -1785,18 +1785,20 @@ public void readOnlyTransaction_failureWhenAttemptingWrite() }, TransactionOptions.createReadOnlyOptionsBuilder().build()); - try { - runTransaction.get(10, TimeUnit.SECONDS); - } catch (ExecutionException e) { - final Throwable cause = e.getCause(); - assertThat(cause).isInstanceOf(FirestoreException.class); - final Throwable rootCause = ExceptionUtils.getRootCause(cause); - assertThat(rootCause).isInstanceOf(StatusRuntimeException.class); - final StatusRuntimeException invalidArgument = (StatusRuntimeException) rootCause; - final Status status = invalidArgument.getStatus(); - assertThat(status.getCode()).isEqualTo(Code.INVALID_ARGUMENT); - assertThat(status.getDescription()).contains("read-only"); - } + ExecutionException e = + assertThrows( + ExecutionException.class, + () -> { + runTransaction.get(10, TimeUnit.SECONDS); + }); + final Throwable cause = e.getCause(); + assertThat(cause).isInstanceOf(FirestoreException.class); + final Throwable rootCause = ExceptionUtils.getRootCause(cause); + assertThat(rootCause).isInstanceOf(StatusRuntimeException.class); + final StatusRuntimeException invalidArgument = (StatusRuntimeException) rootCause; + final Status status = invalidArgument.getStatus(); + assertThat(status.getCode()).isEqualTo(Code.INVALID_ARGUMENT); + assertThat(status.getDescription()).contains("read-only"); } @Test @@ -1839,7 +1841,11 @@ public void readOnlyTransaction_failureWhenAttemptReadOlderThan60Seconds() // To ensure we exceed this, we use 120 minutes. // If this test fails, we should likely be update documentation to reflect new value. See all // usages of "Read Time" on proto, and within SDK. - final long twoHours = System.currentTimeMillis() / 1000 - 7200; + // + // If Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp within the + // past 7 days. For that reason `twoHours` is calculated to whole minute to more accurately + // catch this situation. + final long twoHours = (System.currentTimeMillis() / 60_000 - 120) * 60; final TransactionOptions options = TransactionOptions.createReadOnlyOptionsBuilder() .setReadTime(