Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add stdout memory mode #6774

Merged
merged 4 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@

package io.opentelemetry.exporter.logging.otlp.internal.logs;

import io.opentelemetry.exporter.internal.otlp.logs.LogsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.logs.LogReusableDataMarshaler;
import io.opentelemetry.exporter.internal.otlp.logs.ResourceLogsMarshaler;
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonWriter;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.util.Collection;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -33,11 +35,16 @@
private final Logger logger;
private final JsonWriter jsonWriter;
private final boolean wrapperJsonObject;
private final MemoryMode memoryMode;
private final Function<Collection<LogRecordData>, CompletableResultCode> marshaler;

OtlpStdoutLogRecordExporter(Logger logger, JsonWriter jsonWriter, boolean wrapperJsonObject) {
OtlpStdoutLogRecordExporter(
Logger logger, JsonWriter jsonWriter, boolean wrapperJsonObject, MemoryMode memoryMode) {
this.logger = logger;
this.jsonWriter = jsonWriter;
this.wrapperJsonObject = wrapperJsonObject;
this.memoryMode = memoryMode;
marshaler = createMarshaler(jsonWriter, memoryMode, wrapperJsonObject);
}

/** Returns a new {@link OtlpStdoutLogRecordExporterBuilder}. */
Expand All @@ -46,25 +53,35 @@
return new OtlpStdoutLogRecordExporterBuilder(LOGGER).setOutput(System.out);
}

private static Function<Collection<LogRecordData>, CompletableResultCode> createMarshaler(
JsonWriter jsonWriter, MemoryMode memoryMode, boolean wrapperJsonObject) {
if (wrapperJsonObject) {
LogReusableDataMarshaler reusableDataMarshaler =
new LogReusableDataMarshaler(
memoryMode, (marshaler, numItems) -> jsonWriter.write(marshaler));
return reusableDataMarshaler::export;
} else {
return logs -> {
// no support for low allocation marshaler
for (ResourceLogsMarshaler marshaler : ResourceLogsMarshaler.create(logs)) {
CompletableResultCode resultCode = jsonWriter.write(marshaler);
if (!resultCode.isSuccess()) {
// already logged
return resultCode;

Check warning on line 70 in exporters/logging-otlp/src/main/java/io/opentelemetry/exporter/logging/otlp/internal/logs/OtlpStdoutLogRecordExporter.java

View check run for this annotation

Codecov / codecov/patch

exporters/logging-otlp/src/main/java/io/opentelemetry/exporter/logging/otlp/internal/logs/OtlpStdoutLogRecordExporter.java#L70

Added line #L70 was not covered by tests
}
}
return CompletableResultCode.ofSuccess();
};
}
}

@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

if (wrapperJsonObject) {
LogsRequestMarshaler request = LogsRequestMarshaler.create(logs);
return jsonWriter.write(request);
} else {
for (ResourceLogsMarshaler resourceLogs : ResourceLogsMarshaler.create(logs)) {
CompletableResultCode resultCode = jsonWriter.write(resourceLogs);
if (!resultCode.isSuccess()) {
// already logged
return resultCode;
}
}
return CompletableResultCode.ofSuccess();
}
return marshaler.apply(logs);
}

@Override
Expand All @@ -87,6 +104,7 @@
StringJoiner joiner = new StringJoiner(", ", "OtlpStdoutLogRecordExporter{", "}");
joiner.add("jsonWriter=" + jsonWriter);
joiner.add("wrapperJsonObject=" + wrapperJsonObject);
joiner.add("memoryMode=" + memoryMode);
return joiner.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonWriter;
import io.opentelemetry.exporter.logging.otlp.internal.writer.LoggerJsonWriter;
import io.opentelemetry.exporter.logging.otlp.internal.writer.StreamJsonWriter;
import io.opentelemetry.sdk.common.export.MemoryMode;
import java.io.OutputStream;
import java.util.logging.Logger;

Expand All @@ -27,6 +28,7 @@ public final class OtlpStdoutLogRecordExporterBuilder {
private final Logger logger;
private JsonWriter jsonWriter;
private boolean wrapperJsonObject = true;
private MemoryMode memoryMode = MemoryMode.IMMUTABLE_DATA;

public OtlpStdoutLogRecordExporterBuilder(Logger logger) {
this.logger = logger;
Expand All @@ -44,6 +46,17 @@ public OtlpStdoutLogRecordExporterBuilder setWrapperJsonObject(boolean wrapperJs
return this;
}

/**
* Set the {@link MemoryMode}. If unset, defaults to {@link MemoryMode#IMMUTABLE_DATA}.
*
* <p>When memory mode is {@link MemoryMode#REUSABLE_DATA}, serialization is optimized to reduce
* memory allocation.
*/
public OtlpStdoutLogRecordExporterBuilder setMemoryMode(MemoryMode memoryMode) {
this.memoryMode = memoryMode;
return this;
}

/**
* Sets the exporter to use the specified output stream.
*
Expand Down Expand Up @@ -71,6 +84,10 @@ public OtlpStdoutLogRecordExporterBuilder setOutput(Logger logger) {
* @return a new exporter's instance
*/
public OtlpStdoutLogRecordExporter build() {
return new OtlpStdoutLogRecordExporter(logger, jsonWriter, wrapperJsonObject);
if (memoryMode == MemoryMode.REUSABLE_DATA && !wrapperJsonObject) {
throw new IllegalArgumentException(
"Reusable data mode is not supported without wrapperJsonObject");
}
return new OtlpStdoutLogRecordExporter(logger, jsonWriter, wrapperJsonObject, memoryMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.exporter.logging.otlp.internal.logs;

import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
import io.opentelemetry.sdk.autoconfigure.spi.internal.StructuredConfigProperties;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
Expand All @@ -31,6 +32,7 @@ public String getName() {
@Override
public LogRecordExporter create(StructuredConfigProperties config) {
OtlpStdoutLogRecordExporterBuilder builder = OtlpStdoutLogRecordExporter.builder();
ExporterBuilderUtil.configureExporterMemoryMode(config, builder::setMemoryMode);
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.exporter.logging.otlp.internal.logs;

import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
Expand All @@ -20,6 +21,7 @@ public final class OtlpStdoutLogRecordExporterProvider
@Override
public LogRecordExporter createExporter(ConfigProperties config) {
OtlpStdoutLogRecordExporterBuilder builder = OtlpStdoutLogRecordExporter.builder();
ExporterBuilderUtil.configureExporterMemoryMode(config, builder::setMemoryMode);
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

package io.opentelemetry.exporter.logging.otlp.internal.metrics;

import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricReusableDataMarshaler;
import io.opentelemetry.exporter.internal.otlp.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonWriter;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
Expand All @@ -19,6 +20,7 @@
import java.util.Collection;
import java.util.StringJoiner;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -37,20 +39,25 @@
private final Logger logger;
private final JsonWriter jsonWriter;
private final boolean wrapperJsonObject;
private final MemoryMode memoryMode;
private final Function<Collection<MetricData>, CompletableResultCode> marshaler;
private final AggregationTemporalitySelector aggregationTemporalitySelector;
private final DefaultAggregationSelector defaultAggregationSelector;

OtlpStdoutMetricExporter(
Logger logger,
JsonWriter jsonWriter,
boolean wrapperJsonObject,
MemoryMode memoryMode,
AggregationTemporalitySelector aggregationTemporalitySelector,
DefaultAggregationSelector defaultAggregationSelector) {
this.logger = logger;
this.jsonWriter = jsonWriter;
this.wrapperJsonObject = wrapperJsonObject;
this.memoryMode = memoryMode;
this.aggregationTemporalitySelector = aggregationTemporalitySelector;
this.defaultAggregationSelector = defaultAggregationSelector;
marshaler = createMarshaler(jsonWriter, memoryMode, wrapperJsonObject);
}

/** Returns a new {@link OtlpStdoutMetricExporterBuilder}. */
Expand All @@ -59,6 +66,28 @@
return new OtlpStdoutMetricExporterBuilder(LOGGER).setOutput(System.out);
}

private static Function<Collection<MetricData>, CompletableResultCode> createMarshaler(
JsonWriter jsonWriter, MemoryMode memoryMode, boolean wrapperJsonObject) {
if (wrapperJsonObject) {
MetricReusableDataMarshaler reusableDataMarshaler =
new MetricReusableDataMarshaler(
memoryMode, (marshaler, numItems) -> jsonWriter.write(marshaler));
return reusableDataMarshaler::export;
} else {
return metrics -> {
// no support for low allocation marshaler
for (ResourceMetricsMarshaler marshaler : ResourceMetricsMarshaler.create(metrics)) {
CompletableResultCode resultCode = jsonWriter.write(marshaler);
if (!resultCode.isSuccess()) {
// already logged
return resultCode;

Check warning on line 83 in exporters/logging-otlp/src/main/java/io/opentelemetry/exporter/logging/otlp/internal/metrics/OtlpStdoutMetricExporter.java

View check run for this annotation

Codecov / codecov/patch

exporters/logging-otlp/src/main/java/io/opentelemetry/exporter/logging/otlp/internal/metrics/OtlpStdoutMetricExporter.java#L83

Added line #L83 was not covered by tests
}
}
return CompletableResultCode.ofSuccess();
};
}
}

@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return aggregationTemporalitySelector.getAggregationTemporality(instrumentType);
Expand All @@ -69,25 +98,18 @@
return defaultAggregationSelector.getDefaultAggregation(instrumentType);
}

@Override
public MemoryMode getMemoryMode() {
return memoryMode;
}

@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
if (isShutdown.get()) {
return CompletableResultCode.ofFailure();
}

if (wrapperJsonObject) {
MetricsRequestMarshaler request = MetricsRequestMarshaler.create(metrics);
return jsonWriter.write(request);
} else {
for (ResourceMetricsMarshaler resourceMetrics : ResourceMetricsMarshaler.create(metrics)) {
CompletableResultCode resultCode = jsonWriter.write(resourceMetrics);
if (!resultCode.isSuccess()) {
// already logged
return resultCode;
}
}
return CompletableResultCode.ofSuccess();
}
return marshaler.apply(metrics);
}

@Override
Expand All @@ -110,6 +132,13 @@
StringJoiner joiner = new StringJoiner(", ", "OtlpStdoutMetricExporter{", "}");
joiner.add("jsonWriter=" + jsonWriter);
joiner.add("wrapperJsonObject=" + wrapperJsonObject);
joiner.add("memoryMode=" + memoryMode);
joiner.add(
"aggregationTemporalitySelector="
+ AggregationTemporalitySelector.asString(aggregationTemporalitySelector));
joiner.add(
"defaultAggregationSelector="
+ DefaultAggregationSelector.asString(defaultAggregationSelector));
zeitlinger marked this conversation as resolved.
Show resolved Hide resolved
return joiner.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.exporter.logging.otlp.internal.writer.JsonWriter;
import io.opentelemetry.exporter.logging.otlp.internal.writer.LoggerJsonWriter;
import io.opentelemetry.exporter.logging.otlp.internal.writer.StreamJsonWriter;
import io.opentelemetry.sdk.common.export.MemoryMode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.export.AggregationTemporalitySelector;
import io.opentelemetry.sdk.metrics.export.DefaultAggregationSelector;
Expand Down Expand Up @@ -40,6 +41,7 @@ public final class OtlpStdoutMetricExporterBuilder {
private final Logger logger;
private JsonWriter jsonWriter;
private boolean wrapperJsonObject = true;
private MemoryMode memoryMode = MemoryMode.IMMUTABLE_DATA;

public OtlpStdoutMetricExporterBuilder(Logger logger) {
this.logger = logger;
Expand All @@ -57,6 +59,17 @@ public OtlpStdoutMetricExporterBuilder setWrapperJsonObject(boolean wrapperJsonO
return this;
}

/**
* Set the {@link MemoryMode}. If unset, defaults to {@link MemoryMode#IMMUTABLE_DATA}.
*
* <p>When memory mode is {@link MemoryMode#REUSABLE_DATA}, serialization is optimized to reduce
* memory allocation.
*/
public OtlpStdoutMetricExporterBuilder setMemoryMode(MemoryMode memoryMode) {
this.memoryMode = memoryMode;
return this;
}

/**
* Sets the exporter to use the specified output stream.
*
Expand Down Expand Up @@ -114,10 +127,15 @@ public OtlpStdoutMetricExporterBuilder setDefaultAggregationSelector(
* @return a new exporter's instance
*/
public OtlpStdoutMetricExporter build() {
if (memoryMode == MemoryMode.REUSABLE_DATA && !wrapperJsonObject) {
throw new IllegalArgumentException(
"Reusable data mode is not supported without wrapperJsonObject");
}
return new OtlpStdoutMetricExporter(
logger,
jsonWriter,
wrapperJsonObject,
memoryMode,
aggregationTemporalitySelector,
defaultAggregationSelector);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public String getName() {
@Override
public MetricExporter create(StructuredConfigProperties config) {
OtlpStdoutMetricExporterBuilder builder = OtlpStdoutMetricExporter.builder();
ExporterBuilderUtil.configureExporterMemoryMode(config, builder::setMemoryMode);
ExporterBuilderUtil.configureOtlpAggregationTemporality(
config, builder::setAggregationTemporalitySelector);
ExporterBuilderUtil.configureOtlpHistogramDefaultAggregation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public final class OtlpStdoutMetricExporterProvider implements ConfigurableMetri
@Override
public MetricExporter createExporter(ConfigProperties config) {
OtlpStdoutMetricExporterBuilder builder = OtlpStdoutMetricExporter.builder();
ExporterBuilderUtil.configureExporterMemoryMode(config, builder::setMemoryMode);
ExporterBuilderUtil.configureOtlpAggregationTemporality(
config, builder::setAggregationTemporalitySelector);
ExporterBuilderUtil.configureOtlpHistogramDefaultAggregation(
Expand Down
Loading
Loading