Skip to content

Commit

Permalink
Merge pull request #22175: [BEAM-22089] Fix query retry in Java Fires…
Browse files Browse the repository at this point in the history
…toreIO
  • Loading branch information
chamikaramj authored Jul 15, 2022
2 parents 64bcc7d + ad3af31 commit 2b42751
Show file tree
Hide file tree
Showing 5 changed files with 817 additions and 146 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@

## Bugfixes

* Fixed a condition where retrying queries would yield an incorrect cursor in the Java SDK Firestore Connector ([#22089](https://github.com/apache/beam/issues/22089)).
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## Known Issues

* ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.firestore.v1.stub.FirestoreStub;
import com.google.firestore.v1.BatchGetDocumentsRequest;
import com.google.firestore.v1.BatchGetDocumentsResponse;
import com.google.firestore.v1.BatchGetDocumentsResponse.ResultCase;
import com.google.firestore.v1.Cursor;
import com.google.firestore.v1.ListCollectionIdsRequest;
import com.google.firestore.v1.ListCollectionIdsResponse;
Expand All @@ -43,15 +44,11 @@
import com.google.firestore.v1.RunQueryRequest;
import com.google.firestore.v1.RunQueryResponse;
import com.google.firestore.v1.StructuredQuery;
import com.google.firestore.v1.StructuredQuery.Direction;
import com.google.firestore.v1.StructuredQuery.FieldReference;
import com.google.firestore.v1.StructuredQuery.Order;
import com.google.firestore.v1.Value;
import com.google.protobuf.Message;
import com.google.protobuf.ProtocolStringList;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.firestore.FirestoreDoFn.ImplicitlyWindowedFirestoreDoFn;
Expand Down Expand Up @@ -109,46 +106,31 @@ protected ServerStreamingCallable<RunQueryRequest, RunQueryResponse> getCallable
protected RunQueryRequest setStartFrom(
RunQueryRequest element, RunQueryResponse runQueryResponse) {
StructuredQuery query = element.getStructuredQuery();
StructuredQuery.Builder builder;
List<Order> orderByList = query.getOrderByList();
// if the orderByList is empty that means the default sort of "__name__ ASC" will be used
// Before we can set the cursor to the last document name read, we need to explicitly add
// the order of "__name__ ASC" because a cursor value must map to an order by
if (orderByList.isEmpty()) {
builder =
query
.toBuilder()
.addOrderBy(
Order.newBuilder()
.setField(FieldReference.newBuilder().setFieldPath("__name__").build())
.setDirection(Direction.ASCENDING)
.build())
.setStartAt(
Cursor.newBuilder()
.setBefore(false)
.addValues(
Value.newBuilder()
.setReferenceValue(runQueryResponse.getDocument().getName())
.build()));
} else {
Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
Map<String, Value> fieldsMap = runQueryResponse.getDocument().getFieldsMap();
for (Order order : orderByList) {
String fieldPath = order.getField().getFieldPath();
Value value = fieldsMap.get(fieldPath);
if (value != null) {
cursor.addValues(value);
} else if ("__name__".equals(fieldPath)) {
cursor.addValues(
Value.newBuilder()
.setReferenceValue(runQueryResponse.getDocument().getName())
.build());
}
StructuredQuery.Builder builder = query.toBuilder();
builder.addAllOrderBy(QueryUtils.getImplicitOrderBy(query));
Cursor.Builder cursor = Cursor.newBuilder().setBefore(false);
for (Order order : builder.getOrderByList()) {
Value value =
QueryUtils.lookupDocumentValue(
runQueryResponse.getDocument(), order.getField().getFieldPath());
if (value == null) {
throw new IllegalStateException(
String.format(
"Failed to build query resumption token, field '%s' not found in doc with __name__ '%s'",
order.getField().getFieldPath(), runQueryResponse.getDocument().getName()));
}
builder = query.toBuilder().setStartAt(cursor.build());
cursor.addValues(value);
}
builder.setStartAt(cursor.build());
return element.toBuilder().setStructuredQuery(builder.build()).build();
}

@Override
protected @Nullable RunQueryResponse resumptionValue(
@Nullable RunQueryResponse previousValue, RunQueryResponse nextValue) {
// We need a document to resume, may be null if reporting partial progress.
return nextValue.hasDocument() ? nextValue : previousValue;
}
}

/**
Expand Down Expand Up @@ -380,6 +362,13 @@ protected BatchGetDocumentsRequest setStartFrom(
"Unable to determine BatchGet resumption point. Most recently received doc __name__ '%s'",
foundName != null ? foundName : missing));
}

@Override
protected @Nullable BatchGetDocumentsResponse resumptionValue(
@Nullable BatchGetDocumentsResponse previousValue, BatchGetDocumentsResponse newValue) {
// No sense in resuming from an empty result.
return newValue.getResultCase() == ResultCase.RESULT_NOT_SET ? previousValue : newValue;
}
}

/**
Expand Down Expand Up @@ -407,6 +396,8 @@ protected StreamingFirestoreV1ReadFn(

protected abstract InT setStartFrom(InT element, OutT out);

protected abstract @Nullable OutT resumptionValue(@Nullable OutT previousValue, OutT newValue);

@Override
public final void processElement(ProcessContext c) throws Exception {
@SuppressWarnings(
Expand All @@ -421,14 +412,14 @@ public final void processElement(ProcessContext c) throws Exception {
}

Instant start = clock.instant();
InT request =
lastReceivedValue == null ? element : setStartFrom(element, lastReceivedValue);
try {
InT request =
lastReceivedValue == null ? element : setStartFrom(element, lastReceivedValue);
attempt.recordRequestStart(start);
ServerStream<OutT> serverStream = getCallable(firestoreStub).call(request);
attempt.recordRequestSuccessful(clock.instant());
for (OutT out : serverStream) {
lastReceivedValue = out;
lastReceivedValue = resumptionValue(lastReceivedValue, out);
attempt.recordStreamValue(clock.instant());
c.output(out);
}
Expand Down
Loading

0 comments on commit 2b42751

Please sign in to comment.