diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java index 4756a7e3727a..95d6413dc4f5 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/KafkaSingletons.java @@ -6,7 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkaclients; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.kafka.KafkaUtils; +import io.opentelemetry.instrumentation.kafka.KafkaInstrumenterBuilder; import io.opentelemetry.instrumentation.kafka.ReceivedRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; @@ -15,11 +15,11 @@ public final class KafkaSingletons { private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11"; private static final Instrumenter, Void> PRODUCER_INSTRUMENTER = - KafkaUtils.buildProducerInstrumenter(INSTRUMENTATION_NAME); + KafkaInstrumenterBuilder.buildProducerInstrumenter(INSTRUMENTATION_NAME); private static final Instrumenter CONSUMER_RECEIVE_INSTRUMENTER = - KafkaUtils.buildConsumerReceiveInstrumenter(INSTRUMENTATION_NAME); + KafkaInstrumenterBuilder.buildConsumerReceiveInstrumenter(INSTRUMENTATION_NAME); private static final Instrumenter, Void> CONSUMER_PROCESS_INSTRUMENTER = - KafkaUtils.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME); + KafkaInstrumenterBuilder.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME); public static Instrumenter, Void> producerInstrumenter() { return PRODUCER_INSTRUMENTER; diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracing.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracing.java index 1f7ff44b36fd..7514aeafecd9 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracing.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracing.java @@ -5,7 +5,6 @@ package io.opentelemetry.instrumentation.kafkaclients; -import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; @@ -14,7 +13,7 @@ import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.kafka.KafkaHeadersGetter; +import io.opentelemetry.instrumentation.kafka.KafkaConsumerRecordGetter; import io.opentelemetry.instrumentation.kafka.KafkaHeadersSetter; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -23,19 +22,22 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaTracing { +public final class KafkaTracing { private static final Logger logger = LoggerFactory.getLogger(KafkaTracing.class); - private static final TextMapGetter GETTER = new KafkaHeadersGetter(); + private static final TextMapGetter> GETTER = new KafkaConsumerRecordGetter(); private static final TextMapSetter SETTER = new KafkaHeadersSetter(); + private final OpenTelemetry openTelemetry; private final Instrumenter, Void> producerInstrumenter; private final Instrumenter, Void> consumerProcessInstrumenter; KafkaTracing( + OpenTelemetry openTelemetry, Instrumenter, Void> producerInstrumenter, Instrumenter, Void> consumerProcessInstrumenter) { + this.openTelemetry = openTelemetry; this.producerInstrumenter = producerInstrumenter; this.consumerProcessInstrumenter = consumerProcessInstrumenter; } @@ -50,21 +52,20 @@ public static KafkaTracingBuilder newBuilder(OpenTelemetry openTelemetry) { return new KafkaTracingBuilder(openTelemetry); } - private static TextMapPropagator propagator() { - return GlobalOpenTelemetry.getPropagators().getTextMapPropagator(); + private TextMapPropagator propagator() { + return openTelemetry.getPropagators().getTextMapPropagator(); } /** * Build and inject span into record. Return Runnable handle to end the current span. * * @param record the producer record to inject span info. - * @return runnable to close the current span */ - Runnable buildAndInjectSpan(ProducerRecord record) { + void buildAndInjectSpan(ProducerRecord record) { Context currentContext = Context.current(); if (!producerInstrumenter.shouldStart(currentContext, record)) { - return () -> {}; + return; } Context current = producerInstrumenter.start(currentContext, record); @@ -77,20 +78,20 @@ Runnable buildAndInjectSpan(ProducerRecord record) { } } - return () -> producerInstrumenter.end(current, record, null, null); + producerInstrumenter.end(current, record, null, null); } void buildAndFinishSpan(ConsumerRecords records) { Context currentContext = Context.current(); for (ConsumerRecord record : records) { - Context linkedContext = propagator().extract(currentContext, record.headers(), GETTER); - currentContext.with(Span.fromContext(linkedContext)); + Context linkedContext = propagator().extract(currentContext, record, GETTER); + Context newContext = currentContext.with(Span.fromContext(linkedContext)); - if (!consumerProcessInstrumenter.shouldStart(currentContext, record)) { + if (!consumerProcessInstrumenter.shouldStart(newContext, record)) { continue; } - Context current = consumerProcessInstrumenter.start(currentContext, record); + Context current = consumerProcessInstrumenter.start(newContext, record); consumerProcessInstrumenter.end(current, record, null, null); } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracingBuilder.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracingBuilder.java index cc0d1882efbd..7beaedbc78ac 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracingBuilder.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracingBuilder.java @@ -8,7 +8,7 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.kafka.KafkaUtils; +import io.opentelemetry.instrumentation.kafka.KafkaInstrumenterBuilder; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -21,8 +21,8 @@ public class KafkaTracingBuilder { private final OpenTelemetry openTelemetry; private final List, Void>> producerAttributesExtractors = new ArrayList<>(); - private final List, Void>> - consumerProcessAttributesExtractors = new ArrayList<>(); + private final List, Void>> consumerAttributesExtractors = + new ArrayList<>(); KafkaTracingBuilder(OpenTelemetry openTelemetry) { this.openTelemetry = Objects.requireNonNull(openTelemetry); @@ -33,22 +33,20 @@ public void addProducerAttributesExtractors( producerAttributesExtractors.add(extractor); } - public void addConsumerAttributesProcessExtractors( + public void addConsumerAttributesExtractors( AttributesExtractor, Void> extractor) { - consumerProcessAttributesExtractors.add(extractor); + consumerAttributesExtractors.add(extractor); } - @SuppressWarnings("unchecked") public KafkaTracing build() { return new KafkaTracing( - KafkaUtils.buildProducerInstrumenter( - INSTRUMENTATION_NAME, - openTelemetry, - producerAttributesExtractors.toArray(new AttributesExtractor[0])), - KafkaUtils.buildConsumerOperationInstrumenter( + openTelemetry, + KafkaInstrumenterBuilder.buildProducerInstrumenter( + INSTRUMENTATION_NAME, openTelemetry, producerAttributesExtractors), + KafkaInstrumenterBuilder.buildConsumerOperationInstrumenter( INSTRUMENTATION_NAME, openTelemetry, MessageOperation.RECEIVE, - consumerProcessAttributesExtractors.toArray(new AttributesExtractor[0]))); + consumerAttributesExtractors)); } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracingHolder.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracingHolder.java index ea45acd51af7..30b6bef9fac5 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracingHolder.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/KafkaTracingHolder.java @@ -9,12 +9,5 @@ abstract class KafkaTracingHolder { - private KafkaTracing tracing; - - public synchronized KafkaTracing getTracing() { - if (tracing == null) { - tracing = KafkaTracing.create(GlobalOpenTelemetry.get()); - } - return tracing; - } + static final KafkaTracing tracing = KafkaTracing.create(GlobalOpenTelemetry.get()); } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingConsumerInterceptor.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingConsumerInterceptor.java index 05d3492a4298..1e9285ecfebf 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingConsumerInterceptor.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingConsumerInterceptor.java @@ -16,7 +16,7 @@ public class TracingConsumerInterceptor extends KafkaTracingHolder @Override public ConsumerRecords onConsume(ConsumerRecords records) { - getTracing().buildAndFinishSpan(records); + tracing.buildAndFinishSpan(records); return records; } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingProducerInterceptor.java b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingProducerInterceptor.java index ea9b906d2d1a..af9029cd3083 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingProducerInterceptor.java +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/TracingProducerInterceptor.java @@ -14,7 +14,7 @@ public class TracingProducerInterceptor extends KafkaTracingHolder implements ProducerInterceptor { @Override public ProducerRecord onSend(ProducerRecord producerRecord) { - getTracing().buildAndInjectSpan(producerRecord).run(); + tracing.buildAndInjectSpan(producerRecord); return producerRecord; } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/InterceptorsTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/InterceptorsTest.groovy index 54ddd8d71b17..d40571a6ecf6 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/InterceptorsTest.groovy +++ b/instrumentation/kafka-clients/kafka-clients-0.11/library/src/test/groovy/io/opentelemetry/instrumentation/kafkaclients/InterceptorsTest.groovy @@ -6,6 +6,7 @@ package io.opentelemetry.instrumentation.kafkaclients import io.opentelemetry.instrumentation.test.LibraryTestTrait +import io.opentelemetry.sdk.trace.data.SpanData import io.opentelemetry.semconv.trace.attributes.SemanticAttributes import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig @@ -57,9 +58,12 @@ class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait { assert record.key() == null } - assertTraces(3) { + def traces = waitForTraces(2) + println "traces = $traces" + + assertTraces(2) { traces.sort(orderByRootSpanKind(INTERNAL, PRODUCER, CONSUMER)) - trace(0, 2) { + trace(0, 3) { span(0) { name "parent" kind INTERNAL @@ -75,19 +79,10 @@ class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait { "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" } } - } - trace(1, 1) { - span(0) { - name "producer callback" - kind INTERNAL - hasNoParent() - } - } - trace(2, 1) { - span(0) { + span(2) { name SHARED_TOPIC + " receive" kind CONSUMER - hasNoParent() + hasLink(span(1)) attributes { "${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka" "${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC @@ -100,6 +95,13 @@ class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait { } } } + trace(1, 1) { + span(0) { + name "producer callback" + kind INTERNAL + hasNoParent() + } + } } } } diff --git a/instrumentation/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationBaseTest.groovy b/instrumentation/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationBaseTest.groovy index 78e175e69f7c..b927f375affd 100644 --- a/instrumentation/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationBaseTest.groovy +++ b/instrumentation/kafka-clients/kafka-clients-0.11/testing/src/main/groovy/io/opentelemetry/instrumentation/kafkaclients/KafkaClientPropagationBaseTest.groovy @@ -26,6 +26,7 @@ abstract class KafkaClientPropagationBaseTest extends KafkaClientBaseTest implem awaitUntilConsumerIsReady() // check that the message was received def records = consumer.poll(Duration.ofSeconds(5).toMillis()) + records.count() == 1 for (record in records) { assert record.headers().iterator().hasNext() == propagationEnabled } diff --git a/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaUtils.java b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaInstrumenterBuilder.java similarity index 86% rename from instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaUtils.java rename to instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaInstrumenterBuilder.java index 2555e7e791e1..5c88d3e52345 100644 --- a/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaUtils.java +++ b/instrumentation/kafka-clients/kafka-clients-common/library/src/main/java/io/opentelemetry/instrumentation/kafka/KafkaInstrumenterBuilder.java @@ -16,21 +16,22 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; +import java.util.Collections; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; -@SuppressWarnings("unchecked") -public final class KafkaUtils { +public final class KafkaInstrumenterBuilder { public static Instrumenter, Void> buildProducerInstrumenter( String instrumentationName) { - return buildProducerInstrumenter(instrumentationName, GlobalOpenTelemetry.get()); + return buildProducerInstrumenter( + instrumentationName, GlobalOpenTelemetry.get(), Collections.emptyList()); } public static Instrumenter, Void> buildProducerInstrumenter( String instrumentationName, OpenTelemetry openTelemetry, - AttributesExtractor, Void>... extractors) { + Iterable, Void>> extractors) { KafkaProducerAttributesExtractor attributesExtractor = new KafkaProducerAttributesExtractor(); SpanNameExtractor> spanNameExtractor = MessagingSpanNameExtractor.create(attributesExtractor); @@ -45,13 +46,14 @@ public final class KafkaUtils { public static Instrumenter buildConsumerReceiveInstrumenter( String instrumentationName) { - return buildConsumerReceiveInstrumenter(instrumentationName, GlobalOpenTelemetry.get()); + return buildConsumerReceiveInstrumenter( + instrumentationName, GlobalOpenTelemetry.get(), Collections.emptyList()); } public static Instrumenter buildConsumerReceiveInstrumenter( String instrumentationName, OpenTelemetry openTelemetry, - AttributesExtractor... extractors) { + Iterable> extractors) { KafkaReceiveAttributesExtractor attributesExtractor = new KafkaReceiveAttributesExtractor(); SpanNameExtractor spanNameExtractor = MessagingSpanNameExtractor.create(attributesExtractor); @@ -68,14 +70,17 @@ public static Instrumenter buildConsumerReceiveInstrument public static Instrumenter, Void> buildConsumerProcessInstrumenter( String instrumentationName) { return buildConsumerOperationInstrumenter( - instrumentationName, GlobalOpenTelemetry.get(), MessageOperation.PROCESS); + instrumentationName, + GlobalOpenTelemetry.get(), + MessageOperation.PROCESS, + Collections.emptyList()); } public static Instrumenter, Void> buildConsumerOperationInstrumenter( String instrumentationName, OpenTelemetry openTelemetry, MessageOperation operation, - AttributesExtractor, Void>... extractors) { + Iterable, Void>> extractors) { KafkaConsumerAttributesExtractor attributesExtractor = new KafkaConsumerAttributesExtractor(operation); SpanNameExtractor> spanNameExtractor = @@ -103,5 +108,5 @@ public static Instrumenter buildConsumerReceiveInstrument } } - private KafkaUtils() {} + private KafkaInstrumenterBuilder() {} } diff --git a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java index d3563308e92d..a0761dfe677d 100644 --- a/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java +++ b/instrumentation/kafka-streams-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkastreams/KafkaStreamsSingletons.java @@ -6,7 +6,7 @@ package io.opentelemetry.javaagent.instrumentation.kafkastreams; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.kafka.KafkaUtils; +import io.opentelemetry.instrumentation.kafka.KafkaInstrumenterBuilder; import org.apache.kafka.clients.consumer.ConsumerRecord; public final class KafkaStreamsSingletons { @@ -16,7 +16,7 @@ public final class KafkaStreamsSingletons { private static final Instrumenter, Void> INSTRUMENTER = buildInstrumenter(); private static Instrumenter, Void> buildInstrumenter() { - return KafkaUtils.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME); + return KafkaInstrumenterBuilder.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME); } public static Instrumenter, Void> instrumenter() { diff --git a/settings.gradle.kts b/settings.gradle.kts index 035df2b71864..7d4861081dd4 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -220,7 +220,6 @@ include(":instrumentation:jsp-2.3:javaagent") include(":instrumentation:kafka-clients:kafka-clients-0.11:javaagent") include(":instrumentation:kafka-clients:kafka-clients-0.11:library") include(":instrumentation:kafka-clients:kafka-clients-0.11:testing") -include(":instrumentation:kafka-clients:kafka-clients-2.4.0-testing") include(":instrumentation:kafka-clients:kafka-clients-common:library") include(":instrumentation:kafka-streams-0.11:javaagent") include(":instrumentation:kotlinx-coroutines:javaagent")