Skip to content

Commit

Permalink
Track latency, retry and backoff time of GCS JSON API requests (Googl…
Browse files Browse the repository at this point in the history
…eCloudDataproc#1207)

* Track latency, retry and backoff time of GCS JSON API requests
  • Loading branch information
arunkumarchacko authored Jul 15, 2024
1 parent 009423d commit 1ba5bc2
Show file tree
Hide file tree
Showing 8 changed files with 723 additions and 11 deletions.
119 changes: 119 additions & 0 deletions util/src/main/java/com/google/cloud/hadoop/util/GcsJsonApiEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.util;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;

class GcsJsonApiEvent {
public enum EventType {
BACKOFF,
STARTED,
RETRYSKIPPED,
EXCEPTION,
RESPONSE,
}

public static final String BACKOFFTIME = "BACKOFFTIME";
public static final String RETRYCOUNT = "RETRYCOUNT";
public static final String STATUS_CODE = "STATUS_CODE";
public static final String DURATION = "DURATION";
private final EventType eventType;

// Setting this to object so that we do not have to create the URL string.
private final Object context;

private final String method;

private Map<String, Object> properties;

static GcsJsonApiEvent getResponseEvent(HttpResponse httpResponse, @Nonnegative long duration) {
GcsJsonApiEvent result = new GcsJsonApiEvent(httpResponse.getRequest(), EventType.RESPONSE, 2);
result.set(STATUS_CODE, httpResponse.getStatusCode());
result.set(DURATION, duration);

return result;
}

static GcsJsonApiEvent getRequestStartedEvent(HttpRequest request) {
return new GcsJsonApiEvent(request, EventType.STARTED);
}

static GcsJsonApiEvent getExceptionEvent(HttpRequest httpRequest) {
return new GcsJsonApiEvent(httpRequest, EventType.EXCEPTION);
}

static GcsJsonApiEvent getBackoffEvent(
HttpRequest request, @Nonnegative long backOffTime, @Nonnegative int retryCount) {
return new GcsJsonApiEvent(request, EventType.BACKOFF, 2)
.set(BACKOFFTIME, backOffTime)
.set(RETRYCOUNT, retryCount);
}

@VisibleForTesting
GcsJsonApiEvent(@Nonnull HttpRequest request, EventType eventType) {
this.eventType = eventType;
this.context = request.getUrl();
this.method = request.getRequestMethod();
}

EventType getEventType() {
return eventType;
}

Object getContext() {
return context;
}

String getMethod() {
return method;
}

Object getProperty(String key) {
return properties == null ? null : properties.get(key);
}

private GcsJsonApiEvent(HttpRequest request, EventType eventType, int capacity) {
this(request, eventType);
this.properties = new HashMap<>(capacity, 1);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("method", method)
.add("type", eventType)
.add("properties", properties)
.add("context", context)
.toString();
}

private GcsJsonApiEvent set(String key, Object value) {
checkArgument(properties != null, "properties cannot be null");

this.properties.put(key, value);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
/** Event Bus class */
public class GoogleCloudStorageEventBus {

public static void postGcsJsonApiEvent(GcsJsonApiEvent gcsJsonApiEvent) {
eventBus.post(gcsJsonApiEvent);
}

/** Hold the instance of the event bus here */
private static EventBus eventBus = new EventBus();

Expand Down
127 changes: 127 additions & 0 deletions util/src/main/java/com/google/cloud/hadoop/util/RequestTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2024 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.hadoop.util;

import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.common.base.MoreObjects;
import com.google.common.base.Stopwatch;
import com.google.common.flogger.GoogleLogger;
import java.util.concurrent.TimeUnit;

class RequestTracker {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private static final long LOGGING_THRESHOLD = 200;
private Stopwatch stopWatch;
private Object context;
private int retryCount;
private long backOffTime;
private HttpRequest request;
private final long startTime = System.currentTimeMillis();

protected RequestTracker() {}

public static RequestTracker create(HttpRequest request) {
return new RequestTracker().init(request);
}

void trackResponse(HttpResponse response) {
// The response might have been already tracked. For eg. if we get an unsuccessful response and
// it given up after the configured retries, RetryHttpRequestInitializer response interceptor
// will also get called.
if (stopWatch.isRunning()) {
postToEventQueue(GcsJsonApiEvent.getResponseEvent(response, stopWatch.elapsed().toMillis()));
stopTracking();
}

if (retryCount != 0) {
// Change to minute
logger.atInfo().atMostEvery(10, TimeUnit.SECONDS).log(
"Operation completed after retries with code '%s'. %s", response.getStatusCode(), this);
}
}

void trackIOException() {
stopTracking();
postToEventQueue(GcsJsonApiEvent.getExceptionEvent(request));
}

void trackUnsuccessfulResponseHandler(HttpResponse response) {
stopTracking();
postToEventQueue(GcsJsonApiEvent.getResponseEvent(response, stopWatch.elapsed().toMillis()));
}

void trackBackOffCompleted(long backOffStartTime) {
long diff = System.currentTimeMillis() - backOffStartTime;
postToEventQueue(GcsJsonApiEvent.getBackoffEvent(request, diff, retryCount));
backOffTime += diff;
}

void trackRetryStarted() {
stopWatch.reset();
stopWatch.start();
retryCount++;
}

void trackRetrySkipped(boolean hasResponse) {
if (!hasResponse && this.retryCount != 0) {
logger.atInfo().atMostEvery(10, TimeUnit.SECONDS).log(
"Retry skipped after %s retries. context=%s", retryCount, this);
}
}

protected void postToEventQueue(GcsJsonApiEvent event) {
GoogleCloudStorageEventBus.postGcsJsonApiEvent(event);
}

protected RequestTracker init(HttpRequest request) {
stopWatch = Stopwatch.createStarted();
context = request.getUrl();
this.request = request;

postToEventQueue(GcsJsonApiEvent.getRequestStartedEvent(request));

return this;
}

private void stopTracking() {
if (stopWatch.isRunning()) {
stopWatch.stop();

if (stopWatch.elapsed().toMillis() > LOGGING_THRESHOLD) {
logger.atInfo().atMostEvery(10, TimeUnit.SECONDS).log(
"Detected high latency for %s. duration=%s",
request.getUrl(), stopWatch.elapsed().toMillis());
}
} else {
// Control can reach here only in case of a bug. Did not want to add an assert due to huge
// blast radius.
logger.atWarning().atMostEvery(1, TimeUnit.MINUTES).log(
"Can stop only an already executing request. details=%s", this);
}
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("retryCount", retryCount)
.add("totalBackoffTime", backOffTime)
.add("context", context)
.add("elapsed", System.currentTimeMillis() - startTime)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,17 @@ public void initialize(HttpRequest request) throws IOException {
credentials.initialize(request);
}

RequestTracker tracker = getRequestTracker(request);

request
// Request will be retried if server errors (5XX) or I/O errors are encountered.
.setNumberOfRetries(options.getMaxRequestRetries())
// Set the timeout configurations.
.setConnectTimeout(toIntExact(options.getConnectTimeout().toMillis()))
.setReadTimeout(toIntExact(options.getReadTimeout().toMillis()))
.setUnsuccessfulResponseHandler(new UnsuccessfulResponseHandler(credentials))
.setIOExceptionHandler(new IoExceptionHandler());
.setUnsuccessfulResponseHandler(new UnsuccessfulResponseHandler(credentials, tracker))
.setIOExceptionHandler(new IoExceptionHandler(tracker))
.setResponseInterceptor(tracker::trackResponse);

HttpHeaders headers = request.getHeaders();
if (isNullOrEmpty(headers.getUserAgent()) && !isNullOrEmpty(options.getDefaultUserAgent())) {
Expand All @@ -100,6 +103,10 @@ public void initialize(HttpRequest request) throws IOException {
request.setInterceptor(new InvocationIdInterceptor(request.getInterceptor()));
}

protected RequestTracker getRequestTracker(HttpRequest request) {
return RequestTracker.create(request);
}

public Credentials getCredentials() {
return credentials == null ? null : credentials.getCredentials();
}
Expand Down Expand Up @@ -151,30 +158,38 @@ private static class UnsuccessfulResponseHandler implements HttpUnsuccessfulResp

private final HttpCredentialsAdapter credentials;
private final HttpBackOffUnsuccessfulResponseHandler delegate;
private final RequestTracker tracker;

public UnsuccessfulResponseHandler(HttpCredentialsAdapter credentials) {
public UnsuccessfulResponseHandler(HttpCredentialsAdapter credentials, RequestTracker tracker) {
this.credentials = credentials;
this.delegate =
new HttpBackOffUnsuccessfulResponseHandler(BACKOFF_BUILDER.build())
.setBackOffRequired(BACK_OFF_REQUIRED);
this.tracker = tracker;
}

@Override
public boolean handleResponse(HttpRequest request, HttpResponse response, boolean supportsRetry)
throws IOException {
logResponseCode(request, response);
tracker.trackUnsuccessfulResponseHandler(response);

if (credentials != null && credentials.handleResponse(request, response, supportsRetry)) {
// If credentials decides it can handle it, the return code or message indicated something
// specific to authentication, and no backoff is desired.
return true;
}

long backOffStartTime = System.currentTimeMillis();
if (delegate.handleResponse(request, response, supportsRetry)) {
tracker.trackBackOffCompleted(backOffStartTime);
// Otherwise, we defer to the judgement of our internal backoff handler.
tracker.trackRetryStarted();
return true;
}

tracker.trackRetrySkipped(true);

escapeRedirectPath(request, response);

return false;
Expand Down Expand Up @@ -224,10 +239,12 @@ private void escapeRedirectPath(HttpRequest request, HttpResponse response) {
private static class IoExceptionHandler implements HttpIOExceptionHandler {

private final HttpIOExceptionHandler delegate;
private final RequestTracker tracker;

public IoExceptionHandler() {
public IoExceptionHandler(RequestTracker tracker) {
// Retry IOExceptions such as "socket timed out" of "insufficient bytes written" with backoff.
this.delegate = new HttpBackOffIOExceptionHandler(BACKOFF_BUILDER.build());
this.tracker = tracker;
}

@Override
Expand All @@ -236,7 +253,20 @@ public boolean handleIOException(HttpRequest httpRequest, boolean supportsRetry)
// We sadly don't get anything helpful to see if this is something we want to log.
// As a result we'll turn down the logging level to debug.
logger.atFine().log("Encountered an IOException when accessing URL %s", httpRequest.getUrl());
return delegate.handleIOException(httpRequest, supportsRetry);
tracker.trackIOException();

long backoffStartTime = System.currentTimeMillis();
boolean result = delegate.handleIOException(httpRequest, supportsRetry);

tracker.trackBackOffCompleted(backoffStartTime);

if (result) {
tracker.trackRetryStarted();
} else {
tracker.trackRetrySkipped(false);
}

return result;
}
}
}
Loading

0 comments on commit 1ba5bc2

Please sign in to comment.