Skip to content

Commit

Permalink
Apply feedback, #4.
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj committed Sep 29, 2021
1 parent 4c16d1c commit 4b56eb2
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.KafkaUtils;
import io.opentelemetry.instrumentation.kafka.KafkaInstrumenterBuilder;
import io.opentelemetry.instrumentation.kafka.ReceivedRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -15,11 +15,11 @@ public final class KafkaSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";

private static final Instrumenter<ProducerRecord<?, ?>, Void> PRODUCER_INSTRUMENTER =
KafkaUtils.buildProducerInstrumenter(INSTRUMENTATION_NAME);
KafkaInstrumenterBuilder.buildProducerInstrumenter(INSTRUMENTATION_NAME);
private static final Instrumenter<ReceivedRecords, Void> CONSUMER_RECEIVE_INSTRUMENTER =
KafkaUtils.buildConsumerReceiveInstrumenter(INSTRUMENTATION_NAME);
KafkaInstrumenterBuilder.buildConsumerReceiveInstrumenter(INSTRUMENTATION_NAME);
private static final Instrumenter<ConsumerRecord<?, ?>, Void> CONSUMER_PROCESS_INSTRUMENTER =
KafkaUtils.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME);
KafkaInstrumenterBuilder.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME);

public static Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter() {
return PRODUCER_INSTRUMENTER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package io.opentelemetry.instrumentation.kafkaclients;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
Expand All @@ -14,7 +13,7 @@
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.KafkaHeadersGetter;
import io.opentelemetry.instrumentation.kafka.KafkaConsumerRecordGetter;
import io.opentelemetry.instrumentation.kafka.KafkaHeadersSetter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand All @@ -23,19 +22,22 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaTracing {
public final class KafkaTracing {
private static final Logger logger = LoggerFactory.getLogger(KafkaTracing.class);

private static final TextMapGetter<Headers> GETTER = new KafkaHeadersGetter();
private static final TextMapGetter<ConsumerRecord<?, ?>> GETTER = new KafkaConsumerRecordGetter();

private static final TextMapSetter<Headers> SETTER = new KafkaHeadersSetter();

private final OpenTelemetry openTelemetry;
private final Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter;
private final Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter;

KafkaTracing(
OpenTelemetry openTelemetry,
Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter,
Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter) {
this.openTelemetry = openTelemetry;
this.producerInstrumenter = producerInstrumenter;
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
}
Expand All @@ -50,21 +52,20 @@ public static KafkaTracingBuilder newBuilder(OpenTelemetry openTelemetry) {
return new KafkaTracingBuilder(openTelemetry);
}

private static TextMapPropagator propagator() {
return GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
private TextMapPropagator propagator() {
return openTelemetry.getPropagators().getTextMapPropagator();
}

/**
* Build and inject span into record. Return Runnable handle to end the current span.
*
* @param record the producer record to inject span info.
* @return runnable to close the current span
*/
<K, V> Runnable buildAndInjectSpan(ProducerRecord<K, V> record) {
<K, V> void buildAndInjectSpan(ProducerRecord<K, V> record) {
Context currentContext = Context.current();

if (!producerInstrumenter.shouldStart(currentContext, record)) {
return () -> {};
return;
}

Context current = producerInstrumenter.start(currentContext, record);
Expand All @@ -77,20 +78,20 @@ <K, V> Runnable buildAndInjectSpan(ProducerRecord<K, V> record) {
}
}

return () -> producerInstrumenter.end(current, record, null, null);
producerInstrumenter.end(current, record, null, null);
}

<K, V> void buildAndFinishSpan(ConsumerRecords<K, V> records) {
Context currentContext = Context.current();
for (ConsumerRecord<K, V> record : records) {
Context linkedContext = propagator().extract(currentContext, record.headers(), GETTER);
currentContext.with(Span.fromContext(linkedContext));
Context linkedContext = propagator().extract(currentContext, record, GETTER);
Context newContext = currentContext.with(Span.fromContext(linkedContext));

if (!consumerProcessInstrumenter.shouldStart(currentContext, record)) {
if (!consumerProcessInstrumenter.shouldStart(newContext, record)) {
continue;
}

Context current = consumerProcessInstrumenter.start(currentContext, record);
Context current = consumerProcessInstrumenter.start(newContext, record);
consumerProcessInstrumenter.end(current, record, null, null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.kafka.KafkaUtils;
import io.opentelemetry.instrumentation.kafka.KafkaInstrumenterBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -21,8 +21,8 @@ public class KafkaTracingBuilder {
private final OpenTelemetry openTelemetry;
private final List<AttributesExtractor<ProducerRecord<?, ?>, Void>> producerAttributesExtractors =
new ArrayList<>();
private final List<AttributesExtractor<ConsumerRecord<?, ?>, Void>>
consumerProcessAttributesExtractors = new ArrayList<>();
private final List<AttributesExtractor<ConsumerRecord<?, ?>, Void>> consumerAttributesExtractors =
new ArrayList<>();

KafkaTracingBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = Objects.requireNonNull(openTelemetry);
Expand All @@ -33,22 +33,20 @@ public void addProducerAttributesExtractors(
producerAttributesExtractors.add(extractor);
}

public void addConsumerAttributesProcessExtractors(
public void addConsumerAttributesExtractors(
AttributesExtractor<ConsumerRecord<?, ?>, Void> extractor) {
consumerProcessAttributesExtractors.add(extractor);
consumerAttributesExtractors.add(extractor);
}

@SuppressWarnings("unchecked")
public KafkaTracing build() {
return new KafkaTracing(
KafkaUtils.buildProducerInstrumenter(
INSTRUMENTATION_NAME,
openTelemetry,
producerAttributesExtractors.toArray(new AttributesExtractor[0])),
KafkaUtils.buildConsumerOperationInstrumenter(
openTelemetry,
KafkaInstrumenterBuilder.buildProducerInstrumenter(
INSTRUMENTATION_NAME, openTelemetry, producerAttributesExtractors),
KafkaInstrumenterBuilder.buildConsumerOperationInstrumenter(
INSTRUMENTATION_NAME,
openTelemetry,
MessageOperation.RECEIVE,
consumerProcessAttributesExtractors.toArray(new AttributesExtractor[0])));
consumerAttributesExtractors));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,5 @@

abstract class KafkaTracingHolder {

private KafkaTracing tracing;

public synchronized KafkaTracing getTracing() {
if (tracing == null) {
tracing = KafkaTracing.create(GlobalOpenTelemetry.get());
}
return tracing;
}
static final KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get());
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class TracingConsumerInterceptor<K, V> extends KafkaTracingHolder

@Override
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
getTracing().buildAndFinishSpan(records);
tracing.buildAndFinishSpan(records);
return records;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class TracingProducerInterceptor<K, V> extends KafkaTracingHolder
implements ProducerInterceptor<K, V> {
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
getTracing().buildAndInjectSpan(producerRecord).run();
tracing.buildAndInjectSpan(producerRecord);
return producerRecord;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.instrumentation.kafkaclients

import io.opentelemetry.instrumentation.test.LibraryTestTrait
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
Expand Down Expand Up @@ -57,9 +58,12 @@ class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait {
assert record.key() == null
}

assertTraces(3) {
def traces = waitForTraces(2)
println "traces = $traces"

assertTraces(2) {
traces.sort(orderByRootSpanKind(INTERNAL, PRODUCER, CONSUMER))
trace(0, 2) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
Expand All @@ -75,19 +79,10 @@ class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait {
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
}
}
}
trace(1, 1) {
span(0) {
name "producer callback"
kind INTERNAL
hasNoParent()
}
}
trace(2, 1) {
span(0) {
span(2) {
name SHARED_TOPIC + " receive"
kind CONSUMER
hasNoParent()
hasLink(span(1))
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
Expand All @@ -100,6 +95,13 @@ class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait {
}
}
}
trace(1, 1) {
span(0) {
name "producer callback"
kind INTERNAL
hasNoParent()
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ abstract class KafkaClientPropagationBaseTest extends KafkaClientBaseTest implem
awaitUntilConsumerIsReady()
// check that the message was received
def records = consumer.poll(Duration.ofSeconds(5).toMillis())
records.count() == 1
for (record in records) {
assert record.headers().iterator().hasNext() == propagationEnabled
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import java.util.Collections;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

@SuppressWarnings("unchecked")
public final class KafkaUtils {
public final class KafkaInstrumenterBuilder {

public static Instrumenter<ProducerRecord<?, ?>, Void> buildProducerInstrumenter(
String instrumentationName) {
return buildProducerInstrumenter(instrumentationName, GlobalOpenTelemetry.get());
return buildProducerInstrumenter(
instrumentationName, GlobalOpenTelemetry.get(), Collections.emptyList());
}

public static Instrumenter<ProducerRecord<?, ?>, Void> buildProducerInstrumenter(
String instrumentationName,
OpenTelemetry openTelemetry,
AttributesExtractor<ProducerRecord<?, ?>, Void>... extractors) {
Iterable<AttributesExtractor<ProducerRecord<?, ?>, Void>> extractors) {
KafkaProducerAttributesExtractor attributesExtractor = new KafkaProducerAttributesExtractor();
SpanNameExtractor<ProducerRecord<?, ?>> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);
Expand All @@ -45,13 +46,14 @@ public final class KafkaUtils {

public static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumenter(
String instrumentationName) {
return buildConsumerReceiveInstrumenter(instrumentationName, GlobalOpenTelemetry.get());
return buildConsumerReceiveInstrumenter(
instrumentationName, GlobalOpenTelemetry.get(), Collections.emptyList());
}

public static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrumenter(
String instrumentationName,
OpenTelemetry openTelemetry,
AttributesExtractor<ReceivedRecords, Void>... extractors) {
Iterable<AttributesExtractor<ReceivedRecords, Void>> extractors) {
KafkaReceiveAttributesExtractor attributesExtractor = new KafkaReceiveAttributesExtractor();
SpanNameExtractor<ReceivedRecords> spanNameExtractor =
MessagingSpanNameExtractor.create(attributesExtractor);
Expand All @@ -68,14 +70,17 @@ public static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrument
public static Instrumenter<ConsumerRecord<?, ?>, Void> buildConsumerProcessInstrumenter(
String instrumentationName) {
return buildConsumerOperationInstrumenter(
instrumentationName, GlobalOpenTelemetry.get(), MessageOperation.PROCESS);
instrumentationName,
GlobalOpenTelemetry.get(),
MessageOperation.PROCESS,
Collections.emptyList());
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> buildConsumerOperationInstrumenter(
String instrumentationName,
OpenTelemetry openTelemetry,
MessageOperation operation,
AttributesExtractor<ConsumerRecord<?, ?>, Void>... extractors) {
Iterable<AttributesExtractor<ConsumerRecord<?, ?>, Void>> extractors) {
KafkaConsumerAttributesExtractor attributesExtractor =
new KafkaConsumerAttributesExtractor(operation);
SpanNameExtractor<ConsumerRecord<?, ?>> spanNameExtractor =
Expand Down Expand Up @@ -103,5 +108,5 @@ public static Instrumenter<ReceivedRecords, Void> buildConsumerReceiveInstrument
}
}

private KafkaUtils() {}
private KafkaInstrumenterBuilder() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkastreams;

import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.KafkaUtils;
import io.opentelemetry.instrumentation.kafka.KafkaInstrumenterBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public final class KafkaStreamsSingletons {
Expand All @@ -16,7 +16,7 @@ public final class KafkaStreamsSingletons {
private static final Instrumenter<ConsumerRecord<?, ?>, Void> INSTRUMENTER = buildInstrumenter();

private static Instrumenter<ConsumerRecord<?, ?>, Void> buildInstrumenter() {
return KafkaUtils.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME);
return KafkaInstrumenterBuilder.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME);
}

public static Instrumenter<ConsumerRecord<?, ?>, Void> instrumenter() {
Expand Down
1 change: 0 additions & 1 deletion settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ include(":instrumentation:jsp-2.3:javaagent")
include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent")
include(":instrumentation:kafka-clients:kafka-clients-0.11:library")
include(":instrumentation:kafka-clients:kafka-clients-0.11:testing")
include(":instrumentation:kafka-clients:kafka-clients-2.4.0-testing")
include(":instrumentation:kafka-clients:kafka-clients-common:library")
include(":instrumentation:kafka-streams-0.11:javaagent")
include(":instrumentation:kotlinx-coroutines:javaagent")
Expand Down

0 comments on commit 4b56eb2

Please sign in to comment.