From ec035598068ad739b6f88d4bda7fe93b16ece2d7 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Sat, 2 Nov 2024 14:04:58 -0400 Subject: [PATCH 01/16] chore: remodel unary callables as server streaming callables with an adapter at the end Change-Id: I8708dff0e192d7647ef2cb361fc0992e1ddd2b24 --- .../stub/BigtableUnaryOperationCallable.java | 183 ++++++++++++++++++ .../data/v2/stub/EnhancedBigtableStub.java | 87 ++++++++- .../v2/stub/EnhancedBigtableStubSettings.java | 6 + .../TransformingServerStreamingCallable.java | 72 +++++++ 4 files changed, 338 insertions(+), 10 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/TransformingServerStreamingCallable.java diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java new file mode 100644 index 0000000000..202498987d --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableUnaryOperationCallable.java @@ -0,0 +1,183 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import com.google.api.core.AbstractApiFuture; +import com.google.api.core.ApiFuture; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.InternalException; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; +import com.google.api.gax.rpc.StreamController; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.api.gax.tracing.ApiTracer; +import com.google.api.gax.tracing.ApiTracerFactory; +import com.google.api.gax.tracing.SpanName; +import com.google.common.base.Preconditions; +import io.grpc.Status; +import java.util.logging.Level; +import java.util.logging.Logger; +import javax.annotation.Nullable; + +/** + * Helper to convert a fake {@link ServerStreamingCallable} (ie only up to 1 response) into a {@link + * UnaryCallable}. It is intended to be the outermost callable of a chain. + * + *

Responsibilities: + * + *

*/ public UnaryCallable createReadRowCallable(RowAdapter rowAdapter) { - ServerStreamingCallable readRowsCallable = - createReadRowsBaseCallable( - ServerStreamingCallSettings.newBuilder() - .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) - .setRetrySettings(settings.readRowSettings().getRetrySettings()) - .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) - .build(), - rowAdapter); - if (!EnhancedBigtableStubSettings.SKIP_TRAILERS) { + ServerStreamingCallable readRowsCallable = + createReadRowsBaseCallable( + ServerStreamingCallSettings.newBuilder() + .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowSettings().getRetrySettings()) + .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) + .build(), + rowAdapter); + ReadRowsUserCallable readRowCallable = new ReadRowsUserCallable<>(readRowsCallable, requestContext); ReadRowsFirstCallable firstRow = new ReadRowsFirstCallable<>(readRowCallable); @@ -580,6 +581,15 @@ public UnaryCallable createReadRowCallable(RowAdapter firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow")); return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); } else { + ServerStreamingCallable readRowsCallable = + createReadRowsBaseCallable( + ServerStreamingCallSettings.newBuilder() + .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) + .setRetrySettings(settings.readRowSettings().getRetrySettings()) + .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) + .build(), + rowAdapter, + new SimpleStreamResumptionStrategy<>()); ServerStreamingCallable readRowCallable = new TransformingServerStreamingCallable<>( readRowsCallable, @@ -595,6 +605,11 @@ public UnaryCallable createReadRowCallable(RowAdapter } } + private ServerStreamingCallable createReadRowsBaseCallable( + ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { + return createReadRowsBaseCallable( + readRowsSettings, rowAdapter, new ReadRowsResumptionStrategy(rowAdapter)); + } /** * Creates a callable chain to handle ReadRows RPCs. The chain will: * @@ -611,8 +626,9 @@ public UnaryCallable createReadRowCallable(RowAdapter *

NOTE: the caller is responsible for adding tracing & metrics. */ private ServerStreamingCallable createReadRowsBaseCallable( - ServerStreamingCallSettings readRowsSettings, RowAdapter rowAdapter) { - + ServerStreamingCallSettings readRowsSettings, + RowAdapter rowAdapter, + StreamResumptionStrategy resumptionStrategy) { ServerStreamingCallable base = GrpcRawCallableFactory.createServerStreamingCallable( GrpcCallSettings.newBuilder() @@ -653,7 +669,7 @@ public Map extract(ReadRowsRequest readRowsRequest) { // ReadRowsRequest -> ReadRowsResponse callable). ServerStreamingCallSettings innerSettings = ServerStreamingCallSettings.newBuilder() - .setResumptionStrategy(new ReadRowsResumptionStrategy<>(rowAdapter)) + .setResumptionStrategy(resumptionStrategy) .setRetryableCodes(readRowsSettings.getRetryableCodes()) .setRetrySettings(readRowsSettings.getRetrySettings()) .setIdleTimeout(readRowsSettings.getIdleTimeout()) @@ -1344,7 +1360,7 @@ private UnaryCallable createUnar return createUnaryCallableNew( methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer); } else { - return createUnaryCallableNew( + return createUnaryCallableOld( methodDescriptor, headerParamsFn, callSettings, requestTransformer, responseTranformer); } } From 0c2bc8414b478af4eef6650548d13cc13d5c2245 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Tue, 5 Nov 2024 11:55:36 -0500 Subject: [PATCH 15/16] typo Change-Id: I8202e935975e1a55606265c502fe7573b8a4acb0 --- .../cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 4d26c3329d..580f0a1ae9 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -1426,7 +1426,7 @@ private UnaryCallable createUnar base = new BigtableTracerStreamingCallable<>(base); - base = withRetries(base, convertUnaryToServerStreamingSettings(settings.mutateRowSettings())); + base = withRetries(base, convertUnaryToServerStreamingSettings(callSettings)); ServerStreamingCallable transformed = new TransformingServerStreamingCallable<>(base, requestTransformer, responseTranformer); From 066ae08e82282af308ba2ebaebd50f01fb3844eb Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Tue, 5 Nov 2024 12:02:51 -0500 Subject: [PATCH 16/16] disable watchdog for the new ReadRow callable chain Change-Id: I4522719a65f24d27fb9dccde031c3b1cc04042c2 --- .../cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 580f0a1ae9..a1100231fa 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -586,7 +586,8 @@ public UnaryCallable createReadRowCallable(RowAdapter ServerStreamingCallSettings.newBuilder() .setRetryableCodes(settings.readRowSettings().getRetryableCodes()) .setRetrySettings(settings.readRowSettings().getRetrySettings()) - .setIdleTimeout(settings.readRowSettings().getRetrySettings().getTotalTimeout()) + .setIdleTimeoutDuration(Duration.ZERO) + .setWaitTimeoutDuration(Duration.ZERO) .build(), rowAdapter, new SimpleStreamResumptionStrategy<>());