Skip to content

Commit

Permalink
Testing Kafka instrumentation
Browse files Browse the repository at this point in the history
also prevents the Kafka 2 instrumentation from applying to Kafka 3.
  • Loading branch information
meiao authored May 24, 2022
1 parent 77ed361 commit 508eb9d
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 1 deletion.
3 changes: 3 additions & 0 deletions instrumentation/kafka-clients-metrics-0.10.0.0/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
dependencies {
implementation(project(":agent-bridge"))
implementation("org.apache.kafka:kafka-clients:0.11.0.0")

testImplementation("org.testcontainers:testcontainers:1.16.3")
testImplementation("org.testcontainers:kafka:1.16.3")
}

jar {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
*
* * Copyright 2022 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.agent.instrumentation.kafka;

import com.newrelic.agent.introspec.InstrumentationTestConfig;
import com.newrelic.agent.introspec.InstrumentationTestRunner;
import static com.newrelic.agent.introspec.MetricsHelper.getUnscopedMetricCount;
import com.newrelic.api.agent.Trace;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.After;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(InstrumentationTestRunner.class)
@InstrumentationTestConfig(includePrefixes = "org.apache.kafka")
public class Kafka2MessageTest {
@Rule
public KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

private String TOPIC = "life-universe-everything";
private String ANOTHER_TOPIC = "vogon-poetry";
@Before
public void before() {
kafkaContainer.start();
}

@After
public void after() {
kafkaContainer.stop();
}

@Test
public void testProducer() throws ExecutionException, InterruptedException {
Future<Boolean> msgsWereRead = asyncReadMessages();

// giving some time for the consumer to ready itself up prior to sending the messages
Thread.sleep(1000L);
sendMessages();

Assert.assertTrue("Messages weren't read", msgsWereRead.get());
assertUnscopedMetrics();
}

/**
* @return a Future that holds whether the messages were read
*/
private Future<Boolean> asyncReadMessages() {
return Executors.newSingleThreadExecutor().submit(this::readMessages);
}

/**
* @return whether messages were read
*/
@Trace(dispatcher = true)
private boolean readMessages() {
int messagesRead = 0;
try (KafkaConsumer<String, String> consumer = KafkaHelper.newConsumer(kafkaContainer)) {
consumer.subscribe(Collections.singleton(TOPIC));

// setting a timeout so this does not drag forever if something goes wrong.
long waitUntil = System.currentTimeMillis() + 5000L;
while (waitUntil > System.currentTimeMillis()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
messagesRead += records.count();
if (messagesRead == 2) {
return true;
}
}
}
return false;
}

@Trace(dispatcher = true)
private void sendMessages() throws ExecutionException, InterruptedException {
try (KafkaProducer<String, String> producer = KafkaHelper.newProducer(kafkaContainer)) {
List<Future<RecordMetadata>> futures = Arrays.asList(
producer.send(new ProducerRecord<>(ANOTHER_TOPIC, "Oh freddled gruntbuggly")),
producer.send(new ProducerRecord<>(TOPIC, "Life, don't talk to me about life.")),
producer.send(new ProducerRecord<>(TOPIC, "Don't Panic"))
);
for (Future<RecordMetadata> future : futures) {
future.get();
}
}
}

private void assertUnscopedMetrics() {
assertUnscopedMetricExists(
// general kafka metrics, they can change from test to test, so only verifying they exist
"MessageBroker/Kafka/Internal/consumer-coordinator-metrics/assigned-partitions",
"MessageBroker/Kafka/Internal/consumer-coordinator-metrics/commit-latency-avg",
"MessageBroker/Kafka/Internal/consumer-coordinator-metrics/commit-rate",
"MessageBroker/Kafka/Internal/consumer-coordinator-metrics/heartbeat-rate",
"MessageBroker/Kafka/Internal/consumer-coordinator-metrics/join-rate",
"MessageBroker/Kafka/Internal/consumer-coordinator-metrics/join-time-avg",
"MessageBroker/Kafka/Internal/consumer-coordinator-metrics/last-heartbeat-seconds-ago",
"MessageBroker/Kafka/Internal/consumer-coordinator-metrics/sync-rate",
"MessageBroker/Kafka/Internal/consumer-coordinator-metrics/sync-time-avg",
"MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/bytes-consumed-rate",
"MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-latency-avg",
"MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-rate",
"MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-size-avg",
"MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/fetch-throttle-time-avg",
"MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/records-consumed-rate",
"MessageBroker/Kafka/Internal/consumer-fetch-manager-metrics/records-per-request-avg",
"MessageBroker/Kafka/Internal/consumer-metrics/connection-close-rate",
"MessageBroker/Kafka/Internal/consumer-metrics/connection-count",
"MessageBroker/Kafka/Internal/consumer-metrics/connection-creation-rate",
"MessageBroker/Kafka/Internal/consumer-metrics/incoming-byte-rate",
"MessageBroker/Kafka/Internal/consumer-metrics/io-ratio",
"MessageBroker/Kafka/Internal/consumer-metrics/io-time-ns-avg",
"MessageBroker/Kafka/Internal/consumer-metrics/io-wait-ratio",
"MessageBroker/Kafka/Internal/consumer-metrics/io-wait-time-ns-avg",
"MessageBroker/Kafka/Internal/consumer-metrics/network-io-rate",
"MessageBroker/Kafka/Internal/consumer-metrics/outgoing-byte-rate",
"MessageBroker/Kafka/Internal/consumer-metrics/request-rate",
"MessageBroker/Kafka/Internal/consumer-metrics/request-size-avg",
"MessageBroker/Kafka/Internal/consumer-metrics/response-rate",
"MessageBroker/Kafka/Internal/consumer-metrics/select-rate",
"MessageBroker/Kafka/Internal/kafka-metrics-count/count",
"MessageBroker/Kafka/Internal/producer-metrics/batch-size-avg",
"MessageBroker/Kafka/Internal/producer-metrics/batch-split-rate",
"MessageBroker/Kafka/Internal/producer-metrics/buffer-available-bytes",
"MessageBroker/Kafka/Internal/producer-metrics/buffer-exhausted-rate",
"MessageBroker/Kafka/Internal/producer-metrics/buffer-total-bytes",
"MessageBroker/Kafka/Internal/producer-metrics/bufferpool-wait-ratio",
"MessageBroker/Kafka/Internal/producer-metrics/compression-rate-avg",
"MessageBroker/Kafka/Internal/producer-metrics/connection-close-rate",
"MessageBroker/Kafka/Internal/producer-metrics/connection-count",
"MessageBroker/Kafka/Internal/producer-metrics/connection-creation-rate",
"MessageBroker/Kafka/Internal/producer-metrics/incoming-byte-rate",
"MessageBroker/Kafka/Internal/producer-metrics/io-ratio",
"MessageBroker/Kafka/Internal/producer-metrics/io-time-ns-avg",
"MessageBroker/Kafka/Internal/producer-metrics/io-wait-ratio",
"MessageBroker/Kafka/Internal/producer-metrics/io-wait-time-ns-avg",
"MessageBroker/Kafka/Internal/producer-metrics/metadata-age",
"MessageBroker/Kafka/Internal/producer-metrics/network-io-rate",
"MessageBroker/Kafka/Internal/producer-metrics/outgoing-byte-rate",
"MessageBroker/Kafka/Internal/producer-metrics/produce-throttle-time-avg",
"MessageBroker/Kafka/Internal/producer-metrics/record-error-rate",
"MessageBroker/Kafka/Internal/producer-metrics/record-queue-time-avg",
"MessageBroker/Kafka/Internal/producer-metrics/record-retry-rate",
"MessageBroker/Kafka/Internal/producer-metrics/record-send-rate",
"MessageBroker/Kafka/Internal/producer-metrics/record-size-avg",
"MessageBroker/Kafka/Internal/producer-metrics/records-per-request-avg",
"MessageBroker/Kafka/Internal/producer-metrics/request-latency-avg",
"MessageBroker/Kafka/Internal/producer-metrics/request-rate",
"MessageBroker/Kafka/Internal/producer-metrics/request-size-avg",
"MessageBroker/Kafka/Internal/producer-metrics/requests-in-flight",
"MessageBroker/Kafka/Internal/producer-metrics/response-rate",
"MessageBroker/Kafka/Internal/producer-metrics/select-rate",
"MessageBroker/Kafka/Internal/producer-metrics/waiting-threads"
);

assertEquals(4, getUnscopedMetricCount("MessageBroker/Kafka/Deserialization/" + TOPIC));
assertEquals(8, getUnscopedMetricCount("MessageBroker/Kafka/Serialization/" + TOPIC));
assertEquals(2, getUnscopedMetricCount("MessageBroker/Kafka/Topic/Produce/Named/" +TOPIC));

assertEquals(0, getUnscopedMetricCount("MessageBroker/Kafka/Deserialization/" + ANOTHER_TOPIC));
assertEquals(4, getUnscopedMetricCount("MessageBroker/Kafka/Serialization/" + ANOTHER_TOPIC));
assertEquals(1, getUnscopedMetricCount("MessageBroker/Kafka/Topic/Produce/Named/" + ANOTHER_TOPIC));

// there are 2 messages in the topic, but they could be read in a single poll, or in 2
int consumedCount = getUnscopedMetricCount("MessageBroker/Kafka/Topic/Consume/Named/" + TOPIC);
assertTrue(consumedCount >= 1);
assertTrue(consumedCount <= 2);

assertEquals(1, getUnscopedMetricCount("MessageBroker/Kafka/Rebalance/Assigned/life-universe-everything/0"));
}

private void assertUnscopedMetricExists(String ... metricNames) {
Set<String> existingMetrics= InstrumentationTestRunner.getIntrospector().getUnscopedMetrics().keySet();
for (String metricName : metricNames) {
Assert.assertTrue("metric not found: " + metricName, existingMetrics.contains(metricName));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
*
* * Copyright 2022 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package com.nr.agent.instrumentation.kafka;

import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.testcontainers.containers.KafkaContainer;

public class KafkaHelper {

public static KafkaProducer<String, String> newProducer(KafkaContainer kafkaContainer) {
Properties props = getProps(kafkaContainer.getBootstrapServers());
return new KafkaProducer<>(props);
}

public static KafkaConsumer<String, String> newConsumer(KafkaContainer kafkaContainer) {
Properties props = getProps(kafkaContainer.getBootstrapServers());
return new KafkaConsumer<>(props);
}

public static Properties getProps(String bootstrapServers) {
Properties props = new Properties();
props.put("bootstrap.servers", bootstrapServers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test-consumer-group");
return props;
}

private KafkaHelper() {
// prevents instantiations
}
}
2 changes: 1 addition & 1 deletion instrumentation/kafka-clients-metrics-2.0.0/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jar {
}

verifyInstrumentation {
passesOnly 'org.apache.kafka:kafka-clients:[2.0.0,)'
passesOnly 'org.apache.kafka:kafka-clients:[2.0.0,3.0.0)'
}

site {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
*
* * Copyright 2022 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.apache.kafka.clients.consumer;

import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;

/**
* This class is here to prevent this module from applying to Kafka 3.
*/
@Weave(originalName = "org.apache.kafka.clients.consumer.ConsumerRecord")
public class ConsumerRecord_Instrumentation {

public long checksum() {
return Weaver.callOriginal();
}
}

0 comments on commit 508eb9d

Please sign in to comment.