diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java index f53fe5048387..141498e7836f 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/testing/src/main/java/io/opentelemetry/instrumentation/kafka/internal/AbstractOpenTelemetryMetricsReporterTest.java @@ -32,6 +32,7 @@ import java.util.Optional; import java.util.Random; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -70,6 +71,13 @@ public abstract class AbstractOpenTelemetryMetricsReporterTest { private static KafkaProducer producer; private static KafkaConsumer consumer; + private static final List metricsReporters = + new CopyOnWriteArrayList<>(); + + static { + OpenTelemetryMetricsReporter.setListener(metricsReporters::add); + } + @BeforeEach void beforeAll() { // only start the kafka container the first time this runs @@ -90,14 +98,16 @@ void beforeAll() { @AfterAll static void afterAll() { - kafka.stop(); producer.close(); consumer.close(); + kafka.stop(); } @AfterEach void tearDown() { - OpenTelemetryMetricsReporter.resetForTest(); + for (OpenTelemetryMetricsReporter metricsReporter : metricsReporters) { + metricsReporter.resetForTest(); + } } protected abstract InstrumentationExtension testing(); @@ -186,6 +196,14 @@ private static long countOpenTelemetryMetricsReporters(List met @Test void observeMetrics() { + // Firstly create new producer and consumer and close them. This is done tp verify that metrics + // are still produced after closing one producer/consumer. See + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/11880 + KafkaProducer producer2 = new KafkaProducer<>(producerConfig()); + KafkaConsumer consumer2 = new KafkaConsumer<>(consumerConfig()); + producer2.close(); + consumer2.close(); + produceRecords(); consumeRecords(); @@ -405,7 +423,9 @@ private static void printMappingTable() { Map> kafkaMetricsByGroup = TestMetricsReporter.seenMetrics.stream().collect(groupingBy(KafkaMetricId::getGroup)); List registeredObservables = - OpenTelemetryMetricsReporter.getRegisteredObservables(); + metricsReporters.stream() + .flatMap(metricsReporter -> metricsReporter.getRegisteredObservables().stream()) + .collect(toList()); // Iterate through groups in alpha order for (String group : kafkaMetricsByGroup.keySet().stream().sorted().collect(toList())) { List kafkaMetricIds = diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java index 1c98642c3a4b..f58aa345581b 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/internal/OpenTelemetryMetricsReporter.java @@ -41,28 +41,35 @@ public final class OpenTelemetryMetricsReporter implements MetricsReporter { private static final Logger logger = Logger.getLogger(OpenTelemetryMetricsReporter.class.getName()); - private volatile Meter meter; + private static volatile Listener listener; - private static final Object lock = new Object(); + private volatile Meter meter; + private final Object lock = new Object(); @GuardedBy("lock") - private static final List registeredObservables = new ArrayList<>(); + private final List registeredObservables = new ArrayList<>(); /** * Reset for test by resetting the {@link #meter} to {@code null} and closing all registered * instruments. */ - static void resetForTest() { + void resetForTest() { closeAllInstruments(); } // Visible for test - static List getRegisteredObservables() { + List getRegisteredObservables() { synchronized (lock) { return new ArrayList<>(registeredObservables); } } + public OpenTelemetryMetricsReporter() { + if (listener != null) { + listener.metricsReporterCreated(this); + } + } + @Override public void init(List metrics) { metrics.forEach(this::metricChange); @@ -131,7 +138,7 @@ public void close() { closeAllInstruments(); } - private static void closeAllInstruments() { + private void closeAllInstruments() { synchronized (lock) { for (Iterator it = registeredObservables.iterator(); it.hasNext(); ) { closeInstrument(it.next().getObservable()); @@ -177,4 +184,14 @@ private static T getProperty(Map configs, String key, Class re } return (T) value; } + + // Visible for test + static void setListener(Listener listener) { + OpenTelemetryMetricsReporter.listener = listener; + } + + // used for testing + interface Listener { + void metricsReporterCreated(OpenTelemetryMetricsReporter metricsReporter); + } }