From c3086d99fc2009dcc296a6267f83d017f6e789cb Mon Sep 17 00:00:00 2001 From: tengzhonger <109308630+tengzhonger@users.noreply.github.com> Date: Mon, 15 Aug 2022 12:34:54 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20Change=20CDC=20related=20APIs=20to=20re?= =?UTF-8?q?turn=20ByteStringRange=20instead=20of=20Ro=E2=80=A6=20(#1355)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: Change CDC related APIs to return ByteStringRange instead of RowRange 1. GenerateInitialChangeStreamPartitions 2. ChangeStreamContinuationToken::GetRowRange * fix: Fix tests * fix: Address comments Co-authored-by: Teng Zhong --- .../bigtable/data/v2/BigtableDataClient.java | 23 ++++++++------- .../models/ChangeStreamContinuationToken.java | 16 ++++++---- .../cloud/bigtable/data/v2/models/Range.java | 19 ++++++++++++ .../data/v2/stub/EnhancedBigtableStub.java | 26 +++++++++-------- .../v2/stub/EnhancedBigtableStubSettings.java | 8 ++--- ...ialChangeStreamPartitionsUserCallable.java | 23 +++++++-------- .../data/v2/BigtableDataClientTests.java | 6 ++-- .../ChangeStreamContinuationTokenTest.java | 22 +++++++------- .../v2/models/ChangeStreamRecordTest.java | 11 +++++-- .../bigtable/data/v2/models/RangeTest.java | 11 +++++++ ...ChangeStreamRecordMergingCallableTest.java | 29 ++++++++++--------- ...hangeStreamPartitionsUserCallableTest.java | 17 +++-------- ...ReadChangeStreamMergingAcceptanceTest.java | 14 +++++++-- 13 files changed, 134 insertions(+), 91 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java index acfbff0747..77b909b7a1 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataClient.java @@ -29,7 +29,6 @@ import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; -import com.google.bigtable.v2.RowRange; import com.google.cloud.bigtable.data.v2.models.BulkMutation; import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; @@ -37,6 +36,7 @@ import com.google.cloud.bigtable.data.v2.models.Filters.Filter; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery; import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.Row; @@ -1503,11 +1503,11 @@ public UnaryCallable readModifyWriteRowCallable() { * String tableId = "[TABLE]"; * * try { - * ServerStream stream = bigtableDataClient.generateInitialChangeStreamPartitions(tableId); + * ServerStream stream = bigtableDataClient.generateInitialChangeStreamPartitions(tableId); * int count = 0; * * // Iterator style - * for (RowRange partition : stream) { + * for (ByteStringRange partition : stream) { * if (++count > 10) { * stream.cancel(); * break; @@ -1525,7 +1525,7 @@ public UnaryCallable readModifyWriteRowCallable() { * @see ServerStreamingCallable For call styles. */ @InternalApi("Used in Changestream beam pipeline.") - public ServerStream generateInitialChangeStreamPartitions(String tableId) { + public ServerStream generateInitialChangeStreamPartitions(String tableId) { return generateInitialChangeStreamPartitionsCallable().call(tableId); } @@ -1545,7 +1545,7 @@ public ServerStream generateInitialChangeStreamPartitions(String table * public void onStart(StreamController controller) { * this.controller = controller; * } - * public void onResponse(RowRange partition) { + * public void onResponse(ByteStringRange partition) { * if (++count > 10) { * controller.cancel(); * return; @@ -1568,7 +1568,7 @@ public ServerStream generateInitialChangeStreamPartitions(String table */ @InternalApi("Used in Changestream beam pipeline.") public void generateInitialChangeStreamPartitionsAsync( - String tableId, ResponseObserver observer) { + String tableId, ResponseObserver observer) { generateInitialChangeStreamPartitionsCallable().call(tableId, observer); } @@ -1584,7 +1584,7 @@ public void generateInitialChangeStreamPartitionsAsync( * * // Iterator style * try { - * for(RowRange partition : bigtableDataClient.generateInitialChangeStreamPartitionsCallable().call(tableId)) { + * for(ByteStringRange partition : bigtableDataClient.generateInitialChangeStreamPartitionsCallable().call(tableId)) { * // Do something with partition * } * } catch (NotFoundException e) { @@ -1595,7 +1595,7 @@ public void generateInitialChangeStreamPartitionsAsync( * * // Sync style * try { - * List partitions = bigtableDataClient.generateInitialChangeStreamPartitionsCallable().all().call(tableId); + * List partitions = bigtableDataClient.generateInitialChangeStreamPartitionsCallable().all().call(tableId); * } catch (NotFoundException e) { * System.out.println("Tried to read a non-existent table"); * } catch (RuntimeException e) { @@ -1603,10 +1603,10 @@ public void generateInitialChangeStreamPartitionsAsync( * } * * // Point look up - * ApiFuture partitionFuture = + * ApiFuture partitionFuture = * bigtableDataClient.generateInitialChangeStreamPartitionsCallable().first().futureCall(tableId); * - * ApiFutures.addCallback(partitionFuture, new ApiFutureCallback() { + * ApiFutures.addCallback(partitionFuture, new ApiFutureCallback() { * public void onFailure(Throwable t) { * if (t instanceof NotFoundException) { * System.out.println("Tried to read a non-existent table"); @@ -1626,7 +1626,8 @@ public void generateInitialChangeStreamPartitionsAsync( * @see ServerStreamingCallable For call styles. */ @InternalApi("Used in Changestream beam pipeline.") - public ServerStreamingCallable generateInitialChangeStreamPartitionsCallable() { + public ServerStreamingCallable + generateInitialChangeStreamPartitionsCallable() { return stub.generateInitialChangeStreamPartitionsCallable(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java index af7b15ea4e..06e975c827 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java @@ -54,9 +54,13 @@ public ChangeStreamContinuationToken( .build(); } - // TODO: Change this to return ByteStringRange. - public RowRange getRowRange() { - return this.tokenProto.getPartition().getRowRange(); + /** + * Get the partition of the current continuation token, represented by a {@link ByteStringRange}. + */ + public ByteStringRange getPartition() { + return ByteStringRange.create( + this.tokenProto.getPartition().getRowRange().getStartKeyClosed(), + this.tokenProto.getPartition().getRowRange().getEndKeyOpen()); } public String getToken() { @@ -95,19 +99,19 @@ public boolean equals(Object o) { return false; } ChangeStreamContinuationToken otherToken = (ChangeStreamContinuationToken) o; - return Objects.equal(getRowRange(), otherToken.getRowRange()) + return Objects.equal(getPartition(), otherToken.getPartition()) && Objects.equal(getToken(), otherToken.getToken()); } @Override public int hashCode() { - return Objects.hashCode(getRowRange(), getToken()); + return Objects.hashCode(getPartition(), getToken()); } @Override public String toString() { return MoreObjects.toStringHelper(this) - .add("rowRange", getRowRange()) + .add("partition", getPartition()) .add("token", getToken()) .toString(); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java index 4d7a10ab2a..c56a4163b8 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Range.java @@ -15,10 +15,13 @@ */ package com.google.cloud.bigtable.data.v2.models; +import com.google.api.core.InternalApi; import com.google.api.core.InternalExtensionOnly; +import com.google.bigtable.v2.RowRange; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; @@ -395,6 +398,22 @@ private void writeObject(ObjectOutputStream output) throws IOException { output.defaultWriteObject(); } + @InternalApi("Used in Changestream beam pipeline.") + public static ByteString toByteString(ByteStringRange byteStringRange) { + return RowRange.newBuilder() + .setStartKeyClosed(byteStringRange.getStart()) + .setEndKeyOpen(byteStringRange.getEnd()) + .build() + .toByteString(); + } + + @InternalApi("Used in Changestream beam pipeline.") + public static ByteStringRange toByteStringRange(ByteString byteString) + throws InvalidProtocolBufferException { + RowRange rowRange = RowRange.newBuilder().mergeFrom(byteString).build(); + return ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 4e29f2a3f5..10eef25e7e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -74,6 +74,7 @@ import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery; import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.Row; @@ -155,7 +156,7 @@ public class EnhancedBigtableStub implements AutoCloseable { private final UnaryCallable checkAndMutateRowCallable; private final UnaryCallable readModifyWriteRowCallable; - private final ServerStreamingCallable + private final ServerStreamingCallable generateInitialChangeStreamPartitionsCallable; private final ServerStreamingCallable @@ -833,7 +834,7 @@ public Map extract(ReadModifyWriteRowRequest request) { * RowRange}. * */ - private ServerStreamingCallable + private ServerStreamingCallable createGenerateInitialChangeStreamPartitionsCallable() { ServerStreamingCallable< GenerateInitialChangeStreamPartitionsRequest, @@ -862,22 +863,22 @@ public Map extract( .build(), settings.generateInitialChangeStreamPartitionsSettings().getRetryableCodes()); - ServerStreamingCallable userCallable = + ServerStreamingCallable userCallable = new GenerateInitialChangeStreamPartitionsUserCallable(base, requestContext); - ServerStreamingCallable withStatsHeaders = + ServerStreamingCallable withStatsHeaders = new StatsHeadersServerStreamingCallable<>(userCallable); // Sometimes GenerateInitialChangeStreamPartitions connections are disconnected via an RST // frame. This error is transient and should be treated similar to UNAVAILABLE. However, this // exception has an INTERNAL error code which by default is not retryable. Convert the exception // so it can be retried in the client. - ServerStreamingCallable convertException = + ServerStreamingCallable convertException = new ConvertStreamExceptionCallable<>(withStatsHeaders); // Copy idle timeout settings for watchdog. - ServerStreamingCallSettings innerSettings = - ServerStreamingCallSettings.newBuilder() + ServerStreamingCallSettings innerSettings = + ServerStreamingCallSettings.newBuilder() .setRetryableCodes( settings.generateInitialChangeStreamPartitionsSettings().getRetryableCodes()) .setRetrySettings( @@ -886,17 +887,17 @@ public Map extract( settings.generateInitialChangeStreamPartitionsSettings().getIdleTimeout()) .build(); - ServerStreamingCallable watched = + ServerStreamingCallable watched = Callables.watched(convertException, innerSettings, clientContext); - ServerStreamingCallable withBigtableTracer = + ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(watched); - ServerStreamingCallable retrying = + ServerStreamingCallable retrying = Callables.retrying(withBigtableTracer, innerSettings, clientContext); SpanName span = getSpanName("GenerateInitialChangeStreamPartitions"); - ServerStreamingCallable traced = + ServerStreamingCallable traced = new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span); return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); @@ -1039,7 +1040,8 @@ public UnaryCallable readModifyWriteRowCallable() { } /** Returns a streaming generate initial change stream partitions callable */ - public ServerStreamingCallable generateInitialChangeStreamPartitionsCallable() { + public ServerStreamingCallable + generateInitialChangeStreamPartitionsCallable() { return generateInitialChangeStreamPartitionsCallable; } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java index 9d2a731018..49eb79f5ca 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStubSettings.java @@ -33,12 +33,12 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.api.gax.rpc.UnaryCallSettings; import com.google.auth.Credentials; -import com.google.bigtable.v2.RowRange; import com.google.cloud.bigtable.Version; import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery; import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.Row; @@ -212,7 +212,7 @@ public class EnhancedBigtableStubSettings extends StubSettings checkAndMutateRowSettings; private final UnaryCallSettings readModifyWriteRowSettings; - private final ServerStreamingCallSettings + private final ServerStreamingCallSettings generateInitialChangeStreamPartitionsSettings; private final ServerStreamingCallSettings readChangeStreamSettings; @@ -537,7 +537,7 @@ public UnaryCallSettings readModifyWriteRowSettings() { return readModifyWriteRowSettings; } - public ServerStreamingCallSettings + public ServerStreamingCallSettings generateInitialChangeStreamPartitionsSettings() { return generateInitialChangeStreamPartitionsSettings; } @@ -571,7 +571,7 @@ public static class Builder extends StubSettings.Builder checkAndMutateRowSettings; private final UnaryCallSettings.Builder readModifyWriteRowSettings; - private final ServerStreamingCallSettings.Builder + private final ServerStreamingCallSettings.Builder generateInitialChangeStreamPartitionsSettings; private final ServerStreamingCallSettings.Builder readChangeStreamSettings; diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/GenerateInitialChangeStreamPartitionsUserCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/GenerateInitialChangeStreamPartitionsUserCallable.java index 365cf56ff2..ce07018c52 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/GenerateInitialChangeStreamPartitionsUserCallable.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/GenerateInitialChangeStreamPartitionsUserCallable.java @@ -21,16 +21,16 @@ import com.google.api.gax.rpc.StreamController; import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsRequest; import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse; -import com.google.bigtable.v2.RowRange; import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.cloud.bigtable.data.v2.internal.RequestContext; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; /** * Simple wrapper for GenerateInitialChangeStreamPartitions to wrap the request and response * protobufs. */ public class GenerateInitialChangeStreamPartitionsUserCallable - extends ServerStreamingCallable { + extends ServerStreamingCallable { private final RequestContext requestContext; private final ServerStreamingCallable< GenerateInitialChangeStreamPartitionsRequest, @@ -49,7 +49,7 @@ public GenerateInitialChangeStreamPartitionsUserCallable( @Override public void call( - String tableId, ResponseObserver responseObserver, ApiCallContext context) { + String tableId, ResponseObserver responseObserver, ApiCallContext context) { String tableName = NameUtil.formatTableName( requestContext.getProjectId(), requestContext.getInstanceId(), tableId); @@ -62,12 +62,12 @@ public void call( inner.call(request, new ConvertPartitionToRangeObserver(responseObserver), context); } - private class ConvertPartitionToRangeObserver + private static class ConvertPartitionToRangeObserver implements ResponseObserver { - private final ResponseObserver outerObserver; + private final ResponseObserver outerObserver; - ConvertPartitionToRangeObserver(ResponseObserver observer) { + ConvertPartitionToRangeObserver(ResponseObserver observer) { this.outerObserver = observer; } @@ -78,12 +78,11 @@ public void onStart(final StreamController controller) { @Override public void onResponse(GenerateInitialChangeStreamPartitionsResponse response) { - RowRange rowRange = - RowRange.newBuilder() - .setStartKeyClosed(response.getPartition().getRowRange().getStartKeyClosed()) - .setEndKeyOpen(response.getPartition().getRowRange().getEndKeyOpen()) - .build(); - outerObserver.onResponse(rowRange); + ByteStringRange byteStringRange = + ByteStringRange.create( + response.getPartition().getRowRange().getStartKeyClosed(), + response.getPartition().getRowRange().getEndKeyOpen()); + outerObserver.onResponse(byteStringRange); } @Override diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTests.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTests.java index c3850e7e15..f4f23085a2 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTests.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/BigtableDataClientTests.java @@ -24,7 +24,6 @@ import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.UnaryCallable; -import com.google.bigtable.v2.RowRange; import com.google.cloud.bigtable.data.v2.models.BulkMutation; import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; @@ -32,6 +31,7 @@ import com.google.cloud.bigtable.data.v2.models.KeyOffset; import com.google.cloud.bigtable.data.v2.models.Mutation; import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery; import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; import com.google.cloud.bigtable.data.v2.models.Row; @@ -83,7 +83,7 @@ public class BigtableDataClientTests { @Mock private Batcher mockBulkReadRowsBatcher; @Mock(answer = Answers.RETURNS_DEEP_STUBS) - private ServerStreamingCallable + private ServerStreamingCallable mockGenerateInitialChangeStreamPartitionsCallable; @Mock(answer = Answers.RETURNS_DEEP_STUBS) @@ -342,7 +342,7 @@ public void proxyGenerateInitialChangeStreamPartitionsAsyncTest() { .thenReturn(mockGenerateInitialChangeStreamPartitionsCallable); @SuppressWarnings("unchecked") - ResponseObserver mockObserver = Mockito.mock(ResponseObserver.class); + ResponseObserver mockObserver = Mockito.mock(ResponseObserver.class); bigtableDataClient.generateInitialChangeStreamPartitionsAsync("fake-table", mockObserver); Mockito.verify(mockGenerateInitialChangeStreamPartitionsCallable) diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java index e93dfc70bf..7716ec06f7 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java @@ -39,12 +39,10 @@ private ByteStringRange createFakeByteStringRange() { return ByteStringRange.create("a", "b"); } - // TODO: Get rid of this once we change ChangeStreamContinuationToken::getRowRange() - // to ChangeStreamContinuationToken::getByteStringRange(). - private RowRange rowRangeFromByteStringRange(ByteStringRange byteStringRange) { + private RowRange rowRangeFromPartition(ByteStringRange partition) { return RowRange.newBuilder() - .setStartKeyClosed(byteStringRange.getStart()) - .setEndKeyOpen(byteStringRange.getEnd()) + .setStartKeyClosed(partition.getStart()) + .setEndKeyOpen(partition.getEnd()) .build(); } @@ -53,8 +51,7 @@ public void basicTest() throws Exception { ByteStringRange byteStringRange = createFakeByteStringRange(); ChangeStreamContinuationToken changeStreamContinuationToken = new ChangeStreamContinuationToken(byteStringRange, TOKEN); - Assert.assertEquals( - changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange)); + Assert.assertEquals(changeStreamContinuationToken.getPartition(), byteStringRange); Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); ByteArrayOutputStream bos = new ByteArrayOutputStream(); @@ -69,15 +66,17 @@ public void basicTest() throws Exception { @Test public void fromProtoTest() { ByteStringRange byteStringRange = createFakeByteStringRange(); - RowRange fakeRowRange = rowRangeFromByteStringRange(byteStringRange); StreamContinuationToken proto = StreamContinuationToken.newBuilder() - .setPartition(StreamPartition.newBuilder().setRowRange(fakeRowRange).build()) + .setPartition( + StreamPartition.newBuilder() + .setRowRange(rowRangeFromPartition(byteStringRange)) + .build()) .setToken(TOKEN) .build(); ChangeStreamContinuationToken changeStreamContinuationToken = ChangeStreamContinuationToken.fromProto(proto); - Assert.assertEquals(changeStreamContinuationToken.getRowRange(), fakeRowRange); + Assert.assertEquals(changeStreamContinuationToken.getPartition(), byteStringRange); Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); Assert.assertEquals( changeStreamContinuationToken, @@ -89,8 +88,7 @@ public void toByteStringTest() throws Exception { ByteStringRange byteStringRange = createFakeByteStringRange(); ChangeStreamContinuationToken changeStreamContinuationToken = new ChangeStreamContinuationToken(byteStringRange, TOKEN); - Assert.assertEquals( - changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange)); + Assert.assertEquals(changeStreamContinuationToken.getPartition(), byteStringRange); Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); Assert.assertEquals( changeStreamContinuationToken, diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java index 05df603959..c6aa7580dd 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java @@ -21,6 +21,7 @@ import com.google.bigtable.v2.RowRange; import com.google.bigtable.v2.StreamContinuationToken; import com.google.bigtable.v2.StreamPartition; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; import com.google.rpc.Status; @@ -119,7 +120,9 @@ public void heartbeatTest() { Heartbeat actualHeartbeat = Heartbeat.fromProto(heartbeatProto); Assert.assertEquals(actualHeartbeat.getLowWatermark(), lowWatermark); - Assert.assertEquals(actualHeartbeat.getChangeStreamContinuationToken().getRowRange(), rowRange); + Assert.assertEquals( + actualHeartbeat.getChangeStreamContinuationToken().getPartition(), + ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen())); Assert.assertEquals(actualHeartbeat.getChangeStreamContinuationToken().getToken(), token); } @@ -156,11 +159,13 @@ public void closeStreamTest() { Assert.assertEquals(status, actualCloseStream.getStatus()); Assert.assertEquals( - rowRange1, actualCloseStream.getChangeStreamContinuationTokens().get(0).getRowRange()); + actualCloseStream.getChangeStreamContinuationTokens().get(0).getPartition(), + ByteStringRange.create(rowRange1.getStartKeyClosed(), rowRange1.getEndKeyOpen())); Assert.assertEquals( token1, actualCloseStream.getChangeStreamContinuationTokens().get(0).getToken()); Assert.assertEquals( - rowRange2, actualCloseStream.getChangeStreamContinuationTokens().get(1).getRowRange()); + actualCloseStream.getChangeStreamContinuationTokens().get(1).getPartition(), + ByteStringRange.create(rowRange2.getStartKeyClosed(), rowRange2.getEndKeyOpen())); Assert.assertEquals( token2, actualCloseStream.getChangeStreamContinuationTokens().get(1).getToken()); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/RangeTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/RangeTest.java index eebdba5811..96768e1c82 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/RangeTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/RangeTest.java @@ -21,6 +21,7 @@ import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -306,4 +307,14 @@ public void byteStringSerializationTest() throws IOException, ClassNotFoundExcep ByteStringRange actual = (ByteStringRange) ois.readObject(); assertThat(actual).isEqualTo(expected); } + + @Test + public void byteStringRangeToByteStringTest() throws InvalidProtocolBufferException { + ByteStringRange expected = ByteStringRange.create("a", "z"); + + ByteString serialized = ByteStringRange.toByteString(expected); + ByteStringRange deserialized = ByteStringRange.toByteStringRange(serialized); + + assertThat(expected).isEqualTo(deserialized); + } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java index d23eb64765..5cc04f764d 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ChangeStreamRecordMergingCallableTest.java @@ -25,6 +25,7 @@ import com.google.cloud.bigtable.data.v2.models.CloseStream; import com.google.cloud.bigtable.data.v2.models.DefaultChangeStreamRecordAdapter; import com.google.cloud.bigtable.data.v2.models.Heartbeat; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi.ServerStreamingStashCallable; import com.google.protobuf.ByteString; @@ -47,11 +48,15 @@ public class ChangeStreamRecordMergingCallableTest { @Test public void heartbeatTest() { + RowRange rowRange = RowRange.newBuilder().getDefaultInstanceForType(); ReadChangeStreamResponse.Heartbeat heartbeatProto = ReadChangeStreamResponse.Heartbeat.newBuilder() .setLowWatermark(Timestamp.newBuilder().setSeconds(1000).build()) .setContinuationToken( - StreamContinuationToken.newBuilder().setToken("random-token").build()) + StreamContinuationToken.newBuilder() + .setPartition(StreamPartition.newBuilder().setRowRange(rowRange)) + .setToken("random-token") + .build()) .build(); ReadChangeStreamResponse response = ReadChangeStreamResponse.newBuilder().setHeartbeat(heartbeatProto).build(); @@ -69,8 +74,8 @@ public void heartbeatTest() { Assert.assertTrue(record instanceof Heartbeat); Heartbeat heartbeat = (Heartbeat) record; Assert.assertEquals( - heartbeat.getChangeStreamContinuationToken().getRowRange(), - heartbeatProto.getContinuationToken().getPartition().getRowRange()); + heartbeat.getChangeStreamContinuationToken().getPartition(), + ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen())); Assert.assertEquals( heartbeat.getChangeStreamContinuationToken().getToken(), heartbeatProto.getContinuationToken().getToken()); @@ -79,16 +84,14 @@ public void heartbeatTest() { @Test public void closeStreamTest() { + RowRange rowRange = + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("")) + .setEndKeyOpen(ByteString.copyFromUtf8("")) + .build(); StreamContinuationToken streamContinuationToken = StreamContinuationToken.newBuilder() - .setPartition( - StreamPartition.newBuilder() - .setRowRange( - RowRange.newBuilder() - .setStartKeyClosed(ByteString.copyFromUtf8("")) - .setEndKeyOpen(ByteString.copyFromUtf8("")) - .build()) - .build()) + .setPartition(StreamPartition.newBuilder().setRowRange(rowRange).build()) .setToken("random-token") .build(); ReadChangeStreamResponse.CloseStream closeStreamProto = @@ -116,8 +119,8 @@ public void closeStreamTest() { ChangeStreamContinuationToken changeStreamContinuationToken = closeStream.getChangeStreamContinuationTokens().get(0); Assert.assertEquals( - changeStreamContinuationToken.getRowRange(), - streamContinuationToken.getPartition().getRowRange()); + changeStreamContinuationToken.getPartition(), + ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen())); Assert.assertEquals( changeStreamContinuationToken.getToken(), streamContinuationToken.getToken()); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/GenerateInitialChangeStreamPartitionsUserCallableTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/GenerateInitialChangeStreamPartitionsUserCallableTest.java index 908961be77..885b1c6355 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/GenerateInitialChangeStreamPartitionsUserCallableTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/GenerateInitialChangeStreamPartitionsUserCallableTest.java @@ -23,10 +23,10 @@ import com.google.bigtable.v2.StreamPartition; import com.google.cloud.bigtable.data.v2.internal.NameUtil; import com.google.cloud.bigtable.data.v2.internal.RequestContext; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.cloud.bigtable.gaxx.testing.FakeStreamingApi; import com.google.common.collect.Lists; import com.google.common.truth.Truth; -import com.google.protobuf.ByteString; import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; @@ -69,24 +69,15 @@ public void responseIsConverted() { GenerateInitialChangeStreamPartitionsResponse.newBuilder() .setPartition( StreamPartition.newBuilder() - .setRowRange( - RowRange.newBuilder() - .setStartKeyClosed(ByteString.copyFromUtf8("apple")) - .setEndKeyOpen(ByteString.copyFromUtf8("banana")) - .build()) + .setRowRange(RowRange.newBuilder().getDefaultInstanceForType()) .build()) .build())); GenerateInitialChangeStreamPartitionsUserCallable generateInitialChangeStreamPartitionsUserCallable = new GenerateInitialChangeStreamPartitionsUserCallable(inner, requestContext); - List results = + List results = generateInitialChangeStreamPartitionsUserCallable.all().call("my-table"); - Truth.assertThat(results) - .containsExactly( - RowRange.newBuilder() - .setStartKeyClosed(ByteString.copyFromUtf8("apple")) - .setEndKeyOpen(ByteString.copyFromUtf8("banana")) - .build()); + Truth.assertThat(results).containsExactly(ByteStringRange.create("", "")); } } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java index 5ae88a7f9f..ef8b9fec9f 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamMergingAcceptanceTest.java @@ -24,6 +24,7 @@ import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.ReadChangeStreamRequest; import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.bigtable.v2.RowRange; import com.google.bigtable.v2.StreamContinuationToken; import com.google.bigtable.v2.StreamPartition; import com.google.bigtable.v2.TimestampRange; @@ -123,6 +124,7 @@ public void test() throws Exception { for (ChangeStreamRecord record : stream) { if (record instanceof Heartbeat) { Heartbeat heartbeat = (Heartbeat) record; + ChangeStreamContinuationToken token = heartbeat.getChangeStreamContinuationToken(); ReadChangeStreamResponse.Heartbeat heartbeatProto = ReadChangeStreamResponse.Heartbeat.newBuilder() .setContinuationToken( @@ -130,7 +132,10 @@ public void test() throws Exception { .setPartition( StreamPartition.newBuilder() .setRowRange( - heartbeat.getChangeStreamContinuationToken().getRowRange()) + RowRange.newBuilder() + .setStartKeyClosed(token.getPartition().getStart()) + .setEndKeyOpen(token.getPartition().getEnd()) + .build()) .build()) .setToken(heartbeat.getChangeStreamContinuationToken().getToken()) .build()) @@ -152,7 +157,12 @@ public void test() throws Exception { builder.addContinuationTokens( StreamContinuationToken.newBuilder() .setPartition( - StreamPartition.newBuilder().setRowRange(token.getRowRange()).build()) + StreamPartition.newBuilder() + .setRowRange( + RowRange.newBuilder() + .setStartKeyClosed(token.getPartition().getStart()) + .setEndKeyOpen(token.getPartition().getEnd()) + .build())) .setToken(token.getToken()) .build()); }