Skip to content

Commit

Permalink
Add grpc gcs statistics support (#1158) (#1190)
Browse files Browse the repository at this point in the history
  • Loading branch information
singhravidutt committed Aug 1, 2024
1 parent ac23990 commit 69a7ed0
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@
import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics;
import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus.StatisticsType;
import com.google.cloud.hadoop.util.ITraceFactory;
import com.google.cloud.hadoop.util.ITraceOperation;
import com.google.common.base.Stopwatch;
import com.google.common.eventbus.Subscribe;
import com.google.common.flogger.GoogleLogger;
import io.grpc.Status;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -239,6 +239,43 @@ private void updateGcsIOSpecificStatistics(int statusCode) {
}
}

private int grpcToHttpStatusCodeMapping(Status grpcStatusCode) {
// using code.proto as reference
// https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto
switch (grpcStatusCode.getCode()) {
case OK:
return 200;
case CANCELLED:
return 499;
case INVALID_ARGUMENT:
case FAILED_PRECONDITION:
case OUT_OF_RANGE:
return 400;
case DEADLINE_EXCEEDED:
return 504;

Check warning on line 255 in gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java

View check run for this annotation

Codecov / codecov/patch

gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java#L255

Added line #L255 was not covered by tests
case NOT_FOUND:
return 404;
case ALREADY_EXISTS:
case ABORTED:
return 409;

Check warning on line 260 in gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java

View check run for this annotation

Codecov / codecov/patch

gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java#L260

Added line #L260 was not covered by tests
case PERMISSION_DENIED:
return 403;

Check warning on line 262 in gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java

View check run for this annotation

Codecov / codecov/patch

gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java#L262

Added line #L262 was not covered by tests
case RESOURCE_EXHAUSTED:
return 429;
case UNIMPLEMENTED:
return 501;

Check warning on line 266 in gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java

View check run for this annotation

Codecov / codecov/patch

gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java#L266

Added line #L266 was not covered by tests
case UNAVAILABLE:
return 503;

Check warning on line 268 in gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java

View check run for this annotation

Codecov / codecov/patch

gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java#L268

Added line #L268 was not covered by tests
case UNAUTHENTICATED:
return 401;
case UNKNOWN:
case INTERNAL:
case DATA_LOSS:
default:
return 500;
}
}

/**
* Updating the required gcs specific statistics based on HttpResponseException.
*
Expand All @@ -263,23 +300,23 @@ private void subscriberOnGoogleJsonResponseException(
/**
* Updating the required gcs specific statistics based on HttpResponse.
*
* @param response contains statusCode based on which metrics are updated
* @param responseStatus responseStatus status code from HTTP response
*/
@Subscribe
private void subscriberOnHttpResponse(@Nonnull HttpResponse response) {
updateGcsIOSpecificStatistics(response.getStatusCode());
private void subscriberOnHttpResponseStatus(@Nonnull Integer responseStatus) {
updateGcsIOSpecificStatistics(responseStatus);
}

/**
* Updating the GCS_TOTAL_REQUEST_COUNT
*
* @param request
*/
@Subscribe
private void subscriberOnHttpRequest(@Nonnull HttpRequest request) {
private void subscriberOnGcsRequest(@Nonnull GcsRequestExecutionEvent event) {
incrementGcsTotalRequestCount();
}

@Subscribe
private void subscriberOnGrpcStatus(@Nonnull Status status) {
updateGcsIOSpecificStatistics(grpcToHttpStatusCodeMapping(status));
}

/**
* Updating the EXCEPTION_COUNT
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.fs.gcs;

import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.EXCEPTION_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_RATE_LIMIT_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_SIDE_ERROR_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_REQUEST_COUNT;
import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_SERVER_SIDE_ERROR_COUNT;
import static com.google.common.truth.Truth.assertThat;

import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.common.flogger.GoogleLogger;
import io.grpc.Status;
import java.util.Iterator;
import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class GoogleCloudStorageStatisticsTest {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private GhfsGlobalStorageStatistics subscriber = new GhfsGlobalStorageStatistics();

@Before
public void setUp() throws Exception {

GoogleCloudStorageEventBus.register(subscriber);
}

@After
public void cleanup() throws Exception {

GoogleCloudStorageEventBus.unregister(subscriber);
}

private void verifyStatistics(GhfsGlobalStorageStatistics expectedStats) {
Iterator<LongStatistic> statsIterator = expectedStats.getLongStatistics();
boolean metricsVerified = true;
while (statsIterator.hasNext()) {
LongStatistic stats = statsIterator.next();
Long value = subscriber.getLong(stats.getName());
if (stats.getValue() != value) {
logger.atWarning().log(
"Metric values not matching. for: %s, expected: %d, got: %d",
stats.getName(), stats.getValue(), value);
metricsVerified = false;
break;
}
}
assertThat(metricsVerified).isTrue();
}

@Test
public void gcs_requestCounter() throws Exception {
GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent());
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(GCS_REQUEST_COUNT, 1);
verifyStatistics(verifyCounterStats);
}

@Test
public void gcs_rateLimitCounter() {
// verify for http event i.e. via Apiary
GoogleCloudStorageEventBus.postOnHttpResponseStatus(429);
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(GCS_CLIENT_RATE_LIMIT_COUNT, 1);
verifyCounterStats.incrementCounter(GCS_CLIENT_SIDE_ERROR_COUNT, 1);
verifyStatistics(verifyCounterStats);

subscriber.reset();

// verify for gRPC event i.e. via java-storage
GoogleCloudStorageEventBus.onGrpcStatus(Status.RESOURCE_EXHAUSTED);
verifyStatistics(verifyCounterStats);
}

@Test
public void gcs_clientSideErrorCounter() {
GoogleCloudStorageEventBus.postOnHttpResponseStatus(404);
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(GCS_CLIENT_SIDE_ERROR_COUNT, 1);
verifyStatistics(verifyCounterStats);

subscriber.reset();

// verify for gRPC event i.e. via java-storage
GoogleCloudStorageEventBus.onGrpcStatus(Status.CANCELLED);
verifyStatistics(verifyCounterStats);
}

@Test
public void gcs_serverSideErrorCounter() {
GoogleCloudStorageEventBus.postOnHttpResponseStatus(503);
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(GCS_SERVER_SIDE_ERROR_COUNT, 1);
verifyStatistics(verifyCounterStats);

subscriber.reset();

// verify for gRPC event i.e. via java-storage
GoogleCloudStorageEventBus.onGrpcStatus(Status.INTERNAL);
verifyStatistics(verifyCounterStats);
}

@Test
public void gcs_ExceptionCounter() {
GoogleCloudStorageEventBus.postOnException();
GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics();
verifyCounterStats.incrementCounter(EXCEPTION_COUNT, 1);
verifyStatistics(verifyCounterStats);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2024 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.gcsio;

import com.google.cloud.hadoop.util.GcsRequestExecutionEvent;
import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;

/** This is a gRPC interceptor to capture the statistics related to calls made to gcs backend. */
@VisibleForTesting
public class GoogleCloudStorageClientGrpcStatisticsInterceptor implements ClientInterceptor {

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
try {
GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent());
} finally {
super.start(
new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onClose(Status status, Metadata trailers) {
try {
GoogleCloudStorageEventBus.onGrpcStatus(status);
} finally {
super.onClose(status, trailers);
}
}
},
headers);
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,7 @@ private static Storage createStorage(
downscopedAccessTokenFn));
}

list.add(new GoogleCloudStorageClientGrpcStatisticsInterceptor());
return ImmutableList.copyOf(list);
})
.setCredentials(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2024 Google LLC. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.util;

import com.google.common.annotations.VisibleForTesting;

/** This an Event which is published in EvenBus queue whenever a gcs request is created/executed. */
@VisibleForTesting
public class GcsRequestExecutionEvent {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
package com.google.cloud.hadoop.util;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.common.eventbus.EventBus;
import io.grpc.Status;
import java.io.IOException;

/** Event Bus class */
Expand Down Expand Up @@ -49,6 +48,16 @@ public static void register(Object obj) {
eventBus.register(obj);
}

/**
* Method to unregister an obj to event bus
*
* @param obj to unregister from event bus
* @throws IllegalArgumentException if the object was not previously registered.
*/
public static void unregister(Object obj) {
eventBus.unregister(obj);
}

/**
* Posting GoogleJsonResponseException to invoke corresponding Subscriber method.
*
Expand All @@ -70,19 +79,19 @@ public static void postOnHttpResponseException(HttpResponseException response) {
/**
* Posting HttpResponse to invoke corresponding Subscriber method.
*
* @param response contains statusCode based on which metrics are updated in Subscriber method
* @param responseStatus response status code
*/
public static void postOnHttpResponse(HttpResponse response) {
eventBus.post(response);
public static void postOnHttpResponseStatus(int responseStatus) {
eventBus.post(responseStatus);
}

/**
* Posting HttpRequest to invoke corresponding Subscriber method.
* Posting Gcs request execution event i.e. request to gcs is being initiated.
*
* @param request based on which metrics are updated in Subscriber method
* @param event dummy event to map to request execution type.
*/
public static void postOnHttpRequest(HttpRequest request) {
eventBus.post(request);
public static void onGcsRequest(GcsRequestExecutionEvent event) {
eventBus.post(event);
}

/**
Expand All @@ -100,4 +109,12 @@ public static void postOnException() {
public static void postOnStatisticsType() {
eventBus.post(StatisticsType.DIRECTORIES_DELETED);
}
/**
* Posting grpc Status to invoke the corresponding Subscriber method.
*
* @param status status object of grpc response
*/
public static void onGrpcStatus(Status status) {
eventBus.post(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ public boolean handleResponse(HttpRequest request, HttpResponse response, boolea

private void logResponseCode(HttpRequest request, HttpResponse response) {
// Incrementing GCS Static Statistics using status code of response.
GoogleCloudStorageEventBus.postOnHttpResponse(response);
GoogleCloudStorageEventBus.postOnHttpResponseStatus(response.getStatusCode());

if (RESPONSE_CODES_TO_LOG.contains(response.getStatusCode())) {
logger
Expand Down
Loading

0 comments on commit 69a7ed0

Please sign in to comment.