diff --git a/util/src/main/java/com/google/cloud/hadoop/util/GcsJsonApiEvent.java b/util/src/main/java/com/google/cloud/hadoop/util/GcsJsonApiEvent.java new file mode 100644 index 0000000000..732f6b4843 --- /dev/null +++ b/util/src/main/java/com/google/cloud/hadoop/util/GcsJsonApiEvent.java @@ -0,0 +1,119 @@ +/* + * Copyright 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpResponse; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.MoreObjects; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; + +class GcsJsonApiEvent { + public enum EventType { + BACKOFF, + STARTED, + RETRYSKIPPED, + EXCEPTION, + RESPONSE, + } + + public static final String BACKOFFTIME = "BACKOFFTIME"; + public static final String RETRYCOUNT = "RETRYCOUNT"; + public static final String STATUS_CODE = "STATUS_CODE"; + public static final String DURATION = "DURATION"; + private final EventType eventType; + + // Setting this to object so that we do not have to create the URL string. + private final Object context; + + private final String method; + + private Map properties; + + static GcsJsonApiEvent getResponseEvent(HttpResponse httpResponse, @Nonnegative long duration) { + GcsJsonApiEvent result = new GcsJsonApiEvent(httpResponse.getRequest(), EventType.RESPONSE, 2); + result.set(STATUS_CODE, httpResponse.getStatusCode()); + result.set(DURATION, duration); + + return result; + } + + static GcsJsonApiEvent getRequestStartedEvent(HttpRequest request) { + return new GcsJsonApiEvent(request, EventType.STARTED); + } + + static GcsJsonApiEvent getExceptionEvent(HttpRequest httpRequest) { + return new GcsJsonApiEvent(httpRequest, EventType.EXCEPTION); + } + + static GcsJsonApiEvent getBackoffEvent( + HttpRequest request, @Nonnegative long backOffTime, @Nonnegative int retryCount) { + return new GcsJsonApiEvent(request, EventType.BACKOFF, 2) + .set(BACKOFFTIME, backOffTime) + .set(RETRYCOUNT, retryCount); + } + + @VisibleForTesting + GcsJsonApiEvent(@Nonnull HttpRequest request, EventType eventType) { + this.eventType = eventType; + this.context = request.getUrl(); + this.method = request.getRequestMethod(); + } + + EventType getEventType() { + return eventType; + } + + Object getContext() { + return context; + } + + String getMethod() { + return method; + } + + Object getProperty(String key) { + return properties == null ? null : properties.get(key); + } + + private GcsJsonApiEvent(HttpRequest request, EventType eventType, int capacity) { + this(request, eventType); + this.properties = new HashMap<>(capacity, 1); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("method", method) + .add("type", eventType) + .add("properties", properties) + .add("context", context) + .toString(); + } + + private GcsJsonApiEvent set(String key, Object value) { + checkArgument(properties != null, "properties cannot be null"); + + this.properties.put(key, value); + return this; + } +} diff --git a/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java b/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java index 13a41b614f..973b0a7561 100644 --- a/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java +++ b/util/src/main/java/com/google/cloud/hadoop/util/GoogleCloudStorageEventBus.java @@ -25,6 +25,10 @@ /** Event Bus class */ public class GoogleCloudStorageEventBus { + public static void postGcsJsonApiEvent(GcsJsonApiEvent gcsJsonApiEvent) { + eventBus.post(gcsJsonApiEvent); + } + /** Hold the instance of the event bus here */ private static EventBus eventBus = new EventBus(); diff --git a/util/src/main/java/com/google/cloud/hadoop/util/RequestTracker.java b/util/src/main/java/com/google/cloud/hadoop/util/RequestTracker.java new file mode 100644 index 0000000000..1658816a34 --- /dev/null +++ b/util/src/main/java/com/google/cloud/hadoop/util/RequestTracker.java @@ -0,0 +1,127 @@ +/* + * Copyright 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.util; + +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpResponse; +import com.google.common.base.MoreObjects; +import com.google.common.base.Stopwatch; +import com.google.common.flogger.GoogleLogger; +import java.util.concurrent.TimeUnit; + +class RequestTracker { + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + private static final long LOGGING_THRESHOLD = 200; + private Stopwatch stopWatch; + private Object context; + private int retryCount; + private long backOffTime; + private HttpRequest request; + private final long startTime = System.currentTimeMillis(); + + protected RequestTracker() {} + + public static RequestTracker create(HttpRequest request) { + return new RequestTracker().init(request); + } + + void trackResponse(HttpResponse response) { + // The response might have been already tracked. For eg. if we get an unsuccessful response and + // it given up after the configured retries, RetryHttpRequestInitializer response interceptor + // will also get called. + if (stopWatch.isRunning()) { + postToEventQueue(GcsJsonApiEvent.getResponseEvent(response, stopWatch.elapsed().toMillis())); + stopTracking(); + } + + if (retryCount != 0) { + // Change to minute + logger.atInfo().atMostEvery(10, TimeUnit.SECONDS).log( + "Operation completed after retries with code '%s'. %s", response.getStatusCode(), this); + } + } + + void trackIOException() { + stopTracking(); + postToEventQueue(GcsJsonApiEvent.getExceptionEvent(request)); + } + + void trackUnsuccessfulResponseHandler(HttpResponse response) { + stopTracking(); + postToEventQueue(GcsJsonApiEvent.getResponseEvent(response, stopWatch.elapsed().toMillis())); + } + + void trackBackOffCompleted(long backOffStartTime) { + long diff = System.currentTimeMillis() - backOffStartTime; + postToEventQueue(GcsJsonApiEvent.getBackoffEvent(request, diff, retryCount)); + backOffTime += diff; + } + + void trackRetryStarted() { + stopWatch.reset(); + stopWatch.start(); + retryCount++; + } + + void trackRetrySkipped(boolean hasResponse) { + if (!hasResponse && this.retryCount != 0) { + logger.atInfo().atMostEvery(10, TimeUnit.SECONDS).log( + "Retry skipped after %s retries. context=%s", retryCount, this); + } + } + + protected void postToEventQueue(GcsJsonApiEvent event) { + GoogleCloudStorageEventBus.postGcsJsonApiEvent(event); + } + + protected RequestTracker init(HttpRequest request) { + stopWatch = Stopwatch.createStarted(); + context = request.getUrl(); + this.request = request; + + postToEventQueue(GcsJsonApiEvent.getRequestStartedEvent(request)); + + return this; + } + + private void stopTracking() { + if (stopWatch.isRunning()) { + stopWatch.stop(); + + if (stopWatch.elapsed().toMillis() > LOGGING_THRESHOLD) { + logger.atInfo().atMostEvery(10, TimeUnit.SECONDS).log( + "Detected high latency for %s. duration=%s", + request.getUrl(), stopWatch.elapsed().toMillis()); + } + } else { + // Control can reach here only in case of a bug. Did not want to add an assert due to huge + // blast radius. + logger.atWarning().atMostEvery(1, TimeUnit.MINUTES).log( + "Can stop only an already executing request. details=%s", this); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("retryCount", retryCount) + .add("totalBackoffTime", backOffTime) + .add("context", context) + .add("elapsed", System.currentTimeMillis() - startTime) + .toString(); + } +} diff --git a/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java b/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java index 07c48a668a..e3c8f105bb 100644 --- a/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java +++ b/util/src/main/java/com/google/cloud/hadoop/util/RetryHttpInitializer.java @@ -80,14 +80,17 @@ public void initialize(HttpRequest request) throws IOException { credentials.initialize(request); } + RequestTracker tracker = getRequestTracker(request); + request // Request will be retried if server errors (5XX) or I/O errors are encountered. .setNumberOfRetries(options.getMaxRequestRetries()) // Set the timeout configurations. .setConnectTimeout(toIntExact(options.getConnectTimeout().toMillis())) .setReadTimeout(toIntExact(options.getReadTimeout().toMillis())) - .setUnsuccessfulResponseHandler(new UnsuccessfulResponseHandler(credentials)) - .setIOExceptionHandler(new IoExceptionHandler()); + .setUnsuccessfulResponseHandler(new UnsuccessfulResponseHandler(credentials, tracker)) + .setIOExceptionHandler(new IoExceptionHandler(tracker)) + .setResponseInterceptor(tracker::trackResponse); HttpHeaders headers = request.getHeaders(); if (isNullOrEmpty(headers.getUserAgent()) && !isNullOrEmpty(options.getDefaultUserAgent())) { @@ -100,6 +103,10 @@ public void initialize(HttpRequest request) throws IOException { request.setInterceptor(new InvocationIdInterceptor(request.getInterceptor())); } + protected RequestTracker getRequestTracker(HttpRequest request) { + return RequestTracker.create(request); + } + public Credentials getCredentials() { return credentials == null ? null : credentials.getCredentials(); } @@ -151,18 +158,21 @@ private static class UnsuccessfulResponseHandler implements HttpUnsuccessfulResp private final HttpCredentialsAdapter credentials; private final HttpBackOffUnsuccessfulResponseHandler delegate; + private final RequestTracker tracker; - public UnsuccessfulResponseHandler(HttpCredentialsAdapter credentials) { + public UnsuccessfulResponseHandler(HttpCredentialsAdapter credentials, RequestTracker tracker) { this.credentials = credentials; this.delegate = new HttpBackOffUnsuccessfulResponseHandler(BACKOFF_BUILDER.build()) .setBackOffRequired(BACK_OFF_REQUIRED); + this.tracker = tracker; } @Override public boolean handleResponse(HttpRequest request, HttpResponse response, boolean supportsRetry) throws IOException { logResponseCode(request, response); + tracker.trackUnsuccessfulResponseHandler(response); if (credentials != null && credentials.handleResponse(request, response, supportsRetry)) { // If credentials decides it can handle it, the return code or message indicated something @@ -170,11 +180,16 @@ public boolean handleResponse(HttpRequest request, HttpResponse response, boolea return true; } + long backOffStartTime = System.currentTimeMillis(); if (delegate.handleResponse(request, response, supportsRetry)) { + tracker.trackBackOffCompleted(backOffStartTime); // Otherwise, we defer to the judgement of our internal backoff handler. + tracker.trackRetryStarted(); return true; } + tracker.trackRetrySkipped(true); + escapeRedirectPath(request, response); return false; @@ -224,10 +239,12 @@ private void escapeRedirectPath(HttpRequest request, HttpResponse response) { private static class IoExceptionHandler implements HttpIOExceptionHandler { private final HttpIOExceptionHandler delegate; + private final RequestTracker tracker; - public IoExceptionHandler() { + public IoExceptionHandler(RequestTracker tracker) { // Retry IOExceptions such as "socket timed out" of "insufficient bytes written" with backoff. this.delegate = new HttpBackOffIOExceptionHandler(BACKOFF_BUILDER.build()); + this.tracker = tracker; } @Override @@ -236,7 +253,20 @@ public boolean handleIOException(HttpRequest httpRequest, boolean supportsRetry) // We sadly don't get anything helpful to see if this is something we want to log. // As a result we'll turn down the logging level to debug. logger.atFine().log("Encountered an IOException when accessing URL %s", httpRequest.getUrl()); - return delegate.handleIOException(httpRequest, supportsRetry); + tracker.trackIOException(); + + long backoffStartTime = System.currentTimeMillis(); + boolean result = delegate.handleIOException(httpRequest, supportsRetry); + + tracker.trackBackOffCompleted(backoffStartTime); + + if (result) { + tracker.trackRetryStarted(); + } else { + tracker.trackRetrySkipped(false); + } + + return result; } } } diff --git a/util/src/test/java/com/google/cloud/hadoop/util/GcsJsonApiEventTest.java b/util/src/test/java/com/google/cloud/hadoop/util/GcsJsonApiEventTest.java new file mode 100644 index 0000000000..41d221da26 --- /dev/null +++ b/util/src/test/java/com/google/cloud/hadoop/util/GcsJsonApiEventTest.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.util; + +import static com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.emptyResponse; +import static com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.mockTransport; +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpResponse; +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class GcsJsonApiEventTest { + private static final String URL = "http://fake-url.com"; + + @Test + public void testGetResponseEvent() throws IOException { + int duration = Math.abs(ThreadLocalRandom.current().nextInt(1000000)); + GcsJsonApiEvent event = GcsJsonApiEvent.getResponseEvent(getResponse(), duration); + + assertThat(event.getContext().toString()).isEqualTo(URL); + assertThat(event.getMethod()).isEqualTo("GET"); + assertThat(event.getEventType()).isEqualTo(GcsJsonApiEvent.EventType.RESPONSE); + assertThat(event.getProperty(GcsJsonApiEvent.DURATION)).isEqualTo(duration); + } + + @Test + public void testGetExceptionEvent() throws IOException { + GcsJsonApiEvent event = GcsJsonApiEvent.getExceptionEvent(getResponse().getRequest()); + + assertThat(event.getContext().toString()).isEqualTo(URL); + assertThat(event.getMethod()).isEqualTo("GET"); + assertThat(event.getEventType()).isEqualTo(GcsJsonApiEvent.EventType.EXCEPTION); + } + + private HttpResponse getResponse() throws IOException { + return mockTransport(emptyResponse(200)) + .createRequestFactory() + .buildGetRequest(new GenericUrl(URL)) + .execute(); + } + + @Test + public void testGetNonExistingProperty() throws IOException { + GcsJsonApiEvent event = GcsJsonApiEvent.getResponseEvent(getResponse(), 1); + assertThat(event.getProperty("foo")).isNull(); + } +} diff --git a/util/src/test/java/com/google/cloud/hadoop/util/RequestTrackerTest.java b/util/src/test/java/com/google/cloud/hadoop/util/RequestTrackerTest.java new file mode 100644 index 0000000000..7f2c4f42de --- /dev/null +++ b/util/src/test/java/com/google/cloud/hadoop/util/RequestTrackerTest.java @@ -0,0 +1,146 @@ +/* + * Copyright 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.util; + +import static com.google.cloud.hadoop.util.TestRequestTracker.ExpectedEventDetails; +import static com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.emptyResponse; +import static com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.mockTransport; + +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpResponse; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import junit.framework.TestCase; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class RequestTrackerTest extends TestCase { + private static final String URL = "http://fake-url.com"; + public static final int STATUS_CODE = 200; + private TestRequestTracker tracker; + private HttpResponse response; + private HttpRequest request; + + @Before + public void setup() throws IOException { + this.request = getHttpRequest(STATUS_CODE); + this.response = request.execute(); + this.tracker = new TestRequestTracker(); + tracker.init(response.getRequest()); + } + + @Test + public void testOnResponse() throws IOException { + tracker.verifyEvents(List.of(ExpectedEventDetails.getStarted(URL))); + tracker.trackResponse(response); + tracker.verifyEvents( + List.of( + ExpectedEventDetails.getStarted(URL), + ExpectedEventDetails.getResponse(URL, STATUS_CODE))); + } + + @Test + public void testOnIOException() throws IOException { + tracker.trackIOException(); + tracker.verifyEvents( + List.of(ExpectedEventDetails.getStarted(URL), ExpectedEventDetails.getException(URL))); + } + + @Test + public void testOnUnsuccessfulResponseHandler() { + tracker.trackUnsuccessfulResponseHandler(response); + tracker.verifyEvents( + List.of( + ExpectedEventDetails.getStarted(URL), + ExpectedEventDetails.getResponse(URL, STATUS_CODE))); + } + + @Test + public void testBackOff() { + tracker.trackUnsuccessfulResponseHandler(response); + int backOff1 = doBackOffRandom(); + tracker.verifyEvents( + List.of( + ExpectedEventDetails.getStarted(URL), + ExpectedEventDetails.getResponse(URL, STATUS_CODE), + ExpectedEventDetails.getBackoff(URL, 0, backOff1))); + + int backOff2 = doBackOffRandom(); + tracker.verifyEvents( + List.of( + ExpectedEventDetails.getStarted(URL), + ExpectedEventDetails.getResponse(URL, STATUS_CODE), + ExpectedEventDetails.getBackoff(URL, 0, backOff1), + ExpectedEventDetails.getBackoff(URL, 0, backOff2))); + } + + private int doBackOffRandom() { + int backoffTime = ThreadLocalRandom.current().nextInt(1, 20000); + tracker.trackBackOffCompleted(System.currentTimeMillis() - backoffTime); + return backoffTime; + } + + @Test + public void testRetryEvents() { + tracker.trackUnsuccessfulResponseHandler(response); + tracker.trackRetryStarted(); + int backOff1 = doBackOffRandom(); + tracker.verifyEvents( + List.of( + ExpectedEventDetails.getStarted(URL), + ExpectedEventDetails.getResponse(URL, STATUS_CODE), + ExpectedEventDetails.getBackoff(URL, 1, backOff1))); + + tracker.trackUnsuccessfulResponseHandler(response); + tracker.trackRetryStarted(); + int backOff2 = doBackOffRandom(); + tracker.verifyEvents( + List.of( + ExpectedEventDetails.getStarted(URL), + ExpectedEventDetails.getResponse(URL, STATUS_CODE), + ExpectedEventDetails.getBackoff(URL, 1, backOff1), + ExpectedEventDetails.getResponse(URL, STATUS_CODE), + ExpectedEventDetails.getBackoff(URL, 2, backOff2))); + } + + @Test + public void testRetrySkipped() { + tracker.trackUnsuccessfulResponseHandler(response); + tracker.trackRetryStarted(); + int backOff1 = doBackOffRandom(); + tracker.trackUnsuccessfulResponseHandler(response); + tracker.trackRetrySkipped(true); + + tracker.verifyEvents( + List.of( + ExpectedEventDetails.getStarted(URL), + ExpectedEventDetails.getResponse(URL, STATUS_CODE), + ExpectedEventDetails.getBackoff(URL, 1, backOff1), + ExpectedEventDetails.getResponse(URL, STATUS_CODE))); + } + + private static HttpRequest getHttpRequest(int statusCode) throws IOException { + return mockTransport(emptyResponse(statusCode)) + .createRequestFactory() + .buildGetRequest(new GenericUrl(URL)); + } +} diff --git a/util/src/test/java/com/google/cloud/hadoop/util/RetryHttpInitializerTest.java b/util/src/test/java/com/google/cloud/hadoop/util/RetryHttpInitializerTest.java index 39408cd954..1152e16fbd 100644 --- a/util/src/test/java/com/google/cloud/hadoop/util/RetryHttpInitializerTest.java +++ b/util/src/test/java/com/google/cloud/hadoop/util/RetryHttpInitializerTest.java @@ -16,6 +16,7 @@ package com.google.cloud.hadoop.util; +import static com.google.cloud.hadoop.util.TestRequestTracker.ExpectedEventDetails; import static com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.emptyResponse; import static com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.inputStreamResponse; import static com.google.cloud.hadoop.util.testing.MockHttpTransportHelper.mockTransport; @@ -37,6 +38,8 @@ import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.time.Duration; +import java.util.List; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -47,6 +50,14 @@ */ @RunWith(JUnit4.class) public class RetryHttpInitializerTest { + public static final String URL = "http://fake-url.com"; + private TestRequestTracker requestTracker; + + @Before + public void beforeTest() { + this.requestTracker = new TestRequestTracker(); + } + @Test public void testConstructorNullCredentials() { createRetryHttpInitializer(/* credentials= */ null); @@ -59,7 +70,7 @@ public void successfulRequest_authenticated() throws IOException { mockTransport(emptyResponse(200)) .createRequestFactory(createRetryHttpInitializer(new FakeCredentials(authHeaderValue))); - HttpRequest req = requestFactory.buildGetRequest(new GenericUrl("http://fake-url.com")); + HttpRequest req = requestFactory.buildGetRequest(new GenericUrl(URL)); assertThat(req.getHeaders()) .containsAtLeast( @@ -73,6 +84,9 @@ public void successfulRequest_authenticated() throws IOException { assertThat((String) req.getHeaders().get(InvocationIdInterceptor.GOOG_API_CLIENT)) .contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX); assertThat(res.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_OK); + + requestTracker.verifyEvents( + List.of(ExpectedEventDetails.getStarted(URL), ExpectedEventDetails.getResponse(URL, 200))); } @Test @@ -82,7 +96,7 @@ public void forbiddenResponse_failsWithoutRetries() throws IOException { mockTransport(emptyResponse(403)) .createRequestFactory(createRetryHttpInitializer(new FakeCredentials(authHeaderValue))); - HttpRequest req = requestFactory.buildGetRequest(new GenericUrl("http://fake-url.com")); + HttpRequest req = requestFactory.buildGetRequest(new GenericUrl(URL)); assertThat(req.getHeaders()) .containsAtLeast( @@ -94,6 +108,11 @@ public void forbiddenResponse_failsWithoutRetries() throws IOException { assertThat((String) req.getHeaders().get(InvocationIdInterceptor.GOOG_API_CLIENT)) .contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX); assertThat(thrown.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_FORBIDDEN); + + requestTracker.verifyEvents( + List.of( + TestRequestTracker.ExpectedEventDetails.getStarted(URL), + ExpectedEventDetails.getResponse(URL, 403))); } @Test @@ -113,7 +132,7 @@ private void errorCodeResponse_succeedsAfterRetries(int statusCode) throws Excep mockTransport(emptyResponse(statusCode), emptyResponse(statusCode), emptyResponse(200)) .createRequestFactory(createRetryHttpInitializer(new FakeCredentials(authHeaderValue))); - HttpRequest req = requestFactory.buildGetRequest(new GenericUrl("http://fake-url.com")); + HttpRequest req = requestFactory.buildGetRequest(new GenericUrl(URL)); assertThat(req.getHeaders()) .containsAtLeast( @@ -126,6 +145,60 @@ private void errorCodeResponse_succeedsAfterRetries(int statusCode) throws Excep .contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX); assertThat(res).isNotNull(); assertThat(res.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_OK); + + requestTracker.verifyEvents( + List.of( + ExpectedEventDetails.getStarted(URL), + ExpectedEventDetails.getResponse(URL, statusCode), + ExpectedEventDetails.getBackoff(URL, 0), + ExpectedEventDetails.getResponse(URL, statusCode), + ExpectedEventDetails.getBackoff(URL, 1), + ExpectedEventDetails.getResponse(URL, 200))); + } + + @Test + public void errorCodeResponse_failsAfterMaxRetries() throws Exception { + int statusCode = 429; + String authHeaderValue = "Bearer: y2.WAKiHahzxGS_a1bd40RjNUF"; + HttpRequestFactory requestFactory = + mockTransport( + emptyResponse(statusCode), + emptyResponse(statusCode), + emptyResponse(statusCode), + emptyResponse(statusCode), + emptyResponse(statusCode), + emptyResponse(statusCode), + emptyResponse(statusCode)) + .createRequestFactory(createRetryHttpInitializer(new FakeCredentials(authHeaderValue))); + + HttpRequest req = requestFactory.buildGetRequest(new GenericUrl(URL)); + + assertThat(req.getHeaders()) + .containsAtLeast( + "user-agent", ImmutableList.of("foo-user-agent"), + "header-key", "header-value", + "authorization", ImmutableList.of(authHeaderValue)); + + try { + HttpResponse res = req.execute(); + } catch (HttpResponseException exception) { + // Ignore. Expected. + } + + requestTracker.verifyEvents( + List.of( + ExpectedEventDetails.getStarted(URL), + ExpectedEventDetails.getResponse(URL, statusCode), + ExpectedEventDetails.getBackoff(URL, 0), + ExpectedEventDetails.getResponse(URL, statusCode), + ExpectedEventDetails.getBackoff(URL, 1), + ExpectedEventDetails.getResponse(URL, statusCode), + ExpectedEventDetails.getBackoff(URL, 2), + ExpectedEventDetails.getResponse(URL, statusCode), + ExpectedEventDetails.getBackoff(URL, 3), + ExpectedEventDetails.getResponse(URL, statusCode), + ExpectedEventDetails.getBackoff(URL, 4), + ExpectedEventDetails.getResponse(URL, statusCode))); } @Test @@ -140,7 +213,7 @@ public void ioExceptionResponse_succeedsAfterRetries() throws Exception { emptyResponse(200)) .createRequestFactory(createRetryHttpInitializer(new FakeCredentials(authHeaderValue))); - HttpRequest req = requestFactory.buildGetRequest(new GenericUrl("http://fake-url.com")); + HttpRequest req = requestFactory.buildGetRequest(new GenericUrl(URL)); assertThat(req.getHeaders()) .containsAtLeast( @@ -153,10 +226,17 @@ public void ioExceptionResponse_succeedsAfterRetries() throws Exception { .contains(InvocationIdInterceptor.GCCL_INVOCATION_ID_PREFIX); assertThat(res).isNotNull(); assertThat(res.getStatusCode()).isEqualTo(HttpStatusCodes.STATUS_CODE_OK); + + // TODO: For some reason the IOException handler is not getting called. Check why that is the + // case. + requestTracker.verifyEvents( + List.of( + TestRequestTracker.ExpectedEventDetails.getStarted(URL), + TestRequestTracker.ExpectedEventDetails.getResponse(URL, 200))); } - private static RetryHttpInitializer createRetryHttpInitializer(Credentials credentials) { - return new RetryHttpInitializer( + private TestRetryHttpInitializer createRetryHttpInitializer(Credentials credentials) { + return new TestRetryHttpInitializer( credentials, RetryHttpInitializerOptions.builder() .setDefaultUserAgent("foo-user-agent") @@ -166,4 +246,23 @@ private static RetryHttpInitializer createRetryHttpInitializer(Credentials crede .setReadTimeout(Duration.ofSeconds(5)) .build()); } + + // Helper class which help provide a custom test implementation of RequestTracker + private class TestRetryHttpInitializer extends RetryHttpInitializer { + private boolean isInitialized; + + public TestRetryHttpInitializer(Credentials credentials, RetryHttpInitializerOptions build) { + super(credentials, build); + } + + @Override + protected RequestTracker getRequestTracker(HttpRequest request) { + if (!this.isInitialized) { + requestTracker.init(request); + this.isInitialized = true; + } + + return requestTracker; + } + } } diff --git a/util/src/test/java/com/google/cloud/hadoop/util/TestRequestTracker.java b/util/src/test/java/com/google/cloud/hadoop/util/TestRequestTracker.java new file mode 100644 index 0000000000..fd8341b5b5 --- /dev/null +++ b/util/src/test/java/com/google/cloud/hadoop/util/TestRequestTracker.java @@ -0,0 +1,120 @@ +/* + * Copyright 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.util; + +import static com.google.common.truth.Truth.assertThat; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestRequestTracker extends RequestTracker { + private final List events = new ArrayList<>(); + + public List getEvents() { + return events; + } + + @Override + protected void postToEventQueue(GcsJsonApiEvent event) { + this.events.add(event); + } + + void verifyEvents(List expectedEvents) { + List actualEvents = getEvents(); + assertThat(actualEvents.size()).isEqualTo(expectedEvents.size()); + + for (int i = 0; i < actualEvents.size(); i++) { + GcsJsonApiEvent actual = actualEvents.get(i); + ExpectedEventDetails expected = expectedEvents.get(i); + + assertThat(actual.getEventType()).isEqualTo(expected.eventType); + assertThat(actual.getContext().toString()).isEqualTo(expected.context.toString()); + + GcsJsonApiEvent.EventType eventType = actual.getEventType(); + if (eventType == GcsJsonApiEvent.EventType.RESPONSE) { + verifyNotEmpty(actual, GcsJsonApiEvent.DURATION); + } + + if (eventType == GcsJsonApiEvent.EventType.BACKOFF) { + verifyNotEmpty(actual, GcsJsonApiEvent.BACKOFFTIME); + } + + for (String key : expected.properties.keySet()) { + if (key.equals(GcsJsonApiEvent.BACKOFFTIME)) { + long backOffTime = (long) actual.getProperty(key); + int expectedBackoffTime = (int) expected.properties.get(GcsJsonApiEvent.BACKOFFTIME); + assertThat(backOffTime).isAtLeast(expectedBackoffTime); + // Adding a buffer of 10 seconds. If this is not sufficient increase the threshold or + // remove this check. + assertThat(backOffTime).isLessThan(expectedBackoffTime + 10); + } else { + assertThat(actual.getProperty(key)).isEqualTo(expected.properties.get(key)); + } + } + } + } + + private void verifyNotEmpty(GcsJsonApiEvent actual, String duration) { + assertThat(actual.getProperty(duration)).isNotNull(); + } + + public static class ExpectedEventDetails { + final GcsJsonApiEvent.EventType eventType; + final String context; + Map properties = new HashMap<>(); + + private ExpectedEventDetails(GcsJsonApiEvent.EventType eventType, String url) { + this.eventType = eventType; + this.context = url; + } + + public static ExpectedEventDetails getStarted(String url) { + return new ExpectedEventDetails(GcsJsonApiEvent.EventType.STARTED, url); + } + + public static ExpectedEventDetails getResponse(String url, int statusCode) { + ExpectedEventDetails result = + new ExpectedEventDetails(GcsJsonApiEvent.EventType.RESPONSE, url); + result.properties.put(GcsJsonApiEvent.STATUS_CODE, statusCode); + + return result; + } + + public static ExpectedEventDetails getBackoff(String url, int retryCount) { + ExpectedEventDetails result = + new ExpectedEventDetails(GcsJsonApiEvent.EventType.BACKOFF, url); + result.properties.put(GcsJsonApiEvent.RETRYCOUNT, retryCount); + + return result; + } + + public static ExpectedEventDetails getBackoff(String url, int retryCount, int backOff) { + ExpectedEventDetails result = + new ExpectedEventDetails(GcsJsonApiEvent.EventType.BACKOFF, url); + result.properties.put(GcsJsonApiEvent.RETRYCOUNT, retryCount); + result.properties.put(GcsJsonApiEvent.BACKOFFTIME, backOff); + + return result; + } + + public static ExpectedEventDetails getException(String url) { + return new ExpectedEventDetails(GcsJsonApiEvent.EventType.EXCEPTION, url); + } + } +}