Skip to content

Commit

Permalink
Apply feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj committed Sep 28, 2021
1 parent a16d121 commit ae7aad5
Show file tree
Hide file tree
Showing 15 changed files with 85 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import static io.opentelemetry.instrumentation.kafka.KafkaSingletons.consumerReceiveInstrumenter;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.spanFromContext;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerReceiveInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import static io.opentelemetry.instrumentation.kafka.KafkaSingletons.producerInstrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.producerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,23 @@
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafka;

import static io.opentelemetry.instrumentation.kafka.KafkaUtils.buildConsumerProcessInstrumenter;
import static io.opentelemetry.instrumentation.kafka.KafkaUtils.buildConsumerReceiveInstrumenter;
import static io.opentelemetry.instrumentation.kafka.KafkaUtils.buildProducerInstrumenter;
package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.KafkaUtils;
import io.opentelemetry.instrumentation.kafka.ReceivedRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

public final class KafkaSingletons {
public static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";
public static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11.javaagent";

private static final Instrumenter<ProducerRecord<?, ?>, Void> PRODUCER_INSTRUMENTER =
buildProducerInstrumenter(INSTRUMENTATION_NAME);
KafkaUtils.buildProducerInstrumenter(INSTRUMENTATION_NAME);
private static final Instrumenter<ReceivedRecords, Void> CONSUMER_RECEIVE_INSTRUMENTER =
buildConsumerReceiveInstrumenter(INSTRUMENTATION_NAME);
KafkaUtils.buildConsumerReceiveInstrumenter(INSTRUMENTATION_NAME);
private static final Instrumenter<ConsumerRecord<?, ?>, Void> CONSUMER_PROCESS_INSTRUMENTER =
buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME);
KafkaUtils.buildConsumerProcessInstrumenter(INSTRUMENTATION_NAME);

public static Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter() {
return PRODUCER_INSTRUMENTER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import static io.opentelemetry.instrumentation.kafka.KafkaSingletons.producerInstrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.producerInstrumenter;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import static io.opentelemetry.instrumentation.kafka.KafkaSingletons.consumerProcessInstrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerProcessInstrumenter;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package io.opentelemetry.instrumentation.kafkaclients
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition

import java.time.Duration
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -192,7 +191,7 @@ class KafkaClientDefaultTest extends KafkaClientPropagationBaseTest {
when: "receive messages"
awaitUntilConsumerIsReady()
def consumerRecords = consumer.poll(Duration.ofSeconds(5).toMillis())
def recordsInPartition = consumerRecords.records(new TopicPartition(SHARED_TOPIC, partition))
def recordsInPartition = consumerRecords.records(topicPartition)
recordsInPartition.size() == 1
// iterate over records to generate spans
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.kafka.KafkaHeadersGetter;
import io.opentelemetry.instrumentation.kafka.KafkaHeadersSetter;
import io.opentelemetry.instrumentation.kafka.KafkaSingletons;
import io.opentelemetry.instrumentation.kafka.ReceivedRecords;
import io.opentelemetry.instrumentation.kafka.Timer;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -28,58 +24,49 @@
import org.slf4j.LoggerFactory;

public class KafkaTracing {
public static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11.library";

private static final Logger logger = LoggerFactory.getLogger(KafkaTracing.class);

private static final TextMapGetter<Headers> GETTER = new KafkaHeadersGetter();

private static final TextMapSetter<Headers> SETTER = new KafkaHeadersSetter();

private Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter =
KafkaSingletons.producerInstrumenter();
private Instrumenter<ReceivedRecords, Void> consumerReceiveInstrumenter =
KafkaSingletons.consumerReceiveInstrumenter();
private Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter =
KafkaSingletons.consumerProcessInstrumenter();
private final Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter;
private final Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter;

KafkaTracing() {}
KafkaTracing(
Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter,
Instrumenter<ConsumerRecord<?, ?>, Void> consumerProcessInstrumenter) {
this.producerInstrumenter = producerInstrumenter;
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
}

public static KafkaTracingBuilder newBuilder(OpenTelemetry openTelemetry) {
public static KafkaTracingBuilder create(OpenTelemetry openTelemetry) {
return new KafkaTracingBuilder(openTelemetry);
}

private static TextMapPropagator propagator() {
return GlobalOpenTelemetry.getPropagators().getTextMapPropagator();
}

void setProducerInstrumenter(Instrumenter<ProducerRecord<?, ?>, Void> producerInstrumenter) {
this.producerInstrumenter = Objects.requireNonNull(producerInstrumenter);
}

void setConsumerReceiveInstrumenter(Instrumenter<ReceivedRecords, Void> consumerInstrumenter) {
this.consumerReceiveInstrumenter = Objects.requireNonNull(consumerInstrumenter);
}

void setConsumerProcessInstrumenter(
Instrumenter<ConsumerRecord<?, ?>, Void> consumerInstrumenter) {
this.consumerProcessInstrumenter = Objects.requireNonNull(consumerInstrumenter);
}

/**
* Build and inject span into record. Return Runnable handle to end the current span.
*
* @param record the producer record to inject span info.
* @return runnable to close the current span
*/
public <K, V> Runnable buildAndInjectSpan(ProducerRecord<K, V> record) {
if (!producerInstrumenter.shouldStart(Context.current(), record)) {
<K, V> Runnable buildAndInjectSpan(ProducerRecord<K, V> record) {
Context currentContext = Context.current();

if (!producerInstrumenter.shouldStart(currentContext, record)) {
return () -> {};
}

Context current = producerInstrumenter.start(Context.current(), record);
Context current = producerInstrumenter.start(currentContext, record);
try (Scope ignored = current.makeCurrent()) {
try {
propagator().inject(Context.current(), record.headers(), SETTER);
propagator().inject(current, record.headers(), SETTER);
} catch (Throwable t) {
// it can happen if headers are read only (when record is sent second time)
logger.error("failed to inject span context. sending record second time?", t);
Expand All @@ -89,37 +76,18 @@ public <K, V> Runnable buildAndInjectSpan(ProducerRecord<K, V> record) {
return () -> producerInstrumenter.end(current, record, null, null);
}

public <K, V> void buildAndFinishSpan(ConsumerRecords<K, V> records) {
ReceivedRecords receivedRecords = ReceivedRecords.create(records, Timer.start());
<K, V> void buildAndFinishSpan(ConsumerRecords<K, V> records) {
Context currentContext = Context.current();
for (ConsumerRecord<K, V> record : records) {
Context linkedContext = propagator().extract(currentContext, record.headers(), GETTER);
currentContext.with(Span.fromContext(linkedContext));

if (!consumerReceiveInstrumenter.shouldStart(Context.current(), receivedRecords)) {
return;
}

Context context = consumerReceiveInstrumenter.start(Context.current(), receivedRecords);
try (Scope ignored = context.makeCurrent()) {
for (ConsumerRecord<K, V> record : records) {
buildAndFinishChildSpan(record);
if (!consumerProcessInstrumenter.shouldStart(currentContext, record)) {
continue;
}
consumerReceiveInstrumenter.end(context, receivedRecords, null, null);
} catch (RuntimeException e) {
consumerReceiveInstrumenter.end(context, receivedRecords, null, e);
}
}

private <K, V> void buildAndFinishChildSpan(ConsumerRecord<K, V> record) {
Context linkedContext = propagator().extract(Context.current(), record.headers(), GETTER);
Context.current().with(Span.fromContext(linkedContext));

if (!consumerProcessInstrumenter.shouldStart(Context.current(), record)) {
return;
Context current = consumerProcessInstrumenter.start(currentContext, record);
consumerProcessInstrumenter.end(current, record, null, null);
}

Context current = consumerProcessInstrumenter.start(Context.current(), record);
consumerProcessInstrumenter.end(current, record, null, null);

// Inject created span context into record headers for extraction by client to continue span
// chain
propagator().inject(current, record.headers(), SETTER); // TODO -- OK?
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.kafka.KafkaSingletons;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.kafka.KafkaUtils;
import io.opentelemetry.instrumentation.kafka.ReceivedRecords;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
Expand All @@ -20,8 +19,6 @@ public class KafkaTracingBuilder {
private final OpenTelemetry openTelemetry;
private final List<AttributesExtractor<ProducerRecord<?, ?>, Void>> producerExtractors =
new ArrayList<>();
private final List<AttributesExtractor<ReceivedRecords, Void>> consumerReceiveExtractors =
new ArrayList<>();
private final List<AttributesExtractor<ConsumerRecord<?, ?>, Void>> consumerProcessExtractors =
new ArrayList<>();

Expand All @@ -33,33 +30,22 @@ public void addProducerExtractors(AttributesExtractor<ProducerRecord<?, ?>, Void
producerExtractors.add(extractor);
}

public void addConsumerReceiveExtractors(AttributesExtractor<ReceivedRecords, Void> extractor) {
consumerReceiveExtractors.add(extractor);
}

public void addConsumerProcessExtractors(
AttributesExtractor<ConsumerRecord<?, ?>, Void> extractor) {
consumerProcessExtractors.add(extractor);
}

@SuppressWarnings("unchecked")
public KafkaTracing build() {
KafkaTracing tracing = new KafkaTracing();
tracing.setProducerInstrumenter(
return new KafkaTracing(
KafkaUtils.buildProducerInstrumenter(
KafkaSingletons.INSTRUMENTATION_NAME,
openTelemetry,
producerExtractors.toArray(new AttributesExtractor[0])));
tracing.setConsumerReceiveInstrumenter(
KafkaUtils.buildConsumerReceiveInstrumenter(
KafkaSingletons.INSTRUMENTATION_NAME,
KafkaTracing.INSTRUMENTATION_NAME,
openTelemetry,
consumerReceiveExtractors.toArray(new AttributesExtractor[0])));
tracing.setConsumerProcessInstrumenter(
KafkaUtils.buildConsumerProcessInstrumenter(
KafkaSingletons.INSTRUMENTATION_NAME,
producerExtractors.toArray(new AttributesExtractor[0])),
KafkaUtils.buildConsumerOperationInstrumenter(
KafkaTracing.INSTRUMENTATION_NAME,
openTelemetry,
MessageOperation.RECEIVE,
consumerProcessExtractors.toArray(new AttributesExtractor[0])));
return tracing;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.kafkaclients;

import io.opentelemetry.api.GlobalOpenTelemetry;

abstract class KafkaTracingHolder {

private KafkaTracing tracing;

public synchronized KafkaTracing getTracing() {
if (tracing == null) {
tracing = KafkaTracing.create(GlobalOpenTelemetry.get()).build();
}
return tracing;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@

package io.opentelemetry.instrumentation.kafkaclients;

import io.opentelemetry.api.GlobalOpenTelemetry;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K, V> {
public class TracingConsumerInterceptor<K, V> extends KafkaTracingHolder
implements ConsumerInterceptor<K, V> {

@Override
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
KafkaTracing tracing = KafkaTracing.newBuilder(GlobalOpenTelemetry.get()).build();
tracing.buildAndFinishSpan(records);
getTracing().buildAndFinishSpan(records);
return records;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,16 @@

package io.opentelemetry.instrumentation.kafkaclients;

import io.opentelemetry.api.GlobalOpenTelemetry;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K, V> {
public class TracingProducerInterceptor<K, V> extends KafkaTracingHolder
implements ProducerInterceptor<K, V> {
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
KafkaTracing tracing = KafkaTracing.newBuilder(GlobalOpenTelemetry.get()).build();
tracing.buildAndInjectSpan(producerRecord).run();
getTracing().buildAndInjectSpan(producerRecord).run();
return producerRecord;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait {
hasNoParent()
}
}
trace(2, 2) {
trace(2, 1) {
span(0) {
name SHARED_TOPIC + " receive"
kind CONSUMER
Expand All @@ -93,17 +93,6 @@ class InterceptorsTest extends KafkaClientBaseTest implements LibraryTestTrait {
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "receive"
}
}
span(1) {
name SHARED_TOPIC + " process"
kind CONSUMER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "kafka"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" SHARED_TOPIC
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_KAFKA_PARTITION.key}" { it >= 0 }
"kafka.offset" Long
Expand Down
Loading

0 comments on commit ae7aad5

Please sign in to comment.