Skip to content

Commit

Permalink
fix: fix the connectivity error count caculation
Browse files Browse the repository at this point in the history
  • Loading branch information
mutianf committed Nov 2, 2022
1 parent b12e551 commit 5e3b671
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.ServerStreamingCallable;
import com.google.api.gax.rpc.StreamController;
import com.google.bigtable.v2.ResponseParams;
import com.google.cloud.bigtable.data.v2.stub.SafeResponseObserver;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;

Expand Down Expand Up @@ -101,65 +98,13 @@ protected void onResponseImpl(ResponseT response) {

@Override
protected void onErrorImpl(Throwable t) {
// server-timing metric will be added through GrpcResponseMetadata#onHeaders(Metadata),
// so it's not checking trailing metadata here.
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, t);
try {
// Check both headers and trailers because in different environments the metadata
// could be returned in headers or trailers
if (metadata != null) {
byte[] trailers = metadata.get(Util.METADATA_KEY);
if (trailers == null) {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
if (trailingMetadata != null) {
trailers = trailingMetadata.get(Util.METADATA_KEY);
}
}
// If the response is terminated abnormally and we didn't get location information in
// trailers or headers, skip setting the locations
if (trailers != null) {
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
}
}
} catch (InvalidProtocolBufferException e) {
t.addSuppressed(t);
}

Util.metadataHelper(responseMetadata, tracer, t);
outerObserver.onError(t);
}

@Override
protected void onCompleteImpl() {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
try {
// Check both headers and trailers because in different environments the metadata
// could be returned in headers or trailers
if (metadata != null) {
byte[] trailers = metadata.get(Util.METADATA_KEY);
if (trailers == null) {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
if (trailingMetadata != null) {
trailers = trailingMetadata.get(Util.METADATA_KEY);
}
}
// If the response is terminated abnormally and we didn't get location information in
// trailers or headers, skip setting the locations
if (trailers != null) {
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
}
}
} catch (InvalidProtocolBufferException e) {
// InvalidProtocolBufferException will only throw if something changed on
// the server side. Location info won't be populated as a result. Ignore
// this error and don't bubble it up to user.
}

Util.metadataHelper(responseMetadata, tracer, null);
outerObserver.onComplete();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.bigtable.v2.ResponseParams;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import javax.annotation.Nonnull;

/**
Expand Down Expand Up @@ -79,56 +76,12 @@ class BigtableTracerUnaryCallback<ResponseT> implements ApiFutureCallback<Respon

@Override
public void onFailure(Throwable throwable) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, throwable);
try {
// Check both headers and trailers because in different environments the metadata
// could be returned in headers or trailers
if (metadata != null) {
byte[] trailers = metadata.get(Util.METADATA_KEY);
if (trailers == null) {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
if (trailingMetadata != null) {
trailers = trailingMetadata.get(Util.METADATA_KEY);
}
}
// If the response is terminated abnormally and we didn't get location information in
// trailers or headers, skip setting the locations
if (trailers != null) {
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
}
}
} catch (InvalidProtocolBufferException e) {
}
Util.metadataHelper(responseMetadata, tracer, throwable);
}

@Override
public void onSuccess(ResponseT response) {
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);
tracer.recordGfeMetadata(latency, null);
try {
// Check both headers and trailers because in different environments the metadata
// could be returned in headers or trailers
if (metadata != null) {
byte[] trailers = metadata.get(Util.METADATA_KEY);
if (trailers == null) {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
if (trailingMetadata != null) {
trailers = trailingMetadata.get(Util.METADATA_KEY);
}
}
// If the response is terminated abnormally and we didn't get location information in
// trailers or headers, skip setting the locations
if (trailers != null) {
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
}
}
} catch (InvalidProtocolBufferException e) {
}
Util.metadataHelper(responseMetadata, tracer, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigtable.data.v2.stub.metrics;

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcResponseMetadata;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
Expand All @@ -25,10 +26,12 @@
import com.google.bigtable.v2.MutateRowsRequest;
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.ResponseParams;
import com.google.bigtable.v2.SampleRowKeysRequest;
import com.google.bigtable.v2.TableName;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
Expand Down Expand Up @@ -148,4 +151,44 @@ static Long getGfeLatency(Metadata metadata) {
}
return null;
}

static void metadataHelper(
GrpcResponseMetadata responseMetadata, BigtableTracer tracer, Throwable throwable) {
boolean hasTrailer = false;

// server-timing metric will be added through GrpcResponseMetadata#onHeaders(Metadata),
// so it's not checking trailing metadata here.
Metadata metadata = responseMetadata.getMetadata();
Long latency = Util.getGfeLatency(metadata);

try {
// Check both headers and trailers because in different environments the metadata
// could be returned in headers or trailers
if (metadata != null) {
byte[] trailers = metadata.get(Util.METADATA_KEY);
if (trailers == null) {
Metadata trailingMetadata = responseMetadata.getTrailingMetadata();
if (trailingMetadata != null) {
trailers = trailingMetadata.get(Util.METADATA_KEY);
}
}
// If the response is terminated abnormally and we didn't get location information in
// trailers or headers, skip setting the locations
if (trailers != null) {
hasTrailer = true;
ResponseParams decodedTrailers = ResponseParams.parseFrom(trailers);
tracer.setLocations(decodedTrailers.getZoneId(), decodedTrailers.getClusterId());
}
}
} catch (InvalidProtocolBufferException e) {
}

if (hasTrailer && latency == null) {
// For direct path, we won't see GFE server-timing header. However, if we received the
// location trailer, we know that there isn't a connectivity issue. Set the latency to
// 0 so gfe missing header won't get incremented.
latency = 0L;
}
tracer.recordGfeMetadata(latency, throwable);
}
}

0 comments on commit 5e3b671

Please sign in to comment.