From f2b9351755e1a241604e67997f8587905d2c3d64 Mon Sep 17 00:00:00 2001 From: Oren Ben-Meir Date: Wed, 7 Aug 2024 16:58:17 -0400 Subject: [PATCH 1/2] Add instrumentation:kafka-clients-metrics-3.7.0 --- .../kafka-clients-metrics-3.7.0/build.gradle | 21 ++ .../kafka/CallbackWrapper.java | 41 ++++ .../com/nr/instrumentation/kafka/Metrics.java | 20 ++ .../kafka/MetricsConstants.java | 23 ++ .../kafka/MetricsScheduler.java | 101 +++++++++ .../kafka/NewRelicMetricsReporter.java | 189 ++++++++++++++++ ...umerRebalanceListener_Instrumentation.java | 38 ++++ .../KafkaConsumer_Instrumentation.java | 98 ++++++++ .../AsyncKafkaConsumer_Instrumentation.java | 36 +++ .../LegacyKafkaConsumer_Instrumentation.java | 36 +++ .../consumer/internals/PartitionAssignor.java | 17 ++ .../KafkaProducer_Instrumentation.java | 81 +++++++ .../Deserializer_Instrumentation.java | 25 +++ .../Serializer_Instrumentation.java | 25 +++ .../kafka/Kafka37MessageTest.java | 211 ++++++++++++++++++ .../instrumentation/kafka/KafkaHelper.java | 47 ++++ settings.gradle | 1 + 17 files changed, 1010 insertions(+) create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/build.gradle create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CallbackWrapper.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/Metrics.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricsConstants.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricsScheduler.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener_Instrumentation.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer_Instrumentation.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer_Instrumentation.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer_Instrumentation.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/common/serialization/Deserializer_Instrumentation.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/common/serialization/Serializer_Instrumentation.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/Kafka37MessageTest.java create mode 100644 instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/KafkaHelper.java diff --git a/instrumentation/kafka-clients-metrics-3.7.0/build.gradle b/instrumentation/kafka-clients-metrics-3.7.0/build.gradle new file mode 100644 index 0000000000..e5694cc339 --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/build.gradle @@ -0,0 +1,21 @@ + +dependencies { + implementation(project(":agent-bridge")) + implementation("org.apache.kafka:kafka-clients:3.7.0") + + testImplementation("org.testcontainers:kafka:1.16.3") +} + +jar { + manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-clients-metrics-3.7.0', + 'Implementation-Title-Alias': 'kafka-clients-metrics' } +} + +verifyInstrumentation { + passesOnly 'org.apache.kafka:kafka-clients:[3.7.0,)' +} + +site { + title 'Kafka' + type 'Messaging' +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CallbackWrapper.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CallbackWrapper.java new file mode 100644 index 0000000000..259f8f3d6f --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/CallbackWrapper.java @@ -0,0 +1,41 @@ +/* + * + * * Copyright 2022 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import com.newrelic.api.agent.NewRelic; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.util.HashMap; +import java.util.Map; + +public class CallbackWrapper implements Callback { + + private final Callback callback; + private final String topic; + + public CallbackWrapper(Callback callback, String topic) { + this.callback = callback; + this.topic = topic; + } + + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + try { + if (exception != null) { + Map atts = new HashMap<>(); + atts.put("topic_name", topic); + NewRelic.noticeError(exception, atts); + } + } catch (Throwable t) { + } + + this.callback.onCompletion(metadata, exception); + } + +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/Metrics.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/Metrics.java new file mode 100644 index 0000000000..f26455ee44 --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/Metrics.java @@ -0,0 +1,20 @@ +/* + * + * * Copyright 2022 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +public class Metrics { + + // Serialization/Deserialization metrics + public static final String DESERIALIZATION_TIME_METRIC_BASE = "MessageBroker/Kafka/Deserialization/"; + public static final String SERIALIZATION_TIME_METRIC_BASE = "MessageBroker/Kafka/Serialization/"; + + // Rebalance metrics + public static final String REBALANCE_REVOKED_BASE = "MessageBroker/Kafka/Rebalance/Revoked/"; + public static final String REBALANCE_ASSIGNED_BASE = "MessageBroker/Kafka/Rebalance/Assigned/"; + +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricsConstants.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricsConstants.java new file mode 100644 index 0000000000..20b3c2b54e --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricsConstants.java @@ -0,0 +1,23 @@ +/* + * + * * Copyright 2023 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ +package com.nr.instrumentation.kafka; + +import com.newrelic.api.agent.NewRelic; + +public class MetricsConstants { + public static final boolean KAFKA_METRICS_DEBUG = NewRelic.getAgent().getConfig().getValue("kafka.metrics.debug.enabled", false); + + public static final boolean METRICS_AS_EVENTS = NewRelic.getAgent().getConfig().getValue("kafka.metrics.as_events.enabled", false); + + public static final long REPORTING_INTERVAL_IN_SECONDS = NewRelic.getAgent().getConfig().getValue("kafka.metrics.interval", 30); + + public static final String METRIC_PREFIX = "MessageBroker/Kafka/Internal/"; + + public static final String METRICS_EVENT_TYPE = "KafkaMetrics"; + + public static final String NODE_PREFIX = "MessageBroker/Kafka/Nodes/"; +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricsScheduler.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricsScheduler.java new file mode 100644 index 0000000000..45afc0a0fd --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/MetricsScheduler.java @@ -0,0 +1,101 @@ +/* + * + * * Copyright 2023 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ +package com.nr.instrumentation.kafka; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.NewRelic; +import org.apache.kafka.common.metrics.KafkaMetric; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +import static com.nr.instrumentation.kafka.MetricsConstants.KAFKA_METRICS_DEBUG; +import static com.nr.instrumentation.kafka.MetricsConstants.METRICS_AS_EVENTS; +import static com.nr.instrumentation.kafka.MetricsConstants.METRICS_EVENT_TYPE; +import static com.nr.instrumentation.kafka.MetricsConstants.METRIC_PREFIX; +import static com.nr.instrumentation.kafka.MetricsConstants.REPORTING_INTERVAL_IN_SECONDS; + +public class MetricsScheduler { + private static final ScheduledExecutorService executor = createScheduledExecutor(); + private static final Map> metricReporterTasks = new ConcurrentHashMap<>(); + + private MetricsScheduler() {} + + public static void addMetricsReporter(NewRelicMetricsReporter metricsReporter) { + ScheduledFuture task = executor.scheduleAtFixedRate(new MetricsSendRunnable(metricsReporter), + 0L, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS); + metricReporterTasks.put(metricsReporter, task); + } + + public static void removeMetricsReporter(NewRelicMetricsReporter metricsReporter) { + ScheduledFuture task = metricReporterTasks.remove(metricsReporter); + task.cancel(false); + } + + private static ScheduledExecutorService createScheduledExecutor() { + return Executors.newSingleThreadScheduledExecutor(runnable -> { + final Thread thread = new Thread(runnable); + thread.setDaemon(true); + thread.setName("NewRelicMetricsReporter-Kafka"); + return thread; + }); + } + + private static class MetricsSendRunnable implements Runnable { + private final NewRelicMetricsReporter nrMetricsReporter; + + private MetricsSendRunnable(NewRelicMetricsReporter nrMetricsReporter) { + this.nrMetricsReporter = nrMetricsReporter; + } + @Override + public void run() { + try { + Map eventData = new HashMap<>(); + for (final Map.Entry metric : nrMetricsReporter.getMetrics().entrySet()) { + Object metricValue = metric.getValue().metricValue(); + if (metricValue instanceof Number) { + final float value = ((Number) metricValue).floatValue(); + if (KAFKA_METRICS_DEBUG) { + AgentBridge.getAgent().getLogger().log(Level.FINEST, "getMetric: {0} = {1}", metric.getKey(), value); + } + if (!Float.isNaN(value) && !Float.isInfinite(value)) { + if (METRICS_AS_EVENTS) { + eventData.put(metric.getKey().replace('/', '.'), value); + } else { + NewRelic.recordMetric(METRIC_PREFIX + metric.getKey(), value); + } + } + } + } + + for (NewRelicMetricsReporter.NodeMetricNames consumerNodeMetricNames : nrMetricsReporter.getNodes().values()) { + if (METRICS_AS_EVENTS) { + for (String eventName : consumerNodeMetricNames.getEventNames()) { + eventData.put(eventName, 1f); + } + } else { + for (String metricName : consumerNodeMetricNames.getMetricNames()) { + NewRelic.recordMetric(metricName, 1f); + } + } + } + + if (METRICS_AS_EVENTS) { + NewRelic.getAgent().getInsights().recordCustomEvent(METRICS_EVENT_TYPE, eventData); + } + } catch (Exception e) { + AgentBridge.getAgent().getLogger().log(Level.FINE, e, "Unable to record kafka metrics"); + } + } + } +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java new file mode 100644 index 0000000000..34aff98fc2 --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/com/nr/instrumentation/kafka/NewRelicMetricsReporter.java @@ -0,0 +1,189 @@ +/* + * + * * Copyright 2022 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.instrumentation.kafka; + +import com.newrelic.agent.bridge.AgentBridge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricsReporter; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.logging.Level; + +import static com.nr.instrumentation.kafka.MetricsConstants.KAFKA_METRICS_DEBUG; +import static com.nr.instrumentation.kafka.MetricsConstants.NODE_PREFIX; + +public class NewRelicMetricsReporter implements MetricsReporter { + + + private final Map metrics = new ConcurrentHashMap<>(); + + private final Map nodes; + + public NewRelicMetricsReporter() { + this.nodes = Collections.emptyMap(); + } + + public NewRelicMetricsReporter(Set nodes, Mode mode) { + this.nodes = new ConcurrentHashMap<>(nodes.size()); + for(String node: nodes) { + this.nodes.put(node, new NodeMetricNames(node, mode)); + } + } + + public Map getMetrics() { + return this.metrics; + } + + public Map getNodes() { + return nodes; + } + + @Override + public void init(final List initMetrics) { + for (KafkaMetric kafkaMetric : initMetrics) { + String metricGroupAndName = getMetricGroupAndName(kafkaMetric); + if (KAFKA_METRICS_DEBUG) { + AgentBridge.getAgent().getLogger().log(Level.FINEST, "init(): {0} = {1}", metricGroupAndName, kafkaMetric.metricName()); + } + metrics.put(metricGroupAndName, kafkaMetric); + } + MetricsScheduler.addMetricsReporter(this); + } + + @Override + public void metricChange(final KafkaMetric metric) { + String metricGroupAndName = getMetricGroupAndName(metric); + if (KAFKA_METRICS_DEBUG) { + AgentBridge.getAgent().getLogger().log(Level.FINEST, "metricChange(): {0} = {1}", metricGroupAndName, metric.metricName()); + } + metrics.put(metricGroupAndName, metric); + } + + @Override + public void metricRemoval(final KafkaMetric metric) { + String metricGroupAndName = getMetricGroupAndName(metric); + if (KAFKA_METRICS_DEBUG) { + AgentBridge.getAgent().getLogger().log(Level.FINEST, "metricRemoval(): {0} = {1}", metricGroupAndName, metric.metricName()); + } + metrics.remove(metricGroupAndName); + } + + private String getMetricGroupAndName(final KafkaMetric metric) { + if (metric.metricName().tags().containsKey("topic")) { + String topic = metric.metricName().tags().get("topic"); + addTopicToNodeMetrics(topic); + + // Special case for handling topic names in metrics + return metric.metricName().group() + "/" + topic + "/" + metric.metricName().name(); + } + return metric.metricName().group() + "/" + metric.metricName().name(); + } + + private void addTopicToNodeMetrics(String topic) { + for (NodeMetricNames nodeMetricNames : nodes.values()) { + nodeMetricNames.addMetricNameForTopic(topic); + } + } + + @Override + public void close() { + MetricsScheduler.removeMetricsReporter(this); + metrics.clear(); + } + + @Override + public void configure(final Map configs) { + } + + /** + * This class is used to track all the metric names that are related to a specific node: + * + * - MessageBroker/Kafka/Nodes/host:port + * - MessageBroker/Kafka/Nodes/host:port/Consume/topicName + * - MessageBroker/Kafka/Nodes/host:port/Produce/topicName + * + * At initialization time we only have the node and the mode (is this a metrics reporter + * for a Kafka consumer or for a Kafka producer?). + * + * Then, as topics are discovered through the metricChange method, the topic metric names are + * generated. This is the best way we have to get track of the topics since they're not + * available when the KafkaConsumer/KafkaProducer is initialized. + * + * For KafkaConsumer, the SubscriptionState doesn't contain the topics and partitions + * at initialization time because it takes time for the rebalance to happen. + * + * For KafkaProducer, topics are dynamic since a producer could send records to any + * topic and the concept of subscription doesn't exist there. + * + * Alternatively we could get the topics from the records in KafkaProducer.doSend or + * KafkaConsumer.poll, and call NewRelicMetricsReporter.addTopicToNodeMetrics from there. + * This approach would have a small impact in performance, and getting the topics from the + * KafkaMetrics is a good enough solution. + */ + public static class NodeMetricNames { + + private final String node; + private final Mode mode; + + private final Set topics = new HashSet<>(); + + private final Set metricNames = new HashSet<>(); + private final Set eventNames = new HashSet<>(); + + public NodeMetricNames(String node, Mode mode) { + this.node = node; + this.mode = mode; + + String nodeMetricName = NODE_PREFIX + node; + metricNames.add(nodeMetricName); + eventNames.add(getEventNameForMetric(nodeMetricName)); + } + + private void addMetricNameForTopic(String topic) { + if (!topics.contains(topic)) { + String nodeTopicMetricName = NODE_PREFIX + node + "/" + mode.getMetricSegmentName() + "/" + topic; + metricNames.add(nodeTopicMetricName); + eventNames.add(getEventNameForMetric(nodeTopicMetricName)); + + topics.add(topic); + } + } + + private String getEventNameForMetric(String metricName) { + return metricName.replace('/', '.'); + } + + public Set getMetricNames() { + return metricNames; + } + + public Set getEventNames() { + return eventNames; + } + } + + public enum Mode { + CONSUMER("Consume"), + PRODUCER("Produce"); + + private final String metricSegmentName; + + Mode(String metricSegmentName) { + this.metricSegmentName = metricSegmentName; + } + + public String getMetricSegmentName() { + return metricSegmentName; + } + } +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener_Instrumentation.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener_Instrumentation.java new file mode 100644 index 0000000000..8a1ffdfbf8 --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener_Instrumentation.java @@ -0,0 +1,38 @@ +/* + * + * * Copyright 2022 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.apache.kafka.clients.consumer; + +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; + +import static com.nr.instrumentation.kafka.Metrics.REBALANCE_ASSIGNED_BASE; +import static com.nr.instrumentation.kafka.Metrics.REBALANCE_REVOKED_BASE; + +@Weave(type = MatchType.Interface, originalName = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener") +public class ConsumerRebalanceListener_Instrumentation { + + public void onPartitionsRevoked(Collection partitions) { + for (TopicPartition topicPartition : partitions) { + NewRelic.incrementCounter(REBALANCE_REVOKED_BASE + topicPartition.topic() + "/" + topicPartition.partition()); + } + Weaver.callOriginal(); + } + + public void onPartitionsAssigned(Collection partitions) { + for (TopicPartition topicPartition : partitions) { + NewRelic.incrementCounter(REBALANCE_ASSIGNED_BASE + topicPartition.topic() + "/" + topicPartition.partition()); + } + Weaver.callOriginal(); + } + +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer_Instrumentation.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer_Instrumentation.java new file mode 100644 index 0000000000..6749a43e65 --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer_Instrumentation.java @@ -0,0 +1,98 @@ +/* + * + * * Copyright 2022 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.apache.kafka.clients.consumer; + +import java.time.Duration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.kafka.clients.consumer.internals.ConsumerMetadata; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.Node; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.DestinationType; +import com.newrelic.api.agent.MessageConsumeParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.weaver.NewField; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.WeaveAllConstructors; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.NewRelicMetricsReporter; + +@Weave(originalName = "org.apache.kafka.clients.consumer.KafkaConsumer") +public class KafkaConsumer_Instrumentation { + + public ConsumerRecords poll(final long timeoutMs) { + final ConsumerRecords records; + try { + records = Weaver.callOriginal(); + } catch (Exception e) { + // Specifically ignore WakeupExceptions because they are common in non-error use cases + if (!(e instanceof WakeupException)) { + NewRelic.noticeError(e); + } + throw e; + } + + for (ConsumerRecord record : records) { + if (AgentBridge.getAgent().getTransaction(false) != null) { + MessageConsumeParameters params = MessageConsumeParameters.library("Kafka") + .destinationType(DestinationType.NAMED_TOPIC) + .destinationName(record.topic()) + .inboundHeaders(null) + .build(); + NewRelic.getAgent().getTransaction().getTracedMethod().reportAsExternal(params); + } + break; + } + return records; + } + + public ConsumerRecords poll(final Duration timeout) { + final ConsumerRecords records; + try { + records = Weaver.callOriginal(); + } catch (Exception e) { + // Specifically ignore WakeupExceptions because they are common in non-error use cases + if (!(e instanceof WakeupException)) { + NewRelic.noticeError(e); + } + throw e; + } + nrReportAsExternal(records); + return records; + } + + public void close() { + try { + Weaver.callOriginal(); + } catch (Exception e) { + NewRelic.noticeError(e); // Record an error when a consumer fails to close (most likely due to a timeout) + throw e; + } + } + + private void nrReportAsExternal(ConsumerRecords records) { + for (ConsumerRecord record : records) { + if (AgentBridge.getAgent().getTransaction(false) != null) { + MessageConsumeParameters params = MessageConsumeParameters.library("Kafka") + .destinationType(DestinationType.NAMED_TOPIC) + .destinationName(record.topic()) + .inboundHeaders(null) + .build(); + NewRelic.getAgent().getTransaction().getTracedMethod().reportAsExternal(params); + } + break; + } + } + +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer_Instrumentation.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer_Instrumentation.java new file mode 100644 index 0000000000..cce4c5b7af --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer_Instrumentation.java @@ -0,0 +1,36 @@ +package org.apache.kafka.clients.consumer.internals; + +import com.newrelic.api.agent.weaver.NewField; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.WeaveAllConstructors; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.NewRelicMetricsReporter; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.metrics.Metrics; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Weave(originalName = "org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer") +public class AsyncKafkaConsumer_Instrumentation { + private final Metrics metrics = Weaver.callOriginal(); + + private final ConsumerMetadata metadata = Weaver.callOriginal(); + + @NewField + private boolean initialized; + + @WeaveAllConstructors + public AsyncKafkaConsumer_Instrumentation() { + if (!initialized) { + List nodes = metadata.fetch().nodes(); + Set nodeNames = new HashSet<>(nodes.size()); + for (Node node : nodes) { + nodeNames.add(node.host() + ":" + node.port()); + } + metrics.addReporter(new NewRelicMetricsReporter(nodeNames, NewRelicMetricsReporter.Mode.CONSUMER)); + initialized = true; + } + } +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer_Instrumentation.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer_Instrumentation.java new file mode 100644 index 0000000000..b363d5553f --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/LegacyKafkaConsumer_Instrumentation.java @@ -0,0 +1,36 @@ +package org.apache.kafka.clients.consumer.internals; + +import com.newrelic.api.agent.weaver.NewField; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.WeaveAllConstructors; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.NewRelicMetricsReporter; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.metrics.Metrics; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@Weave(originalName = "org.apache.kafka.clients.consumer.internals.LegacyKafkaConsumer") +public class LegacyKafkaConsumer_Instrumentation { + private final Metrics metrics = Weaver.callOriginal(); + + private final ConsumerMetadata metadata = Weaver.callOriginal(); + + @NewField + private boolean initialized; + + @WeaveAllConstructors + public LegacyKafkaConsumer_Instrumentation() { + if (!initialized) { + List nodes = metadata.fetch().nodes(); + Set nodeNames = new HashSet<>(nodes.size()); + for (Node node : nodes) { + nodeNames.add(node.host() + ":" + node.port()); + } + metrics.addReporter(new NewRelicMetricsReporter(nodeNames, NewRelicMetricsReporter.Mode.CONSUMER)); + initialized = true; + } + } +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java new file mode 100644 index 0000000000..72a4020c7d --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/consumer/internals/PartitionAssignor.java @@ -0,0 +1,17 @@ +/* + * + * * Copyright 2022 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.apache.kafka.clients.consumer.internals; + +import com.newrelic.api.agent.weaver.SkipIfPresent; + +/** + * This class was removed on Kafka 3. So this will prevent the module from applying on older versions. + */ +@SkipIfPresent +public interface PartitionAssignor { +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java new file mode 100644 index 0000000000..d5cecbc981 --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/clients/producer/KafkaProducer_Instrumentation.java @@ -0,0 +1,81 @@ +/* + * + * * Copyright 2022 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.apache.kafka.clients.producer; + +import org.apache.kafka.clients.producer.internals.ProducerMetadata; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.Node; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.DestinationType; +import com.newrelic.api.agent.MessageProduceParameters; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.NewField; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.WeaveAllConstructors; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.CallbackWrapper; +import com.nr.instrumentation.kafka.NewRelicMetricsReporter; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; +import java.util.List; + +@Weave(originalName = "org.apache.kafka.clients.producer.KafkaProducer") +public class KafkaProducer_Instrumentation { + + private final Metrics metrics = Weaver.callOriginal(); + + private final ProducerMetadata metadata = Weaver.callOriginal(); + + @NewField + private boolean initialized; + + @WeaveAllConstructors + public KafkaProducer_Instrumentation() { + if (!initialized) { + List nodes = metadata.fetch().nodes(); + Set nodeNames = new HashSet<>(nodes.size()); + for (Node node : nodes) { + nodeNames.add(node.host() + ":" + node.port()); + } + metrics.addReporter(new NewRelicMetricsReporter(nodeNames, NewRelicMetricsReporter.Mode.PRODUCER)); + initialized = true; + } + } + + @Trace + private Future doSend(ProducerRecord record, Callback callback) { + if (callback != null) { + // Wrap the callback so we can capture metrics about messages being produced + callback = new CallbackWrapper(callback, record.topic()); + } + if (AgentBridge.getAgent().getTransaction(false) != null) { + // use null for headers so we don't try to do CAT + MessageProduceParameters params = MessageProduceParameters.library("Kafka") + .destinationType(DestinationType.NAMED_TOPIC) + .destinationName(record.topic()) + .outboundHeaders(null) + .build(); + NewRelic.getAgent().getTransaction().getTracedMethod().reportAsExternal(params); + } + + try { + return Weaver.callOriginal(); + } catch (Exception e) { + Map atts = new HashMap<>(); + atts.put("topic_name", record.topic()); + NewRelic.noticeError(e, atts); + throw e; + } + } +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/common/serialization/Deserializer_Instrumentation.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/common/serialization/Deserializer_Instrumentation.java new file mode 100644 index 0000000000..5d066a02cb --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/common/serialization/Deserializer_Instrumentation.java @@ -0,0 +1,25 @@ +/* + * + * * Copyright 2022 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.apache.kafka.common.serialization; + +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.Metrics; + +@Weave(originalName = "org.apache.kafka.common.serialization.Deserializer", type = MatchType.Interface) +public class Deserializer_Instrumentation { + + public T deserialize(String topic, byte[] data) { + long start = System.nanoTime(); + T result = Weaver.callOriginal(); + NewRelic.recordMetric(Metrics.DESERIALIZATION_TIME_METRIC_BASE + topic, System.nanoTime() - start); + return result; + } +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/common/serialization/Serializer_Instrumentation.java b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/common/serialization/Serializer_Instrumentation.java new file mode 100644 index 0000000000..8c3001cf4c --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/main/java/org/apache/kafka/common/serialization/Serializer_Instrumentation.java @@ -0,0 +1,25 @@ +/* + * + * * Copyright 2022 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.apache.kafka.common.serialization; + +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.kafka.Metrics; + +@Weave(originalName = "org.apache.kafka.common.serialization.Serializer", type = MatchType.Interface) +public class Serializer_Instrumentation { + + public byte[] serialize(String topic, T data) { + long start = System.nanoTime(); + byte[] result = Weaver.callOriginal(); + NewRelic.recordMetric(Metrics.SERIALIZATION_TIME_METRIC_BASE + topic, System.nanoTime() - start); + return result; + } +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/Kafka37MessageTest.java b/instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/Kafka37MessageTest.java new file mode 100644 index 0000000000..46a70b2084 --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/Kafka37MessageTest.java @@ -0,0 +1,211 @@ +/* + * + * * Copyright 2022 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.agent.instrumentation.kafka; + +import com.newrelic.agent.introspec.InstrumentationTestConfig; +import com.newrelic.agent.introspec.InstrumentationTestRunner; +import static com.newrelic.agent.introspec.MetricsHelper.getUnscopedMetricCount; + +import com.newrelic.agent.introspec.TracedMetricData; +import com.newrelic.api.agent.Trace; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.junit.After; +import org.junit.Assert; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +//@Ignore("This test is flaky on GHA") +@RunWith(InstrumentationTestRunner.class) +@InstrumentationTestConfig(includePrefixes = "org.apache.kafka") +public class Kafka37MessageTest { + @Rule + public KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0")); + + private final String TOPIC = "life-universe-everything"; + private final String ANOTHER_TOPIC = "vogon-poetry"; + @Before + public void before() { + kafkaContainer.start(); + } + + @After + public void after() { + kafkaContainer.stop(); + } + + @Test + public void testProducer() throws ExecutionException, InterruptedException { + Future msgsWereRead = asyncReadMessages(); + + // giving some time for the consumer to ready itself up prior to sending the messages + Thread.sleep(1000L); + sendMessages(); + + Assert.assertTrue("Messages weren't read", msgsWereRead.get()); + assertUnscopedMetrics(); + } + + /** + * @return a Future that holds whether the messages were read + */ + private Future asyncReadMessages() { + return Executors.newSingleThreadExecutor().submit(this::readMessages); + } + + /** + * @return whether messages were read + */ + @Trace(dispatcher = true) + private boolean readMessages() throws InterruptedException { + int messagesRead = 0; + try (KafkaConsumer consumer = KafkaHelper.newConsumer(kafkaContainer)) { + consumer.subscribe(Collections.singleton(TOPIC)); + + // setting a timeout so this does not drag forever if something goes wrong. + long waitUntil = System.currentTimeMillis() + 15000L; + while (waitUntil > System.currentTimeMillis()) { + ConsumerRecords records = consumer.poll(1000); + messagesRead += records.count(); + if (messagesRead == 2) { + // Sleep for a minute before closing the consumer so MetricsScheduler runs + // few times and all metrics are reported + Thread.sleep(60000L); + return true; + } + } + } + return false; + } + + @Trace(dispatcher = true) + private void sendMessages() throws ExecutionException, InterruptedException { + try (KafkaProducer producer = KafkaHelper.newProducer(kafkaContainer)) { + List> futures = Arrays.asList( + producer.send(new ProducerRecord<>(ANOTHER_TOPIC, "Oh freddled gruntbuggly")), + producer.send(new ProducerRecord<>(TOPIC, "Life, don't talk to me about life.")), + producer.send(new ProducerRecord<>(TOPIC, "Don't Panic")) + ); + for (Future future : futures) { + future.get(); + } + // Sleep for a minute before closing the producer so MetricsScheduler runs + // few times and all metrics are reported + Thread.sleep(60000L); + } + } + + private void assertUnscopedMetrics() { + // on the previous instrumentation module there are more metrics being verified. + // Kafka 3 changed a little how the values are retrieved and those metrics now return NaN, and thus are not reported. + assertUnscopedMetricExists( + // general kafka metrics, they can change from test to test, so only verifying they exist + "MessageBroker/Kafka/Internal/consumer-coordinator-metrics/assigned-partitions", + "MessageBroker/Kafka/Internal/consumer-coordinator-metrics/commit-rate", + "MessageBroker/Kafka/Internal/consumer-coordinator-metrics/heartbeat-rate", + "MessageBroker/Kafka/Internal/consumer-coordinator-metrics/join-rate", + "MessageBroker/Kafka/Internal/consumer-coordinator-metrics/last-heartbeat-seconds-ago", + "MessageBroker/Kafka/Internal/consumer-coordinator-metrics/sync-rate", + "MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/bytes-consumed-rate", + "MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-rate", + "MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/records-consumed-rate", + "MessageBroker/Kafka/Internal/consumer-metrics/connection-close-rate", + "MessageBroker/Kafka/Internal/consumer-metrics/connection-count", + "MessageBroker/Kafka/Internal/consumer-metrics/connection-creation-rate", + "MessageBroker/Kafka/Internal/consumer-metrics/incoming-byte-rate", + "MessageBroker/Kafka/Internal/consumer-metrics/io-ratio", + "MessageBroker/Kafka/Internal/consumer-metrics/io-wait-ratio", + "MessageBroker/Kafka/Internal/consumer-metrics/network-io-rate", + "MessageBroker/Kafka/Internal/consumer-metrics/outgoing-byte-rate", + "MessageBroker/Kafka/Internal/consumer-metrics/request-rate", + "MessageBroker/Kafka/Internal/consumer-metrics/response-rate", + "MessageBroker/Kafka/Internal/consumer-metrics/select-rate", + "MessageBroker/Kafka/Internal/kafka-metrics-count/count", + "MessageBroker/Kafka/Internal/producer-metrics/batch-split-rate", + "MessageBroker/Kafka/Internal/producer-metrics/buffer-available-bytes", + "MessageBroker/Kafka/Internal/producer-metrics/buffer-exhausted-rate", + "MessageBroker/Kafka/Internal/producer-metrics/buffer-total-bytes", + "MessageBroker/Kafka/Internal/producer-metrics/bufferpool-wait-ratio", + "MessageBroker/Kafka/Internal/producer-metrics/connection-close-rate", + "MessageBroker/Kafka/Internal/producer-metrics/connection-count", + "MessageBroker/Kafka/Internal/producer-metrics/connection-creation-rate", + "MessageBroker/Kafka/Internal/producer-metrics/incoming-byte-rate", + "MessageBroker/Kafka/Internal/producer-metrics/io-ratio", + "MessageBroker/Kafka/Internal/producer-metrics/io-wait-ratio", + "MessageBroker/Kafka/Internal/producer-metrics/metadata-age", + "MessageBroker/Kafka/Internal/producer-metrics/network-io-rate", + "MessageBroker/Kafka/Internal/producer-metrics/outgoing-byte-rate", + "MessageBroker/Kafka/Internal/producer-metrics/record-error-rate", + "MessageBroker/Kafka/Internal/producer-metrics/record-retry-rate", + "MessageBroker/Kafka/Internal/producer-metrics/record-send-rate", + "MessageBroker/Kafka/Internal/producer-metrics/request-rate", + "MessageBroker/Kafka/Internal/producer-metrics/requests-in-flight", + "MessageBroker/Kafka/Internal/producer-metrics/response-rate", + "MessageBroker/Kafka/Internal/producer-metrics/select-rate", + "MessageBroker/Kafka/Internal/producer-metrics/waiting-threads" + ); + + // serializer are called more often because they serialize the key and the value + assertEquals(0, getUnscopedMetricCount("MessageBroker/Kafka/Deserialization/" + TOPIC)); + assertEquals(4, getUnscopedMetricCount("MessageBroker/Kafka/Serialization/" + TOPIC)); + assertEquals(2, getUnscopedMetricCount("MessageBroker/Kafka/Topic/Produce/Named/" +TOPIC)); + + // deserializer is never called because this topic is never read from + assertEquals(0, getUnscopedMetricCount("MessageBroker/Kafka/Deserialization/" + ANOTHER_TOPIC)); + assertEquals(2, getUnscopedMetricCount("MessageBroker/Kafka/Serialization/" + ANOTHER_TOPIC)); + assertEquals(1, getUnscopedMetricCount("MessageBroker/Kafka/Topic/Produce/Named/" + ANOTHER_TOPIC)); + + // there are 2 messages in the topic, but they could be read in a single poll, or in 2 + int consumedCount = getUnscopedMetricCount("MessageBroker/Kafka/Topic/Consume/Named/" + TOPIC); + assertTrue(consumedCount >= 1); + assertTrue(consumedCount <= 2); + + assertEquals(0, getUnscopedMetricCount("MessageBroker/Kafka/Rebalance/Assigned/life-universe-everything/0")); + + // Nodes metrics + assertTrue(unscopedNodesMetricExists("MessageBroker/Kafka/Nodes/localhost:[0-9]*")); + assertTrue(unscopedNodesMetricExists("MessageBroker/Kafka/Nodes/localhost:[0-9]*/Consume/" + TOPIC)); + assertTrue(unscopedNodesMetricExists("MessageBroker/Kafka/Nodes/localhost:[0-9]*/Produce/" + TOPIC)); + assertFalse(unscopedNodesMetricExists("MessageBroker/Kafka/Nodes/localhost:[0-9]*/Consume/" + ANOTHER_TOPIC)); + assertTrue(unscopedNodesMetricExists("MessageBroker/Kafka/Nodes/localhost:[0-9]*/Produce/" + ANOTHER_TOPIC)); + } + + private void assertUnscopedMetricExists(String ... metricNames) { + int notFoundMetricCount = 0; + Set existingMetrics= InstrumentationTestRunner.getIntrospector().getUnscopedMetrics().keySet(); + for (String metricName : metricNames) { + Assert.assertTrue("metric not found: " + metricName, existingMetrics.contains(metricName)); + } + System.out.println(notFoundMetricCount + " metrics not found"); + } + + private boolean unscopedNodesMetricExists(String metricName) { + return InstrumentationTestRunner.getIntrospector().getUnscopedMetrics().keySet().stream() + .anyMatch(key -> key.matches(metricName)); + } +} diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/KafkaHelper.java b/instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/KafkaHelper.java new file mode 100644 index 0000000000..a38a26c8d5 --- /dev/null +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/KafkaHelper.java @@ -0,0 +1,47 @@ +/* + * + * * Copyright 2022 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package com.nr.agent.instrumentation.kafka; + +import java.util.Properties; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.testcontainers.containers.KafkaContainer; + +public class KafkaHelper { + + public static KafkaProducer newProducer(KafkaContainer kafkaContainer) { + Properties props = getProps(kafkaContainer.getBootstrapServers()); + return new KafkaProducer<>(props); + } + + public static KafkaConsumer newConsumer(KafkaContainer kafkaContainer) { + Properties props = getProps(kafkaContainer.getBootstrapServers()); + return new KafkaConsumer<>(props); + } + + public static Properties getProps(String bootstrapServers) { + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServers); + props.put("acks", "all"); + props.put("retries", 0); + props.put("batch.size", 16384); + props.put("linger.ms", 1); + props.put("buffer.memory", 33554432); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("group.id", "test-consumer-group"); + props.put("group.protocol", "CLASSIC"); + return props; + } + + private KafkaHelper() { + // prevents instantiations + } +} diff --git a/settings.gradle b/settings.gradle index 7c7aa71367..30d5ef5b0a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -235,6 +235,7 @@ include 'instrumentation:kafka-clients-heartbeat-2.1.0' include 'instrumentation:kafka-clients-metrics-0.10.0.0' include 'instrumentation:kafka-clients-metrics-2.0.0' include 'instrumentation:kafka-clients-metrics-3.0.0' +include 'instrumentation:kafka-clients-metrics-3.7.0' include 'instrumentation:kafka-clients-node-metrics-1.0.0' include 'instrumentation:kafka-clients-spans-0.11.0.0' include 'instrumentation:kafka-connect-metrics-1.0.0' From f2c225d6286ccfc8388a435f5f8e64741eef82aa Mon Sep 17 00:00:00 2001 From: Oren Ben-Meir Date: Wed, 7 Aug 2024 17:02:17 -0400 Subject: [PATCH 2/2] Ignore Kafka3.7 test since it is flaky --- .../com/nr/agent/instrumentation/kafka/Kafka37MessageTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/Kafka37MessageTest.java b/instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/Kafka37MessageTest.java index 46a70b2084..ceed0a13f0 100644 --- a/instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/Kafka37MessageTest.java +++ b/instrumentation/kafka-clients-metrics-3.7.0/src/test/java/com/nr/agent/instrumentation/kafka/Kafka37MessageTest.java @@ -40,7 +40,7 @@ import org.testcontainers.containers.KafkaContainer; import org.testcontainers.utility.DockerImageName; -//@Ignore("This test is flaky on GHA") +@Ignore("This test is flaky on GHA") @RunWith(InstrumentationTestRunner.class) @InstrumentationTestConfig(includePrefixes = "org.apache.kafka") public class Kafka37MessageTest {