Skip to content

Commit

Permalink
add stdout memory mode (#6774)
Browse files Browse the repository at this point in the history
  • Loading branch information
zeitlinger authored Oct 17, 2024
1 parent 07b6903 commit 1ac476b
Show file tree
Hide file tree
Showing 27 changed files with 533 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@

package io.opentelemetry.exporter.logging.otlp.internal.logs;

import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.logs.LogReusableDataMarshaler;
import io.opentelemetry.exporter.internal.otlp.logs.ResourceLogsMarshaler;
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonWriter;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.util.Collection;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -33,11 +35,16 @@ public final class OtlpStdoutLogRecordExporter implements LogRecordExporter {
private final Logger logger;
private final JsonWriter jsonWriter;
private final boolean wrapperJsonObject;
private final MemoryMode memoryMode;
private final Function<Collection<LogRecordData>, CompletableResultCode> marshaler;

OtlpStdoutLogRecordExporter(Logger logger, JsonWriter jsonWriter, boolean wrapperJsonObject) {
OtlpStdoutLogRecordExporter(
Logger logger, JsonWriter jsonWriter, boolean wrapperJsonObject, MemoryMode memoryMode) {
this.logger = logger;
this.jsonWriter = jsonWriter;
this.wrapperJsonObject = wrapperJsonObject;
this.memoryMode = memoryMode;
marshaler = createMarshaler(jsonWriter, memoryMode, wrapperJsonObject);
}

/** Returns a new {@link OtlpStdoutLogRecordExporterBuilder}. */
Expand All @@ -46,25 +53,35 @@ public static OtlpStdoutLogRecordExporterBuilder builder() {
return new OtlpStdoutLogRecordExporterBuilder(LOGGER).setOutput(System.out);
}

private static Function<Collection<LogRecordData>, CompletableResultCode> createMarshaler(
JsonWriter jsonWriter, MemoryMode memoryMode, boolean wrapperJsonObject) {
if (wrapperJsonObject) {
LogReusableDataMarshaler reusableDataMarshaler =
new LogReusableDataMarshaler(
memoryMode, (marshaler, numItems) -> jsonWriter.write(marshaler));
return reusableDataMarshaler::export;
} else {
return logs -> {
// no support for low allocation marshaler
for (ResourceLogsMarshaler marshaler : ResourceLogsMarshaler.create(logs)) {
CompletableResultCode resultCode = jsonWriter.write(marshaler);
if (!resultCode.isSuccess()) {
// already logged
return resultCode;
}
}
return CompletableResultCode.ofSuccess();
};
}
}

@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

if (wrapperJsonObject) {
LogsRequestMarshaler request = LogsRequestMarshaler.create(logs);
return jsonWriter.write(request);
} else {
for (ResourceLogsMarshaler resourceLogs : ResourceLogsMarshaler.create(logs)) {
CompletableResultCode resultCode = jsonWriter.write(resourceLogs);
if (!resultCode.isSuccess()) {
// already logged
return resultCode;
}
}
return CompletableResultCode.ofSuccess();
}
return marshaler.apply(logs);
}

@Override
Expand All @@ -87,6 +104,7 @@ public String toString() {
StringJoiner joiner = new StringJoiner(", ", "OtlpStdoutLogRecordExporter{", "}");
joiner.add("jsonWriter=" + jsonWriter);
joiner.add("wrapperJsonObject=" + wrapperJsonObject);
joiner.add("memoryMode=" + memoryMode);
return joiner.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonWriter;
import io.opentelemetry.exporter.logging.otlp.internal.writer.LoggerJsonWriter;
import io.opentelemetry.exporter.logging.otlp.internal.writer.StreamJsonWriter;
import io.opentelemetry.sdk.common.export.MemoryMode;
import java.io.OutputStream;
import java.util.logging.Logger;

Expand All @@ -27,6 +28,7 @@ public final class OtlpStdoutLogRecordExporterBuilder {
private final Logger logger;
private JsonWriter jsonWriter;
private boolean wrapperJsonObject = true;
private MemoryMode memoryMode = MemoryMode.IMMUTABLE_DATA;

public OtlpStdoutLogRecordExporterBuilder(Logger logger) {
this.logger = logger;
Expand All @@ -44,6 +46,17 @@ public OtlpStdoutLogRecordExporterBuilder setWrapperJsonObject(boolean wrapperJs
return this;
}

/**
* Set the {@link MemoryMode}. If unset, defaults to {@link MemoryMode#IMMUTABLE_DATA}.
*
* <p>When memory mode is {@link MemoryMode#REUSABLE_DATA}, serialization is optimized to reduce
* memory allocation.
*/
public OtlpStdoutLogRecordExporterBuilder setMemoryMode(MemoryMode memoryMode) {
this.memoryMode = memoryMode;
return this;
}

/**
* Sets the exporter to use the specified output stream.
*
Expand Down Expand Up @@ -71,6 +84,10 @@ public OtlpStdoutLogRecordExporterBuilder setOutput(Logger logger) {
* @return a new exporter's instance
*/
public OtlpStdoutLogRecordExporter build() {
return new OtlpStdoutLogRecordExporter(logger, jsonWriter, wrapperJsonObject);
if (memoryMode == MemoryMode.REUSABLE_DATA && !wrapperJsonObject) {
throw new IllegalArgumentException(
"Reusable data mode is not supported without wrapperJsonObject");
}
return new OtlpStdoutLogRecordExporter(logger, jsonWriter, wrapperJsonObject, memoryMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.exporter.logging.otlp.internal.logs;

import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
import io.opentelemetry.sdk.autoconfigure.spi.internal.StructuredConfigProperties;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
Expand All @@ -31,6 +32,7 @@ public String getName() {
@Override
public LogRecordExporter create(StructuredConfigProperties config) {
OtlpStdoutLogRecordExporterBuilder builder = OtlpStdoutLogRecordExporter.builder();
ExporterBuilderUtil.configureExporterMemoryMode(config, builder::setMemoryMode);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.exporter.logging.otlp.internal.logs;

import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
Expand All @@ -20,6 +21,7 @@ public final class OtlpStdoutLogRecordExporterProvider
@Override
public LogRecordExporter createExporter(ConfigProperties config) {
OtlpStdoutLogRecordExporterBuilder builder = OtlpStdoutLogRecordExporter.builder();
ExporterBuilderUtil.configureExporterMemoryMode(config, builder::setMemoryMode);
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

package io.opentelemetry.exporter.logging.otlp.internal.metrics;

import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricReusableDataMarshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonWriter;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
Expand All @@ -19,6 +20,7 @@
import java.util.Collection;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -37,20 +39,25 @@ public final class OtlpStdoutMetricExporter implements MetricExporter {
private final Logger logger;
private final JsonWriter jsonWriter;
private final boolean wrapperJsonObject;
private final MemoryMode memoryMode;
private final Function<Collection<MetricData>, CompletableResultCode> marshaler;
private final AggregationTemporalitySelector aggregationTemporalitySelector;
private final DefaultAggregationSelector defaultAggregationSelector;

OtlpStdoutMetricExporter(
Logger logger,
JsonWriter jsonWriter,
boolean wrapperJsonObject,
MemoryMode memoryMode,
AggregationTemporalitySelector aggregationTemporalitySelector,
DefaultAggregationSelector defaultAggregationSelector) {
this.logger = logger;
this.jsonWriter = jsonWriter;
this.wrapperJsonObject = wrapperJsonObject;
this.memoryMode = memoryMode;
this.aggregationTemporalitySelector = aggregationTemporalitySelector;
this.defaultAggregationSelector = defaultAggregationSelector;
marshaler = createMarshaler(jsonWriter, memoryMode, wrapperJsonObject);
}

/** Returns a new {@link OtlpStdoutMetricExporterBuilder}. */
Expand All @@ -59,6 +66,28 @@ public static OtlpStdoutMetricExporterBuilder builder() {
return new OtlpStdoutMetricExporterBuilder(LOGGER).setOutput(System.out);
}

private static Function<Collection<MetricData>, CompletableResultCode> createMarshaler(
JsonWriter jsonWriter, MemoryMode memoryMode, boolean wrapperJsonObject) {
if (wrapperJsonObject) {
MetricReusableDataMarshaler reusableDataMarshaler =
new MetricReusableDataMarshaler(
memoryMode, (marshaler, numItems) -> jsonWriter.write(marshaler));
return reusableDataMarshaler::export;
} else {
return metrics -> {
// no support for low allocation marshaler
for (ResourceMetricsMarshaler marshaler : ResourceMetricsMarshaler.create(metrics)) {
CompletableResultCode resultCode = jsonWriter.write(marshaler);
if (!resultCode.isSuccess()) {
// already logged
return resultCode;
}
}
return CompletableResultCode.ofSuccess();
};
}
}

@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return aggregationTemporalitySelector.getAggregationTemporality(instrumentType);
Expand All @@ -69,25 +98,18 @@ public Aggregation getDefaultAggregation(InstrumentType instrumentType) {
return defaultAggregationSelector.getDefaultAggregation(instrumentType);
}

@Override
public MemoryMode getMemoryMode() {
return memoryMode;
}

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

if (wrapperJsonObject) {
MetricsRequestMarshaler request = MetricsRequestMarshaler.create(metrics);
return jsonWriter.write(request);
} else {
for (ResourceMetricsMarshaler resourceMetrics : ResourceMetricsMarshaler.create(metrics)) {
CompletableResultCode resultCode = jsonWriter.write(resourceMetrics);
if (!resultCode.isSuccess()) {
// already logged
return resultCode;
}
}
return CompletableResultCode.ofSuccess();
}
return marshaler.apply(metrics);
}

@Override
Expand All @@ -110,6 +132,13 @@ public String toString() {
StringJoiner joiner = new StringJoiner(", ", "OtlpStdoutMetricExporter{", "}");
joiner.add("jsonWriter=" + jsonWriter);
joiner.add("wrapperJsonObject=" + wrapperJsonObject);
joiner.add("memoryMode=" + memoryMode);
joiner.add(
"aggregationTemporalitySelector="
+ AggregationTemporalitySelector.asString(aggregationTemporalitySelector));
joiner.add(
"defaultAggregationSelector="
+ DefaultAggregationSelector.asString(defaultAggregationSelector));
return joiner.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonWriter;
import io.opentelemetry.exporter.logging.otlp.internal.writer.LoggerJsonWriter;
import io.opentelemetry.exporter.logging.otlp.internal.writer.StreamJsonWriter;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector;
Expand Down Expand Up @@ -40,6 +41,7 @@ public final class OtlpStdoutMetricExporterBuilder {
private final Logger logger;
private JsonWriter jsonWriter;
private boolean wrapperJsonObject = true;
private MemoryMode memoryMode = MemoryMode.IMMUTABLE_DATA;

public OtlpStdoutMetricExporterBuilder(Logger logger) {
this.logger = logger;
Expand All @@ -57,6 +59,17 @@ public OtlpStdoutMetricExporterBuilder setWrapperJsonObject(boolean wrapperJsonO
return this;
}

/**
* Set the {@link MemoryMode}. If unset, defaults to {@link MemoryMode#IMMUTABLE_DATA}.
*
* <p>When memory mode is {@link MemoryMode#REUSABLE_DATA}, serialization is optimized to reduce
* memory allocation.
*/
public OtlpStdoutMetricExporterBuilder setMemoryMode(MemoryMode memoryMode) {
this.memoryMode = memoryMode;
return this;
}

/**
* Sets the exporter to use the specified output stream.
*
Expand Down Expand Up @@ -114,10 +127,15 @@ public OtlpStdoutMetricExporterBuilder setDefaultAggregationSelector(
* @return a new exporter's instance
*/
public OtlpStdoutMetricExporter build() {
if (memoryMode == MemoryMode.REUSABLE_DATA && !wrapperJsonObject) {
throw new IllegalArgumentException(
"Reusable data mode is not supported without wrapperJsonObject");
}
return new OtlpStdoutMetricExporter(
logger,
jsonWriter,
wrapperJsonObject,
memoryMode,
aggregationTemporalitySelector,
defaultAggregationSelector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public String getName() {
@Override
public MetricExporter create(StructuredConfigProperties config) {
OtlpStdoutMetricExporterBuilder builder = OtlpStdoutMetricExporter.builder();
ExporterBuilderUtil.configureExporterMemoryMode(config, builder::setMemoryMode);
ExporterBuilderUtil.configureOtlpAggregationTemporality(
config, builder::setAggregationTemporalitySelector);
ExporterBuilderUtil.configureOtlpHistogramDefaultAggregation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public final class OtlpStdoutMetricExporterProvider implements ConfigurableMetri
@Override
public MetricExporter createExporter(ConfigProperties config) {
OtlpStdoutMetricExporterBuilder builder = OtlpStdoutMetricExporter.builder();
ExporterBuilderUtil.configureExporterMemoryMode(config, builder::setMemoryMode);
ExporterBuilderUtil.configureOtlpAggregationTemporality(
config, builder::setAggregationTemporalitySelector);
ExporterBuilderUtil.configureOtlpHistogramDefaultAggregation(
Expand Down
Loading

0 comments on commit 1ac476b

Please sign in to comment.