Skip to content

Commit

Permalink
feat: Add toByteString/fromByteString for ChangeStreamContinuationTok…
Browse files Browse the repository at this point in the history
…en (#1346)

* feat: Add toByteString/fromByteString for ChangeStreamContinuationToken

This will be used by the beam connector to write/read to a Bigtable table.

This PR also does:
1. Revert the changes in #1345
since we can use Mockito to create mock objects for testing.

* fix: Update comments

* fix: Address comments

* fix: Add InternalExtensionOnly annotations for Heartbeat/CloseStream/ChangeStreamMutation

Co-authored-by: Teng Zhong <[email protected]>
  • Loading branch information
tengzhonger and Teng Zhong authored Aug 11, 2022
1 parent f1176ae commit bb5c0c0
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,60 +18,72 @@
import com.google.api.core.InternalApi;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.StreamContinuationToken;
import com.google.bigtable.v2.StreamPartition;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.Serializable;
import javax.annotation.Nonnull;

/** A simple wrapper for {@link StreamContinuationToken}. */
public final class ChangeStreamContinuationToken implements Serializable {
private static final long serialVersionUID = 524679926247095L;

private transient StreamContinuationToken.Builder builder;
private final StreamContinuationToken tokenProto;

private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken.Builder builder) {
this.builder = builder;
private ChangeStreamContinuationToken(@Nonnull StreamContinuationToken tokenProto) {
this.tokenProto = tokenProto;
}

private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
input.defaultReadObject();
builder = StreamContinuationToken.newBuilder().mergeFrom(input);
}

private void writeObject(ObjectOutputStream output) throws IOException {
output.defaultWriteObject();
builder.build().writeTo(output);
@InternalApi("Used in Changestream beam pipeline.")
public ChangeStreamContinuationToken(
@Nonnull ByteStringRange byteStringRange, @Nonnull String token) {
this.tokenProto =
StreamContinuationToken.newBuilder()
.setPartition(
StreamPartition.newBuilder()
.setRowRange(
RowRange.newBuilder()
.setStartKeyClosed(byteStringRange.getStart())
.setEndKeyOpen(byteStringRange.getEnd())
.build())
.build())
.setToken(token)
.build();
}

// TODO: Change this to return ByteStringRange.
public RowRange getRowRange() {
return this.builder.getPartition().getRowRange();
return this.tokenProto.getPartition().getRowRange();
}

public String getToken() {
return this.builder.getToken();
return this.tokenProto.getToken();
}

/**
* Creates the protobuf. This method is considered an internal implementation detail and not meant
* to be used by applications.
*/
@InternalApi("Used in Changestream beam pipeline.")
public StreamContinuationToken toProto() {
return builder.build();
// Creates the protobuf.
StreamContinuationToken toProto() {
return tokenProto;
}

/** Wraps the protobuf {@link StreamContinuationToken}. */
@InternalApi("Used in Changestream beam pipeline.")
public static ChangeStreamContinuationToken fromProto(
static ChangeStreamContinuationToken fromProto(
@Nonnull StreamContinuationToken streamContinuationToken) {
return new ChangeStreamContinuationToken(streamContinuationToken.toBuilder());
return new ChangeStreamContinuationToken(streamContinuationToken);
}

public ChangeStreamContinuationToken clone() {
return new ChangeStreamContinuationToken(this.builder.clone());
@InternalApi("Used in Changestream beam pipeline.")
public ByteString toByteString() {
return tokenProto.toByteString();
}

@InternalApi("Used in Changestream beam pipeline.")
public static ChangeStreamContinuationToken fromByteString(ByteString byteString)
throws InvalidProtocolBufferException {
return new ChangeStreamContinuationToken(
StreamContinuationToken.newBuilder().mergeFrom(byteString).build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger;
Expand Down Expand Up @@ -62,8 +62,11 @@
* builder.deleteCells(...);
* ChangeStreamMutation changeStreamMutation = builder.setToken(...).setLowWatermark().build();
* }</pre>
*
* Make this class non-final so that we can create a subclass to mock it.
*/
public final class ChangeStreamMutation implements ChangeStreamRecord, Serializable {
@InternalExtensionOnly("Used in Changestream beam pipeline testing.")
public class ChangeStreamMutation implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 8419520253162024218L;

private final ByteString rowKey;
Expand Down Expand Up @@ -100,8 +103,7 @@ private ChangeStreamMutation(Builder builder) {
* ChangeStreamMutation because `token` and `loWatermark` must be set later when we finish
* building the logical mutation.
*/
@InternalApi("Used in Changestream beam pipeline.")
public static Builder createUserMutation(
static Builder createUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
@Nonnull Timestamp commitTimestamp,
Expand All @@ -114,8 +116,7 @@ public static Builder createUserMutation(
* because `token` and `loWatermark` must be set later when we finish building the logical
* mutation.
*/
@InternalApi("Used in Changestream beam pipeline.")
public static Builder createGcMutation(
static Builder createGcMutation(
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) {
return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker);
}
Expand Down Expand Up @@ -227,8 +228,7 @@ private Builder(ChangeStreamMutation changeStreamMutation) {
this.lowWatermark = changeStreamMutation.lowWatermark;
}

@InternalApi("Used in Changestream beam pipeline.")
public Builder setCell(
Builder setCell(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
long timestamp,
Expand All @@ -237,35 +237,30 @@ public Builder setCell(
return this;
}

@InternalApi("Used in Changestream beam pipeline.")
public Builder deleteCells(
Builder deleteCells(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
@Nonnull TimestampRange timestampRange) {
this.entries.add(DeleteCells.create(familyName, qualifier, timestampRange));
return this;
}

@InternalApi("Used in Changestream beam pipeline.")
public Builder deleteFamily(@Nonnull String familyName) {
Builder deleteFamily(@Nonnull String familyName) {
this.entries.add(DeleteFamily.create(familyName));
return this;
}

@InternalApi("Used in Changestream beam pipeline.")
public Builder setToken(@Nonnull String token) {
Builder setToken(@Nonnull String token) {
this.token = token;
return this;
}

@InternalApi("Used in Changestream beam pipeline.")
public Builder setLowWatermark(@Nonnull Timestamp lowWatermark) {
Builder setLowWatermark(@Nonnull Timestamp lowWatermark) {
this.lowWatermark = lowWatermark;
return this;
}

@InternalApi("Used in Changestream beam pipeline.")
public ChangeStreamMutation build() {
ChangeStreamMutation build() {
Preconditions.checkArgument(
token != null && lowWatermark != null,
"ChangeStreamMutation must have a continuation token and low watermark.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.bigtable.v2.StreamContinuationToken;
import com.google.common.base.MoreObjects;
Expand All @@ -29,7 +30,12 @@
import java.util.List;
import javax.annotation.Nonnull;

public final class CloseStream implements ChangeStreamRecord, Serializable {
/**
* A simple wrapper for {@link ReadChangeStreamResponse.CloseStream}. Make this class non-final so
* that we can create a subclass to mock it.
*/
@InternalExtensionOnly("Used in Changestream beam pipeline testing.")
public class CloseStream implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 7316215828353608505L;
private final Status status;
private transient ImmutableList.Builder<ChangeStreamContinuationToken>
Expand Down Expand Up @@ -69,8 +75,7 @@ private void writeObject(ObjectOutputStream output) throws IOException {
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */
@InternalApi("Used in Changestream beam pipeline.")
public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) {
static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) {
return new CloseStream(closeStream.getStatus(), closeStream.getContinuationTokensList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.api.core.InternalExtensionOnly;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import com.google.protobuf.Timestamp;
import java.io.Serializable;
import javax.annotation.Nonnull;

/** A simple wrapper for {@link ReadChangeStreamResponse.Heartbeat}. */
@InternalExtensionOnly("Used in Changestream beam pipeline testing.")
@AutoValue
public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 7316215828353608504L;
Expand All @@ -32,8 +35,7 @@ private static Heartbeat create(
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */
@InternalApi("Used in Changestream beam pipeline.")
public static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
return create(
ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()),
heartbeat.getLowWatermark());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.models;

import static com.google.common.truth.Truth.assertThat;

import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.StreamContinuationToken;
import com.google.bigtable.v2.StreamPartition;
import com.google.cloud.bigtable.data.v2.models.Range.ByteStringRange;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class ChangeStreamContinuationTokenTest {

private final String TOKEN = "token";

private ByteStringRange createFakeByteStringRange() {
return ByteStringRange.create("a", "b");
}

// TODO: Get rid of this once we change ChangeStreamContinuationToken::getRowRange()
// to ChangeStreamContinuationToken::getByteStringRange().
private RowRange rowRangeFromByteStringRange(ByteStringRange byteStringRange) {
return RowRange.newBuilder()
.setStartKeyClosed(byteStringRange.getStart())
.setEndKeyOpen(byteStringRange.getEnd())
.build();
}

@Test
public void basicTest() throws Exception {
ByteStringRange byteStringRange = createFakeByteStringRange();
ChangeStreamContinuationToken changeStreamContinuationToken =
new ChangeStreamContinuationToken(byteStringRange, TOKEN);
Assert.assertEquals(
changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange));
Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN);

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(changeStreamContinuationToken);
oos.close();
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()));
ChangeStreamContinuationToken actual = (ChangeStreamContinuationToken) ois.readObject();
assertThat(actual).isEqualTo(changeStreamContinuationToken);
}

@Test
public void fromProtoTest() {
ByteStringRange byteStringRange = createFakeByteStringRange();
RowRange fakeRowRange = rowRangeFromByteStringRange(byteStringRange);
StreamContinuationToken proto =
StreamContinuationToken.newBuilder()
.setPartition(StreamPartition.newBuilder().setRowRange(fakeRowRange).build())
.setToken(TOKEN)
.build();
ChangeStreamContinuationToken changeStreamContinuationToken =
ChangeStreamContinuationToken.fromProto(proto);
Assert.assertEquals(changeStreamContinuationToken.getRowRange(), fakeRowRange);
Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN);
Assert.assertEquals(
changeStreamContinuationToken,
ChangeStreamContinuationToken.fromProto(changeStreamContinuationToken.toProto()));
}

@Test
public void toByteStringTest() throws Exception {
ByteStringRange byteStringRange = createFakeByteStringRange();
ChangeStreamContinuationToken changeStreamContinuationToken =
new ChangeStreamContinuationToken(byteStringRange, TOKEN);
Assert.assertEquals(
changeStreamContinuationToken.getRowRange(), rowRangeFromByteStringRange(byteStringRange));
Assert.assertEquals(changeStreamContinuationToken.getToken(), TOKEN);
Assert.assertEquals(
changeStreamContinuationToken,
ChangeStreamContinuationToken.fromByteString(changeStreamContinuationToken.toByteString()));
}
}

0 comments on commit bb5c0c0

Please sign in to comment.