Skip to content

Commit

Permalink
udpate to use trailer instead of error info
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Oct 17, 2023
1 parent d274fc2 commit 9333ba0
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.COOKIES_HOLDER_KEY;
import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.ROUTING_COOKIE_KEY;
import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.ROUTING_COOKIE_METADATA_KEY;

import com.google.protobuf.ByteString;
import com.google.rpc.ErrorInfo;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
Expand All @@ -30,17 +28,13 @@
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;

/**
* A cookie interceptor that checks the cookie value from returned ErrorInfo, updates the cookie
* holder, and inject it in the header of the next request.
*/
class CookieInterceptor implements ClientInterceptor {

static final Metadata.Key<ErrorInfo> ERROR_INFO_KEY =
ProtoUtils.keyForProto(ErrorInfo.getDefaultInstance());

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions callOptions, Channel channel) {
Expand Down Expand Up @@ -69,13 +63,10 @@ static class UpdateCookieListener<RespT>

@Override
public void onClose(Status status, Metadata trailers) {
if (status != Status.OK && trailers != null) {
ErrorInfo errorInfo = trailers.get(ERROR_INFO_KEY);
if (errorInfo != null) {
CookiesHolder cookieHolder = callOptions.getOption(COOKIES_HOLDER_KEY);
cookieHolder.setRoutingCookie(
ByteString.copyFromUtf8(errorInfo.getMetadataMap().get(ROUTING_COOKIE_KEY)));
}
if (trailers != null && trailers.containsKey(ROUTING_COOKIE_METADATA_KEY)) {
byte[] cookie = trailers.get(ROUTING_COOKIE_METADATA_KEY);
CookiesHolder cookieHolder = callOptions.getOption(COOKIES_HOLDER_KEY);
cookieHolder.setRoutingCookie(ByteString.copyFrom(cookie));
}
super.onClose(status, trailers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ class CookiesHolder {
static final CallOptions.Key<CookiesHolder> COOKIES_HOLDER_KEY =
CallOptions.Key.create("bigtable-cookies");

static final String ROUTING_COOKIE_KEY = "bigtable-routing-cookie";
static final String ROUTING_COOKIE_KEY = "x-goog-cbt-cookie-routing";

static final Metadata.Key<byte[]> ROUTING_COOKIE_METADATA_KEY =
Metadata.Key.of("bigtable-routing-cookie-bin", Metadata.BINARY_BYTE_MARSHALLER);
Metadata.Key.of(ROUTING_COOKIE_KEY + "-bin", Metadata.BINARY_BYTE_MARSHALLER);

@Nullable private ByteString routingCookie;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;

public class CookiesServerStreamingCallable<RequestT, ResponseT>
/** Cookie callable injects a placeholder for bigtable retry cookie. */
class CookiesServerStreamingCallable<RequestT, ResponseT>
extends ServerStreamingCallable<RequestT, ResponseT> {

private final ServerStreamingCallable<RequestT, ResponseT> callable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ public <RowT> ServerStreamingCallable<Query, RowT> createReadRowsCallable(
new TracedServerStreamingCallable<>(
readRowsUserCallable, clientContext.getTracerFactory(), span);

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
ServerStreamingCallable<Query, RowT> withCookie = new CookiesServerStreamingCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
Expand Down Expand Up @@ -1021,6 +1023,8 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
UnaryCallable<RequestT, ResponseT> traced =
new TracedUnaryCallable<>(inner, clientContext.getTracerFactory(), getSpanName(methodName));

// CookieHolder needs to be injected to the CallOptions outside of retries, otherwise retry
// attempts won't see a CookieHolder.
UnaryCallable<RequestT, ResponseT> withCookie = new CookiesUnaryCallable<>(traced);

return withCookie.withDefaultCallContext(clientContext.getDefaultCallContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,21 @@
*/
package com.google.cloud.bigtable.data.v2.stub;

import static com.google.cloud.bigtable.data.v2.stub.CookieInterceptor.ERROR_INFO_KEY;
import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.ROUTING_COOKIE_KEY;
import static com.google.cloud.bigtable.data.v2.stub.CookiesHolder.ROUTING_COOKIE_METADATA_KEY;
import static com.google.common.truth.Truth.assertThat;

import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.bigtable.v2.BigtableGrpc;
import com.google.bigtable.v2.MutateRowRequest;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ReadRowsResponse;
import com.google.bigtable.v2.SampleRowKeysRequest;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.data.v2.BigtableDataClient;
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
import com.google.cloud.bigtable.data.v2.FakeServiceBuilder;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.common.collect.ImmutableList;
import com.google.rpc.ErrorInfo;
import com.google.protobuf.ByteString;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerCall;
Expand Down Expand Up @@ -92,11 +92,27 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
}

@Test
public void testRetryCookieIsForwarded() {
client.mutateRow(RowMutation.create("fake-table", "fake-row").setCell("cf", "q", "v"));
public void testReadRowsRetryCookieIsForwarded() {
client.readRows(Query.create("fake-table")).iterator().hasNext();

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());
byte[] bytes = serverMetadata.get(1).get(ROUTING_COOKIE_METADATA_KEY);
assertThat(bytes).isNotNull();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("test-routing-cookie");

serverMetadata.clear();
}

@Test
public void testSampleRowKeysRetryCookieIsForwarded() {

client.sampleRowKeys("fake-table");

assertThat(fakeService.count.get()).isGreaterThan(1);
assertThat(serverMetadata.size()).isEqualTo(fakeService.count.get());
byte[] bytes = serverMetadata.get(1).get(ROUTING_COOKIE_METADATA_KEY);
assertThat(bytes).isNotNull();
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("test-routing-cookie");

serverMetadata.clear();
Expand All @@ -113,18 +129,34 @@ class FakeService extends BigtableGrpc.BigtableImplBase {
private AtomicInteger count = new AtomicInteger();

@Override
public void mutateRow(
MutateRowRequest request, StreamObserver<MutateRowResponse> responseObserver) {
public void readRows(
ReadRowsRequest request, StreamObserver<ReadRowsResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
trailers.put(
ROUTING_COOKIE_METADATA_KEY,
ByteString.copyFromUtf8("test-routing-cookie").toByteArray());
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
}
responseObserver.onNext(ReadRowsResponse.getDefaultInstance());
responseObserver.onCompleted();
}

@Override
public void sampleRowKeys(
SampleRowKeysRequest request, StreamObserver<SampleRowKeysResponse> responseObserver) {
if (count.getAndIncrement() < 1) {
Metadata trailers = new Metadata();
ErrorInfo errorInfo =
ErrorInfo.newBuilder().putMetadata(ROUTING_COOKIE_KEY, "test-routing-cookie").build();
trailers.put(ERROR_INFO_KEY, errorInfo);
trailers.put(
ROUTING_COOKIE_METADATA_KEY,
ByteString.copyFromUtf8("test-routing-cookie").toByteArray());
StatusRuntimeException exception = new StatusRuntimeException(Status.UNAVAILABLE, trailers);
responseObserver.onError(exception);
return;
}
responseObserver.onNext(MutateRowResponse.getDefaultInstance());
responseObserver.onNext(SampleRowKeysResponse.getDefaultInstance());
responseObserver.onCompleted();
}
}
Expand Down

0 comments on commit 9333ba0

Please sign in to comment.