Skip to content

Commit

Permalink
Enable reusuable_data memory mode by default (#6799)
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg authored Oct 23, 2024
1 parent 5ec1e86 commit 10bca8b
Show file tree
Hide file tree
Showing 15 changed files with 66 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URISyntaxException;
import java.util.Locale;
import java.util.function.Consumer;
import java.util.logging.Logger;

/**
* Utilities for exporter builders.
Expand All @@ -30,6 +31,8 @@
*/
public final class ExporterBuilderUtil {

private static final Logger logger = Logger.getLogger(ExporterBuilderUtil.class.getName());

/** Validate OTLP endpoint. */
public static URI validateEndpoint(String endpoint) {
URI uri;
Expand All @@ -50,7 +53,14 @@ public static URI validateEndpoint(String endpoint) {
/** Invoke the {@code memoryModeConsumer} with the configured {@link MemoryMode}. */
public static void configureExporterMemoryMode(
ConfigProperties config, Consumer<MemoryMode> memoryModeConsumer) {
String memoryModeStr = config.getString("otel.java.experimental.exporter.memory_mode");
String memoryModeStr = config.getString("otel.java.exporter.memory_mode");
if (memoryModeStr == null) {
memoryModeStr = config.getString("otel.java.experimental.exporter.memory_mode");
if (memoryModeStr != null) {
logger.warning(
"otel.java.experimental.exporter.memory_mode was set but has been replaced with otel.java.exporter.memory_mode and will be removed in a future release");
}
}
if (memoryModeStr == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public final class OtlpHttpLogRecordExporterBuilder {

private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/logs";
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.REUSABLE_DATA;

private final HttpExporterBuilder<Marshaler> delegate;
private MemoryMode memoryMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class OtlpHttpMetricExporterBuilder {

private static final AggregationTemporalitySelector DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR =
AggregationTemporalitySelector.alwaysCumulative();
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.REUSABLE_DATA;

private final HttpExporterBuilder<Marshaler> delegate;
private AggregationTemporalitySelector aggregationTemporalitySelector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public final class OtlpHttpSpanExporterBuilder {

private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/traces";
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.REUSABLE_DATA;

private final HttpExporterBuilder<Marshaler> delegate;
private MemoryMode memoryMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public final class OtlpGrpcLogRecordExporterBuilder {
private static final String DEFAULT_ENDPOINT_URL = "http://localhost:4317";
private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL);
private static final long DEFAULT_TIMEOUT_SECS = 10;
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.REUSABLE_DATA;

// Visible for testing
final GrpcExporterBuilder<Marshaler> delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public final class OtlpGrpcMetricExporterBuilder {
private static final long DEFAULT_TIMEOUT_SECS = 10;
private static final AggregationTemporalitySelector DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR =
AggregationTemporalitySelector.alwaysCumulative();
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.REUSABLE_DATA;

// Visible for testing
final GrpcExporterBuilder<Marshaler> delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class OtlpGrpcSpanExporterBuilder {
private static final String DEFAULT_ENDPOINT_URL = "http://localhost:4317";
private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL);
private static final long DEFAULT_TIMEOUT_SECS = 10;
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.REUSABLE_DATA;

// Visible for testing
final GrpcExporterBuilder<Marshaler> delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void createExporter_GrpcDefaults() {
verify(grpcBuilder, never()).setTrustedCertificates(any());
verify(grpcBuilder, never()).setClientTls(any(), any());
assertThat(grpcBuilder).extracting("delegate").extracting("retryPolicy").isNotNull();
getMemoryMode(exporter).isEqualTo(MemoryMode.IMMUTABLE_DATA);
getMemoryMode(exporter).isEqualTo(MemoryMode.REUSABLE_DATA);
}
Mockito.verifyNoInteractions(httpBuilder);
}
Expand Down Expand Up @@ -183,7 +183,7 @@ void createExporter_GrpcWithSignalConfiguration() throws CertificateEncodingExce
config.put("otel.exporter.otlp.logs.compression", "gzip");
config.put("otel.exporter.otlp.timeout", "1s");
config.put("otel.exporter.otlp.logs.timeout", "15s");
config.put("otel.java.experimental.exporter.memory_mode", "reusable_data");
config.put("otel.java.exporter.memory_mode", "immutable_data");

try (LogRecordExporter exporter =
provider.createExporter(DefaultConfigProperties.createFromMap(config))) {
Expand All @@ -196,7 +196,7 @@ void createExporter_GrpcWithSignalConfiguration() throws CertificateEncodingExce
verify(grpcBuilder).setTrustedCertificates(serverTls.certificate().getEncoded());
verify(grpcBuilder)
.setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded());
getMemoryMode(exporter).isEqualTo(MemoryMode.REUSABLE_DATA);
getMemoryMode(exporter).isEqualTo(MemoryMode.IMMUTABLE_DATA);
}
Mockito.verifyNoInteractions(httpBuilder);
}
Expand All @@ -216,7 +216,7 @@ void createExporter_HttpDefaults() {
verify(httpBuilder, never()).setTrustedCertificates(any());
verify(httpBuilder, never()).setClientTls(any(), any());
assertThat(httpBuilder).extracting("delegate").extracting("retryPolicy").isNotNull();
getMemoryMode(exporter).isEqualTo(MemoryMode.IMMUTABLE_DATA);
getMemoryMode(exporter).isEqualTo(MemoryMode.REUSABLE_DATA);
}
Mockito.verifyNoInteractions(grpcBuilder);
}
Expand Down Expand Up @@ -269,7 +269,7 @@ void createExporter_HttpWithSignalConfiguration() throws CertificateEncodingExce
config.put("otel.exporter.otlp.logs.compression", "gzip");
config.put("otel.exporter.otlp.timeout", "1s");
config.put("otel.exporter.otlp.logs.timeout", "15s");
config.put("otel.java.experimental.exporter.memory_mode", "reusable_data");
config.put("otel.java.exporter.memory_mode", "immutable_data");

try (LogRecordExporter exporter =
provider.createExporter(DefaultConfigProperties.createFromMap(config))) {
Expand All @@ -282,7 +282,7 @@ void createExporter_HttpWithSignalConfiguration() throws CertificateEncodingExce
verify(httpBuilder).setTrustedCertificates(serverTls.certificate().getEncoded());
verify(httpBuilder)
.setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded());
getMemoryMode(exporter).isEqualTo(MemoryMode.REUSABLE_DATA);
getMemoryMode(exporter).isEqualTo(MemoryMode.IMMUTABLE_DATA);
}
Mockito.verifyNoInteractions(grpcBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void createExporter_GrpcDefaults() {
verify(grpcBuilder, never()).setTrustedCertificates(any());
verify(grpcBuilder, never()).setClientTls(any(), any());
assertThat(grpcBuilder).extracting("delegate").extracting("retryPolicy").isNotNull();
assertThat(exporter.getMemoryMode()).isEqualTo(MemoryMode.IMMUTABLE_DATA);
assertThat(exporter.getMemoryMode()).isEqualTo(MemoryMode.REUSABLE_DATA);
}
Mockito.verifyNoInteractions(httpBuilder);
}
Expand Down Expand Up @@ -178,7 +178,7 @@ void createExporter_GrpcWithSignalConfiguration() throws CertificateEncodingExce
config.put("otel.exporter.otlp.metrics.compression", "gzip");
config.put("otel.exporter.otlp.timeout", "1s");
config.put("otel.exporter.otlp.metrics.timeout", "15s");
config.put("otel.java.experimental.exporter.memory_mode", "reusable_data");
config.put("otel.java.exporter.memory_mode", "immutable_data");

try (MetricExporter exporter =
provider.createExporter(DefaultConfigProperties.createFromMap(config))) {
Expand All @@ -191,7 +191,7 @@ void createExporter_GrpcWithSignalConfiguration() throws CertificateEncodingExce
verify(grpcBuilder).setTrustedCertificates(serverTls.certificate().getEncoded());
verify(grpcBuilder)
.setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded());
assertThat(exporter.getMemoryMode()).isEqualTo(MemoryMode.REUSABLE_DATA);
assertThat(exporter.getMemoryMode()).isEqualTo(MemoryMode.IMMUTABLE_DATA);
}
Mockito.verifyNoInteractions(httpBuilder);
}
Expand All @@ -212,7 +212,7 @@ void createExporter_HttpDefaults() {
verify(httpBuilder, never()).setTrustedCertificates(any());
verify(httpBuilder, never()).setClientTls(any(), any());
assertThat(httpBuilder).extracting("delegate").extracting("retryPolicy").isNotNull();
assertThat(exporter.getMemoryMode()).isEqualTo(MemoryMode.IMMUTABLE_DATA);
assertThat(exporter.getMemoryMode()).isEqualTo(MemoryMode.REUSABLE_DATA);
}
Mockito.verifyNoInteractions(grpcBuilder);
}
Expand Down Expand Up @@ -265,7 +265,7 @@ void createExporter_HttpWithSignalConfiguration() throws CertificateEncodingExce
config.put("otel.exporter.otlp.metrics.compression", "gzip");
config.put("otel.exporter.otlp.timeout", "1s");
config.put("otel.exporter.otlp.metrics.timeout", "15s");
config.put("otel.java.experimental.exporter.memory_mode", "reusable_data");
config.put("otel.java.exporter.memory_mode", "immutable_data");

try (MetricExporter exporter =
provider.createExporter(DefaultConfigProperties.createFromMap(config))) {
Expand All @@ -278,7 +278,7 @@ void createExporter_HttpWithSignalConfiguration() throws CertificateEncodingExce
verify(httpBuilder).setTrustedCertificates(serverTls.certificate().getEncoded());
verify(httpBuilder)
.setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded());
assertThat(exporter.getMemoryMode()).isEqualTo(MemoryMode.REUSABLE_DATA);
assertThat(exporter.getMemoryMode()).isEqualTo(MemoryMode.IMMUTABLE_DATA);
}
Mockito.verifyNoInteractions(grpcBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void createExporter_GrpcDefaults() {
verify(grpcBuilder, never()).setTrustedCertificates(any());
verify(grpcBuilder, never()).setClientTls(any(), any());
assertThat(grpcBuilder).extracting("delegate").extracting("retryPolicy").isNotNull();
getMemoryMode(exporter).isEqualTo(MemoryMode.IMMUTABLE_DATA);
getMemoryMode(exporter).isEqualTo(MemoryMode.REUSABLE_DATA);
}
Mockito.verifyNoInteractions(httpBuilder);
}
Expand Down Expand Up @@ -184,7 +184,7 @@ void createExporter_GrpcWithSignalConfiguration() throws CertificateEncodingExce
config.put("otel.exporter.otlp.traces.compression", "gzip");
config.put("otel.exporter.otlp.timeout", "1s");
config.put("otel.exporter.otlp.traces.timeout", "15s");
config.put("otel.java.experimental.exporter.memory_mode", "reusable_data");
config.put("otel.java.exporter.memory_mode", "immutable_data");

try (SpanExporter exporter =
provider.createExporter(DefaultConfigProperties.createFromMap(config))) {
Expand All @@ -197,7 +197,7 @@ void createExporter_GrpcWithSignalConfiguration() throws CertificateEncodingExce
verify(grpcBuilder).setTrustedCertificates(serverTls.certificate().getEncoded());
verify(grpcBuilder)
.setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded());
getMemoryMode(exporter).isEqualTo(MemoryMode.REUSABLE_DATA);
getMemoryMode(exporter).isEqualTo(MemoryMode.IMMUTABLE_DATA);
}
Mockito.verifyNoInteractions(httpBuilder);
}
Expand All @@ -217,7 +217,7 @@ void createExporter_HttpDefaults() {
verify(httpBuilder, never()).setTrustedCertificates(any());
verify(httpBuilder, never()).setClientTls(any(), any());
assertThat(httpBuilder).extracting("delegate").extracting("retryPolicy").isNotNull();
getMemoryMode(exporter).isEqualTo(MemoryMode.IMMUTABLE_DATA);
getMemoryMode(exporter).isEqualTo(MemoryMode.REUSABLE_DATA);
}
Mockito.verifyNoInteractions(grpcBuilder);
}
Expand Down Expand Up @@ -249,7 +249,7 @@ void createExporter_HttpWithGeneralConfiguration() throws CertificateEncodingExc
verify(httpBuilder)
.setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded());
assertThat(httpBuilder).extracting("delegate").extracting("retryPolicy").isNull();
getMemoryMode(exporter).isEqualTo(MemoryMode.IMMUTABLE_DATA);
getMemoryMode(exporter).isEqualTo(MemoryMode.REUSABLE_DATA);
}
Mockito.verifyNoInteractions(grpcBuilder);
}
Expand All @@ -273,7 +273,7 @@ void createExporter_HttpWithSignalConfiguration() throws CertificateEncodingExce
config.put("otel.exporter.otlp.traces.compression", "gzip");
config.put("otel.exporter.otlp.timeout", "1s");
config.put("otel.exporter.otlp.traces.timeout", "15s");
config.put("otel.java.experimental.exporter.memory_mode", "reusable_data");
config.put("otel.java.exporter.memory_mode", "immutable_data");

try (SpanExporter exporter =
provider.createExporter(DefaultConfigProperties.createFromMap(config))) {
Expand All @@ -286,7 +286,7 @@ void createExporter_HttpWithSignalConfiguration() throws CertificateEncodingExce
verify(httpBuilder).setTrustedCertificates(serverTls.certificate().getEncoded());
verify(httpBuilder)
.setClientTls(clientTls.privateKey().getEncoded(), clientTls.certificate().getEncoded());
getMemoryMode(exporter).isEqualTo(MemoryMode.REUSABLE_DATA);
getMemoryMode(exporter).isEqualTo(MemoryMode.IMMUTABLE_DATA);
}
Mockito.verifyNoInteractions(grpcBuilder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,19 @@ public class FakeTelemetryUtil {

private static final String TRACE_ID = "00000000000000000000000000abc123";
private static final String SPAN_ID = "0000000000def456";
private static final InstrumentationScopeInfo SCOPE_INFO =
InstrumentationScopeInfo.builder("testLib")
.setVersion("1.0")
.setSchemaUrl("http://url")
.build();

/** Generate a fake {@link MetricData}. */
public static MetricData generateFakeMetricData() {
long startNs = TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis());
long endNs = startNs + TimeUnit.MILLISECONDS.toNanos(900);
return ImmutableMetricData.createLongSum(
Resource.empty(),
InstrumentationScopeInfo.empty(),
SCOPE_INFO,
"name",
"description",
"1",
Expand Down Expand Up @@ -69,23 +74,15 @@ public static SpanData generateFakeSpanData() {
.setLinks(Collections.emptyList())
.setTotalRecordedLinks(0)
.setTotalRecordedEvents(0)
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("testLib")
.setVersion("1.0")
.setSchemaUrl("http://url")
.build())
.setInstrumentationScopeInfo(SCOPE_INFO)
.build();
}

/** Generate a fake {@link LogRecordData}. */
public static LogRecordData generateFakeLogRecordData() {
return TestLogRecordData.builder()
.setResource(Resource.getDefault())
.setInstrumentationScopeInfo(
InstrumentationScopeInfo.builder("testLib")
.setVersion("1.0")
.setSchemaUrl("http://url")
.build())
.setInstrumentationScopeInfo(SCOPE_INFO)
.setBody("log body")
.setAttributes(Attributes.builder().put("key", "value").build())
.setSeverity(Severity.INFO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -82,7 +83,13 @@ public static PrometheusHttpServerBuilder builder() {
// sequentially.
if (memoryMode == MemoryMode.REUSABLE_DATA) {
executor =
Executors.newSingleThreadExecutor(new DaemonThreadFactory("prometheus-http-server"));
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new DaemonThreadFactory("prometheus-http-server"));
}
try {
this.httpServer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public final class PrometheusHttpServerBuilder {

static final int DEFAULT_PORT = 9464;
private static final String DEFAULT_HOST = "0.0.0.0";
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.IMMUTABLE_DATA;
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.REUSABLE_DATA;

private String host = DEFAULT_HOST;
private int port = DEFAULT_PORT;
Expand All @@ -46,6 +46,7 @@ public final class PrometheusHttpServerBuilder {
this.otelScopeEnabled = builder.otelScopeEnabled;
this.allowedResourceAttributesFilter = builder.allowedResourceAttributesFilter;
this.executor = builder.executor;
this.memoryMode = builder.memoryMode;
this.defaultAggregationSelector = builder.defaultAggregationSelector;
}

Expand Down Expand Up @@ -150,6 +151,11 @@ public PrometheusHttpServerBuilder setDefaultAggregationSelector(
* registered with a {@link io.opentelemetry.sdk.metrics.SdkMeterProvider}.
*/
public PrometheusHttpServer build() {
if (memoryMode == MemoryMode.REUSABLE_DATA && executor != null) {
throw new IllegalArgumentException(
"MemoryMode REUSEABLE_DATA cannot be used with custom executor, "
+ "since data may be corrupted if reading metrics concurrently");
}
return new PrometheusHttpServer(
new PrometheusHttpServerBuilder(this), // copy to prevent modification
host,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ void customExecutor() throws IOException {
PrometheusHttpServer.builder()
.setHost("localhost")
.setPort(port)
// Memory mode must be IMMUTABLE_DATA to set custom executor
.setMemoryMode(MemoryMode.IMMUTABLE_DATA)
.setExecutor(scheduledExecutor)
.build()) {
assertThat(server)
Expand Down Expand Up @@ -520,7 +522,7 @@ void toBuilder() {
builder.setAllowedResourceAttributesFilter(resourceAttributesFilter);

ExecutorService executor = Executors.newSingleThreadExecutor();
builder.setExecutor(executor);
builder.setExecutor(executor).setMemoryMode(MemoryMode.IMMUTABLE_DATA);

PrometheusRegistry prometheusRegistry = new PrometheusRegistry();
builder.setPrometheusRegistry(prometheusRegistry);
Expand Down
Loading

0 comments on commit 10bca8b

Please sign in to comment.