From ab67a7b292652965f188534e3c9289615a97702f Mon Sep 17 00:00:00 2001 From: Scott Shields Date: Mon, 11 May 2020 18:45:44 -0400 Subject: [PATCH 1/3] Adding an option to manually disable Kafka headers --- .../KafkaProducerInstrumentation.java | 6 +- .../src/test/groovy/KafkaClientTest.groovy | 71 +++++++++++++++++++ .../main/java/datadog/trace/api/Config.java | 12 ++++ 3 files changed, 88 insertions(+), 1 deletion(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index f057ac2ab28..fe5e349aa5c 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -13,6 +13,7 @@ import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.Config; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import java.util.Map; @@ -76,7 +77,10 @@ public static AgentScope onEnter( // Do not inject headers for batch versions below 2 // This is how similar check is being done in Kafka client itself: // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 - if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) { + // Also, do not inject headers if specified by JVM option or environment variable + // This can help in mixed client environments where clients < 0.11 that do not support headers attempt to read + // messages that were produced by clients > 0.11 and the magic value of the broker(s) is >= 2 + if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 && Config.get().isKafkaHeadersEnabled()) { try { propagate().inject(span, record.headers(), SETTER); } catch (final IllegalStateException e) { diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy index a4434cc489f..28e850007de 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy @@ -1,4 +1,5 @@ import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.Config import datadog.trace.bootstrap.instrumentation.api.Tags import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord @@ -15,10 +16,13 @@ import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.rule.KafkaEmbedded import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils +import spock.lang.Unroll import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit +import static datadog.trace.agent.test.utils.ConfigUtils.withConfigOverride + class KafkaClientTest extends AgentTestRunner { static final SHARED_TOPIC = "shared.topic" @@ -204,4 +208,71 @@ class KafkaClientTest extends AgentTestRunner { } + @Unroll + def "test kafka headers manual config"() { + setup: + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producerFactory = new DefaultKafkaProducerFactory(senderProps) + def kafkaTemplate = new KafkaTemplate(producerFactory) + + // set up the Kafka consumer properties + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + + // create a Kafka consumer factory + def consumerFactory = new DefaultKafkaConsumerFactory(consumerProperties) + + // set the topic that needs to be consumed + def containerProperties + try { + // Different class names for test and latestDepTest. + containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC) + } catch (ClassNotFoundException | NoClassDefFoundError e) { + containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC) + } + + // create a Kafka MessageListenerContainer + def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties) + + // create a thread safe queue to store the received message + def records = new LinkedBlockingQueue>() + + // setup a Kafka message listener + container.setupMessageListener(new MessageListener() { + @Override + void onMessage(ConsumerRecord record) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + records.add(record) + } + }) + + // start the container and underlying message listener + container.start() + + // wait until the container has the required number of assigned partitions + ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic()) + + when: + String message = "Testing without headers" + withConfigOverride(Config.KAFKA_HEADERS_ENABLED, value) { + kafkaTemplate.send(SHARED_TOPIC, message) + } + + then: + // check that the message was received + def received = records.poll(5, TimeUnit.SECONDS) + + received.headers().iterator().hasNext() == expected + + cleanup: + producerFactory.stop() + container?.stop() + + where: + value | expected + "false" | false + "true" | true + String.valueOf(Config.DEFAULT_KAFKA_HEADERS_ENABLED) | true + + } + } diff --git a/dd-trace-api/src/main/java/datadog/trace/api/Config.java b/dd-trace-api/src/main/java/datadog/trace/api/Config.java index 55d61bc7585..9a3d16659e7 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/Config.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/Config.java @@ -148,6 +148,8 @@ public class Config { public static final String PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE = "profiling.exception.histogram.max-collection-size"; + public static final String KAFKA_HEADERS_ENABLED = "kafka.headers.enabled"; + public static final String RUNTIME_ID_TAG = "runtime-id"; public static final String SERVICE = "service"; public static final String SERVICE_TAG = SERVICE; @@ -205,6 +207,8 @@ public class Config { public static final int DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_TOP_ITEMS = 50; public static final int DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE = 10000; + public static final boolean DEFAULT_KAFKA_HEADERS_ENABLED = true; + private static final String SPLIT_BY_SPACE_OR_COMMA_REGEX = "[,\\s]+"; private static final boolean DEFAULT_TRACE_REPORT_HOSTNAME = false; @@ -330,6 +334,8 @@ private String profilingProxyPasswordMasker() { @Getter private final int profilingExceptionHistogramTopItems; @Getter private final int profilingExceptionHistogramMaxCollectionSize; + @Getter private final boolean kafkaHeadersEnabled; + // Values from an optionally provided properties file private static Properties propertiesFromConfigFile; @@ -545,6 +551,9 @@ private String profilingProxyPasswordMasker() { PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE, DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE); + kafkaHeadersEnabled = + getBooleanSettingFromEnvironment(KAFKA_HEADERS_ENABLED, DEFAULT_KAFKA_HEADERS_ENABLED); + // Setting this last because we have a few places where this can come from apiKey = tmpApiKey; @@ -734,6 +743,9 @@ private Config(final Properties properties, final Config parent) { PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE, parent.profilingExceptionHistogramMaxCollectionSize); + kafkaHeadersEnabled = + getPropertyBooleanValue(properties, KAFKA_HEADERS_ENABLED, parent.kafkaHeadersEnabled); + log.debug("New instance: {}", this); } From 9b82831c0ee6e4181c06b00c97c831738a32c715 Mon Sep 17 00:00:00 2001 From: Scott Shields Date: Tue, 12 May 2020 08:37:36 -0400 Subject: [PATCH 2/3] Fixing checkstyle error --- .../kafka_clients/KafkaProducerInstrumentation.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index fe5e349aa5c..7bcc8c8cdbf 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -78,9 +78,11 @@ public static AgentScope onEnter( // This is how similar check is being done in Kafka client itself: // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 // Also, do not inject headers if specified by JVM option or environment variable - // This can help in mixed client environments where clients < 0.11 that do not support headers attempt to read + // This can help in mixed client environments where clients < 0.11 that do not support headers + // attempt to read // messages that were produced by clients > 0.11 and the magic value of the broker(s) is >= 2 - if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 && Config.get().isKafkaHeadersEnabled()) { + if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 + && Config.get().isKafkaHeadersEnabled()) { try { propagate().inject(span, record.headers(), SETTER); } catch (final IllegalStateException e) { From 0c8a9a8df532e3c34891dc9ccefc6a9b8ac3fb05 Mon Sep 17 00:00:00 2001 From: Scott Shields Date: Tue, 12 May 2020 09:57:45 -0400 Subject: [PATCH 3/3] Updating naming convention --- .../KafkaProducerInstrumentation.java | 8 ++++---- .../src/test/groovy/KafkaClientTest.groovy | 13 +++++++------ .../src/main/java/datadog/trace/api/Config.java | 16 +++++++++------- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index 7bcc8c8cdbf..bad5a030bc6 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -78,11 +78,11 @@ public static AgentScope onEnter( // This is how similar check is being done in Kafka client itself: // https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412 // Also, do not inject headers if specified by JVM option or environment variable - // This can help in mixed client environments where clients < 0.11 that do not support headers - // attempt to read - // messages that were produced by clients > 0.11 and the magic value of the broker(s) is >= 2 + // This can help in mixed client environments where clients < 0.11 that do not support + // headers attempt to read messages that were produced by clients > 0.11 and the magic + // value of the broker(s) is >= 2 if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2 - && Config.get().isKafkaHeadersEnabled()) { + && Config.get().isKafkaClientPropagationEnabled()) { try { propagate().inject(span, record.headers(), SETTER); } catch (final IllegalStateException e) { diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy index 28e850007de..ef385f556e0 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy @@ -209,7 +209,7 @@ class KafkaClientTest extends AgentTestRunner { } @Unroll - def "test kafka headers manual config"() { + def "test kafka client header propagation manual config"() { setup: def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) def producerFactory = new DefaultKafkaProducerFactory(senderProps) @@ -253,7 +253,7 @@ class KafkaClientTest extends AgentTestRunner { when: String message = "Testing without headers" - withConfigOverride(Config.KAFKA_HEADERS_ENABLED, value) { + withConfigOverride(Config.KAFKA_CLIENT_PROPAGATION_ENABLED, value) { kafkaTemplate.send(SHARED_TOPIC, message) } @@ -268,11 +268,12 @@ class KafkaClientTest extends AgentTestRunner { container?.stop() where: - value | expected - "false" | false - "true" | true - String.valueOf(Config.DEFAULT_KAFKA_HEADERS_ENABLED) | true + value | expected + "false" | false + "true" | true + String.valueOf(Config.DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED) | true } } + diff --git a/dd-trace-api/src/main/java/datadog/trace/api/Config.java b/dd-trace-api/src/main/java/datadog/trace/api/Config.java index 9a3d16659e7..0d8c9bb799f 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/Config.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/Config.java @@ -148,7 +148,7 @@ public class Config { public static final String PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE = "profiling.exception.histogram.max-collection-size"; - public static final String KAFKA_HEADERS_ENABLED = "kafka.headers.enabled"; + public static final String KAFKA_CLIENT_PROPAGATION_ENABLED = "kafka.client.propagation.enabled"; public static final String RUNTIME_ID_TAG = "runtime-id"; public static final String SERVICE = "service"; @@ -207,7 +207,7 @@ public class Config { public static final int DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_TOP_ITEMS = 50; public static final int DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE = 10000; - public static final boolean DEFAULT_KAFKA_HEADERS_ENABLED = true; + public static final boolean DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED = true; private static final String SPLIT_BY_SPACE_OR_COMMA_REGEX = "[,\\s]+"; @@ -334,7 +334,7 @@ private String profilingProxyPasswordMasker() { @Getter private final int profilingExceptionHistogramTopItems; @Getter private final int profilingExceptionHistogramMaxCollectionSize; - @Getter private final boolean kafkaHeadersEnabled; + @Getter private final boolean kafkaClientPropagationEnabled; // Values from an optionally provided properties file private static Properties propertiesFromConfigFile; @@ -551,8 +551,9 @@ private String profilingProxyPasswordMasker() { PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE, DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE); - kafkaHeadersEnabled = - getBooleanSettingFromEnvironment(KAFKA_HEADERS_ENABLED, DEFAULT_KAFKA_HEADERS_ENABLED); + kafkaClientPropagationEnabled = + getBooleanSettingFromEnvironment( + KAFKA_CLIENT_PROPAGATION_ENABLED, DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED); // Setting this last because we have a few places where this can come from apiKey = tmpApiKey; @@ -743,8 +744,9 @@ private Config(final Properties properties, final Config parent) { PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE, parent.profilingExceptionHistogramMaxCollectionSize); - kafkaHeadersEnabled = - getPropertyBooleanValue(properties, KAFKA_HEADERS_ENABLED, parent.kafkaHeadersEnabled); + kafkaClientPropagationEnabled = + getPropertyBooleanValue( + properties, KAFKA_CLIENT_PROPAGATION_ENABLED, parent.kafkaClientPropagationEnabled); log.debug("New instance: {}", this); }