Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-22089] Fix query retry in Java FirestoreIO #22175

Merged
merged 1 commit into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@

## Bugfixes

* Fixed a condition where retrying queries would yield an incorrect cursor in the Java SDK Firestore Connector ([#22089](https://github.com/apache/beam/issues/22089)).
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.firestore.v1.stub.FirestoreStub;
import com.google.firestore.v1.BatchGetDocumentsRequest;
import com.google.firestore.v1.BatchGetDocumentsResponse;
import com.google.firestore.v1.BatchGetDocumentsResponse.ResultCase;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.ListCollectionIdsRequest;
import com.google.firestore.v1.ListCollectionIdsResponse;
Expand All @@ -43,15 +44,11 @@
import com.google.firestore.v1.RunQueryRequest;
import com.google.firestore.v1.RunQueryResponse;
import com.google.firestore.v1.StructuredQuery;
import com.google.firestore.v1.StructuredQuery.Direction;
import com.google.firestore.v1.StructuredQuery.FieldReference;
import com.google.firestore.v1.StructuredQuery.Order;
import com.google.firestore.v1.Value;
import com.google.protobuf.Message;
import com.google.protobuf.ProtocolStringList;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.ImplicitlyWindowedFirestoreDoFn;
Expand Down Expand Up @@ -109,46 +106,31 @@ protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> getCallable
protected RunQueryRequest setStartFrom(
RunQueryRequest element, RunQueryResponse runQueryResponse) {
StructuredQuery query = element.getStructuredQuery();
StructuredQuery.Builder builder;
List<Order> orderByList = query.getOrderByList();
// if the orderByList is empty that means the default sort of "__name__ ASC" will be used
// Before we can set the cursor to the last document name read, we need to explicitly add
// the order of "__name__ ASC" because a cursor value must map to an order by
if (orderByList.isEmpty()) {
builder =
query
.toBuilder()
.addOrderBy(
Order.newBuilder()
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
.setDirection(Direction.ASCENDING)
.build())
.setStartAt(
Cursor.newBuilder()
.setBefore(false)
.addValues(
Value.newBuilder()
.setReferenceValue(runQueryResponse.getDocument().getName())
.build()));
} else {
Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
Map<String, Value> fieldsMap = runQueryResponse.getDocument().getFieldsMap();
for (Order order : orderByList) {
String fieldPath = order.getField().getFieldPath();
Value value = fieldsMap.get(fieldPath);
if (value != null) {
cursor.addValues(value);
} else if ("__name__".equals(fieldPath)) {
cursor.addValues(
Value.newBuilder()
.setReferenceValue(runQueryResponse.getDocument().getName())
.build());
}
StructuredQuery.Builder builder = query.toBuilder();
builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
for (Order order : builder.getOrderByList()) {
Value value =
QueryUtils.lookupDocumentValue(
runQueryResponse.getDocument(), order.getField().getFieldPath());
if (value == null) {
throw new IllegalStateException(
String.format(
"Failed to build query resumption token, field '%s' not found in doc with __name__ '%s'",
order.getField().getFieldPath(), runQueryResponse.getDocument().getName()));
}
builder = query.toBuilder().setStartAt(cursor.build());
cursor.addValues(value);
}
builder.setStartAt(cursor.build());
return element.toBuilder().setStructuredQuery(builder.build()).build();
}

@Override
protected @Nullable RunQueryResponse resumptionValue(
@Nullable RunQueryResponse previousValue, RunQueryResponse nextValue) {
// We need a document to resume, may be null if reporting partial progress.
return nextValue.hasDocument() ? nextValue : previousValue;
}
}

/**
Expand Down Expand Up @@ -380,6 +362,13 @@ protected BatchGetDocumentsRequest setStartFrom(
"Unable to determine BatchGet resumption point. Most recently received doc __name__ '%s'",
foundName != null ? foundName : missing));
}

@Override
protected @Nullable BatchGetDocumentsResponse resumptionValue(
@Nullable BatchGetDocumentsResponse previousValue, BatchGetDocumentsResponse newValue) {
// No sense in resuming from an empty result.
return newValue.getResultCase() == ResultCase.RESULT_NOT_SET ? previousValue : newValue;
}
}

/**
Expand Down Expand Up @@ -407,6 +396,8 @@ protected StreamingFirestoreV1ReadFn(

protected abstract InT setStartFrom(InT element, OutT out);

protected abstract @Nullable OutT resumptionValue(@Nullable OutT previousValue, OutT newValue);

@Override
public final void processElement(ProcessContext c) throws Exception {
@SuppressWarnings(
Expand All @@ -421,14 +412,14 @@ public final void processElement(ProcessContext c) throws Exception {
}

Instant start = clock.instant();
InT request =
lastReceivedValue == null ? element : setStartFrom(element, lastReceivedValue);
try {
InT request =
lastReceivedValue == null ? element : setStartFrom(element, lastReceivedValue);
attempt.recordRequestStart(start);
ServerStream<OutT> serverStream = getCallable(firestoreStub).call(request);
attempt.recordRequestSuccessful(clock.instant());
for (OutT out : serverStream) {
lastReceivedValue = out;
lastReceivedValue = resumptionValue(lastReceivedValue, out);
attempt.recordStreamValue(clock.instant());
c.output(out);
}
Expand Down
Loading