From bb5c0c05157043683486f0bffd6c47b803247a60 Mon Sep 17 00:00:00 2001 From: tengzhonger <109308630+tengzhonger@users.noreply.github.com> Date: Thu, 11 Aug 2022 12:14:52 -0400 Subject: [PATCH] feat: Add toByteString/fromByteString for ChangeStreamContinuationToken (#1346) * feat: Add toByteString/fromByteString for ChangeStreamContinuationToken This will be used by the beam connector to write/read to a Bigtable table. This PR also does: 1. Revert the changes in https://github.com/googleapis/java-bigtable/pull/1345 since we can use Mockito to create mock objects for testing. * fix: Update comments * fix: Address comments * fix: Add InternalExtensionOnly annotations for Heartbeat/CloseStream/ChangeStreamMutation Co-authored-by: Teng Zhong --- .../models/ChangeStreamContinuationToken.java | 68 +++++++------ .../data/v2/models/ChangeStreamMutation.java | 31 +++--- .../bigtable/data/v2/models/CloseStream.java | 11 ++- .../bigtable/data/v2/models/Heartbeat.java | 6 +- .../ChangeStreamContinuationTokenTest.java | 99 +++++++++++++++++++ 5 files changed, 164 insertions(+), 51 deletions(-) create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java index bb5363a3a5..af7b15ea4e 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationToken.java @@ -18,11 +18,12 @@ import com.google.api.core.InternalApi; import com.google.bigtable.v2.RowRange; import com.google.bigtable.v2.StreamContinuationToken; +import com.google.bigtable.v2.StreamPartition; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; import com.google.common.base.MoreObjects; import com.google.common.base.Objects; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; import java.io.Serializable; import javax.annotation.Nonnull; @@ -30,48 +31,59 @@ public final class ChangeStreamContinuationToken implements Serializable { private static final long serialVersionUID = 524679926247095L; - private transient StreamContinuationToken.Builder builder; + private final StreamContinuationToken tokenProto; - private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken.Builder builder) { - this.builder = builder; + private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken tokenProto) { + this.tokenProto = tokenProto; } - private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException { - input.defaultReadObject(); - builder = StreamContinuationToken.newBuilder().mergeFrom(input); - } - - private void writeObject(ObjectOutputStream output) throws IOException { - output.defaultWriteObject(); - builder.build().writeTo(output); + @InternalApi("Used in Changestream beam pipeline.") + public ChangeStreamContinuationToken( + @Nonnull ByteStringRange byteStringRange, @Nonnull String token) { + this.tokenProto = + StreamContinuationToken.newBuilder() + .setPartition( + StreamPartition.newBuilder() + .setRowRange( + RowRange.newBuilder() + .setStartKeyClosed(byteStringRange.getStart()) + .setEndKeyOpen(byteStringRange.getEnd()) + .build()) + .build()) + .setToken(token) + .build(); } + // TODO: Change this to return ByteStringRange. public RowRange getRowRange() { - return this.builder.getPartition().getRowRange(); + return this.tokenProto.getPartition().getRowRange(); } public String getToken() { - return this.builder.getToken(); + return this.tokenProto.getToken(); } - /** - * Creates the protobuf. This method is considered an internal implementation detail and not meant - * to be used by applications. - */ - @InternalApi("Used in Changestream beam pipeline.") - public StreamContinuationToken toProto() { - return builder.build(); + // Creates the protobuf. + StreamContinuationToken toProto() { + return tokenProto; } /** Wraps the protobuf {@link StreamContinuationToken}. */ - @InternalApi("Used in Changestream beam pipeline.") - public static ChangeStreamContinuationToken fromProto( + static ChangeStreamContinuationToken fromProto( @Nonnull StreamContinuationToken streamContinuationToken) { - return new ChangeStreamContinuationToken(streamContinuationToken.toBuilder()); + return new ChangeStreamContinuationToken(streamContinuationToken); } - public ChangeStreamContinuationToken clone() { - return new ChangeStreamContinuationToken(this.builder.clone()); + @InternalApi("Used in Changestream beam pipeline.") + public ByteString toByteString() { + return tokenProto.toByteString(); + } + + @InternalApi("Used in Changestream beam pipeline.") + public static ChangeStreamContinuationToken fromByteString(ByteString byteString) + throws InvalidProtocolBufferException { + return new ChangeStreamContinuationToken( + StreamContinuationToken.newBuilder().mergeFrom(byteString).build()); } @Override diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java index f9107220b3..cfb8bb30b7 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java @@ -15,7 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.models; -import com.google.api.core.InternalApi; +import com.google.api.core.InternalExtensionOnly; import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type; import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger; @@ -62,8 +62,11 @@ * builder.deleteCells(...); * ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build(); * } + * + * Make this class non-final so that we can create a subclass to mock it. */ -public final class ChangeStreamMutation implements ChangeStreamRecord, Serializable { +@InternalExtensionOnly("Used in Changestream beam pipeline testing.") +public class ChangeStreamMutation implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 8419520253162024218L; private final ByteString rowKey; @@ -100,8 +103,7 @@ private ChangeStreamMutation(Builder builder) { * ChangeStreamMutation because `token` and `loWatermark` must be set later when we finish * building the logical mutation. */ - @InternalApi("Used in Changestream beam pipeline.") - public static Builder createUserMutation( + static Builder createUserMutation( @Nonnull ByteString rowKey, @Nonnull String sourceClusterId, @Nonnull Timestamp commitTimestamp, @@ -114,8 +116,7 @@ public static Builder createUserMutation( * because `token` and `loWatermark` must be set later when we finish building the logical * mutation. */ - @InternalApi("Used in Changestream beam pipeline.") - public static Builder createGcMutation( + static Builder createGcMutation( @Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) { return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker); } @@ -227,8 +228,7 @@ private Builder(ChangeStreamMutation changeStreamMutation) { this.lowWatermark = changeStreamMutation.lowWatermark; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder setCell( + Builder setCell( @Nonnull String familyName, @Nonnull ByteString qualifier, long timestamp, @@ -237,8 +237,7 @@ public Builder setCell( return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder deleteCells( + Builder deleteCells( @Nonnull String familyName, @Nonnull ByteString qualifier, @Nonnull TimestampRange timestampRange) { @@ -246,26 +245,22 @@ public Builder deleteCells( return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder deleteFamily(@Nonnull String familyName) { + Builder deleteFamily(@Nonnull String familyName) { this.entries.add(DeleteFamily.create(familyName)); return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder setToken(@Nonnull String token) { + Builder setToken(@Nonnull String token) { this.token = token; return this; } - @InternalApi("Used in Changestream beam pipeline.") - public Builder setLowWatermark(@Nonnull Timestamp lowWatermark) { + Builder setLowWatermark(@Nonnull Timestamp lowWatermark) { this.lowWatermark = lowWatermark; return this; } - @InternalApi("Used in Changestream beam pipeline.") - public ChangeStreamMutation build() { + ChangeStreamMutation build() { Preconditions.checkArgument( token != null && lowWatermark != null, "ChangeStreamMutation must have a continuation token and low watermark."); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java index e871c86697..346b0b60a7 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/CloseStream.java @@ -16,6 +16,7 @@ package com.google.cloud.bigtable.data.v2.models; import com.google.api.core.InternalApi; +import com.google.api.core.InternalExtensionOnly; import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.bigtable.v2.StreamContinuationToken; import com.google.common.base.MoreObjects; @@ -29,7 +30,12 @@ import java.util.List; import javax.annotation.Nonnull; -public final class CloseStream implements ChangeStreamRecord, Serializable { +/** + * A simple wrapper for {@link ReadChangeStreamResponse.CloseStream}. Make this class non-final so + * that we can create a subclass to mock it. + */ +@InternalExtensionOnly("Used in Changestream beam pipeline testing.") +public class CloseStream implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 7316215828353608505L; private final Status status; private transient ImmutableList.Builder @@ -69,8 +75,7 @@ private void writeObject(ObjectOutputStream output) throws IOException { } /** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */ - @InternalApi("Used in Changestream beam pipeline.") - public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) { + static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) { return new CloseStream(closeStream.getStatus(), closeStream.getContinuationTokensList()); } diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java index 63c23525f3..40daa9d23a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java @@ -16,12 +16,15 @@ package com.google.cloud.bigtable.data.v2.models; import com.google.api.core.InternalApi; +import com.google.api.core.InternalExtensionOnly; import com.google.auto.value.AutoValue; import com.google.bigtable.v2.ReadChangeStreamResponse; import com.google.protobuf.Timestamp; import java.io.Serializable; import javax.annotation.Nonnull; +/** A simple wrapper for {@link ReadChangeStreamResponse.Heartbeat}. */ +@InternalExtensionOnly("Used in Changestream beam pipeline testing.") @AutoValue public abstract class Heartbeat implements ChangeStreamRecord, Serializable { private static final long serialVersionUID = 7316215828353608504L; @@ -32,8 +35,7 @@ private static Heartbeat create( } /** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */ - @InternalApi("Used in Changestream beam pipeline.") - public static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) { + static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) { return create( ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()), heartbeat.getLowWatermark()); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java new file mode 100644 index 0000000000..e93dfc70bf --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamContinuationTokenTest.java @@ -0,0 +1,99 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.models; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.StreamContinuationToken; +import com.google.bigtable.v2.StreamPartition; +import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ChangeStreamContinuationTokenTest { + + private final String TOKEN = "token"; + + private ByteStringRange createFakeByteStringRange() { + return ByteStringRange.create("a", "b"); + } + + // TODO: Get rid of this once we change ChangeStreamContinuationToken::getRowRange() + // to ChangeStreamContinuationToken::getByteStringRange(). + private RowRange rowRangeFromByteStringRange(ByteStringRange byteStringRange) { + return RowRange.newBuilder() + .setStartKeyClosed(byteStringRange.getStart()) + .setEndKeyOpen(byteStringRange.getEnd()) + .build(); + } + + @Test + public void basicTest() throws Exception { + ByteStringRange byteStringRange = createFakeByteStringRange(); + ChangeStreamContinuationToken changeStreamContinuationToken = + new ChangeStreamContinuationToken(byteStringRange, TOKEN); + Assert.assertEquals( + changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange)); + Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(changeStreamContinuationToken); + oos.close(); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray())); + ChangeStreamContinuationToken actual = (ChangeStreamContinuationToken) ois.readObject(); + assertThat(actual).isEqualTo(changeStreamContinuationToken); + } + + @Test + public void fromProtoTest() { + ByteStringRange byteStringRange = createFakeByteStringRange(); + RowRange fakeRowRange = rowRangeFromByteStringRange(byteStringRange); + StreamContinuationToken proto = + StreamContinuationToken.newBuilder() + .setPartition(StreamPartition.newBuilder().setRowRange(fakeRowRange).build()) + .setToken(TOKEN) + .build(); + ChangeStreamContinuationToken changeStreamContinuationToken = + ChangeStreamContinuationToken.fromProto(proto); + Assert.assertEquals(changeStreamContinuationToken.getRowRange(), fakeRowRange); + Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); + Assert.assertEquals( + changeStreamContinuationToken, + ChangeStreamContinuationToken.fromProto(changeStreamContinuationToken.toProto())); + } + + @Test + public void toByteStringTest() throws Exception { + ByteStringRange byteStringRange = createFakeByteStringRange(); + ChangeStreamContinuationToken changeStreamContinuationToken = + new ChangeStreamContinuationToken(byteStringRange, TOKEN); + Assert.assertEquals( + changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange)); + Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN); + Assert.assertEquals( + changeStreamContinuationToken, + ChangeStreamContinuationToken.fromByteString(changeStreamContinuationToken.toByteString())); + } +}