From 4c73abd2f4a07808b591dd9178e87715d2f3008d Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Mon, 27 Nov 2023 10:15:12 -0500 Subject: [PATCH] feat: client sends routing cookie back to server (#1888) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: client sends retry cookie back to server * udpate to use trailer instead of error info * updating the header name * address some comments * udpate * update tests and handling of retry cookie * address comments * address comments * add cookie to readChangeStream * also check headers and add a test * simplify code * clean up test * clean up test * update dependency * test * move MetadataSubject to a separate file * add the file * add license * address comments * close client * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot --- README.md | 2 +- .../bigtable/data/v2/stub/CookiesHolder.java | 70 +++ .../data/v2/stub/CookiesInterceptor.java | 96 +++ .../stub/CookiesServerStreamingCallable.java | 48 ++ .../data/v2/stub/CookiesUnaryCallable.java | 44 ++ .../data/v2/stub/EnhancedBigtableStub.java | 64 +- .../bigtable/data/v2/MetadataSubject.java | 60 ++ .../data/v2/stub/CookiesHolderTest.java | 561 ++++++++++++++++++ 8 files changed, 924 insertions(+), 21 deletions(-) create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesInterceptor.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesServerStreamingCallable.java create mode 100644 google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesUnaryCallable.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/MetadataSubject.java create mode 100644 google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java diff --git a/README.md b/README.md index 736624d00a..5caf555e94 100644 --- a/README.md +++ b/README.md @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies: If you are using Gradle 5.x or later, add this to your dependencies: ```Groovy -implementation platform('com.google.cloud:libraries-bom:26.26.0') +implementation platform('com.google.cloud:libraries-bom:26.27.0') implementation 'com.google.cloud:google-cloud-bigtable' ``` diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java new file mode 100644 index 0000000000..7d7ca6a029 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolder.java @@ -0,0 +1,70 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import io.grpc.CallOptions; +import io.grpc.Metadata; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; + +/** A cookie that holds information for retry or routing */ +class CookiesHolder { + + static final CallOptions.Key COOKIES_HOLDER_KEY = + CallOptions.Key.create("bigtable-cookies"); + + /** Routing cookie key prefix. */ + static final String COOKIE_KEY_PREFIX = "x-goog-cbt-cookie"; + + /** A map that stores all the routing cookies. */ + private final Map, String> cookies = new HashMap<>(); + + /** Returns CookiesHolder if presents in CallOptions, otherwise returns null. */ + @Nullable + static CookiesHolder fromCallOptions(CallOptions options) { + // CookiesHolder should be added by CookiesServerStreamingCallable and + // CookiesUnaryCallable for most methods. However, methods like PingAndWarm + // doesn't support routing cookie, in which case this will return null. + return options.getOption(COOKIES_HOLDER_KEY); + } + + /** Add all the routing cookies to headers if any. */ + Metadata injectCookiesInRequestHeaders(Metadata headers) { + for (Metadata.Key key : cookies.keySet()) { + headers.put(key, cookies.get(key)); + } + return headers; + } + + /** + * Iterate through all the keys in initial or trailing metadata, and add all the keys that match + * COOKIE_KEY_PREFIX to cookies. Values in trailers will override the value set in initial + * metadata for the same keys. + */ + void extractCookiesFromMetadata(@Nullable Metadata trailers) { + if (trailers == null) { + return; + } + for (String key : trailers.keys()) { + if (key.startsWith(COOKIE_KEY_PREFIX)) { + Metadata.Key metadataKey = Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER); + String value = trailers.get(metadataKey); + cookies.put(metadataKey, value); + } + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesInterceptor.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesInterceptor.java new file mode 100644 index 0000000000..77387851fa --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesInterceptor.java @@ -0,0 +1,96 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; +import io.grpc.ForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A cookie interceptor that checks the cookie value from returned trailer, updates the cookie + * holder, and inject it in the header of the next request. + */ +class CookiesInterceptor implements ClientInterceptor { + + private static final Logger LOG = Logger.getLogger(CookiesInterceptor.class.getName()); + + @Override + public ClientCall interceptCall( + MethodDescriptor methodDescriptor, CallOptions callOptions, Channel channel) { + return new ForwardingClientCall.SimpleForwardingClientCall( + channel.newCall(methodDescriptor, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + // Gets the CookiesHolder added from CookiesServerStreamingCallable and + // CookiesUnaryCallable. + // Add CookiesHolder content to request headers if there's any. + try { + CookiesHolder cookie = CookiesHolder.fromCallOptions(callOptions); + if (cookie != null) { + headers = cookie.injectCookiesInRequestHeaders(headers); + responseListener = new UpdateCookieListener<>(responseListener, cookie); + } + } catch (Throwable e) { + LOG.warning("Failed to inject cookie to request headers: " + e); + } finally { + super.start(responseListener, headers); + } + } + }; + } + + /** Add headers and trailers to CookiesHolder if there's any. * */ + static class UpdateCookieListener + extends ForwardingClientCallListener.SimpleForwardingClientCallListener { + + private final CookiesHolder cookie; + + UpdateCookieListener(ClientCall.Listener delegate, CookiesHolder cookiesHolder) { + super(delegate); + this.cookie = cookiesHolder; + } + + @Override + public void onHeaders(Metadata headers) { + try { + cookie.extractCookiesFromMetadata(headers); + } catch (Throwable e) { + LOG.log(Level.WARNING, "Failed to extract cookie from response headers.", e); + } finally { + super.onHeaders(headers); + } + } + + @Override + public void onClose(Status status, Metadata trailers) { + try { + cookie.extractCookiesFromMetadata(trailers); + } catch (Throwable e) { + LOG.log(Level.WARNING, "Failed to extract cookie from response trailers.", e); + } finally { + super.onClose(status, trailers); + } + } + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesServerStreamingCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesServerStreamingCallable.java new file mode 100644 index 0000000000..0d012b8ea0 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesServerStreamingCallable.java @@ -0,0 +1,48 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.COOKIES_HOLDER_KEY; + +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.ResponseObserver; +import com.google.api.gax.rpc.ServerStreamingCallable; + +/** + * The cookie holder will act as operation scoped storage for all retry attempts. Each attempt's + * cookies will be merged into the value holder and will be sent out with the next retry attempt. + */ +class CookiesServerStreamingCallable + extends ServerStreamingCallable { + + private final ServerStreamingCallable callable; + + CookiesServerStreamingCallable(ServerStreamingCallable innerCallable) { + this.callable = innerCallable; + } + + @Override + public void call( + RequestT request, ResponseObserver responseObserver, ApiCallContext context) { + GrpcCallContext grpcCallContext = (GrpcCallContext) context; + callable.call( + request, + responseObserver, + grpcCallContext.withCallOptions( + grpcCallContext.getCallOptions().withOption(COOKIES_HOLDER_KEY, new CookiesHolder()))); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesUnaryCallable.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesUnaryCallable.java new file mode 100644 index 0000000000..b0d42d5955 --- /dev/null +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/CookiesUnaryCallable.java @@ -0,0 +1,44 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.COOKIES_HOLDER_KEY; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.rpc.ApiCallContext; +import com.google.api.gax.rpc.UnaryCallable; + +/** + * The cookie holder will act as operation scoped storage for all retry attempts. Each attempt's + * cookies will be merged into the value holder and will be sent out with the next retry attempt. + */ +class CookiesUnaryCallable extends UnaryCallable { + private final UnaryCallable innerCallable; + + CookiesUnaryCallable(UnaryCallable callable) { + this.innerCallable = callable; + } + + @Override + public ApiFuture futureCall(RequestT request, ApiCallContext context) { + GrpcCallContext grpcCallContext = (GrpcCallContext) context; + return innerCallable.futureCall( + request, + grpcCallContext.withCallOptions( + grpcCallContext.getCallOptions().withOption(COOKIES_HOLDER_KEY, new CookiesHolder()))); + } +} diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java index 0420e47dcf..705b3027ed 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java @@ -185,6 +185,14 @@ public static EnhancedBigtableStubSettings finalizeSettings( // workaround JWT audience issues patchCredentials(builder); + // patch cookies interceptor + InstantiatingGrpcChannelProvider.Builder transportProvider = null; + if (builder.getTransportChannelProvider() instanceof InstantiatingGrpcChannelProvider) { + transportProvider = + ((InstantiatingGrpcChannelProvider) builder.getTransportChannelProvider()).toBuilder(); + transportProvider.setInterceptorProvider(() -> ImmutableList.of(new CookiesInterceptor())); + } + // Inject channel priming if (settings.isRefreshingChannel()) { // Fix the credentials so that they can be shared @@ -194,20 +202,18 @@ public static EnhancedBigtableStubSettings finalizeSettings( } builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials)); - // Inject the primer - InstantiatingGrpcChannelProvider transportProvider = - (InstantiatingGrpcChannelProvider) settings.getTransportChannelProvider(); - - builder.setTransportChannelProvider( - transportProvider - .toBuilder() - .setChannelPrimer( - BigtableChannelPrimer.create( - credentials, - settings.getProjectId(), - settings.getInstanceId(), - settings.getAppProfileId())) - .build()); + if (transportProvider != null) { + transportProvider.setChannelPrimer( + BigtableChannelPrimer.create( + credentials, + settings.getProjectId(), + settings.getInstanceId(), + settings.getAppProfileId())); + } + } + + if (transportProvider != null) { + builder.setTransportChannelProvider(transportProvider.build()); } ImmutableMap attributes = @@ -365,7 +371,11 @@ public ServerStreamingCallable createReadRowsCallable( new TracedServerStreamingCallable<>( readRowsUserCallable, clientContext.getTracerFactory(), span); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + // CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry + // attempts won't see a CookieHolder. + ServerStreamingCallable withCookie = new CookiesServerStreamingCallable<>(traced); + + return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -401,7 +411,9 @@ public UnaryCallable createReadRowCallable(RowAdapter new TracedUnaryCallable<>( firstRow, clientContext.getTracerFactory(), getSpanName("ReadRow")); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + UnaryCallable withCookie = new CookiesUnaryCallable<>(traced); + + return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -642,7 +654,9 @@ private UnaryCallable createBulkMutateRowsCallable() { new TracedUnaryCallable<>( tracedBatcherUnaryCallable, clientContext.getTracerFactory(), spanName); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + UnaryCallable withCookie = new CookiesUnaryCallable<>(traced); + + return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -924,7 +938,10 @@ public Map extract( ServerStreamingCallable traced = new TracedServerStreamingCallable<>(retrying, clientContext.getTracerFactory(), span); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + ServerStreamingCallable withCookie = + new CookiesServerStreamingCallable<>(traced); + + return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -1004,7 +1021,10 @@ public Map extract( new TracedServerStreamingCallable<>( readChangeStreamUserCallable, clientContext.getTracerFactory(), span); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + ServerStreamingCallable withCookie = + new CookiesServerStreamingCallable<>(traced); + + return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); } /** @@ -1017,7 +1037,11 @@ private UnaryCallable createUserFacin UnaryCallable traced = new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName)); - return traced.withDefaultCallContext(clientContext.getDefaultCallContext()); + // CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry + // attempts won't see a CookieHolder. + UnaryCallable withCookie = new CookiesUnaryCallable<>(traced); + + return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext()); } private UnaryCallable createPingAndWarmCallable() { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/MetadataSubject.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/MetadataSubject.java new file mode 100644 index 0000000000..5e76ce38cd --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/MetadataSubject.java @@ -0,0 +1,60 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2; + +import static com.google.common.truth.Truth.assertAbout; + +import com.google.common.truth.FailureMetadata; +import com.google.common.truth.Subject; +import io.grpc.Metadata; +import javax.annotation.Nullable; + +/** Utility class to test key-value pairs in {@link io.grpc.Metadata}. */ +public final class MetadataSubject extends Subject { + + @Nullable private final Metadata actual; + + public static Factory metadata() { + return MetadataSubject::new; + } + + private MetadataSubject(FailureMetadata metadata, @Nullable Metadata actual) { + super(metadata, actual); + this.actual = actual; + } + + public static MetadataSubject assertThat(@Nullable Metadata actual) { + return assertAbout(metadata()).that(actual); + } + + public void containsAtLeast(String... keyValuePairs) { + assert actual != null; + for (int i = 0; i < keyValuePairs.length; i += 2) { + check("containsAtLeast()") + .that(actual.get(Metadata.Key.of(keyValuePairs[i], Metadata.ASCII_STRING_MARSHALLER))) + .isEqualTo(keyValuePairs[i + 1]); + } + } + + public void doesNotContainKeys(String... keys) { + assert actual != null; + for (String key : keys) { + check("doesNotContainKeys()") + .that(actual.containsKey(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER))) + .isFalse(); + } + } +} diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java new file mode 100644 index 0000000000..5dac053523 --- /dev/null +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/stub/CookiesHolderTest.java @@ -0,0 +1,561 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigtable.data.v2.stub; + +import static com.google.cloud.bigtable.data.v2.MetadataSubject.assertThat; +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; +import com.google.bigtable.v2.BigtableGrpc; +import com.google.bigtable.v2.CheckAndMutateRowRequest; +import com.google.bigtable.v2.CheckAndMutateRowResponse; +import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsRequest; +import com.google.bigtable.v2.GenerateInitialChangeStreamPartitionsResponse; +import com.google.bigtable.v2.MutateRowRequest; +import com.google.bigtable.v2.MutateRowResponse; +import com.google.bigtable.v2.MutateRowsRequest; +import com.google.bigtable.v2.MutateRowsResponse; +import com.google.bigtable.v2.ReadChangeStreamRequest; +import com.google.bigtable.v2.ReadChangeStreamResponse; +import com.google.bigtable.v2.ReadModifyWriteRowRequest; +import com.google.bigtable.v2.ReadModifyWriteRowResponse; +import com.google.bigtable.v2.ReadRowsRequest; +import com.google.bigtable.v2.ReadRowsResponse; +import com.google.bigtable.v2.SampleRowKeysRequest; +import com.google.bigtable.v2.SampleRowKeysResponse; +import com.google.cloud.bigtable.data.v2.BigtableDataClient; +import com.google.cloud.bigtable.data.v2.BigtableDataSettings; +import com.google.cloud.bigtable.data.v2.FakeServiceBuilder; +import com.google.cloud.bigtable.data.v2.models.BulkMutation; +import com.google.cloud.bigtable.data.v2.models.ConditionalRowMutation; +import com.google.cloud.bigtable.data.v2.models.Mutation; +import com.google.cloud.bigtable.data.v2.models.Query; +import com.google.cloud.bigtable.data.v2.models.ReadChangeStreamQuery; +import com.google.cloud.bigtable.data.v2.models.ReadModifyWriteRow; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.bigtable.data.v2.models.RowMutationEntry; +import io.grpc.ForwardingServerCall; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Server; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class CookiesHolderTest { + private static final Metadata.Key ROUTING_COOKIE_1 = + Metadata.Key.of("x-goog-cbt-cookie-routing", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key ROUTING_COOKIE_2 = + Metadata.Key.of("x-goog-cbt-cookie-random", Metadata.ASCII_STRING_MARSHALLER); + private static final Metadata.Key BAD_KEY = + Metadata.Key.of("x-goog-cbt-not-cookie", Metadata.ASCII_STRING_MARSHALLER); + private static final String testCookie = "test-routing-cookie"; + + private Server server; + private final FakeService fakeService = new FakeService(); + private BigtableDataClient client; + private final List serverMetadata = new ArrayList<>(); + + private final Set methods = new HashSet<>(); + + @Before + public void setup() throws Exception { + ServerInterceptor serverInterceptor = + new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + serverMetadata.add(metadata); + if (metadata.containsKey(ROUTING_COOKIE_1)) { + methods.add(serverCall.getMethodDescriptor().getBareMethodName()); + } + return serverCallHandler.startCall(serverCall, metadata); + } + }; + + server = FakeServiceBuilder.create(fakeService).intercept(serverInterceptor).start(); + + BigtableDataSettings.Builder settings = + BigtableDataSettings.newBuilderForEmulator(server.getPort()) + .setProjectId("fake-project") + .setInstanceId("fake-instance"); + + // Override CheckAndMutate and ReadModifyWrite retry settings here. These operations + // are currently not retryable but this could change in the future after we + // have routing cookie sends back meaningful information and changes how retry works. + // Routing cookie still needs to be respected and handled by the callables. + settings + .stubSettings() + .checkAndMutateRowSettings() + .setRetrySettings( + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(10)) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .setMaxAttempts(2) + .build()) + .setRetryableCodes(StatusCode.Code.UNAVAILABLE); + + settings + .stubSettings() + .readModifyWriteRowSettings() + .setRetrySettings( + RetrySettings.newBuilder() + .setInitialRetryDelay(Duration.ofMillis(10)) + .setMaxRetryDelay(Duration.ofMinutes(1)) + .setMaxAttempts(2) + .build()) + .setRetryableCodes(StatusCode.Code.UNAVAILABLE); + + client = BigtableDataClient.create(settings.build()); + } + + @After + public void tearDown() throws Exception { + if (client != null) { + client.close(); + } + if (server != null) { + server.shutdown(); + } + } + + @Test + public void testReadRows() { + client.readRows(Query.create("fake-table")).iterator().hasNext(); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata) + .containsAtLeast(ROUTING_COOKIE_1.name(), "readRows", ROUTING_COOKIE_2.name(), testCookie); + assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); + + serverMetadata.clear(); + } + + @Test + public void testReadRow() { + client.readRow("fake-table", "key"); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata) + .containsAtLeast(ROUTING_COOKIE_1.name(), "readRows", ROUTING_COOKIE_2.name(), testCookie); + assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); + + serverMetadata.clear(); + } + + @Test + public void testMutateRows() { + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("key").setCell("cf", "q", "v"))); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata) + .containsAtLeast( + ROUTING_COOKIE_1.name(), "mutateRows", ROUTING_COOKIE_2.name(), testCookie); + assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); + + serverMetadata.clear(); + } + + @Test + public void testMutateRow() { + client.mutateRow(RowMutation.create("table", "key").setCell("cf", "q", "v")); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata) + .containsAtLeast(ROUTING_COOKIE_1.name(), "mutateRow", ROUTING_COOKIE_2.name(), testCookie); + assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); + + serverMetadata.clear(); + } + + @Test + public void testSampleRowKeys() { + + client.sampleRowKeys("fake-table"); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata) + .containsAtLeast( + ROUTING_COOKIE_1.name(), "sampleRowKeys", ROUTING_COOKIE_2.name(), testCookie); + assertThat(lastMetadata).doesNotContainKeys(BAD_KEY.name()); + + serverMetadata.clear(); + } + + @Test + public void testNoCookieSucceedReadRows() { + fakeService.returnCookie = false; + + client.readRows(Query.create("fake-table")).iterator().hasNext(); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata).doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name()); + + serverMetadata.clear(); + } + + @Test + public void testNoCookieSucceedReadRow() { + fakeService.returnCookie = false; + + client.readRow("fake-table", "key"); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata) + .doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name()); + + serverMetadata.clear(); + } + + @Test + public void testNoCookieSucceedMutateRows() { + fakeService.returnCookie = false; + + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("key").setCell("cf", "q", "v"))); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata) + .doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name()); + + serverMetadata.clear(); + } + + @Test + public void testNoCookieSucceedMutateRow() { + fakeService.returnCookie = false; + + client.mutateRow(RowMutation.create("fake-table", "key").setCell("cf", "q", "v")); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata) + .doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name()); + + serverMetadata.clear(); + } + + @Test + public void testNoCookieSucceedSampleRowKeys() { + fakeService.returnCookie = false; + + client.sampleRowKeys("fake-table"); + + assertThat(fakeService.count.get()).isGreaterThan(1); + assertThat(serverMetadata).hasSize(fakeService.count.get()); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata) + .doesNotContainKeys(ROUTING_COOKIE_1.name(), ROUTING_COOKIE_2.name(), BAD_KEY.name()); + + serverMetadata.clear(); + } + + @Test + public void testCookiesInHeaders() throws Exception { + // Send 2 cookies in the headers, with routingCookieKey and ROUTING_COOKIE_2. ROUTING_COOKIE_2 + // is also sent in the trailers so the value should be overridden. + final Metadata.Key routingCookieKey = + Metadata.Key.of("x-goog-cbt-cookie-no-override", Metadata.ASCII_STRING_MARSHALLER); + final String routingCookieValue = "no-override"; + ServerInterceptor serverInterceptor = + new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall serverCall, + Metadata metadata, + ServerCallHandler serverCallHandler) { + serverMetadata.add(metadata); + + metadata.put(routingCookieKey, routingCookieValue); + return serverCallHandler.startCall( + new ForwardingServerCall.SimpleForwardingServerCall(serverCall) { + @Override + public void sendHeaders(Metadata headers) { + headers.put(routingCookieKey, routingCookieValue); + headers.put(ROUTING_COOKIE_2, "will-be-overridden"); + super.sendHeaders(headers); + } + }, + metadata); + } + }; + + Server newServer = null; + try { + newServer = FakeServiceBuilder.create(fakeService).intercept(serverInterceptor).start(); + + BigtableDataSettings.Builder settings = + BigtableDataSettings.newBuilderForEmulator(newServer.getPort()) + .setProjectId("fake-project") + .setInstanceId("fake-instance"); + + try (BigtableDataClient client = BigtableDataClient.create(settings.build())) { + + client.readRows(Query.create("table")).iterator().hasNext(); + + Metadata lastMetadata = serverMetadata.get(fakeService.count.get() - 1); + + assertThat(lastMetadata) + .containsAtLeast( + ROUTING_COOKIE_2.name(), testCookie, routingCookieKey.name(), routingCookieValue); + } + } finally { + if (newServer != null) { + newServer.shutdown(); + } + } + } + + @Test + public void testAllMethodsAreCalled() throws InterruptedException { + // This test ensures that all methods respect the retry cookie except for the ones that are + // explicitly added to the methods list. It requires that any newly method is exercised in this + // test. This is enforced by introspecting grpc method descriptors. + client.readRows(Query.create("fake-table")).iterator().hasNext(); + + fakeService.count.set(0); + client.mutateRow(RowMutation.create("fake-table", "key").setCell("cf", "q", "v")); + + fakeService.count.set(0); + client.bulkMutateRows( + BulkMutation.create("fake-table") + .add(RowMutationEntry.create("key").setCell("cf", "q", "v"))); + + fakeService.count.set(0); + client.sampleRowKeys("fake-table"); + + fakeService.count.set(0); + client.checkAndMutateRow( + ConditionalRowMutation.create("fake-table", "key") + .then(Mutation.create().setCell("cf", "q", "v"))); + + fakeService.count.set(0); + client.readModifyWriteRow( + ReadModifyWriteRow.create("fake-table", "key").append("cf", "q", "v")); + + fakeService.count.set(0); + client.generateInitialChangeStreamPartitions("fake-table").iterator().hasNext(); + + fakeService.count.set(0); + client.readChangeStream(ReadChangeStreamQuery.create("fake-table")).iterator().hasNext(); + + Set expected = + BigtableGrpc.getServiceDescriptor().getMethods().stream() + .map(MethodDescriptor::getBareMethodName) + .collect(Collectors.toSet()); + + // Exclude methods that are not supported by routing cookie + methods.add("PingAndWarm"); + + assertThat(methods).containsExactlyElementsIn(expected); + } + + static class FakeService extends BigtableGrpc.BigtableImplBase { + + private boolean returnCookie = true; + private final AtomicInteger count = new AtomicInteger(); + + @Override + public void readRows( + ReadRowsRequest request, StreamObserver responseObserver) { + if (count.getAndIncrement() < 1) { + Metadata trailers = new Metadata(); + maybePopulateCookie(trailers, "readRows"); + responseObserver.onNext(ReadRowsResponse.getDefaultInstance()); + StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + responseObserver.onError(exception); + return; + } + responseObserver.onNext(ReadRowsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void mutateRow( + MutateRowRequest request, StreamObserver responseObserver) { + if (count.getAndIncrement() < 1) { + Metadata trailers = new Metadata(); + maybePopulateCookie(trailers, "mutateRow"); + StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + responseObserver.onError(exception); + return; + } + responseObserver.onNext(MutateRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void mutateRows( + MutateRowsRequest request, StreamObserver responseObserver) { + if (count.getAndIncrement() < 1) { + Metadata trailers = new Metadata(); + maybePopulateCookie(trailers, "mutateRows"); + StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + responseObserver.onError(exception); + return; + } + responseObserver.onNext( + MutateRowsResponse.newBuilder() + .addEntries(MutateRowsResponse.Entry.getDefaultInstance()) + .build()); + responseObserver.onCompleted(); + } + + @Override + public void sampleRowKeys( + SampleRowKeysRequest request, StreamObserver responseObserver) { + if (count.getAndIncrement() < 1) { + Metadata trailers = new Metadata(); + maybePopulateCookie(trailers, "sampleRowKeys"); + StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + responseObserver.onError(exception); + return; + } + responseObserver.onNext(SampleRowKeysResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void checkAndMutateRow( + CheckAndMutateRowRequest request, + StreamObserver responseObserver) { + if (count.getAndIncrement() < 1) { + Metadata trailers = new Metadata(); + maybePopulateCookie(trailers, "checkAndMutate"); + StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + responseObserver.onError(exception); + return; + } + responseObserver.onNext(CheckAndMutateRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void readModifyWriteRow( + ReadModifyWriteRowRequest request, + StreamObserver responseObserver) { + if (count.getAndIncrement() < 1) { + Metadata trailers = new Metadata(); + maybePopulateCookie(trailers, "readModifyWrite"); + StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + responseObserver.onError(exception); + return; + } + responseObserver.onNext(ReadModifyWriteRowResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void readChangeStream( + ReadChangeStreamRequest request, + StreamObserver responseObserver) { + if (count.getAndIncrement() < 1) { + Metadata trailers = new Metadata(); + maybePopulateCookie(trailers, "readChangeStream"); + StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + responseObserver.onError(exception); + return; + } + responseObserver.onNext( + ReadChangeStreamResponse.newBuilder() + .setCloseStream(ReadChangeStreamResponse.CloseStream.getDefaultInstance()) + .build()); + responseObserver.onCompleted(); + } + + @Override + public void generateInitialChangeStreamPartitions( + GenerateInitialChangeStreamPartitionsRequest request, + StreamObserver responseObserver) { + if (count.getAndIncrement() < 1) { + Metadata trailers = new Metadata(); + maybePopulateCookie(trailers, "generateInitialChangeStreamPartitions"); + StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers); + responseObserver.onError(exception); + return; + } + responseObserver.onNext(GenerateInitialChangeStreamPartitionsResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + private void maybePopulateCookie(Metadata trailers, String label) { + if (returnCookie) { + trailers.put(ROUTING_COOKIE_1, label); + trailers.put(ROUTING_COOKIE_2, testCookie); + trailers.put(BAD_KEY, "bad-key"); + } + } + } +}