Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: reverse scans public preview #1711

Merged
merged 7 commits into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions google-cloud-bigtable/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,15 @@
<method>*</method>
<to>*</to>
</difference>
<!-- Removed methods in an internal class -->
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigtable/data/v2/internal/RowSetUtil</className>
<method>*</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/bigtable/data/v2/stub/readrows/RowMerger</className>
<method>*</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class RowMergerUtil implements AutoCloseable {

public RowMergerUtil() {
RowBuilder<Row> rowBuilder = new DefaultRowAdapter().createRowBuilder();
merger = new RowMerger<>(rowBuilder);
merger = new RowMerger<>(rowBuilder, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,80 +50,79 @@ public final class RowSetUtil {
private RowSetUtil() {}

/**
* Splits the provided {@link RowSet} along the provided splitPoint into 2 segments. The right
* segment will contain all keys that are strictly greater than the splitPoint and all {@link
* RowRange}s truncated to start right after the splitPoint. The primary usecase is to resume a
* broken ReadRows stream.
* Removes all the keys and range parts that fall on or before the splitPoint.
*
* <p>The direction of before is determined by fromStart: for forward scans fromStart is true and
* will remove all the keys and range segments that would've been read prior to the splitPoint
* (ie. all of the keys sort lexiographically at or before the split point. For reverse scans,
* fromStart is false and all segments that sort lexiographically at or after the split point are
* removed. The primary usecase is to resume a broken ReadRows stream.
*/
@Nonnull
public static Split split(@Nonnull RowSet rowSet, @Nonnull ByteString splitPoint) {
// Edgecase: splitPoint is the leftmost key ("")
if (splitPoint.isEmpty()) {
return Split.of(null, rowSet);
}
public static RowSet erase(RowSet rowSet, ByteString splitPoint, boolean fromStart) {
RowSet.Builder newRowSet = RowSet.newBuilder();

// An empty RowSet represents a full table scan. Make that explicit so that there is RowRange to
// split.
if (rowSet.getRowKeysList().isEmpty() && rowSet.getRowRangesList().isEmpty()) {
rowSet = RowSet.newBuilder().addRowRanges(RowRange.getDefaultInstance()).build();
}

RowSet.Builder leftBuilder = RowSet.newBuilder();
boolean leftIsEmpty = true;
RowSet.Builder rightBuilder = RowSet.newBuilder();
boolean rightIsEmpty = true;

// Handle point lookups
for (ByteString key : rowSet.getRowKeysList()) {
if (ByteStringComparator.INSTANCE.compare(key, splitPoint) <= 0) {
leftBuilder.addRowKeys(key);
leftIsEmpty = false;
if (fromStart) {
// key is right of the split
if (ByteStringComparator.INSTANCE.compare(key, splitPoint) > 0) {
newRowSet.addRowKeys(key);
}
} else {
rightBuilder.addRowKeys(key);
rightIsEmpty = false;
// key is left of the split
if (ByteStringComparator.INSTANCE.compare(key, splitPoint) < 0) {
newRowSet.addRowKeys(key);
}
}
}

for (RowRange range : rowSet.getRowRangesList()) {
StartPoint startPoint = StartPoint.extract(range);
int startCmp =
ComparisonChain.start()
.compare(startPoint.value, splitPoint, ByteStringComparator.INSTANCE)
// when value lies on the split point, only closed start points are on the left
.compareTrueFirst(startPoint.isClosed, true)
.result();

// Range is fully on the right side
if (startCmp > 0) {
rightBuilder.addRowRanges(range);
rightIsEmpty = false;
continue;
// Handle ranges
for (RowRange rowRange : rowSet.getRowRangesList()) {
RowRange newRange = truncateRange(rowRange, splitPoint, fromStart);
if (newRange != null) {
newRowSet.addRowRanges(newRange);
}
}

EndPoint endPoint = EndPoint.extract(range);
int endCmp =
ComparisonChain.start()
// empty (true) end key means rightmost regardless of the split point
.compareFalseFirst(endPoint.value.isEmpty(), false)
.compare(endPoint.value, splitPoint, ByteStringComparator.INSTANCE)
// don't care if the endpoint is open/closed: both will be on the left if the value is
// <=
.result();

if (endCmp <= 0) {
// Range is fully on the left
leftBuilder.addRowRanges(range);
leftIsEmpty = false;
} else {
// Range is split
leftBuilder.addRowRanges(range.toBuilder().setEndKeyClosed(splitPoint));
leftIsEmpty = false;
rightBuilder.addRowRanges(range.toBuilder().setStartKeyOpen(splitPoint));
rightIsEmpty = false;
// Return the new rowset if there is anything left to read
RowSet result = newRowSet.build();
if (result.getRowKeysList().isEmpty() && result.getRowRangesList().isEmpty()) {
return null;
}
return result;
}

private static RowRange truncateRange(RowRange range, ByteString split, boolean fromStart) {
if (fromStart) {
// range end is on or left of the split: skip
if (EndPoint.extract(range).compareTo(new EndPoint(split, true)) <= 0) {
return null;
}
} else {
// range is on or right of the split
if (StartPoint.extract(range).compareTo(new StartPoint(split, true)) >= 0) {
return null;
}
}
RowRange.Builder newRange = range.toBuilder();

if (fromStart) {
// range start is on or left of the split
if (StartPoint.extract(range).compareTo(new StartPoint(split, true)) <= 0) {
newRange.setStartKeyOpen(split);
}
} else {
// range end is on or right of the split
if (EndPoint.extract(range).compareTo(new EndPoint(split, true)) >= 0) {
newRange.setEndKeyOpen(split);
}
}

return Split.of(
leftIsEmpty ? null : leftBuilder.build(), rightIsEmpty ? null : rightBuilder.build());
return newRange.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,26 @@ public Query limit(long limit) {
return this;
}

/**
* Return rows in reverse order.
*
* <p>The row will be streamed in reverse lexiographic order of the keys. The row key ranges are
* still expected to be oriented the same way as forwards. ie [a,c] where a <= c. The row content
* will remain unchanged from the ordering forward scans. This is particularly useful to get the
* last N records before a key:
*
* <pre>{@code
* query
* .range(ByteStringRange.unbounded().endOpen("key"))
* .limit(10)
* .reversed(true)
* }</pre>
*/
public Query reversed(boolean enable) {
builder.setReversed(enable);
return this;
}

/**
* Split this query into multiple queries that can be evenly distributed across Bigtable nodes and
* be run in parallel. This method takes the results from {@link
Expand Down Expand Up @@ -379,11 +399,12 @@ public boolean advance(@Nonnull ByteString lastSeenRowKey) {

// Split the row ranges / row keys. Return false if there's nothing
// left on the right of the split point.
RowSetUtil.Split split = RowSetUtil.split(query.builder.getRows(), lastSeenRowKey);
if (split.getRight() == null) {
RowSet remaining =
RowSetUtil.erase(query.builder.getRows(), lastSeenRowKey, !query.builder.getReversed());
if (remaining == null) {
return false;
}
query.builder.setRows(split.getRight());
query.builder.setRows(remaining);
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ private Builder() {
.setTotalTimeout(PRIME_REQUEST_TIMEOUT)
.build());

featureFlags = FeatureFlags.newBuilder();
featureFlags = FeatureFlags.newBuilder().setReverseScans(true);
}

private Builder(EnhancedBigtableStubSettings settings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public ReadRowsRequest getResumeRequest(ReadRowsRequest originalRequest) {
return originalRequest;
}

RowSet remaining = RowSetUtil.split(originalRequest.getRows(), lastKey).getRight();
RowSet remaining =
RowSetUtil.erase(originalRequest.getRows(), lastKey, !originalRequest.getReversed());

// Edge case: retrying a fulfilled request.
// A fulfilled request is one that has had all of its row keys and ranges fulfilled, or if it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public class RowMerger<RowT> implements Reframer<RowT, ReadRowsResponse> {
private final StateMachine<RowT> stateMachine;
private Queue<RowT> mergedRows;

public RowMerger(RowBuilder<RowT> rowBuilder) {
stateMachine = new StateMachine<>(rowBuilder);
public RowMerger(RowBuilder<RowT> rowBuilder, boolean reversed) {
stateMachine = new StateMachine<>(rowBuilder, reversed);
mergedRows = new ArrayDeque<>();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public RowMergingCallable(
public void call(
ReadRowsRequest request, ResponseObserver<RowT> responseObserver, ApiCallContext context) {
RowBuilder<RowT> rowBuilder = rowAdapter.createRowBuilder();
RowMerger<RowT> merger = new RowMerger<>(rowBuilder);
RowMerger<RowT> merger = new RowMerger<>(rowBuilder, request.getReversed());
ReframingResponseObserver<ReadRowsResponse, RowT> innerObserver =
new ReframingResponseObserver<>(responseObserver, merger);
inner.call(request, innerObserver, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
*/
final class StateMachine<RowT> {
private final RowBuilder<RowT> adapter;
private boolean reversed;
private State currentState;
private ByteString lastCompleteRowKey;

Expand All @@ -102,9 +103,11 @@ final class StateMachine<RowT> {
* Initialize a new state machine that's ready for a new row.
*
* @param adapter The adapter that will build the final row.
* @param reversed
*/
StateMachine(RowBuilder<RowT> adapter) {
StateMachine(RowBuilder<RowT> adapter, boolean reversed) {
this.adapter = adapter;
this.reversed = reversed;
reset();
}

Expand Down Expand Up @@ -261,9 +264,15 @@ State handleChunk(CellChunk chunk) {
validate(chunk.hasFamilyName(), "AWAITING_NEW_ROW: family missing");
validate(chunk.hasQualifier(), "AWAITING_NEW_ROW: qualifier missing");
if (lastCompleteRowKey != null) {
validate(
ByteStringComparator.INSTANCE.compare(lastCompleteRowKey, chunk.getRowKey()) < 0,
"AWAITING_NEW_ROW: key must be strictly increasing");

int cmp = ByteStringComparator.INSTANCE.compare(lastCompleteRowKey, chunk.getRowKey());
String direction = "increasing";
if (reversed) {
cmp *= -1;
direction = "decreasing";
}

validate(cmp < 0, "AWAITING_NEW_ROW: key must be strictly " + direction);
}

rowKey = chunk.getRowKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.WatchdogProvider;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.FeatureFlags;
import com.google.bigtable.v2.InstanceName;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
Expand All @@ -36,8 +37,14 @@
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.common.base.Preconditions;
import com.google.common.io.BaseEncoding;
import io.grpc.Attributes;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerTransportFilter;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
Expand Down Expand Up @@ -78,12 +85,24 @@ public class BigtableDataClientFactoryTest {

private final BlockingQueue<Attributes> setUpAttributes = new LinkedBlockingDeque<>();
private final BlockingQueue<Attributes> terminateAttributes = new LinkedBlockingDeque<>();
private final BlockingQueue<Metadata> requestMetadata = new LinkedBlockingDeque<>();

@Before
public void setUp() throws IOException {
service = new FakeBigtableService();
server =
FakeServiceBuilder.create(service)
.intercept(
new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
requestMetadata.add(headers);
return next.startCall(call, headers);
}
})
.addTransportFilter(
new ServerTransportFilter() {
@Override
Expand Down Expand Up @@ -276,6 +295,24 @@ public void testCreateWithRefreshingChannel() throws Exception {
assertThat(terminateAttributes).hasSize(poolSize);
}

@Test
public void testFeatureFlags() throws Exception {
try (BigtableDataClientFactory factory = BigtableDataClientFactory.create(defaultSettings);
BigtableDataClient client = factory.createDefault()) {

requestMetadata.clear();
client.mutateRow(RowMutation.create("some-table", "some-key").deleteRow());
}

Metadata metadata = requestMetadata.take();
String encodedValue =
metadata.get(Metadata.Key.of("bigtable-features", Metadata.ASCII_STRING_MARSHALLER));
FeatureFlags featureFlags =
FeatureFlags.parseFrom(BaseEncoding.base64Url().decode(encodedValue));

assertThat(featureFlags.getReverseScans()).isTrue();
}

@Test
public void testBulkMutationFlowControllerConfigured() throws Exception {
BigtableDataSettings settings =
Expand Down Expand Up @@ -306,6 +343,7 @@ private static class FakeBigtableService extends BigtableGrpc.BigtableImplBase {
volatile MutateRowRequest lastRequest;
BlockingQueue<ReadRowsRequest> readRowsRequests = new LinkedBlockingDeque<>();
BlockingQueue<PingAndWarmRequest> pingAndWarmRequests = new LinkedBlockingDeque<>();

private ApiFunction<ReadRowsRequest, ReadRowsResponse> readRowsCallback =
new ApiFunction<ReadRowsRequest, ReadRowsResponse>() {
@Override
Expand Down
Loading