diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java index ab3e63a8dc..1e2338cade 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GhfsGlobalStorageStatistics.java @@ -25,16 +25,16 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpResponseException; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics; +import com.google.cloud.hadoop.util.GcsRequestExecutionEvent; import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus.StatisticsType; import com.google.cloud.hadoop.util.ITraceFactory; import com.google.cloud.hadoop.util.ITraceOperation; import com.google.common.base.Stopwatch; import com.google.common.eventbus.Subscribe; import com.google.common.flogger.GoogleLogger; +import io.grpc.Status; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -239,6 +239,43 @@ private void updateGcsIOSpecificStatistics(int statusCode) { } } + private int grpcToHttpStatusCodeMapping(Status grpcStatusCode) { + // using code.proto as reference + // https://github.com/googleapis/googleapis/blob/master/google/rpc/code.proto + switch (grpcStatusCode.getCode()) { + case OK: + return 200; + case CANCELLED: + return 499; + case INVALID_ARGUMENT: + case FAILED_PRECONDITION: + case OUT_OF_RANGE: + return 400; + case DEADLINE_EXCEEDED: + return 504; + case NOT_FOUND: + return 404; + case ALREADY_EXISTS: + case ABORTED: + return 409; + case PERMISSION_DENIED: + return 403; + case RESOURCE_EXHAUSTED: + return 429; + case UNIMPLEMENTED: + return 501; + case UNAVAILABLE: + return 503; + case UNAUTHENTICATED: + return 401; + case UNKNOWN: + case INTERNAL: + case DATA_LOSS: + default: + return 500; + } + } + /** * Updating the required gcs specific statistics based on HttpResponseException. * @@ -263,23 +300,23 @@ private void subscriberOnGoogleJsonResponseException( /** * Updating the required gcs specific statistics based on HttpResponse. * - * @param response contains statusCode based on which metrics are updated + * @param responseStatus responseStatus status code from HTTP response */ @Subscribe - private void subscriberOnHttpResponse(@Nonnull HttpResponse response) { - updateGcsIOSpecificStatistics(response.getStatusCode()); + private void subscriberOnHttpResponseStatus(@Nonnull Integer responseStatus) { + updateGcsIOSpecificStatistics(responseStatus); } - /** - * Updating the GCS_TOTAL_REQUEST_COUNT - * - * @param request - */ @Subscribe - private void subscriberOnHttpRequest(@Nonnull HttpRequest request) { + private void subscriberOnGcsRequest(@Nonnull GcsRequestExecutionEvent event) { incrementGcsTotalRequestCount(); } + @Subscribe + private void subscriberOnGrpcStatus(@Nonnull Status status) { + updateGcsIOSpecificStatistics(grpcToHttpStatusCodeMapping(status)); + } + /** * Updating the EXCEPTION_COUNT * diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleCloudStorageStatisticsTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleCloudStorageStatisticsTest.java new file mode 100644 index 0000000000..7d01b01a0a --- /dev/null +++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleCloudStorageStatisticsTest.java @@ -0,0 +1,129 @@ +/* + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.fs.gcs; + +import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.EXCEPTION_COUNT; +import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_RATE_LIMIT_COUNT; +import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_CLIENT_SIDE_ERROR_COUNT; +import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_REQUEST_COUNT; +import static com.google.cloud.hadoop.gcsio.GoogleCloudStorageStatistics.GCS_SERVER_SIDE_ERROR_COUNT; +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.hadoop.util.GcsRequestExecutionEvent; +import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus; +import com.google.common.flogger.GoogleLogger; +import io.grpc.Status; +import java.util.Iterator; +import org.apache.hadoop.fs.StorageStatistics.LongStatistic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class GoogleCloudStorageStatisticsTest { + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + private GhfsGlobalStorageStatistics subscriber = new GhfsGlobalStorageStatistics(); + + @Before + public void setUp() throws Exception { + + GoogleCloudStorageEventBus.register(subscriber); + } + + @After + public void cleanup() throws Exception { + + GoogleCloudStorageEventBus.unregister(subscriber); + } + + private void verifyStatistics(GhfsGlobalStorageStatistics expectedStats) { + Iterator statsIterator = expectedStats.getLongStatistics(); + boolean metricsVerified = true; + while (statsIterator.hasNext()) { + LongStatistic stats = statsIterator.next(); + Long value = subscriber.getLong(stats.getName()); + if (stats.getValue() != value) { + logger.atWarning().log( + "Metric values not matching. for: %s, expected: %d, got: %d", + stats.getName(), stats.getValue(), value); + metricsVerified = false; + break; + } + } + assertThat(metricsVerified).isTrue(); + } + + @Test + public void gcs_requestCounter() throws Exception { + GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent()); + GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics(); + verifyCounterStats.incrementCounter(GCS_REQUEST_COUNT, 1); + verifyStatistics(verifyCounterStats); + } + + @Test + public void gcs_rateLimitCounter() { + // verify for http event i.e. via Apiary + GoogleCloudStorageEventBus.postOnHttpResponseStatus(429); + GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics(); + verifyCounterStats.incrementCounter(GCS_CLIENT_RATE_LIMIT_COUNT, 1); + verifyCounterStats.incrementCounter(GCS_CLIENT_SIDE_ERROR_COUNT, 1); + verifyStatistics(verifyCounterStats); + + subscriber.reset(); + + // verify for gRPC event i.e. via java-storage + GoogleCloudStorageEventBus.onGrpcStatus(Status.RESOURCE_EXHAUSTED); + verifyStatistics(verifyCounterStats); + } + + @Test + public void gcs_clientSideErrorCounter() { + GoogleCloudStorageEventBus.postOnHttpResponseStatus(404); + GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics(); + verifyCounterStats.incrementCounter(GCS_CLIENT_SIDE_ERROR_COUNT, 1); + verifyStatistics(verifyCounterStats); + + subscriber.reset(); + + // verify for gRPC event i.e. via java-storage + GoogleCloudStorageEventBus.onGrpcStatus(Status.CANCELLED); + verifyStatistics(verifyCounterStats); + } + + @Test + public void gcs_serverSideErrorCounter() { + GoogleCloudStorageEventBus.postOnHttpResponseStatus(503); + GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics(); + verifyCounterStats.incrementCounter(GCS_SERVER_SIDE_ERROR_COUNT, 1); + verifyStatistics(verifyCounterStats); + + subscriber.reset(); + + // verify for gRPC event i.e. via java-storage + GoogleCloudStorageEventBus.onGrpcStatus(Status.INTERNAL); + verifyStatistics(verifyCounterStats); + } + + @Test + public void gcs_ExceptionCounter() { + GoogleCloudStorageEventBus.postOnException(); + GhfsGlobalStorageStatistics verifyCounterStats = new GhfsGlobalStorageStatistics(); + verifyCounterStats.incrementCounter(EXCEPTION_COUNT, 1); + verifyStatistics(verifyCounterStats); + } +} diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientGrpcStatisticsInterceptor.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientGrpcStatisticsInterceptor.java new file mode 100644 index 0000000000..3f9579f439 --- /dev/null +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientGrpcStatisticsInterceptor.java @@ -0,0 +1,61 @@ +/* + * Copyright 2024 Google Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.gcsio; + +import com.google.cloud.hadoop.util.GcsRequestExecutionEvent; +import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus; +import com.google.common.annotations.VisibleForTesting; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.Status; + +/** This is a gRPC interceptor to capture the statistics related to calls made to gcs backend. */ +@VisibleForTesting +public class GoogleCloudStorageClientGrpcStatisticsInterceptor implements ClientInterceptor { + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new SimpleForwardingClientCall(next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + try { + GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent()); + } finally { + super.start( + new SimpleForwardingClientCallListener(responseListener) { + @Override + public void onClose(Status status, Metadata trailers) { + try { + GoogleCloudStorageEventBus.onGrpcStatus(status); + } finally { + super.onClose(status, trailers); + } + } + }, + headers); + } + } + }; + } +} diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java index d86622c479..b6ad4f3515 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientImpl.java @@ -1293,6 +1293,7 @@ private static Storage createStorage( downscopedAccessTokenFn)); } + list.add(new GoogleCloudStorageClientGrpcStatisticsInterceptor()); return ImmutableList.copyOf(list); }) .setCredentials( diff --git a/util/src/main/java/com/google/cloud/hadoop/util/GcsRequestExecutionEvent.java b/util/src/main/java/com/google/cloud/hadoop/util/GcsRequestExecutionEvent.java new file mode 100644 index 0000000000..331f39314c --- /dev/null +++ b/util/src/main/java/com/google/cloud/hadoop/util/GcsRequestExecutionEvent.java @@ -0,0 +1,23 @@ +/* + * Copyright 2024 Google LLC. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.util; + +import com.google.common.annotations.VisibleForTesting; + +/** This an Event which is published in EvenBus queue whenever a gcs request is created/executed. */ +@VisibleForTesting +public class GcsRequestExecutionEvent {} diff --git a/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java b/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java index 5c613a974d..59c3ac7590 100644 --- a/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java +++ b/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java @@ -17,10 +17,9 @@ package com.google.cloud.hadoop.util; import com.google.api.client.googleapis.json.GoogleJsonResponseException; -import com.google.api.client.http.HttpRequest; -import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpResponseException; import com.google.common.eventbus.EventBus; +import io.grpc.Status; import java.io.IOException; /** Event Bus class */ @@ -49,6 +48,16 @@ public static void register(Object obj) { eventBus.register(obj); } + /** + * Method to unregister an obj to event bus + * + * @param obj to unregister from event bus + * @throws IllegalArgumentException if the object was not previously registered. + */ + public static void unregister(Object obj) { + eventBus.unregister(obj); + } + /** * Posting GoogleJsonResponseException to invoke corresponding Subscriber method. * @@ -70,19 +79,19 @@ public static void postOnHttpResponseException(HttpResponseException response) { /** * Posting HttpResponse to invoke corresponding Subscriber method. * - * @param response contains statusCode based on which metrics are updated in Subscriber method + * @param responseStatus response status code */ - public static void postOnHttpResponse(HttpResponse response) { - eventBus.post(response); + public static void postOnHttpResponseStatus(int responseStatus) { + eventBus.post(responseStatus); } /** - * Posting HttpRequest to invoke corresponding Subscriber method. + * Posting Gcs request execution event i.e. request to gcs is being initiated. * - * @param request based on which metrics are updated in Subscriber method + * @param event dummy event to map to request execution type. */ - public static void postOnHttpRequest(HttpRequest request) { - eventBus.post(request); + public static void onGcsRequest(GcsRequestExecutionEvent event) { + eventBus.post(event); } /** @@ -100,4 +109,12 @@ public static void postOnException() { public static void postOnStatisticsType() { eventBus.post(StatisticsType.DIRECTORIES_DELETED); } + /** + * Posting grpc Status to invoke the corresponding Subscriber method. + * + * @param status status object of grpc response + */ + public static void onGrpcStatus(Status status) { + eventBus.post(status); + } } diff --git a/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java b/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java index ba87ef8eff..7046356b81 100644 --- a/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java +++ b/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java @@ -198,7 +198,7 @@ public boolean handleResponse(HttpRequest request, HttpResponse response, boolea private void logResponseCode(HttpRequest request, HttpResponse response) { // Incrementing GCS Static Statistics using status code of response. - GoogleCloudStorageEventBus.postOnHttpResponse(response); + GoogleCloudStorageEventBus.postOnHttpResponseStatus(response.getStatusCode()); if (RESPONSE_CODES_TO_LOG.contains(response.getStatusCode())) { logger diff --git a/util/src/main/java/com/google/cloud/hadoop/util/interceptors/InvocationIdInterceptor.java b/util/src/main/java/com/google/cloud/hadoop/util/interceptors/InvocationIdInterceptor.java index 24affed679..3c2568f3e0 100644 --- a/util/src/main/java/com/google/cloud/hadoop/util/interceptors/InvocationIdInterceptor.java +++ b/util/src/main/java/com/google/cloud/hadoop/util/interceptors/InvocationIdInterceptor.java @@ -19,6 +19,7 @@ import com.google.api.client.http.HttpExecuteInterceptor; import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpRequest; +import com.google.cloud.hadoop.util.GcsRequestExecutionEvent; import com.google.cloud.hadoop.util.GoogleCloudStorageEventBus; import com.google.cloud.hadoop.util.ThreadTrace; import com.google.cloud.hadoop.util.TraceOperation; @@ -69,7 +70,7 @@ public void intercept(HttpRequest request) throws IOException { } else { newValue = invocationEntry; } - GoogleCloudStorageEventBus.postOnHttpRequest(request); + GoogleCloudStorageEventBus.onGcsRequest(new GcsRequestExecutionEvent()); headers.set(GOOG_API_CLIENT, newValue); ThreadTrace tt = TraceOperation.current();