Skip to content

Commit

Permalink
Apply feedback open-telemetry#5.
Browse files Browse the repository at this point in the history
  • Loading branch information
alesj committed Sep 30, 2021
1 parent 0912ff3 commit bd8a5d5
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {
library("org.apache.kafka:kafka-clients:2.6.0")

testImplementation(project(":instrumentation:kafka-clients:kafka-clients-0.11:testing"))
testImplementation("com.fasterxml.jackson.core:jackson-databind:2.10.2")

testImplementation("org.testcontainers:kafka")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@ private TextMapPropagator propagator() {
return openTelemetry.getPropagators().getTextMapPropagator();
}

/** Returns a decorated {@link Producer} that emits spans for each sent message. */
public <K, V> Producer<K, V> wrap(Producer<K, V> producer) {
return new TracingProducer<>(producer, this);
}

/** Returns a decorated {@link Consumer} that consumes spans for each received message. */
public <K, V> Consumer<K, V> wrap(Consumer<K, V> consumer) {
return new TracingConsumer<>(consumer, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaTracingBuilder {
public final class KafkaTracingBuilder {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11";

private final OpenTelemetry openTelemetry;
Expand All @@ -28,14 +28,16 @@ public class KafkaTracingBuilder {
this.openTelemetry = Objects.requireNonNull(openTelemetry);
}

public void addProducerAttributesExtractors(
public KafkaTracingBuilder addProducerAttributesExtractors(
AttributesExtractor<ProducerRecord<?, ?>, Void> extractor) {
producerAttributesExtractors.add(extractor);
return this;
}

public void addConsumerAttributesExtractors(
public KafkaTracingBuilder addConsumerAttributesExtractors(
AttributesExtractor<ConsumerRecord<?, ?>, Void> extractor) {
consumerAttributesExtractors.add(extractor);
return this;
}

public KafkaTracing build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

public class TracingConsumer<K, V> implements Consumer<K, V> {
class TracingConsumer<K, V> implements Consumer<K, V> {
private final Consumer<K, V> consumer;
private final KafkaTracing tracing;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

public class TracingProducer<K, V> implements Producer<K, V> {
class TracingProducer<K, V> implements Producer<K, V> {
private final Producer<K, V> producer;
private final KafkaTracing tracing;

Expand Down

0 comments on commit bd8a5d5

Please sign in to comment.