Skip to content

Commit

Permalink
Merge pull request #2001 from newrelic/kafka-clients-metrics-3.7.0
Browse files Browse the repository at this point in the history
Add instrumentation:kafka-clients-metrics-3.7.0
  • Loading branch information
obenkenobi authored Aug 12, 2024
2 parents 9338ecc + f2c225d commit aa6cccf
Show file tree
Hide file tree
Showing 17 changed files with 1,010 additions and 0 deletions.
21 changes: 21 additions & 0 deletions instrumentation/kafka-clients-metrics-3.7.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

dependencies {
implementation(project(":agent-bridge"))
implementation("org.apache.kafka:kafka-clients:3.7.0")

testImplementation("org.testcontainers:kafka:1.16.3")
}

jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-clients-metrics-3.7.0',
'Implementation-Title-Alias': 'kafka-clients-metrics' }
}

verifyInstrumentation {
passesOnly 'org.apache.kafka:kafka-clients:[3.7.0,)'
}

site {
title 'Kafka'
type 'Messaging'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
*
* * Copyright 2022 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

import com.newrelic.api.agent.NewRelic;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.HashMap;
import java.util.Map;

public class CallbackWrapper implements Callback {

private final Callback callback;
private final String topic;

public CallbackWrapper(Callback callback, String topic) {
this.callback = callback;
this.topic = topic;
}

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
try {
if (exception != null) {
Map<String, Object> atts = new HashMap<>();
atts.put("topic_name", topic);
NewRelic.noticeError(exception, atts);
}
} catch (Throwable t) {
}

this.callback.onCompletion(metadata, exception);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
*
* * Copyright 2022 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.instrumentation.kafka;

public class Metrics {

// Serialization/Deserialization metrics
public static final String DESERIALIZATION_TIME_METRIC_BASE = "MessageBroker/Kafka/Deserialization/";
public static final String SERIALIZATION_TIME_METRIC_BASE = "MessageBroker/Kafka/Serialization/";

// Rebalance metrics
public static final String REBALANCE_REVOKED_BASE = "MessageBroker/Kafka/Rebalance/Revoked/";
public static final String REBALANCE_ASSIGNED_BASE = "MessageBroker/Kafka/Rebalance/Assigned/";

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package com.nr.instrumentation.kafka;

import com.newrelic.api.agent.NewRelic;

public class MetricsConstants {
public static final boolean KAFKA_METRICS_DEBUG = NewRelic.getAgent().getConfig().getValue("kafka.metrics.debug.enabled", false);

public static final boolean METRICS_AS_EVENTS = NewRelic.getAgent().getConfig().getValue("kafka.metrics.as_events.enabled", false);

public static final long REPORTING_INTERVAL_IN_SECONDS = NewRelic.getAgent().getConfig().getValue("kafka.metrics.interval", 30);

public static final String METRIC_PREFIX = "MessageBroker/Kafka/Internal/";

public static final String METRICS_EVENT_TYPE = "KafkaMetrics";

public static final String NODE_PREFIX = "MessageBroker/Kafka/Nodes/";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
*
* * Copyright 2023 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package com.nr.instrumentation.kafka;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import org.apache.kafka.common.metrics.KafkaMetric;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

import static com.nr.instrumentation.kafka.MetricsConstants.KAFKA_METRICS_DEBUG;
import static com.nr.instrumentation.kafka.MetricsConstants.METRICS_AS_EVENTS;
import static com.nr.instrumentation.kafka.MetricsConstants.METRICS_EVENT_TYPE;
import static com.nr.instrumentation.kafka.MetricsConstants.METRIC_PREFIX;
import static com.nr.instrumentation.kafka.MetricsConstants.REPORTING_INTERVAL_IN_SECONDS;

public class MetricsScheduler {
private static final ScheduledExecutorService executor = createScheduledExecutor();
private static final Map<NewRelicMetricsReporter, ScheduledFuture<?>> metricReporterTasks = new ConcurrentHashMap<>();

private MetricsScheduler() {}

public static void addMetricsReporter(NewRelicMetricsReporter metricsReporter) {
ScheduledFuture<?> task = executor.scheduleAtFixedRate(new MetricsSendRunnable(metricsReporter),
0L, REPORTING_INTERVAL_IN_SECONDS, TimeUnit.SECONDS);
metricReporterTasks.put(metricsReporter, task);
}

public static void removeMetricsReporter(NewRelicMetricsReporter metricsReporter) {
ScheduledFuture<?> task = metricReporterTasks.remove(metricsReporter);
task.cancel(false);
}

private static ScheduledExecutorService createScheduledExecutor() {
return Executors.newSingleThreadScheduledExecutor(runnable -> {
final Thread thread = new Thread(runnable);
thread.setDaemon(true);
thread.setName("NewRelicMetricsReporter-Kafka");
return thread;
});
}

private static class MetricsSendRunnable implements Runnable {
private final NewRelicMetricsReporter nrMetricsReporter;

private MetricsSendRunnable(NewRelicMetricsReporter nrMetricsReporter) {
this.nrMetricsReporter = nrMetricsReporter;
}
@Override
public void run() {
try {
Map<String, Object> eventData = new HashMap<>();
for (final Map.Entry<String, KafkaMetric> metric : nrMetricsReporter.getMetrics().entrySet()) {
Object metricValue = metric.getValue().metricValue();
if (metricValue instanceof Number) {
final float value = ((Number) metricValue).floatValue();
if (KAFKA_METRICS_DEBUG) {
AgentBridge.getAgent().getLogger().log(Level.FINEST, "getMetric: {0} = {1}", metric.getKey(), value);
}
if (!Float.isNaN(value) && !Float.isInfinite(value)) {
if (METRICS_AS_EVENTS) {
eventData.put(metric.getKey().replace('/', '.'), value);
} else {
NewRelic.recordMetric(METRIC_PREFIX + metric.getKey(), value);
}
}
}
}

for (NewRelicMetricsReporter.NodeMetricNames consumerNodeMetricNames : nrMetricsReporter.getNodes().values()) {
if (METRICS_AS_EVENTS) {
for (String eventName : consumerNodeMetricNames.getEventNames()) {
eventData.put(eventName, 1f);
}
} else {
for (String metricName : consumerNodeMetricNames.getMetricNames()) {
NewRelic.recordMetric(metricName, 1f);
}
}
}

if (METRICS_AS_EVENTS) {
NewRelic.getAgent().getInsights().recordCustomEvent(METRICS_EVENT_TYPE, eventData);
}
} catch (Exception e) {
AgentBridge.getAgent().getLogger().log(Level.FINE, e, "Unable to record kafka metrics");
}
}
}
}
Loading

0 comments on commit aa6cccf

Please sign in to comment.