Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure kafka configuration remains serializable #7754

Merged
merged 2 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
Expand Down Expand Up @@ -75,7 +76,8 @@ public static void enhanceConfig(Map<? super String, Object> config) {
OpenTelemetryMetricsReporter.class.getName(),
(class1, class2) -> class1 + "," + class2);
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, GlobalOpenTelemetry.get());
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new OpenTelemetrySupplier(GlobalOpenTelemetry.get()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use a supplier here that always returns GlobalOpenTelemetry.get()? that way it will "actually be serializable" in case that's a real thing for kafka config 🤷‍♂️

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also the library instrumentation where there is no way to restore the OpenTelemetry instance. I thought that we can let the reporter of this issue test it. If it is needed then he'll get a NPE and we'll know that we have to do something for both javaagent and library instrumentation.

config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME,
INSTRUMENTATION_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerRecordGetter;
import io.opentelemetry.instrumentation.kafka.internal.KafkaHeadersSetter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter;
import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Collections;
Expand Down Expand Up @@ -162,7 +163,9 @@ public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
config.put(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
OpenTelemetryMetricsReporter.class.getName());
config.put(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, openTelemetry);
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER,
new OpenTelemetrySupplier(openTelemetry));
config.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME,
KafkaTelemetryBuilder.INSTRUMENTATION_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
import io.opentelemetry.instrumentation.kafkaclients.KafkaTelemetry;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -37,21 +42,21 @@ void badConfig() {
assertThatThrownBy(
() -> {
Map<String, Object> producerConfig = producerConfig();
producerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE);
producerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
new KafkaProducer<>(producerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage("Missing required configuration property: opentelemetry.instance");
.hasRootCauseMessage("Missing required configuration property: opentelemetry.supplier");
assertThatThrownBy(
() -> {
Map<String, Object> producerConfig = producerConfig();
producerConfig.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo");
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, "foo");
new KafkaProducer<>(producerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage(
"Configuration property opentelemetry.instance is not instance of OpenTelemetry");
"Configuration property opentelemetry.supplier is not instance of OpenTelemetrySupplier");
assertThatThrownBy(
() -> {
Map<String, Object> producerConfig = producerConfig();
Expand All @@ -77,21 +82,21 @@ void badConfig() {
assertThatThrownBy(
() -> {
Map<String, Object> consumerConfig = consumerConfig();
consumerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE);
consumerConfig.remove(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER);
new KafkaConsumer<>(consumerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage("Missing required configuration property: opentelemetry.instance");
.hasRootCauseMessage("Missing required configuration property: opentelemetry.supplier");
assertThatThrownBy(
() -> {
Map<String, Object> consumerConfig = consumerConfig();
consumerConfig.put(
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTANCE, "foo");
OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, "foo");
new KafkaConsumer<>(consumerConfig).close();
})
.hasRootCauseInstanceOf(IllegalStateException.class)
.hasRootCauseMessage(
"Configuration property opentelemetry.instance is not instance of OpenTelemetry");
"Configuration property opentelemetry.supplier is not instance of OpenTelemetrySupplier");
assertThatThrownBy(
() -> {
Map<String, Object> consumerConfig = consumerConfig();
Expand All @@ -113,4 +118,23 @@ void badConfig() {
.hasRootCauseMessage(
"Configuration property opentelemetry.instrumentation_name is not instance of String");
}

@Test
void serializableConfig() throws IOException, ClassNotFoundException {
testSerialize(producerConfig());
testSerialize(consumerConfig());
}

@SuppressWarnings("unchecked")
private static Map<String, Object> testSerialize(Map<String, Object> map)
throws IOException, ClassNotFoundException {
ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream();
try (ObjectOutputStream outputStream = new ObjectOutputStream(byteOutputStream)) {
outputStream.writeObject(map);
}
try (ObjectInputStream inputStream =
new ObjectInputStream(new ByteArrayInputStream(byteOutputStream.toByteArray()))) {
return (Map<String, Object>) inputStream.readObject();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*/
public final class OpenTelemetryMetricsReporter implements MetricsReporter {

public static final String CONFIG_KEY_OPENTELEMETRY_INSTANCE = "opentelemetry.instance";
public static final String CONFIG_KEY_OPENTELEMETRY_SUPPLIER = "opentelemetry.supplier";
public static final String CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME =
"opentelemetry.instrumentation_name";

Expand Down Expand Up @@ -150,8 +150,9 @@ private static void closeInstrument(AutoCloseable observable) {

@Override
public void configure(Map<String, ?> configs) {
OpenTelemetry openTelemetry =
getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTANCE, OpenTelemetry.class);
OpenTelemetrySupplier openTelemetrySupplier =
getProperty(configs, CONFIG_KEY_OPENTELEMETRY_SUPPLIER, OpenTelemetrySupplier.class);
OpenTelemetry openTelemetry = openTelemetrySupplier.get();
String instrumentationName =
getProperty(configs, CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, String.class);
String instrumentationVersion =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafka.internal;

import io.opentelemetry.api.OpenTelemetry;
import java.io.Serializable;
import java.util.Objects;
import java.util.function.Supplier;

/**
* Wrapper for OpenTelemetry that can be injected into kafka configuration without breaking
* serialization. https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/7597
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class OpenTelemetrySupplier implements Supplier<OpenTelemetry>, Serializable {
private static final long serialVersionUID = 1L;
private final transient OpenTelemetry openTelemetry;

public OpenTelemetrySupplier(OpenTelemetry openTelemetry) {
Objects.requireNonNull(openTelemetry);
this.openTelemetry = openTelemetry;
}

@Override
public OpenTelemetry get() {
return openTelemetry;
}
}