Skip to content

Commit

Permalink
Add http/protobuf retry (#3983)
Browse files Browse the repository at this point in the history
* Add http/protobuf retry

* PR feedback
  • Loading branch information
jack-berg authored Dec 17, 2021
1 parent 2fb3d63 commit 6f755cc
Show file tree
Hide file tree
Showing 43 changed files with 497 additions and 223 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.otlp.internal.logs.ResourceLogsMarshaler;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpExporter;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
Expand Down Expand Up @@ -183,6 +185,16 @@ void invalidConfig() {
"Unsupported compression method. Supported compression methods include: gzip, none.");
}

@Test
void testBuilderDelegate() {
assertThatCode(
() ->
OkHttpExporterBuilder.getDelegateBuilder(
OtlpHttpLogExporterBuilder.class, OtlpHttpLogExporter.builder())
.setRetryPolicy(RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}

@Test
void testExportUncompressed() {
server.enqueue(successResponse());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.exporter.otlp.internal.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpExporter;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
Expand Down Expand Up @@ -188,6 +190,16 @@ void invalidConfig() {
.hasMessage("preferredTemporality");
}

@Test
void testBuilderDelegate() {
assertThatCode(
() ->
OkHttpExporterBuilder.getDelegateBuilder(
OtlpHttpMetricExporterBuilder.class, OtlpHttpMetricExporter.builder())
.setRetryPolicy(RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}

@Test
void testExportUncompressed() {
server.enqueue(successResponse());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpExporter;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpExporterBuilder;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.traces.ResourceSpansMarshaler;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
Expand Down Expand Up @@ -171,6 +173,16 @@ void invalidConfig() {
"Unsupported compression method. Supported compression methods include: gzip, none.");
}

@Test
void testBuilderDelegate() {
assertThatCode(
() ->
OkHttpExporterBuilder.getDelegateBuilder(
OtlpHttpSpanExporterBuilder.class, OtlpHttpSpanExporter.builder())
.setRetryPolicy(RetryPolicy.getDefault()))
.doesNotThrowAnyException();
}

@Test
void testExportUncompressed() {
server.enqueue(successResponse());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.grpc.stub.MetadataUtils;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -120,7 +120,7 @@ public DefaultGrpcExporterBuilder<T> addHeader(String key, String value) {
}

@Override
public GrpcExporterBuilder<T> addRetryPolicy(RetryPolicy retryPolicy) {
public GrpcExporterBuilder<T> setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.grpc.ManagedChannel;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.time.Duration;
import java.util.concurrent.TimeUnit;

Expand All @@ -28,7 +28,7 @@ public interface GrpcExporterBuilder<T extends Marshaler> {

GrpcExporterBuilder<T> addHeader(String key, String value);

GrpcExporterBuilder<T> addRetryPolicy(RetryPolicy retryPolicy);
GrpcExporterBuilder<T> setRetryPolicy(RetryPolicy retryPolicy);

GrpcExporterBuilder<T> setMeterProvider(MeterProvider meterProvider);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import io.opentelemetry.exporter.otlp.internal.CodedInputStream;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
* Utilities for working with gRPC status without requiring dependencies on gRPC.
Expand All @@ -27,23 +25,6 @@ public final class GrpcStatusUtil {
public static final String GRPC_STATUS_UNAVAILABLE = "14";
public static final String GRPC_STATUS_DATA_LOSS = "15";

private static final Set<String> RETRYABLE_STATUS_CODES = new HashSet<>();

static {
RETRYABLE_STATUS_CODES.add(GrpcStatusUtil.GRPC_STATUS_CANCELLED);
RETRYABLE_STATUS_CODES.add(GrpcStatusUtil.GRPC_STATUS_DEADLINE_EXCEEDED);
RETRYABLE_STATUS_CODES.add(GrpcStatusUtil.GRPC_STATUS_RESOURCE_EXHAUSTED);
RETRYABLE_STATUS_CODES.add(GrpcStatusUtil.GRPC_STATUS_ABORTED);
RETRYABLE_STATUS_CODES.add(GrpcStatusUtil.GRPC_STATUS_OUT_OF_RANGE);
RETRYABLE_STATUS_CODES.add(GrpcStatusUtil.GRPC_STATUS_UNAVAILABLE);
RETRYABLE_STATUS_CODES.add(GrpcStatusUtil.GRPC_STATUS_DATA_LOSS);
}

/** Returns the retryable gRPC status codes. */
public static Set<String> retryableStatusCodes() {
return RETRYABLE_STATUS_CODES;
}

/** Parses the message out of a serialized gRPC Status. */
public static String getStatusMessage(byte[] serializedStatus) throws IOException {
CodedInputStream input = CodedInputStream.newInstance(serializedStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.TlsUtil;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -82,7 +83,7 @@ public static void setTrustedCertificatesPem(
*/
public static Map<String, ?> toServiceConfig(String serviceName, RetryPolicy retryPolicy) {
List<Double> retryableStatusCodes =
GrpcStatusUtil.retryableStatusCodes().stream().map(Double::parseDouble).collect(toList());
RetryUtil.retryableGrpcStatusCodes().stream().map(Double::parseDouble).collect(toList());

Map<String, Object> retryConfig = new HashMap<>();
retryConfig.put("retryableStatusCodes", retryableStatusCodes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import java.io.IOException;
Expand Down Expand Up @@ -235,7 +236,7 @@ static boolean isRetryable(Response response) {
// We don't check trailers for retry since retryable error codes always come with response
// headers, not trailers, in practice.
String grpcStatus = response.header(GRPC_STATUS);
return GrpcStatusUtil.retryableStatusCodes().contains(grpcStatus);
return RetryUtil.retryableGrpcStatusCodes().contains(grpcStatus);
}

// From grpc-java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import io.grpc.ManagedChannel;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.TlsUtil;
import io.opentelemetry.exporter.otlp.internal.okhttp.OkHttpUtil;
import io.opentelemetry.exporter.otlp.internal.okhttp.RetryInterceptor;
import io.opentelemetry.exporter.otlp.internal.retry.RetryInterceptor;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
Expand Down Expand Up @@ -109,7 +109,7 @@ public OkHttpGrpcExporterBuilder<T> addHeader(String key, String value) {
}

@Override
public GrpcExporterBuilder<T> addRetryPolicy(RetryPolicy retryPolicy) {
public GrpcExporterBuilder<T> setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.exporter.otlp.internal.ExporterMetrics;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.GrpcStatusUtil;
import io.opentelemetry.exporter.otlp.internal.retry.RetryUtil;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import java.io.IOException;
Expand Down Expand Up @@ -131,6 +132,10 @@ public CompletableResultCode shutdown() {
return result;
}

static boolean isRetryable(Response response) {
return RetryUtil.retryableHttpResponseCodes().contains(response.code());
}

private static RequestBody gzipRequestBody(RequestBody requestBody) {
return new RequestBody() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.TlsUtil;
import io.opentelemetry.exporter.otlp.internal.retry.RetryInterceptor;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
Expand All @@ -31,6 +34,7 @@ public final class OkHttpExporterBuilder<T extends Marshaler> {
private boolean compressionEnabled = false;
@Nullable private Headers.Builder headersBuilder;
@Nullable private byte[] trustedCertificatesPem;
@Nullable private RetryPolicy retryPolicy;
private MeterProvider meterProvider = MeterProvider.noop();

public OkHttpExporterBuilder(String type, String defaultEndpoint) {
Expand Down Expand Up @@ -91,6 +95,11 @@ public OkHttpExporterBuilder<T> setMeterProvider(MeterProvider meterProvider) {
return this;
}

public OkHttpExporterBuilder<T> setRetryPolicy(RetryPolicy retryPolicy) {
this.retryPolicy = retryPolicy;
return this;
}

public OkHttpExporter<T> build() {
OkHttpClient.Builder clientBuilder =
new OkHttpClient.Builder()
Expand All @@ -110,7 +119,32 @@ public OkHttpExporter<T> build() {

Headers headers = headersBuilder == null ? null : headersBuilder.build();

if (retryPolicy != null) {
clientBuilder.addInterceptor(new RetryInterceptor(retryPolicy, OkHttpExporter::isRetryable));
}

return new OkHttpExporter<>(
type, clientBuilder.build(), meterProvider, endpoint, headers, compressionEnabled);
}

/**
* Reflectively access a {@link OkHttpExporterBuilder} instance in field called "delegate" of the
* instance.
*
* @throws IllegalArgumentException if the instance does not contain a field called "delegate" of
* type {@link OkHttpExporterBuilder}
*/
public static <T> OkHttpExporterBuilder<?> getDelegateBuilder(Class<T> type, T instance) {
try {
Field field = type.getDeclaredField("delegate");
field.setAccessible(true);
Object value = field.get(instance);
if (!(value instanceof OkHttpExporterBuilder)) {
throw new IllegalArgumentException("delegate field is not type OkHttpExporterBuilder");
}
return (OkHttpExporterBuilder<?>) value;
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalArgumentException("Unable to access delegate reflectively.", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.okhttp;
package io.opentelemetry.exporter.otlp.internal.retry;

import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal;
package io.opentelemetry.exporter.otlp.internal.retry;

import com.google.auto.value.AutoValue;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal;
package io.opentelemetry.exporter.otlp.internal.retry;

import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.retry;

import io.opentelemetry.exporter.otlp.internal.grpc.GrpcStatusUtil;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public class RetryUtil {

private static final Set<String> RETRYABLE_GRPC_STATUS_CODES;
private static final Set<Integer> RETRYABLE_HTTP_STATUS_CODES =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(429, 502, 503, 504)));

static {
Set<String> retryableGrpcStatusCodes = new HashSet<>();
retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_CANCELLED);
retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_DEADLINE_EXCEEDED);
retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_RESOURCE_EXHAUSTED);
retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_ABORTED);
retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_OUT_OF_RANGE);
retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_UNAVAILABLE);
retryableGrpcStatusCodes.add(GrpcStatusUtil.GRPC_STATUS_DATA_LOSS);
RETRYABLE_GRPC_STATUS_CODES = Collections.unmodifiableSet(retryableGrpcStatusCodes);
}

private RetryUtil() {}

/** Returns the retryable gRPC status codes. */
public static Set<String> retryableGrpcStatusCodes() {
return RETRYABLE_GRPC_STATUS_CODES;
}

/** Returns the retryable HTTP status codes. */
public static Set<Integer> retryableHttpResponseCodes() {
return RETRYABLE_HTTP_STATUS_CODES;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import io.grpc.ManagedChannelBuilder;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import io.opentelemetry.exporter.otlp.internal.retry.RetryPolicy;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.skyscreamer.jsonassert.JSONAssert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal.okhttp;
package io.opentelemetry.exporter.otlp.internal.retry;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doNothing;
Expand All @@ -14,7 +14,6 @@
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.testing.junit5.server.mock.MockWebServerExtension;
import io.opentelemetry.exporter.otlp.internal.RetryPolicy;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.exporter.otlp.internal;
package io.opentelemetry.exporter.otlp.internal.retry;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down
Loading

0 comments on commit 6f755cc

Please sign in to comment.