From 051bf2911f13e7c05d54ff6161854950ce351f13 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Mon, 8 Aug 2022 17:19:48 -0400 Subject: [PATCH 1/5] feat: Implement ReadChangeStreamResumptionStrategy --- .../data/v2/stub/EnhancedBigtableStub.java | 8 +- .../ReadChangeStreamResumptionStrategy.java | 96 ++++ .../ReadChangeStreamRetryTest.java | 494 ++++++++++++++++++ 3 files changed, 594 insertions(+), 4 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 7872b1e07e..6b5746fea2 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -82,6 +82,7 @@ import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMergingCallable; import com.google.cloud.bigtable.data.v2.stub.changestream.ListChangeStreamPartitionsUserCallable; +import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamResumptionStrategy; import com.google.cloud.bigtable.data.v2.stub.changestream.ReadChangeStreamUserCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerStreamingCallable; import com.google.cloud.bigtable.data.v2.stub.metrics.BigtableTracerUnaryCallable; @@ -898,7 +899,7 @@ public Map extract( * case of mutations, it will merge the {@link ReadChangeStreamResponse.DataChange}s into * {@link ChangeStreamMutation}. The actual change stream record implementation can be * configured by the {@code changeStreamRecordAdapter} parameter. - *
  • TODO: Retry/resume on failure. + *
  • Retry/resume on failure. *
  • Add tracing & metrics. * */ @@ -939,7 +940,8 @@ public Map extract( // Copy idle timeout settings for watchdog. ServerStreamingCallSettings innerSettings = ServerStreamingCallSettings.newBuilder() - // TODO: setResumptionStrategy. + .setResumptionStrategy( + new ReadChangeStreamResumptionStrategy<>(changeStreamRecordAdapter)) .setRetryableCodes(settings.readChangeStreamSettings().getRetryableCodes()) .setRetrySettings(settings.readChangeStreamSettings().getRetrySettings()) .setIdleTimeout(settings.readChangeStreamSettings().getIdleTimeout()) @@ -951,8 +953,6 @@ public Map extract( ServerStreamingCallable withBigtableTracer = new BigtableTracerStreamingCallable<>(watched); - // TODO: Add ReadChangeStreamRetryCompletedCallable. - ServerStreamingCallable readChangeStreamCallable = Callables.retrying(withBigtableTracer, innerSettings, clientContext); diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java new file mode 100644 index 0000000000..9c709d9f2c --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java @@ -0,0 +1,96 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.changestream; + +import com.google.api.core.InternalApi; +import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.bigtable.v2.ReadChangeStreamRequest; +import com.google.bigtable.v2.ReadChangeStreamRequest.Builder; +import com.google.bigtable.v2.StreamContinuationToken; +import com.google.bigtable.v2.StreamContinuationTokens; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecordAdapter; + +/** + * An implementation of a {@link StreamResumptionStrategy} for change stream records. This class + * tracks the continuation token and upon retry can build a request to resume the stream from where + * it left off. + * + *

    This class is considered an internal implementation detail and not meant to be used by + * applications. + */ +@InternalApi +public class ReadChangeStreamResumptionStrategy + implements StreamResumptionStrategy { + private final ChangeStreamRecordAdapter changeStreamRecordAdapter; + private String token = null; + + public ReadChangeStreamResumptionStrategy( + ChangeStreamRecordAdapter changeStreamRecordAdapter) { + this.changeStreamRecordAdapter = changeStreamRecordAdapter; + } + + @Override + public boolean canResume() { + return true; + } + + @Override + public StreamResumptionStrategy createNew() { + return new ReadChangeStreamResumptionStrategy<>(changeStreamRecordAdapter); + } + + @Override + public ChangeStreamRecordT processResponse(ChangeStreamRecordT response) { + // Update the token from a Heartbeat or a ChangeStreamMutation. + if (changeStreamRecordAdapter.isHeartbeat(response)) { + this.token = changeStreamRecordAdapter.getTokenFromHeartbeat(response); + } + if (changeStreamRecordAdapter.isChangeStreamMutation(response)) { + this.token = changeStreamRecordAdapter.getTokenFromChangeStreamMutation(response); + } + return response; + } + + /** + * {@inheritDoc} + * + *

    Given a request, this implementation will narrow that request to include data changes that + * come after {@link #token}. + */ + @Override + public ReadChangeStreamRequest getResumeRequest(ReadChangeStreamRequest originalRequest) { + // A null token means that we have not successfully read a Heartbeat nor a ChangeStreamMutation, + // so start from the beginning. + if (this.token == null) { + return originalRequest; + } + + Builder builder = originalRequest.toBuilder(); + // We need to clear either start_time or continuation_tokens. + // And just use the StreamPartition and the token to resume the request. + builder.clearStartFrom(); + builder.setContinuationTokens( + StreamContinuationTokens.newBuilder() + .addTokens( + StreamContinuationToken.newBuilder() + .setPartition(originalRequest.getPartition()) + .setToken(this.token) + .build()) + .build()); + + return builder.build(); + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java new file mode 100644 index 0000000000..25674b2b10 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java @@ -0,0 +1,494 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub.changestream; + +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.InternalException; +import com.google.api.gax.rpc.ServerStream; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.Mutation; +import com.google.bigtable.v2.ReadChangeStreamRequest; +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.bigtable.v2.RowRange; +import com.google.bigtable.v2.StreamContinuationToken; +import com.google.bigtable.v2.StreamContinuationTokens; +import com.google.bigtable.v2.StreamPartition; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.internal.NameUtil; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamRecord; +import com.google.cloud.bigtable.data.v2.models.CloseStream; +import com.google.cloud.bigtable.data.v2.models.Heartbeat; +import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery; +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import com.google.common.truth.Truth; +import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Queue; +import javax.annotation.Nonnull; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ReadChangeStreamRetryTest { + private static final String PROJECT_ID = "fake-project"; + private static final String INSTANCE_ID = "fake-instance"; + private static final String TABLE_ID = "fake-table"; + private static final String START_KEY_CLOSED = "a"; + private static final String END_KEY_OPEN = "b"; + private static final String HEARTBEAT_TOKEN = "heartbeat-token"; + private static final String CLOSE_STREAM_TOKEN = "close-stream-token"; + private static final String DATA_CHANGE_TOKEN = "data-change-token"; + private static Timestamp REQUEST_START_TIME = Timestamp.newBuilder().setSeconds(1).build(); + + @Rule public GrpcServerRule serverRule = new GrpcServerRule(); + private TestBigtableService service; + private BigtableDataClient client; + + @Before + public void setUp() throws IOException { + service = new TestBigtableService(); + serverRule.getServiceRegistry().addService(service); + + BigtableDataSettings.Builder settings = + BigtableDataSettings.newBuilder() + .setProjectId(PROJECT_ID) + .setInstanceId(INSTANCE_ID) + .setCredentialsProvider(NoCredentialsProvider.create()); + + settings + .stubSettings() + .setTransportChannelProvider( + FixedTransportChannelProvider.create( + GrpcTransportChannel.create(serverRule.getChannel()))) + .build(); + + client = BigtableDataClient.create(settings.build()); + } + + @After + public void tearDown() { + if (client != null) { + client.close(); + } + } + + private StreamContinuationToken createStreamContinuationToken(@Nonnull String token) { + return StreamContinuationToken.newBuilder() + .setPartition( + StreamPartition.newBuilder() + .setRowRange( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8(START_KEY_CLOSED)) + .setEndKeyOpen(ByteString.copyFromUtf8(END_KEY_OPEN)) + .build()) + .build()) + .setToken(token) + .build(); + } + + private ReadChangeStreamResponse.Heartbeat createHeartbeat( + StreamContinuationToken streamContinuationToken) { + return ReadChangeStreamResponse.Heartbeat.newBuilder() + .setContinuationToken(streamContinuationToken) + .setLowWatermark(Timestamp.newBuilder().setSeconds(1000).build()) + .build(); + } + + private ReadChangeStreamResponse.CloseStream createCloseStream() { + return ReadChangeStreamResponse.CloseStream.newBuilder() + .addContinuationTokens(createStreamContinuationToken(CLOSE_STREAM_TOKEN)) + .setStatus(com.google.rpc.Status.newBuilder().setCode(0).build()) + .build(); + } + + private ReadChangeStreamResponse.DataChange createDataChange(boolean done) { + Mutation deleteFromFamily = + Mutation.newBuilder() + .setDeleteFromFamily( + Mutation.DeleteFromFamily.newBuilder().setFamilyName("fake-family").build()) + .build(); + ReadChangeStreamResponse.DataChange.Builder dataChangeBuilder = + ReadChangeStreamResponse.DataChange.newBuilder() + .setType(ReadChangeStreamResponse.DataChange.Type.USER) + .setSourceClusterId("fake-source-cluster-id") + .setRowKey(ByteString.copyFromUtf8("key")) + .setCommitTimestamp(Timestamp.newBuilder().setSeconds(100).build()) + .setTiebreaker(100) + .addChunks( + ReadChangeStreamResponse.MutationChunk.newBuilder().setMutation(deleteFromFamily)); + if (done) { + dataChangeBuilder.setDone(true); + dataChangeBuilder.setLowWatermark(Timestamp.newBuilder().setSeconds(1).build()); + dataChangeBuilder.setToken(DATA_CHANGE_TOKEN); + } + return dataChangeBuilder.build(); + } + + // [{ReadChangeStreamResponse.Heartbeat}] -> [{Heartbeat}] + @Test + public void happyPathHeartbeatTest() { + ReadChangeStreamResponse heartbeatResponse = + ReadChangeStreamResponse.newBuilder() + .setHeartbeat(createHeartbeat(createStreamContinuationToken(HEARTBEAT_TOKEN))) + .build(); + service.expectations.add( + RpcExpectation.create().expectInitialRequest().respondWith(heartbeatResponse)); + List actualResults = getResults(); + Assert.assertEquals(actualResults.size(), 1); + Assert.assertTrue(actualResults.get(0) instanceof Heartbeat); + } + + // [{ReadChangeStreamResponse.CloseStream}] -> [{CloseStream}] + @Test + public void happyPathCloseStreamTest() { + ReadChangeStreamResponse closeStreamResponse = + ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build(); + service.expectations.add( + RpcExpectation.create().expectInitialRequest().respondWith(closeStreamResponse)); + List actualResults = getResults(); + Assert.assertEquals(actualResults.size(), 1); + Assert.assertTrue(actualResults.get(0) instanceof CloseStream); + } + + // [{DataChange(done==true)}] -> [{ReadChangeStreamMutation}] + @Test + public void happyPathCompleteDataChangeTest() { + // Setting `done==true` to complete the ChangeStreamMutation. + ReadChangeStreamResponse dataChangeResponse = + ReadChangeStreamResponse.newBuilder().setDataChange(createDataChange(true)).build(); + service.expectations.add( + RpcExpectation.create().expectInitialRequest().respondWith(dataChangeResponse)); + List actualResults = getResults(); + Assert.assertEquals(actualResults.size(), 1); + Assert.assertTrue(actualResults.get(0) instanceof ChangeStreamMutation); + } + + // [{UNAVAILABLE}, {ReadChangeStreamResponse.Heartbeat}] -> [{Heartbeat}] + @Test + public void singleHeartbeatImmediateRetryTest() { + ReadChangeStreamResponse heartbeatResponse = + ReadChangeStreamResponse.newBuilder() + .setHeartbeat(createHeartbeat(createStreamContinuationToken(HEARTBEAT_TOKEN))) + .build(); + service.expectations.add( + RpcExpectation.create().expectInitialRequest().respondWithStatus(Code.UNAVAILABLE)); + // Resume with the exact same request. + service.expectations.add( + RpcExpectation.create().expectInitialRequest().respondWith(heartbeatResponse)); + List actualResults = getResults(); + Assert.assertEquals(actualResults.size(), 1); + Assert.assertTrue(actualResults.get(0) instanceof Heartbeat); + } + + // [{UNAVAILABLE}, {ReadChangeStreamResponse.CloseStream}] -> [{CloseStream}] + @Test + public void singleCloseStreamImmediateRetryTest() { + // CloseStream. + ReadChangeStreamResponse closeStreamResponse = + ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build(); + service.expectations.add( + RpcExpectation.create().expectInitialRequest().respondWithStatus(Code.UNAVAILABLE)); + // Resume with the exact same request. + service.expectations.add( + RpcExpectation.create().expectInitialRequest().respondWith(closeStreamResponse)); + List actualResults = getResults(); + Assert.assertEquals(actualResults.size(), 1); + Assert.assertTrue(actualResults.get(0) instanceof CloseStream); + } + + // [{UNAVAILABLE}, {DataChange with done==true}] -> [{(ChangeStreamRecord) ChangeStreamMutation}] + @Test + public void singleCompleteDataChangeImmediateRetryTest() { + // DataChange + ReadChangeStreamResponse dataChangeResponse = + ReadChangeStreamResponse.newBuilder().setDataChange(createDataChange(true)).build(); + service.expectations.add( + RpcExpectation.create().expectInitialRequest().respondWithStatus(Code.UNAVAILABLE)); + // Resume with the exact same request. + service.expectations.add( + RpcExpectation.create().expectInitialRequest().respondWith(dataChangeResponse)); + List actualResults = getResults(); + Assert.assertEquals(actualResults.size(), 1); + Assert.assertTrue(actualResults.get(0) instanceof ChangeStreamMutation); + } + + // [{ReadChangeStreamResponse.Heartbeat}, {UNAVAILABLE}] -> Resume with token from heartbeat. + @Test + public void errorAfterHeartbeatShouldResumeWithTokenTest() { + StreamContinuationToken streamContinuationToken = + createStreamContinuationToken(HEARTBEAT_TOKEN); + ReadChangeStreamResponse heartbeatResponse = + ReadChangeStreamResponse.newBuilder() + .setHeartbeat(createHeartbeat(streamContinuationToken)) + .build(); + service.expectations.add( + RpcExpectation.create() + .expectInitialRequest() + .respondWith(heartbeatResponse) + .respondWithStatus(Code.UNAVAILABLE)); + // Resume the request with the token from the Heartbeat. `startTime` is cleared. + // We don't care about the response here so just do expectRequest. + service.expectations.add( + RpcExpectation.create() + .expectRequest( + StreamContinuationTokens.newBuilder().addTokens(streamContinuationToken).build())); + List actualResults = getResults(); + // This is the Heartbeat we get before UNAVAILABLE. + Assert.assertEquals(actualResults.size(), 1); + Assert.assertTrue(actualResults.get(0) instanceof Heartbeat); + } + + // [{ReadChangeStreamResponse.CloseStream}, {UNAVAILABLE}] -> Resume with original request. + @Test + public void errorAfterSingleCloseStreamShouldResumeWithOriginalRequestTest() { + // CloseStream. + ReadChangeStreamResponse closeStreamResponse = + ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build(); + service.expectations.add( + RpcExpectation.create() + .expectInitialRequest() + .respondWith(closeStreamResponse) + .respondWithStatus(Code.UNAVAILABLE)); + // Resume the request with the original request. + // We don't care about the response here so just do expectRequest. + service.expectations.add(RpcExpectation.create().expectInitialRequest()); + List actualResults = getResults(); + Assert.assertEquals(actualResults.size(), 1); + Assert.assertTrue(actualResults.get(0) instanceof CloseStream); + } + + // [{DataChange with done==true}, {UNAVAILABLE}] -> Resume with token from DataChange. + @Test + public void errorAfterDataChangeWithDoneShouldResumeWithTokenTest() { + // DataChange + ReadChangeStreamResponse dataChangeResponse = + ReadChangeStreamResponse.newBuilder().setDataChange(createDataChange(true)).build(); + service.expectations.add( + RpcExpectation.create() + .expectInitialRequest() + .respondWith(dataChangeResponse) + .respondWithStatus(Code.UNAVAILABLE)); + // Resume the request with the token from the ChangeStreamMutation. `startTime` is cleared. + // We don't care about the response here so just do expectRequest. + service.expectations.add( + RpcExpectation.create() + .expectRequest( + StreamContinuationTokens.newBuilder() + .addTokens(createStreamContinuationToken(DATA_CHANGE_TOKEN)) + .build())); + List actualResults = getResults(); + Assert.assertEquals(actualResults.size(), 1); + Assert.assertTrue(actualResults.get(0) instanceof ChangeStreamMutation); + } + + // [{DataChange with done==false}, {UNAVAILABLE}] -> Resume with original request. + @Test + public void errorAfterDataChangeWithoutDoneShouldResumeWithTokenTest() { + // DataChange + ReadChangeStreamResponse dataChangeResponse = + ReadChangeStreamResponse.newBuilder().setDataChange(createDataChange(false)).build(); + service.expectations.add( + RpcExpectation.create() + .expectInitialRequest() + .respondWith(dataChangeResponse) + .respondWithStatus(Code.UNAVAILABLE)); + // Resume the request with the original request, because the previous DataChange didn't + // complete the ChangeStreamMutation(i.e. without `done==true`). + // We don't care about the response here so just do expectRequest. + service.expectations.add(RpcExpectation.create().expectInitialRequest()); + List actualResults = getResults(); + Truth.assertThat(actualResults).isEmpty(); + } + + // [{DataChange with done==true}, {Heartbeat}, {UNAVAILABLE}] -> Resume with token from Heartbeat. + @Test + public void shouldResumeWithLastTokenTest() { + // DataChange + ReadChangeStreamResponse dataChangeResponse = + ReadChangeStreamResponse.newBuilder().setDataChange(createDataChange(true)).build(); + // Heartbeat. + ReadChangeStreamResponse heartbeatResponse = + ReadChangeStreamResponse.newBuilder() + .setHeartbeat(createHeartbeat(createStreamContinuationToken(HEARTBEAT_TOKEN))) + .build(); + service.expectations.add( + RpcExpectation.create() + .expectInitialRequest() + .respondWith(dataChangeResponse) + .respondWith(heartbeatResponse) + .respondWithStatus(Code.UNAVAILABLE)); + // If we receive a DataChange with done==true and a Heartbeat then a retryable error, it should + // resume with the last token, which is the one from the heartbeat. + // If the original request reads with start_time, it'll be resumed with the continuation token. + // We don't care about the response here so just do expectRequest. + service.expectations.add( + RpcExpectation.create() + .expectRequest( + StreamContinuationTokens.newBuilder() + .addTokens(createStreamContinuationToken(HEARTBEAT_TOKEN)) + .build())); + List actualResults = getResults(); + Assert.assertEquals(actualResults.size(), 2); + Assert.assertTrue(actualResults.get(0) instanceof ChangeStreamMutation); + Assert.assertTrue(actualResults.get(1) instanceof Heartbeat); + } + + @Test + public void retryRstStreamExceptionTest() { + ApiException exception = + new InternalException( + new StatusRuntimeException( + Status.INTERNAL.withDescription( + "INTERNAL: HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")), + GrpcStatusCode.of(Code.INTERNAL), + false); + ReadChangeStreamResponse heartbeatResponse = + ReadChangeStreamResponse.newBuilder() + .setHeartbeat(createHeartbeat(createStreamContinuationToken(HEARTBEAT_TOKEN))) + .build(); + service.expectations.add( + RpcExpectation.create() + .expectInitialRequest() + .respondWithException(Code.INTERNAL, exception)); + service.expectations.add( + RpcExpectation.create().expectInitialRequest().respondWith(heartbeatResponse)); + List actualResults = getResults(); + Assert.assertEquals(actualResults.size(), 1); + Assert.assertTrue(actualResults.get(0) instanceof Heartbeat); + } + + private List getResults() { + ReadChangeStreamQuery query = + ReadChangeStreamQuery.create(TABLE_ID).startTime(REQUEST_START_TIME); + // Always give it this partition. We don't care. + ServerStream actualRecords = + client.readChangeStream(query.streamPartition(START_KEY_CLOSED, END_KEY_OPEN)); + List actualValues = Lists.newArrayList(); + for (ChangeStreamRecord record : actualRecords) { + actualValues.add(record); + } + return actualValues; + } + + private static class TestBigtableService extends BigtableGrpc.BigtableImplBase { + Queue expectations = Queues.newArrayDeque(); + int i = -1; + + @Override + public void readChangeStream( + ReadChangeStreamRequest request, + StreamObserver responseObserver) { + + RpcExpectation expectedRpc = expectations.poll(); + i++; + + Truth.assertWithMessage("Unexpected request#" + i + ":" + request.toString()) + .that(expectedRpc) + .isNotNull(); + Truth.assertWithMessage("Unexpected request#" + i) + .that(request) + .isEqualTo(expectedRpc.getExpectedRequest()); + + for (ReadChangeStreamResponse response : expectedRpc.responses) { + responseObserver.onNext(response); + } + if (expectedRpc.statusCode.toStatus().isOk()) { + responseObserver.onCompleted(); + } else if (expectedRpc.exception != null) { + responseObserver.onError(expectedRpc.exception); + } else { + responseObserver.onError(expectedRpc.statusCode.toStatus().asRuntimeException()); + } + } + } + + private static class RpcExpectation { + ReadChangeStreamRequest.Builder requestBuilder; + Status.Code statusCode; + ApiException exception; + List responses; + + private RpcExpectation() { + this.requestBuilder = + ReadChangeStreamRequest.newBuilder() + .setTableName(NameUtil.formatTableName(PROJECT_ID, INSTANCE_ID, TABLE_ID)) + .setPartition( + StreamPartition.newBuilder() + .setRowRange( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8(START_KEY_CLOSED)) + .setEndKeyOpen(ByteString.copyFromUtf8(END_KEY_OPEN)) + .build()) + .build()); + this.statusCode = Status.Code.OK; + this.responses = Lists.newArrayList(); + } + + static RpcExpectation create() { + return new RpcExpectation(); + } + + RpcExpectation expectInitialRequest() { + requestBuilder.setStartTime(REQUEST_START_TIME); + return this; + } + + RpcExpectation expectRequest(StreamContinuationTokens continuationTokens) { + requestBuilder.setContinuationTokens(continuationTokens); + return this; + } + + RpcExpectation respondWithStatus(Status.Code code) { + this.statusCode = code; + return this; + } + + RpcExpectation respondWithException(Status.Code code, ApiException exception) { + this.statusCode = code; + this.exception = exception; + return this; + } + + RpcExpectation respondWith(ReadChangeStreamResponse... responses) { + Collections.addAll(this.responses, responses); + return this; + } + + ReadChangeStreamRequest getExpectedRequest() { + return requestBuilder.build(); + } + } +} From 94e3ec7755e5c18dcd5daee7d79b2fd267e44096 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Wed, 10 Aug 2022 11:44:26 -0400 Subject: [PATCH 2/5] fix: Address comments --- .../v2/models/ChangeStreamRecordAdapter.java | 4 ++++ .../DefaultChangeStreamRecordAdapter.java | 6 ++++++ .../ReadChangeStreamResumptionStrategy.java | 13 +++++++++++-- .../DefaultChangeStreamRecordAdapterTest.java | 17 +++++++++++++++++ .../changestream/ReadChangeStreamRetryTest.java | 13 ++++++++----- 5 files changed, 46 insertions(+), 7 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java index 6e9715a407..6665a0e56a 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java @@ -36,6 +36,10 @@ public interface ChangeStreamRecordAdapter { @InternalApi("Used in Changestream beam pipeline.") boolean isHeartbeat(ChangeStreamRecordT record); + /** Checks if the given change stream record is a CloseStream. */ + @InternalApi("Used in Changestream beam pipeline.") + boolean isCloseStream(ChangeStreamRecordT record); + /** * Get the token from the given Heartbeat record. If the given record is not a Heartbeat, it will * throw an Exception. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java index d8eb632e54..c95618fc69 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java @@ -48,6 +48,12 @@ public String getTokenFromHeartbeat(ChangeStreamRecord record) { return ((Heartbeat) record).getChangeStreamContinuationToken().getToken(); } + /** {@inheritDoc} */ + @Override + public boolean isCloseStream(ChangeStreamRecord record) { + return record instanceof CloseStream; + } + /** {@inheritDoc} */ @Override public boolean isChangeStreamMutation(ChangeStreamRecord record) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java index 9c709d9f2c..70e0efea8d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java @@ -36,6 +36,7 @@ public class ReadChangeStreamResumptionStrategy implements StreamResumptionStrategy { private final ChangeStreamRecordAdapter changeStreamRecordAdapter; private String token = null; + private boolean canResume = true; public ReadChangeStreamResumptionStrategy( ChangeStreamRecordAdapter changeStreamRecordAdapter) { @@ -44,7 +45,7 @@ public ReadChangeStreamResumptionStrategy( @Override public boolean canResume() { - return true; + return canResume; } @Override @@ -55,9 +56,14 @@ public StreamResumptionStrategy cr @Override public ChangeStreamRecordT processResponse(ChangeStreamRecordT response) { // Update the token from a Heartbeat or a ChangeStreamMutation. + // If we get a CloseStream, disable resumption and don't re-enable it, since + // the stream is supposed to be closed upon CloseStream. if (changeStreamRecordAdapter.isHeartbeat(response)) { this.token = changeStreamRecordAdapter.getTokenFromHeartbeat(response); } + if (changeStreamRecordAdapter.isCloseStream(response)) { + canResume = false; + } if (changeStreamRecordAdapter.isChangeStreamMutation(response)) { this.token = changeStreamRecordAdapter.getTokenFromChangeStreamMutation(response); } @@ -79,8 +85,11 @@ public ReadChangeStreamRequest getResumeRequest(ReadChangeStreamRequest original } Builder builder = originalRequest.toBuilder(); - // We need to clear either start_time or continuation_tokens. + // We need to clear both start_time and continuation_tokens. // And just use the StreamPartition and the token to resume the request. + // The partition should is always the same as the one from the original request, + // because otherwise we would have received a CloseStream with different + // partitions(which indicates tablet split/merge events). builder.clearStartFrom(); builder.setContinuationTokens( StreamContinuationTokens.newBuilder() diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java index e29b914ffc..fc280e2e7e 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java @@ -75,6 +75,23 @@ public void getTokenFromHeartbeatTest() { Assert.assertEquals(adapter.getTokenFromHeartbeat(heartbeatRecord), "heartbeat-token"); } + @Test + public void isCloseStreamTest() { + ChangeStreamRecord heartbeatRecord = + Heartbeat.fromProto(ReadChangeStreamResponse.Heartbeat.getDefaultInstance()); + ChangeStreamRecord closeStreamRecord = + CloseStream.fromProto(ReadChangeStreamResponse.CloseStream.getDefaultInstance()); + ChangeStreamRecord changeStreamMutationRecord = + ChangeStreamMutation.createGcMutation( + ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0) + .setToken("token") + .setLowWatermark(Timestamp.getDefaultInstance()) + .build(); + Assert.assertFalse(adapter.isCloseStream(heartbeatRecord)); + Assert.assertTrue(adapter.isCloseStream(closeStreamRecord)); + Assert.assertFalse(adapter.isCloseStream(changeStreamMutationRecord)); + } + @Test(expected = IllegalArgumentException.class) public void getTokenFromHeartbeatInvalidTypeTest() { ChangeStreamRecord closeStreamRecord = diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java index 25674b2b10..3b417ef772 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java @@ -22,6 +22,7 @@ import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.ServerStream; +import com.google.api.gax.rpc.UnavailableException; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.ReadChangeStreamRequest; @@ -58,6 +59,7 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -76,6 +78,7 @@ public class ReadChangeStreamRetryTest { @Rule public GrpcServerRule serverRule = new GrpcServerRule(); private TestBigtableService service; private BigtableDataClient client; + @Rule public ExpectedException expect = ExpectedException.none(); @Before public void setUp() throws IOException { @@ -271,8 +274,8 @@ public void errorAfterHeartbeatShouldResumeWithTokenTest() { Assert.assertTrue(actualResults.get(0) instanceof Heartbeat); } - // [{ReadChangeStreamResponse.CloseStream}, {UNAVAILABLE}] -> Resume with original request. - @Test + // [{ReadChangeStreamResponse.CloseStream}, {UNAVAILABLE}] -> Request not resumed. + @Test(expected = UnavailableException.class) public void errorAfterSingleCloseStreamShouldResumeWithOriginalRequestTest() { // CloseStream. ReadChangeStreamResponse closeStreamResponse = @@ -282,10 +285,10 @@ public void errorAfterSingleCloseStreamShouldResumeWithOriginalRequestTest() { .expectInitialRequest() .respondWith(closeStreamResponse) .respondWithStatus(Code.UNAVAILABLE)); - // Resume the request with the original request. - // We don't care about the response here so just do expectRequest. - service.expectations.add(RpcExpectation.create().expectInitialRequest()); + // Request is not resumed. We'll get a CloseStream and then an UNAVAILABLE error. List actualResults = getResults(); + expect.expect(UnavailableException.class); + // This is the CloseStream we get before UNAVAILABLE. Assert.assertEquals(actualResults.size(), 1); Assert.assertTrue(actualResults.get(0) instanceof CloseStream); } From a827dddfa3701a2ff18a1253bd963dc7804d32d7 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Wed, 10 Aug 2022 12:04:21 -0400 Subject: [PATCH 3/5] fix: Fix typos --- .../stub/changestream/ReadChangeStreamResumptionStrategy.java | 2 +- .../data/v2/stub/changestream/ReadChangeStreamRetryTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java index 70e0efea8d..5e72496b7d 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java @@ -87,7 +87,7 @@ public ReadChangeStreamRequest getResumeRequest(ReadChangeStreamRequest original Builder builder = originalRequest.toBuilder(); // We need to clear both start_time and continuation_tokens. // And just use the StreamPartition and the token to resume the request. - // The partition should is always the same as the one from the original request, + // The partition should always be the same as the one from the original request, // because otherwise we would have received a CloseStream with different // partitions(which indicates tablet split/merge events). builder.clearStartFrom(); diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java index 3b417ef772..ceb0fedff4 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java @@ -276,7 +276,7 @@ public void errorAfterHeartbeatShouldResumeWithTokenTest() { // [{ReadChangeStreamResponse.CloseStream}, {UNAVAILABLE}] -> Request not resumed. @Test(expected = UnavailableException.class) - public void errorAfterSingleCloseStreamShouldResumeWithOriginalRequestTest() { + public void errorAfterSingleCloseStreamShouldNotResumeTest() { // CloseStream. ReadChangeStreamResponse closeStreamResponse = ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build(); From 89daf770ac58d71568963890d4f02a504c8b7b2a Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Wed, 10 Aug 2022 14:21:51 -0400 Subject: [PATCH 4/5] fix: Update comments --- .../changestream/ReadChangeStreamResumptionStrategy.java | 8 ++++---- .../v2/stub/changestream/ReadChangeStreamRetryTest.java | 3 +++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java index 5e72496b7d..1424221348 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java @@ -57,7 +57,7 @@ public StreamResumptionStrategy cr public ChangeStreamRecordT processResponse(ChangeStreamRecordT response) { // Update the token from a Heartbeat or a ChangeStreamMutation. // If we get a CloseStream, disable resumption and don't re-enable it, since - // the stream is supposed to be closed upon CloseStream. + // the server will return an OK status after sending a CloseStream. if (changeStreamRecordAdapter.isHeartbeat(response)) { this.token = changeStreamRecordAdapter.getTokenFromHeartbeat(response); } @@ -85,10 +85,10 @@ public ReadChangeStreamRequest getResumeRequest(ReadChangeStreamRequest original } Builder builder = originalRequest.toBuilder(); - // We need to clear both start_time and continuation_tokens. - // And just use the StreamPartition and the token to resume the request. + // We need to clear the start_from and use the updated continuation_tokens + // to resume the request. // The partition should always be the same as the one from the original request, - // because otherwise we would have received a CloseStream with different + // otherwise we would receive a CloseStream with different // partitions(which indicates tablet split/merge events). builder.clearStartFrom(); builder.setContinuationTokens( diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java index ceb0fedff4..5bac66e03c 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java @@ -275,6 +275,9 @@ public void errorAfterHeartbeatShouldResumeWithTokenTest() { } // [{ReadChangeStreamResponse.CloseStream}, {UNAVAILABLE}] -> Request not resumed. + // This scenario should be very rare because the server will return an OK status + // right after sending a CloseStream. But in case the server fails after sending a + // CloseStream and before returning an OK, it should not resume the request. @Test(expected = UnavailableException.class) public void errorAfterSingleCloseStreamShouldNotResumeTest() { // CloseStream. From da695c41217398702712db1989770ba96937ecb0 Mon Sep 17 00:00:00 2001 From: Teng Zhong Date: Wed, 10 Aug 2022 14:29:52 -0400 Subject: [PATCH 5/5] fix: Address comments --- .../v2/models/ChangeStreamRecordAdapter.java | 4 --- .../DefaultChangeStreamRecordAdapter.java | 6 ----- .../ReadChangeStreamResumptionStrategy.java | 10 +++----- .../DefaultChangeStreamRecordAdapterTest.java | 17 ------------- .../ReadChangeStreamRetryTest.java | 25 ------------------- 5 files changed, 3 insertions(+), 59 deletions(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java index 6665a0e56a..6e9715a407 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordAdapter.java @@ -36,10 +36,6 @@ public interface ChangeStreamRecordAdapter { @InternalApi("Used in Changestream beam pipeline.") boolean isHeartbeat(ChangeStreamRecordT record); - /** Checks if the given change stream record is a CloseStream. */ - @InternalApi("Used in Changestream beam pipeline.") - boolean isCloseStream(ChangeStreamRecordT record); - /** * Get the token from the given Heartbeat record. If the given record is not a Heartbeat, it will * throw an Exception. diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java index c95618fc69..d8eb632e54 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java @@ -48,12 +48,6 @@ public String getTokenFromHeartbeat(ChangeStreamRecord record) { return ((Heartbeat) record).getChangeStreamContinuationToken().getToken(); } - /** {@inheritDoc} */ - @Override - public boolean isCloseStream(ChangeStreamRecord record) { - return record instanceof CloseStream; - } - /** {@inheritDoc} */ @Override public boolean isChangeStreamMutation(ChangeStreamRecord record) { diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java index 1424221348..a3532180fc 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamResumptionStrategy.java @@ -36,7 +36,6 @@ public class ReadChangeStreamResumptionStrategy implements StreamResumptionStrategy { private final ChangeStreamRecordAdapter changeStreamRecordAdapter; private String token = null; - private boolean canResume = true; public ReadChangeStreamResumptionStrategy( ChangeStreamRecordAdapter changeStreamRecordAdapter) { @@ -45,7 +44,7 @@ public ReadChangeStreamResumptionStrategy( @Override public boolean canResume() { - return canResume; + return true; } @Override @@ -56,14 +55,11 @@ public StreamResumptionStrategy cr @Override public ChangeStreamRecordT processResponse(ChangeStreamRecordT response) { // Update the token from a Heartbeat or a ChangeStreamMutation. - // If we get a CloseStream, disable resumption and don't re-enable it, since - // the server will return an OK status after sending a CloseStream. + // We don't worry about resumption after CloseStream, since the server + // will return an OK status right after sending a CloseStream. if (changeStreamRecordAdapter.isHeartbeat(response)) { this.token = changeStreamRecordAdapter.getTokenFromHeartbeat(response); } - if (changeStreamRecordAdapter.isCloseStream(response)) { - canResume = false; - } if (changeStreamRecordAdapter.isChangeStreamMutation(response)) { this.token = changeStreamRecordAdapter.getTokenFromChangeStreamMutation(response); } diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java index fc280e2e7e..e29b914ffc 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapterTest.java @@ -75,23 +75,6 @@ public void getTokenFromHeartbeatTest() { Assert.assertEquals(adapter.getTokenFromHeartbeat(heartbeatRecord), "heartbeat-token"); } - @Test - public void isCloseStreamTest() { - ChangeStreamRecord heartbeatRecord = - Heartbeat.fromProto(ReadChangeStreamResponse.Heartbeat.getDefaultInstance()); - ChangeStreamRecord closeStreamRecord = - CloseStream.fromProto(ReadChangeStreamResponse.CloseStream.getDefaultInstance()); - ChangeStreamRecord changeStreamMutationRecord = - ChangeStreamMutation.createGcMutation( - ByteString.copyFromUtf8("key"), Timestamp.getDefaultInstance(), 0) - .setToken("token") - .setLowWatermark(Timestamp.getDefaultInstance()) - .build(); - Assert.assertFalse(adapter.isCloseStream(heartbeatRecord)); - Assert.assertTrue(adapter.isCloseStream(closeStreamRecord)); - Assert.assertFalse(adapter.isCloseStream(changeStreamMutationRecord)); - } - @Test(expected = IllegalArgumentException.class) public void getTokenFromHeartbeatInvalidTypeTest() { ChangeStreamRecord closeStreamRecord = diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java index 5bac66e03c..a0defe6375 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/changestream/ReadChangeStreamRetryTest.java @@ -22,7 +22,6 @@ import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.ServerStream; -import com.google.api.gax.rpc.UnavailableException; import com.google.bigtable.v2.BigtableGrpc; import com.google.bigtable.v2.Mutation; import com.google.bigtable.v2.ReadChangeStreamRequest; @@ -59,7 +58,6 @@ import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -78,7 +76,6 @@ public class ReadChangeStreamRetryTest { @Rule public GrpcServerRule serverRule = new GrpcServerRule(); private TestBigtableService service; private BigtableDataClient client; - @Rule public ExpectedException expect = ExpectedException.none(); @Before public void setUp() throws IOException { @@ -274,28 +271,6 @@ public void errorAfterHeartbeatShouldResumeWithTokenTest() { Assert.assertTrue(actualResults.get(0) instanceof Heartbeat); } - // [{ReadChangeStreamResponse.CloseStream}, {UNAVAILABLE}] -> Request not resumed. - // This scenario should be very rare because the server will return an OK status - // right after sending a CloseStream. But in case the server fails after sending a - // CloseStream and before returning an OK, it should not resume the request. - @Test(expected = UnavailableException.class) - public void errorAfterSingleCloseStreamShouldNotResumeTest() { - // CloseStream. - ReadChangeStreamResponse closeStreamResponse = - ReadChangeStreamResponse.newBuilder().setCloseStream(createCloseStream()).build(); - service.expectations.add( - RpcExpectation.create() - .expectInitialRequest() - .respondWith(closeStreamResponse) - .respondWithStatus(Code.UNAVAILABLE)); - // Request is not resumed. We'll get a CloseStream and then an UNAVAILABLE error. - List actualResults = getResults(); - expect.expect(UnavailableException.class); - // This is the CloseStream we get before UNAVAILABLE. - Assert.assertEquals(actualResults.size(), 1); - Assert.assertTrue(actualResults.get(0) instanceof CloseStream); - } - // [{DataChange with done==true}, {UNAVAILABLE}] -> Resume with token from DataChange. @Test public void errorAfterDataChangeWithDoneShouldResumeWithTokenTest() {