Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add response protos #1246

Merged
merged 18 commits into from
Jul 11, 2022
Merged
9 changes: 8 additions & 1 deletion google-cloud-bigtable-stats/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-impl</artifactId>
<scope>test</scope>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
Expand Down Expand Up @@ -110,6 +110,13 @@
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<ignoreNonCompile>true</ignoreNonCompile>
</configuration>
</plugin>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets pull this up to the root pom

</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
/** For registering built-in metric views */
@InternalApi("For internal use only")
public class BuiltinViews {

@VisibleForTesting
static final ImmutableSet<View> BIGTABLE_BUILTIN_VIEWS =
ImmutableSet.of(
Expand All @@ -44,7 +45,7 @@ void registerPrivateViews(ViewManager viewManager) {
}
}

public void registerBigtableBuiltinViews() {
public static void registerBigtableBuiltinViews() {
ViewManager viewManager = Stats.getViewManager();
for (View view : BIGTABLE_BUILTIN_VIEWS) {
viewManager.registerView(view);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@

import com.google.api.core.InternalApi;
import com.google.api.gax.tracing.SpanName;
import com.google.common.annotations.VisibleForTesting;
import io.opencensus.stats.Stats;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* Wrapper class for accessing opencensus. We use a shaded version of opencensus to avoid polluting
Expand All @@ -34,4 +38,16 @@ public static StatsRecorderWrapper createRecorder(
return new StatsRecorderWrapper(
operationType, spanName, statsAttributes, Stats.getStatsRecorder());
}

// This is used in integration tests to get the tag value strings from view manager because Stats
// is relocated to com.google.bigtable.veneer.repackaged.io.opencensus.
@VisibleForTesting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use InternalApi("Visible for testing"). We arent supposed to expose guava on our public surface

public static List<String> getOperationLatencyViewTagValueStrings() {
return Stats.getViewManager().getView(BuiltinViewConstants.OPERATION_LATENCIES_VIEW.getName())
.getAggregationMap().entrySet().stream()
.map(Map.Entry::getKey)
.flatMap(x -> x.stream())
.map(x -> x.asString())
.collect(Collectors.toCollection(ArrayList::new));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,26 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ResponseParams;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

/**
* This callable will
*
* <p>-inject a {@link GrpcResponseMetadata} to access the headers and trailers returned by gRPC
* methods upon completion. The {@link BigtableTracer} will process metrics that were injected in
* the header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment the
* gfe_header_missing_counter in this case.
*
* <p>-Call {@link BigtableTracer#onRequest()} to record the request events in a stream.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
* <li>-Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC methods upon
* completion. The {@link BigtableTracer} will process metrics that were injected in the
* header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment
* the gfe_header_missing_counter in this case.
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
* <li>-Call {@link BigtableTracer#onRequest(int)} to record the request events in a stream.
* <li>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public class BigtableTracerStreamingCallable<RequestT, ResponseT>
Expand Down Expand Up @@ -102,6 +103,14 @@ public void onError(Throwable t) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, t);
try {
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}

outerObserver.onError(t);
}

Expand All @@ -110,6 +119,14 @@ public void onComplete() {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
try {
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}

outerObserver.onComplete();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.ResponseParams;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import javax.annotation.Nonnull;

/**
* This callable will inject a {@link GrpcResponseMetadata} to access the headers and trailers
* returned by gRPC methods upon completion. The {@link BigtableTracer} will process metrics that
* were injected in the header/trailer and publish them to OpenCensus. If {@link
* GrpcResponseMetadata#getMetadata()} returned null, it probably means that the request has never
* reached GFE, and it'll increment the gfe_header_missing_counter in this case.
*
* <p>This class is considered an internal implementation detail and not meant to be used by
* applications.
* This callable will:
* <li>- Inject a {@link GrpcResponseMetadata} to access the headers returned by gRPC methods upon
* completion. The {@link BigtableTracer} will process metrics that were injected in the
* header/trailer and publish them to OpenCensus. If {@link GrpcResponseMetadata#getMetadata()}
* returned null, it probably means that the request has never reached GFE, and it'll increment
* the gfe_header_missing_counter in this case.
* <li>-This class will also access trailers from {@link GrpcResponseMetadata} to record zone and
* cluster ids.
* <li>This class is considered an internal implementation detail and not meant to be used by
* applications.
*/
@InternalApi
public class BigtableTracerUnaryCallable<RequestT, ResponseT>
Expand Down Expand Up @@ -78,13 +82,27 @@ public void onFailure(Throwable throwable) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, throwable);
try {
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}
}

@Override
public void onSuccess(ResponseT response) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
try {
byte[] trailers =
metadata.get(Metadata.Key.of(Util.RESPONSE_PRAMS_KEY, Metadata.BINARY_BYTE_MARSHALLER));
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
} catch (NullPointerException | InvalidProtocolBufferException e) {
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class Util {
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER);
private static final Pattern SERVER_TIMING_HEADER_PATTERN = Pattern.compile(".*dur=(?<dur>\\d+)");

static final String TRAILER_KEY = "x-goog-ext-425905942-bin";
static final String RESPONSE_PRAMS_KEY = "x-goog-ext-425905942-bin";

/** Convert an exception into a value that can be used to create an OpenCensus tag value. */
static String extractStatus(@Nullable Throwable error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.it;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.bigtable.admin.v2.models.Cluster;
import com.google.cloud.bigtable.data.v2.models.Query;
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.stats.BuiltinViews;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

public class StreamingMetricsMetadataIT {
@ClassRule public static TestEnvRule testEnvRule = new TestEnvRule();

@BeforeClass
public static void setUpClass() {
assume()
.withMessage("StreamingMetricsMetadataIT is not supported on Emulator")
.that(testEnvRule.env())
.isNotInstanceOf(EmulatorEnv.class);
BuiltinViews.registerBigtableBuiltinViews();
}

@Test
public void testSuccess() throws Exception {
String prefix = UUID.randomUUID().toString();
String uniqueKey = prefix + "-read";

Query query = Query.create(testEnvRule.env().getTableId()).rowKey(uniqueKey);
ArrayList<Row> rows = Lists.newArrayList(testEnvRule.env().getDataClient().readRows(query));

ApiFuture<List<Cluster>> clustersFuture =
testEnvRule
.env()
.getInstanceAdminClient()
.listClustersAsync(testEnvRule.env().getInstanceId());

List<Cluster> clusters = clustersFuture.get(1, TimeUnit.MINUTES);

// give opencensus some time to populate view data
Thread.sleep(100);

List<String> tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings();
assertThat(tagValueStrings).contains(clusters.get(0).getZone());
assertThat(tagValueStrings).contains(clusters.get(0).getId());
}

@Test
public void testFailure() throws InterruptedException {
Query query = Query.create("non-exist-table");
try {
Lists.newArrayList(testEnvRule.env().getDataClient().readRows(query));
} catch (NotFoundException e) {
}

// give opencensus some time to populate view data
Thread.sleep(100);

List<String> tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings();
assertThat(tagValueStrings).contains("undefined");
assertThat(tagValueStrings).contains("undefined");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.data.v2.it;

import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.TruthJUnit.assume;

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.bigtable.admin.v2.models.Cluster;
import com.google.cloud.bigtable.data.v2.models.RowMutation;
import com.google.cloud.bigtable.stats.BuiltinViews;
import com.google.cloud.bigtable.stats.StatsWrapper;
import com.google.cloud.bigtable.test_helpers.env.EmulatorEnv;
import com.google.cloud.bigtable.test_helpers.env.TestEnvRule;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

public class UnaryMetricsMetadataIT {
@ClassRule public static TestEnvRule testEnvRule = new TestEnvRule();

@BeforeClass
public static void setUpClass() {
assume()
.withMessage("UnaryMetricsMetadataIT is not supported on Emulator")
.that(testEnvRule.env())
.isNotInstanceOf(EmulatorEnv.class);
BuiltinViews.registerBigtableBuiltinViews();
}

@Test
public void testSuccess() throws Exception {
String rowKey = UUID.randomUUID().toString();
String familyId = testEnvRule.env().getFamilyId();

ApiFuture<Void> future =
testEnvRule
.env()
.getDataClient()
.mutateRowCallable()
.futureCall(
RowMutation.create(testEnvRule.env().getTableId(), rowKey)
.setCell(familyId, "q", "myVal"));

future.get(1, TimeUnit.MINUTES);

ApiFuture<List<Cluster>> clustersFuture =
testEnvRule
.env()
.getInstanceAdminClient()
.listClustersAsync(testEnvRule.env().getInstanceId());
List<Cluster> clusters = clustersFuture.get(1, TimeUnit.MINUTES);

// give opencensus some time to populate view data
Thread.sleep(100);

List<String> tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings();
assertThat(tagValueStrings).contains(clusters.get(0).getZone());
assertThat(tagValueStrings).contains(clusters.get(0).getId());
}

@Test
public void testFailure() throws InterruptedException {
String rowKey = UUID.randomUUID().toString();
String familyId = testEnvRule.env().getFamilyId();

try {
testEnvRule
.env()
.getDataClient()
.mutateRowCallable()
.call(RowMutation.create("non-exist-table", rowKey).setCell(familyId, "q", "myVal"));
} catch (NotFoundException e) {
}

// give opencensus some time to populate view data
Thread.sleep(100);

List<String> tagValueStrings = StatsWrapper.getOperationLatencyViewTagValueStrings();
assertThat(tagValueStrings).contains("undefined");
assertThat(tagValueStrings).contains("undefined");
}
}
Loading