Skip to content

Commit

Permalink
convert to internal changes in change stream classes
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarquezp committed Dec 11, 2024
1 parent ce6a279 commit 51657d8
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeInstant;
import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant;

import com.google.api.core.InternalApi;
import com.google.api.core.ObsoleteApi;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger;
Expand Down Expand Up @@ -82,7 +78,7 @@ static Builder createUserMutation(
.setRowKey(rowKey)
.setType(MutationType.USER)
.setSourceClusterId(sourceClusterId)
.setCommitTime(commitTimestamp)
.setCommitTimestamp(commitTimestamp)
.setTieBreaker(tieBreaker);
}

Expand All @@ -97,7 +93,7 @@ static Builder createGcMutation(
.setRowKey(rowKey)
.setType(MutationType.GARBAGE_COLLECTION)
.setSourceClusterId("")
.setCommitTime(commitTimestamp)
.setCommitTimestamp(commitTimestamp)
.setTieBreaker(tieBreaker);
}

Expand All @@ -113,14 +109,7 @@ static Builder createGcMutation(
@Nonnull
public abstract String getSourceClusterId();

/** This method is obsolete. Use {@link #getCommitTime()} instead. */
@ObsoleteApi("Use getCommitTime() instead")
public abstract org.threeten.bp.Instant getCommitTimestamp();

/** Get the commit timestamp of the current mutation. */
public java.time.Instant getCommitTime() {
return toJavaTimeInstant(getCommitTimestamp());
}
public abstract java.time.Instant getCommitTimestamp();

/**
* Get the tie breaker of the current mutation. This is used to resolve conflicts when multiple
Expand All @@ -132,14 +121,8 @@ public java.time.Instant getCommitTime() {
@Nonnull
public abstract String getToken();

/** This method is obsolete. Use {@link #getEstimatedLowWatermarkTime()} instead. */
@ObsoleteApi("Use getEstimatedLowWatermarkTime() instead")
public abstract org.threeten.bp.Instant getEstimatedLowWatermark();

/** Get the low watermark of the current mutation. */
public java.time.Instant getEstimatedLowWatermarkTime() {
return toJavaTimeInstant(getEstimatedLowWatermark());
}
public abstract java.time.Instant getEstimatedLowWatermark();

/** Get the list of mods of the current mutation. */
@Nonnull
Expand All @@ -160,27 +143,15 @@ abstract static class Builder {

abstract Builder setSourceClusterId(@Nonnull String sourceClusterId);

Builder setCommitTime(java.time.Instant commitTimestamp) {
return setCommitTimestamp(toThreetenInstant(commitTimestamp));
}

/** This method is obsolete. Use {@link #setCommitTime(java.time.Instant)} instead. */
@ObsoleteApi("Use setCommitTime(java.time.Instant) instead")
abstract Builder setCommitTimestamp(org.threeten.bp.Instant commitTimestamp);
abstract Builder setCommitTimestamp(java.time.Instant commitTimestamp);

abstract Builder setTieBreaker(int tieBreaker);

abstract ImmutableList.Builder<Entry> entriesBuilder();

abstract Builder setToken(@Nonnull String token);

Builder setLowWatermarkTime(java.time.Instant estimatedLowWatermark) {
return setEstimatedLowWatermark(toThreetenInstant(estimatedLowWatermark));
}

/** This method is obsolete. Use {@link #setLowWatermarkTime(java.time.Instant)} instead. */
@ObsoleteApi("Use setEstimatedLowWatermarkInstant(java.time.Instant) instead")
abstract Builder setEstimatedLowWatermark(org.threeten.bp.Instant estimatedLowWatermark);
abstract Builder setEstimatedLowWatermark(java.time.Instant estimatedLowWatermark);

Builder setCell(
@Nonnull String familyName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void finishCell() {
public ChangeStreamRecord finishChangeStreamMutation(
String token, Instant estimatedLowWatermark) {
this.changeStreamMutationBuilder.setToken(token);
this.changeStreamMutationBuilder.setLowWatermarkTime(estimatedLowWatermark);
this.changeStreamMutationBuilder.setEstimatedLowWatermark(estimatedLowWatermark);
return this.changeStreamMutationBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,11 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeInstant;
import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant;

import com.google.api.core.InternalApi;
import com.google.api.core.ObsoleteApi;
import com.google.auto.value.AutoValue;
import com.google.bigtable.v2.ReadChangeStreamResponse;
import java.io.Serializable;
import java.time.Instant;
import javax.annotation.Nonnull;

/** A simple wrapper for {@link ReadChangeStreamResponse.Heartbeat}. */
Expand All @@ -32,30 +29,22 @@ public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
private static final long serialVersionUID = 7316215828353608504L;

private static Heartbeat create(
ChangeStreamContinuationToken changeStreamContinuationToken,
java.time.Instant estimatedLowWatermark) {
return new AutoValue_Heartbeat(
changeStreamContinuationToken, toThreetenInstant(estimatedLowWatermark));
ChangeStreamContinuationToken changeStreamContinuationToken, Instant estimatedLowWatermark) {
return new AutoValue_Heartbeat(changeStreamContinuationToken, estimatedLowWatermark);
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */
static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
return create(
ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()),
java.time.Instant.ofEpochSecond(
Instant.ofEpochSecond(
heartbeat.getEstimatedLowWatermark().getSeconds(),
heartbeat.getEstimatedLowWatermark().getNanos()));
}

@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
public abstract ChangeStreamContinuationToken getChangeStreamContinuationToken();

/** This method is obsolete. Use {@link #getEstimatedLowWatermarkInstant()} instead. */
@ObsoleteApi("Use getEstimatedLowWatermarkInstant() instead")
public abstract org.threeten.bp.Instant getEstimatedLowWatermark();

@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
public java.time.Instant getEstimatedLowWatermarkInstant() {
return toJavaTimeInstant(getEstimatedLowWatermark());
}
public abstract Instant getEstimatedLowWatermark();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant;
import static com.google.common.truth.Truth.assertThat;

import com.google.bigtable.v2.MutateRowRequest;
Expand Down Expand Up @@ -46,10 +45,6 @@ public class ChangeStreamMutationTest {
RequestContext.create(PROJECT_ID, INSTANCE_ID, APP_PROFILE_ID);
private static final Instant FAKE_COMMIT_TIMESTAMP = Instant.ofEpochSecond(0, 1000L);
private static final Instant FAKE_LOW_WATERMARK = Instant.ofEpochSecond(0, 2000L);
private static final org.threeten.bp.Instant FAKE_COMMIT_TIMESTAMP_THREETEN =
toThreetenInstant(FAKE_COMMIT_TIMESTAMP);
private static final org.threeten.bp.Instant FAKE_LOW_WATERMARK_THREETEN =
toThreetenInstant(FAKE_LOW_WATERMARK);

@Test
public void userInitiatedMutationTest() throws IOException, ClassNotFoundException {
Expand Down Expand Up @@ -78,20 +73,18 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti
Value.rawTimestamp(1000),
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setLowWatermarkTime(FAKE_LOW_WATERMARK)
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();

// Test the getters.
assertThat(changeStreamMutation.getRowKey()).isEqualTo(ByteString.copyFromUtf8("key"));
assertThat(changeStreamMutation.getType()).isEqualTo(ChangeStreamMutation.MutationType.USER);
assertThat(changeStreamMutation.getSourceClusterId()).isEqualTo("fake-source-cluster-id");
assertThat(changeStreamMutation.getCommitTime()).isEqualTo(FAKE_COMMIT_TIMESTAMP);
assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP_THREETEN);
assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP);
assertThat(changeStreamMutation.getTieBreaker()).isEqualTo(0);
assertThat(changeStreamMutation.getToken()).isEqualTo("fake-token");
assertThat(changeStreamMutation.getEstimatedLowWatermarkTime()).isEqualTo(FAKE_LOW_WATERMARK);
assertThat(changeStreamMutation.getEstimatedLowWatermark())
.isEqualTo(FAKE_LOW_WATERMARK_THREETEN);
assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK);
assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK);

// Test serialization.
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Expand Down Expand Up @@ -120,21 +113,19 @@ public void gcMutationTest() throws IOException, ClassNotFoundException {
ByteString.copyFromUtf8("fake-qualifier"),
Range.TimestampRange.create(1000L, 2000L))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();

// Test the getters.
assertThat(changeStreamMutation.getRowKey()).isEqualTo(ByteString.copyFromUtf8("key"));
assertThat(changeStreamMutation.getType())
.isEqualTo(ChangeStreamMutation.MutationType.GARBAGE_COLLECTION);
Assert.assertTrue(changeStreamMutation.getSourceClusterId().isEmpty());
assertThat(changeStreamMutation.getCommitTime()).isEqualTo(FAKE_COMMIT_TIMESTAMP);
assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP_THREETEN);
assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP);
assertThat(changeStreamMutation.getTieBreaker()).isEqualTo(0);
assertThat(changeStreamMutation.getToken()).isEqualTo("fake-token");
assertThat(changeStreamMutation.getEstimatedLowWatermarkTime()).isEqualTo(FAKE_LOW_WATERMARK);
assertThat(changeStreamMutation.getEstimatedLowWatermark())
.isEqualTo(FAKE_LOW_WATERMARK_THREETEN);
assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK);
assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK);

// Test serialization.
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Expand Down Expand Up @@ -172,7 +163,7 @@ public void toRowMutationTest() {
Value.rawTimestamp(1000),
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();

// Convert it to a rowMutation and construct a MutateRowRequest.
Expand Down Expand Up @@ -215,7 +206,7 @@ public void toRowMutationWithoutTokenShouldFailTest() {
ChangeStreamMutation.createUserMutation(
ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0)
.deleteFamily("fake-family")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN);
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK);
Assert.assertThrows(IllegalStateException.class, builder::build);
}

Expand Down Expand Up @@ -255,7 +246,7 @@ public void toRowMutationEntryTest() {
Value.rawTimestamp(1000),
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();

// Convert it to a rowMutationEntry and construct a MutateRowRequest.
Expand Down Expand Up @@ -295,7 +286,7 @@ public void toRowMutationEntryWithoutTokenShouldFailTest() {
ChangeStreamMutation.createUserMutation(
ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0)
.deleteFamily("fake-family")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN);
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK);
Assert.assertThrows(IllegalStateException.class, builder::build);
}

Expand All @@ -320,7 +311,7 @@ public void testWithLongValue() {
1000L,
ByteString.copyFrom(Longs.toByteArray(1L)))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
.build();

RowMutation rowMutation = changeStreamMutation.toRowMutation(TABLE_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void heartbeatTest() {
.build();
Heartbeat actualHeartbeat = Heartbeat.fromProto(heartbeatProto);

assertThat(actualHeartbeat.getEstimatedLowWatermarkInstant())
assertThat(actualHeartbeat.getEstimatedLowWatermark())
.isEqualTo(Instant.ofEpochSecond(lowWatermark.getSeconds(), lowWatermark.getNanos()));
assertThat(actualHeartbeat.getChangeStreamContinuationToken().getPartition())
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
Expand Down
Loading

0 comments on commit 51657d8

Please sign in to comment.