Skip to content

Commit

Permalink
Closing a kafka producer/consumer should not disable metrics from oth…
Browse files Browse the repository at this point in the history
…er consumers/producers (#11975)
  • Loading branch information
laurit authored Aug 14, 2024
1 parent f7f7d39 commit 31e5eea
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -70,6 +71,13 @@ public abstract class AbstractOpenTelemetryMetricsReporterTest {
private static KafkaProducer<byte[], byte[]> producer;
private static KafkaConsumer<byte[], byte[]> consumer;

private static final List<OpenTelemetryMetricsReporter> metricsReporters =
new CopyOnWriteArrayList<>();

static {
OpenTelemetryMetricsReporter.setListener(metricsReporters::add);
}

@BeforeEach
void beforeAll() {
// only start the kafka container the first time this runs
Expand All @@ -90,14 +98,16 @@ void beforeAll() {

@AfterAll
static void afterAll() {
kafka.stop();
producer.close();
consumer.close();
kafka.stop();
}

@AfterEach
void tearDown() {
OpenTelemetryMetricsReporter.resetForTest();
for (OpenTelemetryMetricsReporter metricsReporter : metricsReporters) {
metricsReporter.resetForTest();
}
}

protected abstract InstrumentationExtension testing();
Expand Down Expand Up @@ -186,6 +196,14 @@ private static long countOpenTelemetryMetricsReporters(List<MetricsReporter> met

@Test
void observeMetrics() {
// Firstly create new producer and consumer and close them. This is done tp verify that metrics
// are still produced after closing one producer/consumer. See
// https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/11880
KafkaProducer<byte[], byte[]> producer2 = new KafkaProducer<>(producerConfig());
KafkaConsumer<byte[], byte[]> consumer2 = new KafkaConsumer<>(consumerConfig());
producer2.close();
consumer2.close();

produceRecords();
consumeRecords();

Expand Down Expand Up @@ -405,7 +423,9 @@ private static void printMappingTable() {
Map<String, List<KafkaMetricId>> kafkaMetricsByGroup =
TestMetricsReporter.seenMetrics.stream().collect(groupingBy(KafkaMetricId::getGroup));
List<RegisteredObservable> registeredObservables =
OpenTelemetryMetricsReporter.getRegisteredObservables();
metricsReporters.stream()
.flatMap(metricsReporter -> metricsReporter.getRegisteredObservables().stream())
.collect(toList());
// Iterate through groups in alpha order
for (String group : kafkaMetricsByGroup.keySet().stream().sorted().collect(toList())) {
List<KafkaMetricId> kafkaMetricIds =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,35 @@ public final class OpenTelemetryMetricsReporter implements MetricsReporter {

private static final Logger logger =
Logger.getLogger(OpenTelemetryMetricsReporter.class.getName());
private volatile Meter meter;
private static volatile Listener listener;

private static final Object lock = new Object();
private volatile Meter meter;
private final Object lock = new Object();

@GuardedBy("lock")
private static final List<RegisteredObservable> registeredObservables = new ArrayList<>();
private final List<RegisteredObservable> registeredObservables = new ArrayList<>();

/**
* Reset for test by resetting the {@link #meter} to {@code null} and closing all registered
* instruments.
*/
static void resetForTest() {
void resetForTest() {
closeAllInstruments();
}

// Visible for test
static List<RegisteredObservable> getRegisteredObservables() {
List<RegisteredObservable> getRegisteredObservables() {
synchronized (lock) {
return new ArrayList<>(registeredObservables);
}
}

public OpenTelemetryMetricsReporter() {
if (listener != null) {
listener.metricsReporterCreated(this);
}
}

@Override
public void init(List<KafkaMetric> metrics) {
metrics.forEach(this::metricChange);
Expand Down Expand Up @@ -131,7 +138,7 @@ public void close() {
closeAllInstruments();
}

private static void closeAllInstruments() {
private void closeAllInstruments() {
synchronized (lock) {
for (Iterator<RegisteredObservable> it = registeredObservables.iterator(); it.hasNext(); ) {
closeInstrument(it.next().getObservable());
Expand Down Expand Up @@ -177,4 +184,14 @@ private static <T> T getProperty(Map<String, ?> configs, String key, Class<T> re
}
return (T) value;
}

// Visible for test
static void setListener(Listener listener) {
OpenTelemetryMetricsReporter.listener = listener;
}

// used for testing
interface Listener {
void metricsReporterCreated(OpenTelemetryMetricsReporter metricsReporter);
}
}

0 comments on commit 31e5eea

Please sign in to comment.