diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/AbstractTracingTransformer.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/AbstractTracingTransformer.java new file mode 100644 index 0000000000..4dedf127d4 --- /dev/null +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/AbstractTracingTransformer.java @@ -0,0 +1,14 @@ +package brave.kafka.streams; + +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.processor.ProcessorContext; + +abstract class AbstractTracingTransformer implements + Transformer { + + @Override public void init(ProcessorContext context) { + } + + @Override public void close() { + } +} diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/AbstractTracingValueTransformer.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/AbstractTracingValueTransformer.java new file mode 100644 index 0000000000..d8eb6eaba7 --- /dev/null +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/AbstractTracingValueTransformer.java @@ -0,0 +1,14 @@ +package brave.kafka.streams; + +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.processor.ProcessorContext; + +abstract class AbstractTracingValueTransformer implements + ValueTransformer { + + @Override public void init(ProcessorContext context) { + } + + @Override public void close() { + } +} diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/AbstractTracingValueTransformerWithKey.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/AbstractTracingValueTransformerWithKey.java new file mode 100644 index 0000000000..7c420993a0 --- /dev/null +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/AbstractTracingValueTransformerWithKey.java @@ -0,0 +1,14 @@ +package brave.kafka.streams; + +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.processor.ProcessorContext; + +abstract class AbstractTracingValueTransformerWithKey implements + ValueTransformerWithKey { + + @Override public void init(ProcessorContext context) { + } + + @Override public void close() { + } +} diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java index b81512c93b..f965c7465c 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/KafkaStreamsTracing.java @@ -7,16 +7,24 @@ import brave.propagation.TraceContext; import brave.propagation.TraceContextOrSamplingFlags; import java.util.Properties; +import java.util.function.BiConsumer; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.ForeachAction; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.kstream.ValueMapperWithKey; import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.kstream.ValueTransformerWithKey; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; +import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -38,8 +46,8 @@ public static KafkaStreamsTracing create(Tracing tracing) { /** * Creates a {@link KafkaStreams} instance with a tracing-enabled {@link KafkaClientSupplier}. All - * Topology Sources and Sinks (including internal Topics) will create Spans on records - * processed (i.e. send or consumed). + * Topology Sources and Sinks (including internal Topics) will create Spans on records processed + * (i.e. send or consumed). * * Use this instead of {@link KafkaStreams} constructor. * @@ -69,8 +77,8 @@ public KafkaStreams kafkaStreams(Topology topology, Properties streamsConfig) { * * @see TracingKafkaClientSupplier */ - public ProcessorSupplier processor(String name, Processor processor) { - return new TracingProcessorSupplier<>(this, name, processor); + public ProcessorSupplier processor(String spanName, Processor processor) { + return new TracingProcessorSupplier<>(this, spanName, processor); } /** @@ -84,9 +92,9 @@ public ProcessorSupplier processor(String name, Processor pro * .to(outputTopic); * } */ - public TransformerSupplier transformer(String name, + public TransformerSupplier transformer(String spanName, Transformer transformer) { - return new TracingTransformerSupplier<>(this, name, transformer); + return new TracingTransformerSupplier<>(this, spanName, transformer); } /** @@ -100,9 +108,9 @@ public TransformerSupplier transformer(String name, * .to(outputTopic); * } */ - public ValueTransformerSupplier valueTransformer(String name, + public ValueTransformerSupplier valueTransformer(String spanName, ValueTransformer valueTransformer) { - return new TracingValueTransformerSupplier<>(this, name, valueTransformer); + return new TracingValueTransformerSupplier<>(this, spanName, valueTransformer); } /** @@ -116,9 +124,140 @@ public ValueTransformerSupplier valueTransformer(String name, * .to(outputTopic); * } */ - public ValueTransformerWithKeySupplier valueTransformerWithKey(String name, + public ValueTransformerWithKeySupplier valueTransformerWithKey(String spanName, ValueTransformerWithKey valueTransformerWithKey) { - return new TracingValueTransformerWithKeySupplier<>(this, name, valueTransformerWithKey); + return new TracingValueTransformerWithKeySupplier<>(this, spanName, valueTransformerWithKey); + } + + /** + * Create a foreach processor, similar to {@link KStream#foreach(ForeachAction)}, where its action + * will be recorded in a new span with the indicated name. + * + *

Simple example using Kafka Streams DSL: + *

{@code
+   * StreamsBuilder builder = new StreamsBuilder();
+   * builder.stream(inputTopic)
+   *        .process(kafkaStreamsTracing.foreach("myForeach", (k, v) -> ...);
+   * }
+ */ + public ProcessorSupplier foreach(String spanName, ForeachAction action) { + return new TracingProcessorSupplier<>(this, spanName, new AbstractProcessor() { + @Override public void process(K key, V value) { + action.apply(key, value); + } + }); + } + + /** + * Create a peek transformer, similar to {@link KStream#peek(ForeachAction)}, where its action + * will be recorded in a new span with the indicated name. + * + *

Simple example using Kafka Streams DSL: + *

{@code
+   * StreamsBuilder builder = new StreamsBuilder();
+   * builder.stream(inputTopic)
+   *        .transform(kafkaStreamsTracing.peek("myPeek", (k, v) -> ...)
+   *        .to(outputTopic);
+   * }
+ */ + public TransformerSupplier> peek(String spanName, ForeachAction action) { + return new TracingTransformerSupplier<>(this, spanName, new AbstractTracingTransformer>() { + @Override public KeyValue transform(K key, V value) { + action.apply(key, value); + return KeyValue.pair(key, value); + } + }); + } + + /** + * Create a mark transformer, similar to {@link KStream#peek(ForeachAction)}, but no action is executed. + * Instead, only a span is created to represent an event as part of the stream process. + * + * A common scenario for this transformer is to mark the beginning and end of a step (or set of steps) + * in a stream process. + * + *

Simple example using Kafka Streams DSL: + *

{@code
+   * StreamsBuilder builder = new StreamsBuilder();
+   * builder.stream(inputTopic)
+   *        .transform(kafkaStreamsTracing.mark("beginning-complex-map")
+   *        .map(complexTransformation1)
+   *        .filter(predicate)
+   *        .map(complexTransformation2)
+   *        .transform(kafkaStreamsTracing.mark("end-complex-transformation")
+   *        .to(outputTopic);
+   * }
+ */ + public TransformerSupplier> mark(String spanName) { + return new TracingTransformerSupplier<>(this, spanName, new AbstractTracingTransformer>() { + @Override public KeyValue transform(K key, V value) { + return KeyValue.pair(key, value); + } + }); + } + + /** + * Create a map transformer, similar to {@link KStream#map(KeyValueMapper)}, where its mapper action + * will be recorded in a new span with the indicated name. + * + *

Simple example using Kafka Streams DSL: + *

{@code
+   * StreamsBuilder builder = new StreamsBuilder();
+   * builder.stream(inputTopic)
+   *        .transform(kafkaStreamsTracing.map("myMap", (k, v) -> ...)
+   *        .to(outputTopic);
+   * }
+ */ + public TransformerSupplier> map(String spanName, + KeyValueMapper> mapper) { + return new TracingTransformerSupplier<>(this, spanName, + new AbstractTracingTransformer>() { + @Override public KeyValue transform(K key, V value) { + return mapper.apply(key, value); + } + }); + } + + /** + * Create a peek transformer, similar to {@link KStream#mapValues(ValueMapperWithKey)}, where its mapper action + * will be recorded in a new span with the indicated name. + * + *

Simple example using Kafka Streams DSL: + *

{@code
+   * StreamsBuilder builder = new StreamsBuilder();
+   * builder.stream(inputTopic)
+   *        .transform(kafkaStreamsTracing.mapValues("myMapValues", (k, v) -> ...)
+   *        .to(outputTopic);
+   * }
+ */ + public ValueTransformerWithKeySupplier mapValues(String spanName, + ValueMapperWithKey mapper) { + return new TracingValueTransformerWithKeySupplier<>(this, spanName, + new AbstractTracingValueTransformerWithKey() { + @Override public VR transform(K readOnlyKey, V value) { + return mapper.apply(readOnlyKey, value); + } + }); + } + + /** + * Create a peek transformer, similar to {@link KStream#mapValues(ValueMapper)}, where its mapper action + * will be recorded in a new span with the indicated name. + * + *

Simple example using Kafka Streams DSL: + *

{@code
+   * StreamsBuilder builder = new StreamsBuilder();
+   * builder.stream(inputTopic)
+   *        .transform(kafkaStreamsTracing.mapValues("myMapValues", v -> ...)
+   *        .to(outputTopic);
+   * }
+ */ + public ValueTransformerSupplier mapValues(String spanName, ValueMapper mapper) { + return new TracingValueTransformerSupplier<>(this, spanName, new AbstractTracingValueTransformer() { + @Override public VR transform(V value) { + return mapper.apply(value); + } + }); } static void addTags(ProcessorContext processorContext, SpanCustomizer result) { diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessor.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessor.java index 707fd218e6..ae017d6691 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessor.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessor.java @@ -9,16 +9,16 @@ class TracingProcessor implements Processor { final KafkaStreamsTracing kafkaStreamsTracing; final Tracer tracer; - final String name; + final String spanName; final Processor delegateProcessor; ProcessorContext processorContext; TracingProcessor(KafkaStreamsTracing kafkaStreamsTracing, - String name, Processor delegateProcessor) { + String spanName, Processor delegateProcessor) { this.kafkaStreamsTracing = kafkaStreamsTracing; this.tracer = kafkaStreamsTracing.tracing.tracer(); - this.name = name; + this.spanName = spanName; this.delegateProcessor = delegateProcessor; } @@ -32,7 +32,7 @@ public void init(ProcessorContext processorContext) { public void process(K k, V v) { Span span = kafkaStreamsTracing.nextSpan(processorContext); if (!span.isNoop()) { - span.name(name); + span.name(spanName); span.start(); } diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessorSupplier.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessorSupplier.java index 451a380f52..bd6d66fdd2 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessorSupplier.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingProcessorSupplier.java @@ -6,19 +6,19 @@ class TracingProcessorSupplier implements ProcessorSupplier { final KafkaStreamsTracing kafkaStreamsTracing; - final String name; + final String spanName; final Processor delegateProcessor; TracingProcessorSupplier(KafkaStreamsTracing kafkaStreamsTracing, - String name, + String spanName, Processor delegateProcessor) { this.kafkaStreamsTracing = kafkaStreamsTracing; - this.name = name; + this.spanName = spanName; this.delegateProcessor = delegateProcessor; } /** This wraps process method to enable tracing. */ @Override public Processor get() { - return new TracingProcessor<>(kafkaStreamsTracing, name, delegateProcessor); + return new TracingProcessor<>(kafkaStreamsTracing, spanName, delegateProcessor); } } diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingTransformer.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingTransformer.java index f33a1766bd..601e5698cb 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingTransformer.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingTransformer.java @@ -9,16 +9,16 @@ class TracingTransformer implements Transformer { final KafkaStreamsTracing kafkaStreamsTracing; final Tracer tracer; - final String name; + final String spanName; final Transformer delegateTransformer; ProcessorContext processorContext; - TracingTransformer(KafkaStreamsTracing kafkaStreamsTracing, String name, + TracingTransformer(KafkaStreamsTracing kafkaStreamsTracing, String spanName, Transformer delegateTransformer) { this.kafkaStreamsTracing = kafkaStreamsTracing; this.tracer = kafkaStreamsTracing.tracing.tracer(); - this.name = name; + this.spanName = spanName; this.delegateTransformer = delegateTransformer; } @@ -32,7 +32,7 @@ public void init(ProcessorContext processorContext) { public R transform(K k, V v) { Span span = kafkaStreamsTracing.nextSpan(processorContext); if (!span.isNoop()) { - span.name(name); + span.name(spanName); span.start(); } diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingTransformerSupplier.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingTransformerSupplier.java index d5e3445d84..ccf43b8bbd 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingTransformerSupplier.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingTransformerSupplier.java @@ -5,19 +5,19 @@ class TracingTransformerSupplier implements TransformerSupplier { final KafkaStreamsTracing kafkaStreamsTracing; - final String name; + final String spanName; final Transformer delegateTransformer; TracingTransformerSupplier(KafkaStreamsTracing kafkaStreamsTracing, - String name, + String spanName, Transformer delegateTransformer) { this.kafkaStreamsTracing = kafkaStreamsTracing; - this.name = name; + this.spanName = spanName; this.delegateTransformer = delegateTransformer; } /** This wraps transform method to enable tracing. */ @Override public Transformer get() { - return new TracingTransformer<>(kafkaStreamsTracing, name, delegateTransformer); + return new TracingTransformer<>(kafkaStreamsTracing, spanName, delegateTransformer); } } diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformer.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformer.java index f33126232d..1fa4728e03 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformer.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformer.java @@ -9,16 +9,16 @@ class TracingValueTransformer implements ValueTransformer { final KafkaStreamsTracing kafkaStreamsTracing; final Tracer tracer; - final String name; + final String spanName; final ValueTransformer delegateTransformer; ProcessorContext processorContext; - TracingValueTransformer(KafkaStreamsTracing kafkaStreamsTracing, String name, + TracingValueTransformer(KafkaStreamsTracing kafkaStreamsTracing, String spanName, ValueTransformer delegateTransformer) { this.kafkaStreamsTracing = kafkaStreamsTracing; this.tracer = kafkaStreamsTracing.tracing.tracer(); - this.name = name; + this.spanName = spanName; this.delegateTransformer = delegateTransformer; } @@ -32,7 +32,7 @@ public void init(ProcessorContext processorContext) { public VR transform(V v) { Span span = kafkaStreamsTracing.nextSpan(processorContext); if (!span.isNoop()) { - span.name(name); + span.name(spanName); span.start(); } diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerSupplier.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerSupplier.java index 449afe4278..9961a58d5e 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerSupplier.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerSupplier.java @@ -5,19 +5,19 @@ class TracingValueTransformerSupplier implements ValueTransformerSupplier { final KafkaStreamsTracing kafkaStreamsTracing; - final String name; + final String spanName; final ValueTransformer delegateTransformer; TracingValueTransformerSupplier(KafkaStreamsTracing kafkaStreamsTracing, - String name, + String spanName, ValueTransformer delegateTransformer) { this.kafkaStreamsTracing = kafkaStreamsTracing; - this.name = name; + this.spanName = spanName; this.delegateTransformer = delegateTransformer; } /** This wraps transform method to enable tracing. */ @Override public ValueTransformer get() { - return new TracingValueTransformer<>(kafkaStreamsTracing, name, delegateTransformer); + return new TracingValueTransformer<>(kafkaStreamsTracing, spanName, delegateTransformer); } } diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerWithKey.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerWithKey.java index d55552988f..7a0ab75dc6 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerWithKey.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerWithKey.java @@ -9,16 +9,16 @@ class TracingValueTransformerWithKey implements ValueTransformerWithKe final KafkaStreamsTracing kafkaStreamsTracing; final Tracer tracer; - final String name; + final String spanName; final ValueTransformerWithKey delegateTransformer; ProcessorContext processorContext; - TracingValueTransformerWithKey(KafkaStreamsTracing kafkaStreamsTracing, String name, + TracingValueTransformerWithKey(KafkaStreamsTracing kafkaStreamsTracing, String spanName, ValueTransformerWithKey delegateTransformer) { this.kafkaStreamsTracing = kafkaStreamsTracing; this.tracer = kafkaStreamsTracing.tracing.tracer(); - this.name = name; + this.spanName = spanName; this.delegateTransformer = delegateTransformer; } @@ -32,7 +32,7 @@ public void init(ProcessorContext processorContext) { public VR transform(K k, V v) { Span span = kafkaStreamsTracing.nextSpan(processorContext); if (!span.isNoop()) { - span.name(name); + span.name(spanName); span.start(); } diff --git a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerWithKeySupplier.java b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerWithKeySupplier.java index 6a3c64e74f..dbedd7bf4e 100644 --- a/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerWithKeySupplier.java +++ b/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingValueTransformerWithKeySupplier.java @@ -6,19 +6,19 @@ class TracingValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier { final KafkaStreamsTracing kafkaStreamsTracing; - final String name; + final String spanName; final ValueTransformerWithKey delegateTransformer; TracingValueTransformerWithKeySupplier(KafkaStreamsTracing kafkaStreamsTracing, - String name, + String spanName, ValueTransformerWithKey delegateTransformer) { this.kafkaStreamsTracing = kafkaStreamsTracing; - this.name = name; + this.spanName = spanName; this.delegateTransformer = delegateTransformer; } /** This wraps transform method to enable tracing. */ @Override public ValueTransformerWithKey get() { - return new TracingValueTransformerWithKey<>(kafkaStreamsTracing, name, delegateTransformer); + return new TracingValueTransformerWithKey<>(kafkaStreamsTracing, spanName, delegateTransformer); } } diff --git a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java index c42333a870..9f3341bd6f 100644 --- a/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java +++ b/instrumentation/kafka-streams/src/test/java/brave/kafka/streams/ITKafkaStreamsTracing.java @@ -8,6 +8,7 @@ import com.github.charithe.kafka.KafkaJunitRule; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -44,6 +45,7 @@ import org.junit.rules.TestRule; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import zipkin2.Annotation; import zipkin2.Span; import static org.assertj.core.api.Assertions.assertThat; @@ -180,6 +182,106 @@ public void process(String key, String value) { streams.cleanUp(); } + @Test + public void should_create_spans_from_stream_with_tracing_peek() throws Exception { + String inputTopic = testName.getMethodName() + "-input"; + String outputTopic = testName.getMethodName() + "-output"; + + long now = System.currentTimeMillis(); + + StreamsBuilder builder = new StreamsBuilder(); + builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())) + .transform(kafkaStreamsTracing.peek("peek-1", (key, value) -> { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + tracing.tracer().currentSpan().annotate(now, "test"); + })) + .to(outputTopic); + Topology topology = builder.build(); + + KafkaStreams streams = buildKafkaStreams(topology); + + producer = createTracingProducer(); + producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get(); + + waitForStreamToRun(streams); + + Span spanInput = takeSpan(), spanProcessor = takeSpan(), spanOutput = takeSpan(); + + assertThat(spanInput.kind().name()).isEqualTo(brave.Span.Kind.CONSUMER.name()); + assertThat(spanInput.traceId()).isEqualTo(spanProcessor.traceId()); + assertThat(spanProcessor.traceId()).isEqualTo(spanOutput.traceId()); + assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic); + assertThat(spanProcessor.annotations()).contains(Annotation.create(now, "test")); + + streams.close(); + streams.cleanUp(); + } + + @Test + public void should_create_spans_from_stream_with_tracing_mark() throws Exception { + String inputTopic = testName.getMethodName() + "-input"; + String outputTopic = testName.getMethodName() + "-output"; + + StreamsBuilder builder = new StreamsBuilder(); + builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())) + .transform(kafkaStreamsTracing.mark("mark-1")) + .to(outputTopic); + Topology topology = builder.build(); + + KafkaStreams streams = buildKafkaStreams(topology); + + producer = createTracingProducer(); + producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get(); + + waitForStreamToRun(streams); + + Span spanInput = takeSpan(), spanProcessor = takeSpan(), spanOutput = takeSpan(); + + assertThat(spanInput.kind().name()).isEqualTo(brave.Span.Kind.CONSUMER.name()); + assertThat(spanInput.traceId()).isEqualTo(spanProcessor.traceId()); + assertThat(spanProcessor.traceId()).isEqualTo(spanOutput.traceId()); + assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic); + + streams.close(); + streams.cleanUp(); + } + + @Test + public void should_create_spans_from_stream_with_tracing_foreach() throws Exception { + String inputTopic = testName.getMethodName() + "-input"; + + StreamsBuilder builder = new StreamsBuilder(); + builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())) + .process(kafkaStreamsTracing.foreach("foreach-1", (key, value) -> { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + })); + Topology topology = builder.build(); + + KafkaStreams streams = buildKafkaStreams(topology); + + producer = createTracingProducer(); + producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get(); + + waitForStreamToRun(streams); + + Span spanInput = takeSpan(), spanProcessor = takeSpan(); + + assertThat(spanInput.kind().name()).isEqualTo(brave.Span.Kind.CONSUMER.name()); + assertThat(spanInput.traceId()).isEqualTo(spanProcessor.traceId()); + assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic); + + streams.close(); + streams.cleanUp(); + } + @Test public void should_create_spans_from_stream_without_tracing_and_tracing_processor() throws Exception { @@ -386,6 +488,117 @@ public void close() { streams.cleanUp(); } + @Test + public void should_create_spans_from_stream_with_tracing_map() throws Exception { + String inputTopic = testName.getMethodName() + "-input"; + String outputTopic = testName.getMethodName() + "-output"; + + StreamsBuilder builder = new StreamsBuilder(); + builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())) + .transform(kafkaStreamsTracing.map("map-1", (key, value) -> { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return KeyValue.pair(key, value); + })) + .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); + Topology topology = builder.build(); + + KafkaStreams streams = buildKafkaStreams(topology); + + producer = createTracingProducer(); + producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get(); + + waitForStreamToRun(streams); + + Span spanInput = takeSpan(), spanProcessor = takeSpan(), spanOutput = takeSpan(); + + assertThat(spanInput.kind().name()).isEqualTo(brave.Span.Kind.CONSUMER.name()); + assertThat(spanInput.traceId()).isEqualTo(spanProcessor.traceId()); + assertThat(spanProcessor.traceId()).isEqualTo(spanOutput.traceId()); + assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic); + assertThat(spanOutput.tags()).containsEntry("kafka.topic", outputTopic); + + streams.close(); + streams.cleanUp(); + } + + @Test + public void should_create_spans_from_stream_with_tracing_mapValues() throws Exception { + String inputTopic = testName.getMethodName() + "-input"; + String outputTopic = testName.getMethodName() + "-output"; + + StreamsBuilder builder = new StreamsBuilder(); + builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())) + .transformValues(kafkaStreamsTracing.mapValues("mapValue-1", value -> { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return value; + })) + .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); + Topology topology = builder.build(); + + KafkaStreams streams = buildKafkaStreams(topology); + + producer = createTracingProducer(); + producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get(); + + waitForStreamToRun(streams); + + Span spanInput = takeSpan(), spanProcessor = takeSpan(), spanOutput = takeSpan(); + + assertThat(spanInput.kind().name()).isEqualTo(brave.Span.Kind.CONSUMER.name()); + assertThat(spanInput.traceId()).isEqualTo(spanProcessor.traceId()); + assertThat(spanProcessor.traceId()).isEqualTo(spanOutput.traceId()); + assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic); + assertThat(spanOutput.tags()).containsEntry("kafka.topic", outputTopic); + + streams.close(); + streams.cleanUp(); + } + + @Test + public void should_create_spans_from_stream_with_tracing_mapValues_withKey() throws Exception { + String inputTopic = testName.getMethodName() + "-input"; + String outputTopic = testName.getMethodName() + "-output"; + + StreamsBuilder builder = new StreamsBuilder(); + builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())) + .transformValues(kafkaStreamsTracing.mapValues("mapValue-1", (key, value) -> { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return value; + })) + .to(outputTopic, Produced.with(Serdes.String(), Serdes.String())); + Topology topology = builder.build(); + + KafkaStreams streams = buildKafkaStreams(topology); + + producer = createTracingProducer(); + producer.send(new ProducerRecord<>(inputTopic, TEST_KEY, TEST_VALUE)).get(); + + waitForStreamToRun(streams); + + Span spanInput = takeSpan(), spanProcessor = takeSpan(), spanOutput = takeSpan(); + + assertThat(spanInput.kind().name()).isEqualTo(brave.Span.Kind.CONSUMER.name()); + assertThat(spanInput.traceId()).isEqualTo(spanProcessor.traceId()); + assertThat(spanProcessor.traceId()).isEqualTo(spanOutput.traceId()); + assertThat(spanInput.tags()).containsEntry("kafka.topic", inputTopic); + assertThat(spanOutput.tags()).containsEntry("kafka.topic", outputTopic); + + streams.close(); + streams.cleanUp(); + } + @Test public void should_create_spans_from_stream_without_tracing_with_tracing_valueTransformer() throws Exception {