Skip to content

Commit

Permalink
convert to internal changes in change stream classes ii
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomarquezp committed Dec 12, 2024
1 parent 06501e7 commit ed0213d
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeInstant;
import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant;

import com.google.api.core.InternalApi;
Expand Down Expand Up @@ -115,12 +114,12 @@ static Builder createGcMutation(

/** This method is obsolete. Use {@link #getCommitTime()} instead. */
@ObsoleteApi("Use getCommitTime() instead")
public abstract org.threeten.bp.Instant getCommitTimestamp();
public org.threeten.bp.Instant getCommitTimestamp() {
return toThreetenInstant(getCommitTime());
}

/** Get the commit timestamp of the current mutation. */
public java.time.Instant getCommitTime() {
return toJavaTimeInstant(getCommitTimestamp());
}
public abstract java.time.Instant getCommitTime();

/**
* Get the tie breaker of the current mutation. This is used to resolve conflicts when multiple
Expand All @@ -134,12 +133,12 @@ public java.time.Instant getCommitTime() {

/** This method is obsolete. Use {@link #getEstimatedLowWatermarkTime()} instead. */
@ObsoleteApi("Use getEstimatedLowWatermarkTime() instead")
public abstract org.threeten.bp.Instant getEstimatedLowWatermark();
public org.threeten.bp.Instant getEstimatedLowWatermark() {
return toThreetenInstant(getEstimatedLowWatermarkTime());
}

/** Get the low watermark of the current mutation. */
public java.time.Instant getEstimatedLowWatermarkTime() {
return toJavaTimeInstant(getEstimatedLowWatermark());
}
public abstract java.time.Instant getEstimatedLowWatermarkTime();

/** Get the list of mods of the current mutation. */
@Nonnull
Expand All @@ -160,27 +159,15 @@ abstract static class Builder {

abstract Builder setSourceClusterId(@Nonnull String sourceClusterId);

Builder setCommitTime(java.time.Instant commitTimestamp) {
return setCommitTimestamp(toThreetenInstant(commitTimestamp));
}

/** This method is obsolete. Use {@link #setCommitTime(java.time.Instant)} instead. */
@ObsoleteApi("Use setCommitTime(java.time.Instant) instead")
abstract Builder setCommitTimestamp(org.threeten.bp.Instant commitTimestamp);
abstract Builder setCommitTime(java.time.Instant commitTimestamp);

abstract Builder setTieBreaker(int tieBreaker);

abstract ImmutableList.Builder<Entry> entriesBuilder();

abstract Builder setToken(@Nonnull String token);

Builder setLowWatermarkTime(java.time.Instant estimatedLowWatermark) {
return setEstimatedLowWatermark(toThreetenInstant(estimatedLowWatermark));
}

/** This method is obsolete. Use {@link #setLowWatermarkTime(java.time.Instant)} instead. */
@ObsoleteApi("Use setEstimatedLowWatermarkInstant(java.time.Instant) instead")
abstract Builder setEstimatedLowWatermark(org.threeten.bp.Instant estimatedLowWatermark);
abstract Builder setEstimatedLowWatermarkTime(java.time.Instant estimatedLowWatermark);

Builder setCell(
@Nonnull String familyName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public void finishCell() {
public ChangeStreamRecord finishChangeStreamMutation(
String token, Instant estimatedLowWatermark) {
this.changeStreamMutationBuilder.setToken(token);
this.changeStreamMutationBuilder.setLowWatermarkTime(estimatedLowWatermark);
this.changeStreamMutationBuilder.setEstimatedLowWatermarkTime(estimatedLowWatermark);
return this.changeStreamMutationBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeInstant;
import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant;

import com.google.api.core.InternalApi;
Expand All @@ -34,8 +33,7 @@ public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
private static Heartbeat create(
ChangeStreamContinuationToken changeStreamContinuationToken,
java.time.Instant estimatedLowWatermark) {
return new AutoValue_Heartbeat(
changeStreamContinuationToken, toThreetenInstant(estimatedLowWatermark));
return new AutoValue_Heartbeat(changeStreamContinuationToken, estimatedLowWatermark);
}

/** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */
Expand All @@ -50,12 +48,12 @@ static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
public abstract ChangeStreamContinuationToken getChangeStreamContinuationToken();

/** This method is obsolete. Use {@link #getEstimatedLowWatermarkInstant()} instead. */
@ObsoleteApi("Use getEstimatedLowWatermarkInstant() instead")
public abstract org.threeten.bp.Instant getEstimatedLowWatermark();
/** This method is obsolete. Use {@link #getEstimatedLowWatermarkTime()} instead. */
@ObsoleteApi("Use getEstimatedLowWatermarkTime() instead")
public org.threeten.bp.Instant getEstimatedLowWatermark() {
return toThreetenInstant(getEstimatedLowWatermarkTime());
}

@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
public java.time.Instant getEstimatedLowWatermarkInstant() {
return toJavaTimeInstant(getEstimatedLowWatermark());
}
public abstract java.time.Instant getEstimatedLowWatermarkTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti
Value.rawTimestamp(1000),
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setLowWatermarkTime(FAKE_LOW_WATERMARK)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();

// Test the getters.
Expand Down Expand Up @@ -120,7 +120,7 @@ public void gcMutationTest() throws IOException, ClassNotFoundException {
ByteString.copyFromUtf8("fake-qualifier"),
Range.TimestampRange.create(1000L, 2000L))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();

// Test the getters.
Expand Down Expand Up @@ -172,7 +172,7 @@ public void toRowMutationTest() {
Value.rawTimestamp(1000),
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();

// Convert it to a rowMutation and construct a MutateRowRequest.
Expand Down Expand Up @@ -215,7 +215,7 @@ public void toRowMutationWithoutTokenShouldFailTest() {
ChangeStreamMutation.createUserMutation(
ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0)
.deleteFamily("fake-family")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN);
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK);
Assert.assertThrows(IllegalStateException.class, builder::build);
}

Expand Down Expand Up @@ -255,7 +255,7 @@ public void toRowMutationEntryTest() {
Value.rawTimestamp(1000),
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();

// Convert it to a rowMutationEntry and construct a MutateRowRequest.
Expand Down Expand Up @@ -295,7 +295,7 @@ public void toRowMutationEntryWithoutTokenShouldFailTest() {
ChangeStreamMutation.createUserMutation(
ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0)
.deleteFamily("fake-family")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN);
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK);
Assert.assertThrows(IllegalStateException.class, builder::build);
}

Expand All @@ -320,7 +320,7 @@ public void testWithLongValue() {
1000L,
ByteString.copyFrom(Longs.toByteArray(1L)))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();

RowMutation rowMutation = changeStreamMutation.toRowMutation(TABLE_ID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void heartbeatTest() {
.build();
Heartbeat actualHeartbeat = Heartbeat.fromProto(heartbeatProto);

assertThat(actualHeartbeat.getEstimatedLowWatermarkInstant())
assertThat(actualHeartbeat.getEstimatedLowWatermarkTime())
.isEqualTo(Instant.ofEpochSecond(lowWatermark.getSeconds(), lowWatermark.getNanos()));
assertThat(actualHeartbeat.getChangeStreamContinuationToken().getPartition())
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public void isHeartbeatTest() {
ChangeStreamMutation.createGcMutation(
ByteString.copyFromUtf8("key"), FAKE_COMMIT_TIMESTAMP, 0)
.setToken("token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();
Assert.assertTrue(adapter.isHeartbeat(heartbeatRecord));
Assert.assertFalse(adapter.isHeartbeat(closeStreamRecord));
Expand Down Expand Up @@ -102,7 +102,7 @@ public void isChangeStreamMutationTest() {
ChangeStreamMutation.createGcMutation(
ByteString.copyFromUtf8("key"), FAKE_COMMIT_TIMESTAMP, 0)
.setToken("token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();
Assert.assertFalse(adapter.isChangeStreamMutation(heartbeatRecord));
Assert.assertFalse(adapter.isChangeStreamMutation(closeStreamRecord));
Expand All @@ -115,7 +115,7 @@ public void getTokenFromChangeStreamMutationTest() {
ChangeStreamMutation.createGcMutation(
ByteString.copyFromUtf8("key"), FAKE_COMMIT_TIMESTAMP, 0)
.setToken("change-stream-mutation-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();
Assert.assertEquals(
adapter.getTokenFromChangeStreamMutation(changeStreamMutationRecord),
Expand Down Expand Up @@ -189,7 +189,7 @@ public void singleDeleteFamilyTest() {
ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0)
.deleteFamily("fake-family")
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();

// Create the ChangeStreamMutation through the ChangeStreamRecordBuilder.
Expand Down Expand Up @@ -228,7 +228,7 @@ public void singleDeleteCellTest() {
ByteString.copyFromUtf8("fake-qualifier"),
Range.TimestampRange.create(1000L, 2000L))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();

// Create the ChangeStreamMutation through the ChangeStreamRecordBuilder.
Expand Down Expand Up @@ -261,7 +261,7 @@ public void singleNonChunkedCellTest() {
100L,
ByteString.copyFromUtf8("fake-value"))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();

// Create the ChangeStreamMutation through the ChangeStreamRecordBuilder.
Expand Down Expand Up @@ -293,7 +293,7 @@ public void singleChunkedCellTest() {
100L,
ByteString.copyFromUtf8("fake-value1-value2"))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();

// Create the ChangeStreamMutation through the ChangeStreamRecordBuilder.
Expand Down Expand Up @@ -330,7 +330,7 @@ public void multipleChunkedCellsTest() {
}
expectedChangeStreamMutationBuilder
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN);
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK);

// Create the ChangeStreamMutation through the ChangeStreamRecordBuilder.
changeStreamRecordBuilder.startUserMutation(
Expand Down Expand Up @@ -372,7 +372,7 @@ public void multipleDifferentModsTest() {
100L,
ByteString.copyFromUtf8("chunked-value"))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN);
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK);

// Create the ChangeStreamMutation through the ChangeStreamRecordBuilder.
changeStreamRecordBuilder.startUserMutation(
Expand Down Expand Up @@ -421,7 +421,7 @@ public void resetTest() {
ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0)
.deleteFamily("fake-family")
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();
changeStreamRecordBuilder.startUserMutation(
ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0);
Expand All @@ -441,7 +441,7 @@ public void resetTest() {
100L,
ByteString.copyFromUtf8("fake-value1-value2"))
.setToken("fake-token")
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
.setEstimatedLowWatermarkTime(FAKE_LOW_WATERMARK)
.build();

changeStreamRecordBuilder.startUserMutation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void heartbeatTest() {
Instant.ofEpochSecond(
heartbeatProto.getEstimatedLowWatermark().getSeconds(),
heartbeatProto.getEstimatedLowWatermark().getNanos()));
assertThat(heartbeat.getEstimatedLowWatermarkInstant())
assertThat(heartbeat.getEstimatedLowWatermarkTime())
.isEqualTo(
java.time.Instant.ofEpochSecond(
heartbeatProto.getEstimatedLowWatermark().getSeconds(),
Expand Down

0 comments on commit ed0213d

Please sign in to comment.