diff --git a/google-cloud-bigtable/clirr-ignored-differences.xml b/google-cloud-bigtable/clirr-ignored-differences.xml
index 588327d0de..8a3edd69c0 100644
--- a/google-cloud-bigtable/clirr-ignored-differences.xml
+++ b/google-cloud-bigtable/clirr-ignored-differences.xml
@@ -71,4 +71,9 @@
8001
com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracerUnaryCallable
+
+
+ 8001
+ com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable
+
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ConvertExceptionCallable.java
similarity index 90%
rename from google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable.java
rename to google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ConvertExceptionCallable.java
index 0c58f66441..ed50532fae 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/readrows/ReadRowsConvertExceptionCallable.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/ConvertExceptionCallable.java
@@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.google.cloud.bigtable.data.v2.stub.readrows;
+package com.google.cloud.bigtable.data.v2.stub;
-import com.google.api.core.InternalApi;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InternalException;
@@ -26,14 +25,12 @@
/**
* This callable converts the "Received rst stream" exception into a retryable {@link ApiException}.
*/
-@InternalApi
-public final class ReadRowsConvertExceptionCallable
+final class ConvertExceptionCallable
extends ServerStreamingCallable {
private final ServerStreamingCallable innerCallable;
- public ReadRowsConvertExceptionCallable(
- ServerStreamingCallable innerCallable) {
+ public ConvertExceptionCallable(ServerStreamingCallable innerCallable) {
this.innerCallable = innerCallable;
}
diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
index 1550127e23..301ecd66b5 100644
--- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
+++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java
@@ -86,7 +86,6 @@
import com.google.cloud.bigtable.data.v2.stub.mutaterows.MutateRowsRetryingCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.FilterMarkerRowsCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsBatchingDescriptor;
-import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsConvertExceptionCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsResumptionStrategy;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsRetryCompletedCallable;
import com.google.cloud.bigtable.data.v2.stub.readrows.ReadRowsUserCallable;
@@ -414,7 +413,7 @@ public Map extract(ReadRowsRequest readRowsRequest) {
// should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
// which by default is not retryable. Convert the exception so it can be retried in the client.
ServerStreamingCallable convertException =
- new ReadRowsConvertExceptionCallable<>(withStatsHeaders);
+ new ConvertExceptionCallable<>(withStatsHeaders);
ServerStreamingCallable merging =
new RowMergingCallable<>(convertException, rowAdapter);
@@ -704,6 +703,13 @@ public Map extract(MutateRowsRequest mutateRowsRequest) {
ServerStreamingCallable withStatsHeaders =
new StatsHeadersServerStreamingCallable<>(base);
+ // Sometimes MutateRows connections are disconnected via an RST frame. This error is transient
+ // and
+ // should be treated similar to UNAVAILABLE. However, this exception has an INTERNAL error code
+ // which by default is not retryable. Convert the exception so it can be retried in the client.
+ ServerStreamingCallable convertException =
+ new ConvertExceptionCallable<>(withStatsHeaders);
+
RetryAlgorithm retryAlgorithm =
new RetryAlgorithm<>(
new ApiResultRetryAlgorithm(),
@@ -714,7 +720,7 @@ public Map extract(MutateRowsRequest mutateRowsRequest) {
return new MutateRowsRetryingCallable(
clientContext.getDefaultCallContext(),
- withStatsHeaders,
+ convertException,
retryingExecutor,
settings.bulkMutateRowsSettings().getRetryableCodes());
}
diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java
new file mode 100644
index 0000000000..19ab6413a0
--- /dev/null
+++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/mutaterows/MutateRowsRetryTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2022 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.data.v2.stub.mutaterows;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import com.google.api.gax.rpc.InternalException;
+import com.google.bigtable.v2.BigtableGrpc;
+import com.google.bigtable.v2.MutateRowsRequest;
+import com.google.bigtable.v2.MutateRowsResponse;
+import com.google.cloud.bigtable.data.v2.BigtableDataClient;
+import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
+import com.google.cloud.bigtable.data.v2.models.BulkMutation;
+import com.google.cloud.bigtable.data.v2.models.RowMutationEntry;
+import com.google.common.collect.Queues;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import io.grpc.testing.GrpcServerRule;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class MutateRowsRetryTest {
+
+ @Rule public GrpcServerRule serverRule = new GrpcServerRule();
+
+ private FakeBigtableService service;
+ private BigtableDataClient client;
+
+ private AtomicInteger attemptCounter = new AtomicInteger();
+
+ @Before
+ public void setUp() throws IOException {
+ service = new FakeBigtableService();
+ serverRule.getServiceRegistry().addService(service);
+
+ BigtableDataSettings.Builder settings =
+ BigtableDataSettings.newBuilder()
+ .setProjectId("fake-project")
+ .setInstanceId("fake-instance")
+ .setCredentialsProvider(NoCredentialsProvider.create());
+
+ settings
+ .stubSettings()
+ .setTransportChannelProvider(
+ FixedTransportChannelProvider.create(
+ GrpcTransportChannel.create(serverRule.getChannel())))
+ .build();
+
+ this.client = BigtableDataClient.create(settings.build());
+ }
+
+ @Test
+ public void testRetryRstStream() {
+ ApiException exception =
+ new InternalException(
+ new StatusRuntimeException(
+ Status.INTERNAL.withDescription(
+ "INTERNAL: HTTP/2 error code: INTERNAL_ERROR\nReceived Rst Stream")),
+ GrpcStatusCode.of(Status.Code.INTERNAL),
+ false);
+
+ service.expectations.add(exception);
+
+ try {
+ client.bulkMutateRows(
+ BulkMutation.create("fake-table")
+ .add(RowMutationEntry.create("row-key-1").setCell("cf", "q", "v")));
+ } catch (ApiException e) {
+ Assert.fail("Rst stream errors should be retried");
+ }
+
+ Assert.assertEquals(attemptCounter.get(), 2);
+ }
+
+ private class FakeBigtableService extends BigtableGrpc.BigtableImplBase {
+ Queue expectations = Queues.newArrayDeque();
+
+ @Override
+ public void mutateRows(
+ MutateRowsRequest request, StreamObserver responseObserver) {
+ attemptCounter.incrementAndGet();
+ if (expectations.isEmpty()) {
+ responseObserver.onNext(MutateRowsResponse.getDefaultInstance());
+ responseObserver.onCompleted();
+ } else {
+ Exception expectedRpc = expectations.poll();
+ responseObserver.onError(expectedRpc);
+ }
+ }
+ }
+}