diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java index bb5363a3a5..992310edfc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java @@ -18,11 +18,11 @@ import com.google.api.core.InternalApi; import com.google.bigtable.v2.RowRange; import com.google.bigtable.v2.StreamContinuationToken; +import com.google.bigtable.v2.StreamPartition; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import com.google.protobuf.ByteString; import java.io.Serializable; import javax.annotation.Nonnull; @@ -30,48 +30,62 @@ public final class ChangeStreamContinuationToken implements Serializable { private static final long serialVersionUID = 524679926247095L; - private transient StreamContinuationToken.Builder builder; + private final StreamContinuationToken proto; - private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken.Builder builder) { - this.builder = builder; + private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken proto) { + this.proto = proto; } - private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { - input.defaultReadObject(); - builder = StreamContinuationToken.newBuilder().mergeFrom(input); - } - - private void writeObject(ObjectOutputStream output) throws IOException { - output.defaultWriteObject(); - builder.build().writeTo(output); + @InternalApi("Used in Changestream beam pipeline.") + public ChangeStreamContinuationToken( + @Nonnull ByteStringRange byteStringRange, @Nonnull String token) { + this.proto = + StreamContinuationToken.newBuilder() + .setPartition( + StreamPartition.newBuilder() + .setRowRange( + RowRange.newBuilder() + .setStartKeyClosed(byteStringRange.getStart()) + .setEndKeyOpen(byteStringRange.getEnd()) + .build()) + .build()) + .setToken(token) + .build(); } + // TODO: Change this to return ByteStringRange. public RowRange getRowRange() { - return this.builder.getPartition().getRowRange(); + return this.proto.getPartition().getRowRange(); } public String getToken() { - return this.builder.getToken(); + return this.proto.getToken(); } /** * Creates the protobuf. This method is considered an internal implementation detail and not meant * to be used by applications. */ - @InternalApi("Used in Changestream beam pipeline.") - public StreamContinuationToken toProto() { - return builder.build(); + StreamContinuationToken toProto() { + return proto; } /** Wraps the protobuf {@link StreamContinuationToken}. */ - @InternalApi("Used in Changestream beam pipeline.") - public static ChangeStreamContinuationToken fromProto( + static ChangeStreamContinuationToken fromProto( @Nonnull StreamContinuationToken streamContinuationToken) { - return new ChangeStreamContinuationToken(streamContinuationToken.toBuilder()); + return new ChangeStreamContinuationToken(streamContinuationToken); } - public ChangeStreamContinuationToken clone() { - return new ChangeStreamContinuationToken(this.builder.clone()); + @InternalApi("Used in Changestream beam pipeline.") + public ByteString toByteString() { + return proto.toByteString(); + } + + @InternalApi("Used in Changestream beam pipeline.") + public static ChangeStreamContinuationToken fromByteString(ByteString byteString) + throws Exception { + return new ChangeStreamContinuationToken( + StreamContinuationToken.newBuilder().mergeFrom(byteString).build()); } @Override diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java index f9107220b3..92c0ccbd47 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java @@ -15,7 +15,6 @@ */ package com.google.cloud.bigtable.data.v2.models; -import com.google.api.core.InternalApi; import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type; import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger; @@ -63,7 +62,7 @@ * ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build(); * } */ -public final class ChangeStreamMutation implements ChangeStreamRecord, Serializable { +public class ChangeStreamMutation implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 8419520253162024218L; private final ByteString rowKey; @@ -100,8 +99,7 @@ private ChangeStreamMutation(Builder builder) { * ChangeStreamMutation because `token` and `loWatermark` must be set later when we finish * building the logical mutation. */ - @InternalApi("Used in Changestream beam pipeline.") - public static Builder createUserMutation( + static Builder createUserMutation( @Nonnull ByteString rowKey, @Nonnull String sourceClusterId, @Nonnull Timestamp commitTimestamp, @@ -114,8 +112,7 @@ public static Builder createUserMutation( * because `token` and `loWatermark` must be set later when we finish building the logical * mutation. */ - @InternalApi("Used in Changestream beam pipeline.") - public static Builder createGcMutation( + static Builder createGcMutation( @Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) { return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker); } @@ -227,8 +224,7 @@ private Builder(ChangeStreamMutation changeStreamMutation) { this.lowWatermark = changeStreamMutation.lowWatermark; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder setCell( + Builder setCell( @Nonnull String familyName, @Nonnull ByteString qualifier, long timestamp, @@ -237,8 +233,7 @@ public Builder setCell( return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder deleteCells( + Builder deleteCells( @Nonnull String familyName, @Nonnull ByteString qualifier, @Nonnull TimestampRange timestampRange) { @@ -246,26 +241,22 @@ public Builder deleteCells( return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder deleteFamily(@Nonnull String familyName) { + Builder deleteFamily(@Nonnull String familyName) { this.entries.add(DeleteFamily.create(familyName)); return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder setToken(@Nonnull String token) { + Builder setToken(@Nonnull String token) { this.token = token; return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder setLowWatermark(@Nonnull Timestamp lowWatermark) { + Builder setLowWatermark(@Nonnull Timestamp lowWatermark) { this.lowWatermark = lowWatermark; return this; } - @InternalApi("Used in Changestream beam pipeline.") - public ChangeStreamMutation build() { + ChangeStreamMutation build() { Preconditions.checkArgument( token != null && lowWatermark != null, "ChangeStreamMutation must have a continuation token and low watermark."); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java index e871c86697..e245ce615e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java @@ -29,7 +29,7 @@ import java.util.List; import javax.annotation.Nonnull; -public final class CloseStream implements ChangeStreamRecord, Serializable { +public class CloseStream implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 7316215828353608505L; private final Status status; private transient ImmutableList.Builder @@ -69,8 +69,7 @@ private void writeObject(ObjectOutputStream output) throws IOException { } /** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */ - @InternalApi("Used in Changestream beam pipeline.") - public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) { + static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) { return new CloseStream(closeStream.getStatus(), closeStream.getContinuationTokensList()); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java index 63c23525f3..f2371c8507 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java @@ -32,8 +32,7 @@ private static Heartbeat create( } /** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */ - @InternalApi("Used in Changestream beam pipeline.") - public static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) { + static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) { return create( ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()), heartbeat.getLowWatermark()); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java new file mode 100644 index 0000000000..e1ba6c68f6 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.models; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.StreamContinuationToken; +import com.google.bigtable.v2.StreamPartition; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ChangeStreamContinuationTokenTest { + + private final String TOKEN = "token"; + + private ByteStringRange createFakeByteStringRange() { + return ByteStringRange.create("a", "b"); + } + + // TODO: Get rid of this once we change ChangeStreamContinuationToken::getRowRange() + // to ChangeStreamContinuationToken::getByteStringRange(). + private RowRange rowRangeFromByteStringRange(ByteStringRange byteStringRange) { + return RowRange.newBuilder() + .setStartKeyClosed(byteStringRange.getStart()) + .setEndKeyOpen(byteStringRange.getEnd()) + .build(); + } + + @Test + public void basicTest() throws Exception { + ByteStringRange byteStringRange = createFakeByteStringRange(); + ChangeStreamContinuationToken changeStreamContinuationToken = + new ChangeStreamContinuationToken(byteStringRange, TOKEN); + Assert.assertEquals( + changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange)); + Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(changeStreamContinuationToken); + oos.close(); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())); + ChangeStreamContinuationToken actual = (ChangeStreamContinuationToken) ois.readObject(); + assertThat(actual).isEqualTo(changeStreamContinuationToken); + } + + @Test + public void toProtoTest() { + ByteStringRange byteStringRange = createFakeByteStringRange(); + RowRange fakeRowRange = rowRangeFromByteStringRange(byteStringRange); + StreamContinuationToken proto = + StreamContinuationToken.newBuilder() + .setPartition(StreamPartition.newBuilder().setRowRange(fakeRowRange).build()) + .setToken(TOKEN) + .build(); + ChangeStreamContinuationToken changeStreamContinuationToken = + ChangeStreamContinuationToken.fromProto(proto); + Assert.assertEquals(changeStreamContinuationToken.getRowRange(), fakeRowRange); + Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); + Assert.assertEquals( + changeStreamContinuationToken, + ChangeStreamContinuationToken.fromProto(changeStreamContinuationToken.toProto())); + } + + @Test + public void toByteStringTest() throws Exception { + ByteStringRange byteStringRange = createFakeByteStringRange(); + ChangeStreamContinuationToken changeStreamContinuationToken = + new ChangeStreamContinuationToken(byteStringRange, TOKEN); + Assert.assertEquals( + changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange)); + Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); + Assert.assertEquals( + changeStreamContinuationToken, + ChangeStreamContinuationToken.fromByteString(changeStreamContinuationToken.toByteString())); + } +}