Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding an option to manually disable Kafka headers #1448

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.Map;
Expand Down Expand Up @@ -76,7 +77,12 @@ public static AgentScope onEnter(
// Do not inject headers for batch versions below 2
// This is how similar check is being done in Kafka client itself:
// https://github.com/apache/kafka/blob/05fcfde8f69b0349216553f711fdfc3f0259c601/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L411-L412
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2) {
// Also, do not inject headers if specified by JVM option or environment variable
// This can help in mixed client environments where clients < 0.11 that do not support
// headers attempt to read messages that were produced by clients > 0.11 and the magic
// value of the broker(s) is >= 2
if (apiVersions.maxUsableProduceMagic() >= RecordBatch.MAGIC_VALUE_V2
&& Config.get().isKafkaClientPropagationEnabled()) {
try {
propagate().inject(span, record.headers(), SETTER);
} catch (final IllegalStateException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.Config
import datadog.trace.bootstrap.instrumentation.api.Tags
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
Expand All @@ -15,10 +16,13 @@ import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.rule.KafkaEmbedded
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Unroll

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

import static datadog.trace.agent.test.utils.ConfigUtils.withConfigOverride

class KafkaClientTest extends AgentTestRunner {
static final SHARED_TOPIC = "shared.topic"

Expand Down Expand Up @@ -204,4 +208,72 @@ class KafkaClientTest extends AgentTestRunner {

}

@Unroll
def "test kafka client header propagation manual config"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producerFactory = new DefaultKafkaProducerFactory<String, String>(senderProps)
def kafkaTemplate = new KafkaTemplate<String, String>(producerFactory)

// set up the Kafka consumer properties
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)

// create a Kafka consumer factory
def consumerFactory = new DefaultKafkaConsumerFactory<String, String>(consumerProperties)

// set the topic that needs to be consumed
def containerProperties
try {
// Different class names for test and latestDepTest.
containerProperties = Class.forName("org.springframework.kafka.listener.config.ContainerProperties").newInstance(SHARED_TOPIC)
} catch (ClassNotFoundException | NoClassDefFoundError e) {
containerProperties = Class.forName("org.springframework.kafka.listener.ContainerProperties").newInstance(SHARED_TOPIC)
}

// create a Kafka MessageListenerContainer
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)

// create a thread safe queue to store the received message
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()

// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
records.add(record)
}
})

// start the container and underlying message listener
container.start()

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())

when:
String message = "Testing without headers"
withConfigOverride(Config.KAFKA_CLIENT_PROPAGATION_ENABLED, value) {
kafkaTemplate.send(SHARED_TOPIC, message)
}

then:
// check that the message was received
def received = records.poll(5, TimeUnit.SECONDS)

received.headers().iterator().hasNext() == expected

cleanup:
producerFactory.stop()
container?.stop()

where:
value | expected
"false" | false
"true" | true
String.valueOf(Config.DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED) | true

}

}

14 changes: 14 additions & 0 deletions dd-trace-api/src/main/java/datadog/trace/api/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ public class Config {
public static final String PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE =
"profiling.exception.histogram.max-collection-size";

public static final String KAFKA_CLIENT_PROPAGATION_ENABLED = "kafka.client.propagation.enabled";

public static final String RUNTIME_ID_TAG = "runtime-id";
public static final String SERVICE = "service";
public static final String SERVICE_TAG = SERVICE;
Expand Down Expand Up @@ -205,6 +207,8 @@ public class Config {
public static final int DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_TOP_ITEMS = 50;
public static final int DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE = 10000;

public static final boolean DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED = true;

private static final String SPLIT_BY_SPACE_OR_COMMA_REGEX = "[,\\s]+";

private static final boolean DEFAULT_TRACE_REPORT_HOSTNAME = false;
Expand Down Expand Up @@ -330,6 +334,8 @@ private String profilingProxyPasswordMasker() {
@Getter private final int profilingExceptionHistogramTopItems;
@Getter private final int profilingExceptionHistogramMaxCollectionSize;

@Getter private final boolean kafkaClientPropagationEnabled;

// Values from an optionally provided properties file
private static Properties propertiesFromConfigFile;

Expand Down Expand Up @@ -545,6 +551,10 @@ private String profilingProxyPasswordMasker() {
PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE,
DEFAULT_PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE);

kafkaClientPropagationEnabled =
getBooleanSettingFromEnvironment(
KAFKA_CLIENT_PROPAGATION_ENABLED, DEFAULT_KAFKA_CLIENT_PROPAGATION_ENABLED);

// Setting this last because we have a few places where this can come from
apiKey = tmpApiKey;

Expand Down Expand Up @@ -734,6 +744,10 @@ private Config(final Properties properties, final Config parent) {
PROFILING_EXCEPTION_HISTOGRAM_MAX_COLLECTION_SIZE,
parent.profilingExceptionHistogramMaxCollectionSize);

kafkaClientPropagationEnabled =
getPropertyBooleanValue(
properties, KAFKA_CLIENT_PROPAGATION_ENABLED, parent.kafkaClientPropagationEnabled);

log.debug("New instance: {}", this);
}

Expand Down