Skip to content

Commit

Permalink
feat: Expose some package-private methods to be used by CDC beam code (
Browse files Browse the repository at this point in the history
…googleapis#1345)

Co-authored-by: Teng Zhong <[email protected]>
  • Loading branch information
tengzhonger and Teng Zhong committed Sep 2, 2022
1 parent d3119cf commit 865ea62
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.bigtable.v2.RowRange;
import com.google.bigtable.v2.StreamContinuationToken;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -57,12 +58,14 @@ public String getToken() {
* Creates the protobuf. This method is considered an internal implementation detail and not meant
* to be used by applications.
*/
StreamContinuationToken toProto() {
@InternalApi("Used in Changestream beam pipeline.")
public StreamContinuationToken toProto() {
return builder.build();
}

/** Wraps the protobuf {@link StreamContinuationToken}. */
static ChangeStreamContinuationToken fromProto(
@InternalApi("Used in Changestream beam pipeline.")
public static ChangeStreamContinuationToken fromProto(
@Nonnull StreamContinuationToken streamContinuationToken) {
return new ChangeStreamContinuationToken(streamContinuationToken.toBuilder());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.InternalApi;
import com.google.bigtable.v2.ReadChangeStreamResponse.DataChange.Type;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger;
Expand Down Expand Up @@ -99,7 +100,8 @@ private ChangeStreamMutation(Builder builder) {
* ChangeStreamMutation because `token` and `loWatermark` must be set later when we finish
* building the logical mutation.
*/
static Builder createUserMutation(
@InternalApi("Used in Changestream beam pipeline.")
public static Builder createUserMutation(
@Nonnull ByteString rowKey,
@Nonnull String sourceClusterId,
@Nonnull Timestamp commitTimestamp,
Expand All @@ -112,7 +114,8 @@ static Builder createUserMutation(
* because `token` and `loWatermark` must be set later when we finish building the logical
* mutation.
*/
static Builder createGcMutation(
@InternalApi("Used in Changestream beam pipeline.")
public static Builder createGcMutation(
@Nonnull ByteString rowKey, @Nonnull Timestamp commitTimestamp, int tieBreaker) {
return new Builder(rowKey, Type.GARBAGE_COLLECTION, null, commitTimestamp, tieBreaker);
}
Expand Down Expand Up @@ -224,7 +227,8 @@ private Builder(ChangeStreamMutation changeStreamMutation) {
this.lowWatermark = changeStreamMutation.lowWatermark;
}

Builder setCell(
@InternalApi("Used in Changestream beam pipeline.")
public Builder setCell(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
long timestamp,
Expand All @@ -233,30 +237,35 @@ Builder setCell(
return this;
}

Builder deleteCells(
@InternalApi("Used in Changestream beam pipeline.")
public Builder deleteCells(
@Nonnull String familyName,
@Nonnull ByteString qualifier,
@Nonnull TimestampRange timestampRange) {
this.entries.add(DeleteCells.create(familyName, qualifier, timestampRange));
return this;
}

Builder deleteFamily(@Nonnull String familyName) {
@InternalApi("Used in Changestream beam pipeline.")
public Builder deleteFamily(@Nonnull String familyName) {
this.entries.add(DeleteFamily.create(familyName));
return this;
}

Builder setToken(@Nonnull String token) {
@InternalApi("Used in Changestream beam pipeline.")
public Builder setToken(@Nonnull String token) {
this.token = token;
return this;
}

Builder setLowWatermark(@Nonnull Timestamp lowWatermark) {
@InternalApi("Used in Changestream beam pipeline.")
public Builder setLowWatermark(@Nonnull Timestamp lowWatermark) {
this.lowWatermark = lowWatermark;
return this;
}

ChangeStreamMutation build() {
@InternalApi("Used in Changestream beam pipeline.")
public ChangeStreamMutation build() {
Preconditions.checkArgument(
token != null && lowWatermark != null,
"ChangeStreamMutation must have a continuation token and low watermark.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ private void writeObject(ObjectOutputStream output) throws IOException {
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.CloseStream}. */
static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) {
@InternalApi("Used in Changestream beam pipeline.")
public static CloseStream fromProto(@Nonnull ReadChangeStreamResponse.CloseStream closeStream) {
return new CloseStream(closeStream.getStatus(), closeStream.getContinuationTokensList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ private static Heartbeat create(
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */
static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
@InternalApi("Used in Changestream beam pipeline.")
public static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
return create(
ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()),
heartbeat.getLowWatermark());
Expand Down

0 comments on commit 865ea62

Please sign in to comment.