From f71b47fd09da4c4e5b86b55a18c1a4a7001534b0 Mon Sep 17 00:00:00 2001 From: Jorge Quilcate Otoya Date: Tue, 3 Dec 2019 17:45:11 +0100 Subject: [PATCH] refactor: from extractAndClear and later inject to extract and later clearAndInject --- .../jms/src/main/java/brave/jms/JmsTracing.java | 5 +++++ .../src/main/java/brave/jms/TracingConsumer.java | 5 +++++ .../java/brave/jms/TracingMessageListener.java | 1 + .../src/main/java/brave/jms/TracingProducer.java | 3 ++- .../jms/ITJms_1_1_TracingMessageConsumer.java | 4 +++- .../java/brave/jms/ITTracingJMSConsumer.java | 4 +++- .../src/test/java/brave/jms/JmsTracingTest.java | 11 +++++++++++ .../brave/jms/TracingMessageListenerTest.java | 16 ++++++++++++++++ .../java/brave/kafka/clients/KafkaTracing.java | 9 +++++++++ .../brave/kafka/clients/TracingConsumer.java | 3 +++ .../brave/kafka/clients/TracingProducer.java | 3 +++ .../brave/kafka/clients/KafkaTracingTest.java | 6 ++++++ .../brave/spring/rabbit/SpringRabbitTracing.java | 6 ++++++ .../rabbit/TracingMessagePostProcessor.java | 3 +++ .../rabbit/TracingRabbitListenerAdvice.java | 1 + .../rabbit/TracingRabbitListenerAdviceTest.java | 6 ++---- 16 files changed, 79 insertions(+), 7 deletions(-) diff --git a/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java b/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java index fb09d10d15..c991b498b0 100644 --- a/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java +++ b/instrumentation/jms/src/main/java/brave/jms/JmsTracing.java @@ -236,6 +236,7 @@ public MessageListener messageListener(MessageListener messageListener, boolean public Span nextSpan(Message message) { TraceContextOrSamplingFlags extracted = extractAndClearTraceIdProperties(processorExtractor, message, message); + //TODO TraceContextOrSamplingFlags extracted = processorExtractor.extract(message); Span result = tracer.nextSpan(extracted); // Processor spans use the normal sampler. // When an upstream context was not present, lookup keys are unlikely added @@ -254,6 +255,10 @@ TraceContextOrSamplingFlags extractAndClearTraceIdProperties( // message writable PropertyFilter.filterProperties(message, traceIdProperties); return extracted; +//TODO void clearProperties(Message message) { +// if (!TraceContextOrSamplingFlags.EMPTY.equals(extracted)) { +// PropertyFilter.filterProperties(message, propagationKeys); +// } } /** Creates a potentially noop remote span representing this request */ diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingConsumer.java b/instrumentation/jms/src/main/java/brave/jms/TracingConsumer.java index 3723550a8f..17ba114c83 100644 --- a/instrumentation/jms/src/main/java/brave/jms/TracingConsumer.java +++ b/instrumentation/jms/src/main/java/brave/jms/TracingConsumer.java @@ -47,8 +47,12 @@ void handleReceive(Message message) { if (message == null || tracing.isNoop()) return; MessageConsumerRequest request = new MessageConsumerRequest(message, destination(message)); +<<<<<<< HEAD TraceContextOrSamplingFlags extracted = jmsTracing.extractAndClearTraceIdProperties(extractor, request, message); +======= + TraceContextOrSamplingFlags extracted = extractor.extract(request); +>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject Span span = jmsTracing.nextMessagingSpan(sampler, request, extracted); if (!span.isNoop()) { @@ -61,6 +65,7 @@ void handleReceive(Message message) { long timestamp = tracing.clock(span.context()).currentTimeMicroseconds(); span.start(timestamp).finish(timestamp); } + jmsTracing.clearProperties(message); injector.inject(span.context(), request); } diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java b/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java index e02c531d33..412cff4132 100644 --- a/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java +++ b/instrumentation/jms/src/main/java/brave/jms/TracingMessageListener.java @@ -92,6 +92,7 @@ Span startMessageListenerSpan(Message message) { TraceContextOrSamplingFlags extracted = jmsTracing.extractAndClearTraceIdProperties(extractor, request, message); +// TODO TraceContextOrSamplingFlags extracted = extractor.extract(request); Span consumerSpan = jmsTracing.nextMessagingSpan(sampler, request, extracted); // JMS has no visibility of the incoming message, which incidentally could be local! diff --git a/instrumentation/jms/src/main/java/brave/jms/TracingProducer.java b/instrumentation/jms/src/main/java/brave/jms/TracingProducer.java index 91a1af0b0f..3e514e91c6 100644 --- a/instrumentation/jms/src/main/java/brave/jms/TracingProducer.java +++ b/instrumentation/jms/src/main/java/brave/jms/TracingProducer.java @@ -53,8 +53,9 @@ Span createAndStartProducerSpan(R request) { // sending one. At any rate, as long as we are using b3-single format, this is an overwrite not // a clear. Span span; + TraceContextOrSamplingFlags extracted = null; if (maybeParent == null) { - TraceContextOrSamplingFlags extracted = extractor.extract(request); + extracted = extractor.extract(request); span = jmsTracing.nextMessagingSpan(sampler, request, extracted); } else { span = tracer.newChild(maybeParent); diff --git a/instrumentation/jms/src/test/java/brave/jms/ITJms_1_1_TracingMessageConsumer.java b/instrumentation/jms/src/test/java/brave/jms/ITJms_1_1_TracingMessageConsumer.java index 14bb7b5b36..8d50f909b9 100644 --- a/instrumentation/jms/src/test/java/brave/jms/ITJms_1_1_TracingMessageConsumer.java +++ b/instrumentation/jms/src/test/java/brave/jms/ITJms_1_1_TracingMessageConsumer.java @@ -237,7 +237,9 @@ void messageListener_resumesTrace(JMSRunnable send, MessageConsumer messageConsu assertChildOf(listenerSpan, consumerSpan); assertThat(listenerSpan.tags()) .hasSize(1) // no redundant copy of consumer tags - .containsEntry("b3", "false"); // b3 header not leaked to listener + // This assumption does not hold. +// .containsEntry("b3", "false"); // b3 header not leaked to listener + .containsEntry("b3", "true"); // b3 header not leaked to listener } @Test public void messageListener_readsBaggage() throws JMSException { diff --git a/instrumentation/jms/src/test/java/brave/jms/ITTracingJMSConsumer.java b/instrumentation/jms/src/test/java/brave/jms/ITTracingJMSConsumer.java index 4de0fd5f2d..4aa25bbd53 100644 --- a/instrumentation/jms/src/test/java/brave/jms/ITTracingJMSConsumer.java +++ b/instrumentation/jms/src/test/java/brave/jms/ITTracingJMSConsumer.java @@ -140,7 +140,9 @@ void messageListener_resumesTrace(Runnable send) { assertThat(listenerSpan.tags()) .hasSize(1) // no redundant copy of consumer tags - .containsEntry("b3", "false"); // b3 header not leaked to listener + // This expectation does not hold +// .containsEntry("b3", "false"); // b3 header not leaked to listener + .containsEntry("b3", "true"); // b3 header kept } @Test public void messageListener_readsBaggage() { diff --git a/instrumentation/jms/src/test/java/brave/jms/JmsTracingTest.java b/instrumentation/jms/src/test/java/brave/jms/JmsTracingTest.java index 61c29c7f96..11a8c54a4c 100644 --- a/instrumentation/jms/src/test/java/brave/jms/JmsTracingTest.java +++ b/instrumentation/jms/src/test/java/brave/jms/JmsTracingTest.java @@ -245,12 +245,23 @@ abstract class Both implements XATopicConnection, TopicConnection { assertThat(testSpanHandler.takeLocalSpan().tags()).isEmpty(); } +<<<<<<< HEAD @Test public void nextSpan_should_clear_propagation_headers() { Propagation.B3_STRING.injector(SETTER).inject(parent, message); Propagation.B3_SINGLE_STRING.injector(SETTER).inject(parent, message); jmsTracing.nextSpan(message); assertThat(ITJms.propertiesToMap(message)).isEmpty(); +======= + @Test public void nextSpan_should_not_clear_propagation_headers() throws Exception { + TraceContext context = + TraceContext.newBuilder().traceId(1L).parentId(2L).spanId(3L).debug(true).build(); + Propagation.B3_STRING.injector(SETTER).inject(context, message); + Propagation.B3_SINGLE_STRING.injector(SETTER).inject(context, message); + + jmsTracing.nextSpan(message); + assertThat(JmsTest.propertiesToMap(message)).isNotEmpty(); +>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject } @Test public void nextSpan_should_retain_baggage_headers() throws JMSException { diff --git a/instrumentation/jms/src/test/java/brave/jms/TracingMessageListenerTest.java b/instrumentation/jms/src/test/java/brave/jms/TracingMessageListenerTest.java index a212ae7647..631a41ae62 100644 --- a/instrumentation/jms/src/test/java/brave/jms/TracingMessageListenerTest.java +++ b/instrumentation/jms/src/test/java/brave/jms/TracingMessageListenerTest.java @@ -150,8 +150,12 @@ public class TracingMessageListenerTest extends ITJms { onMessageConsumed(message); +<<<<<<< HEAD // clearing headers ensures later work doesn't try to use the old parent assertNoProperties(message); +======= + assertThat(message.getProperties()).isNotEmpty(); +>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject MutableSpan consumerSpan = testSpanHandler.takeRemoteSpan(CONSUMER); MutableSpan listenerSpan = testSpanHandler.takeLocalSpan(); @@ -169,8 +173,12 @@ public class TracingMessageListenerTest extends ITJms { onMessageConsumed(message); +<<<<<<< HEAD // clearing headers ensures later work doesn't try to use the old parent assertNoProperties(message); +======= + assertThat(message.getProperties()).isNotEmpty(); +>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject assertChildOf(testSpanHandler.takeLocalSpan(), parent); } @@ -196,8 +204,12 @@ public class TracingMessageListenerTest extends ITJms { onMessageConsumed(message); +<<<<<<< HEAD // clearing headers ensures later work doesn't try to use the old parent assertNoProperties(message); +======= + assertThat(message.getProperties()).isNotEmpty(); +>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject MutableSpan consumerSpan = testSpanHandler.takeRemoteSpan(CONSUMER); MutableSpan listenerSpan = testSpanHandler.takeLocalSpan(); @@ -215,8 +227,12 @@ public class TracingMessageListenerTest extends ITJms { onMessageConsumed(message); +<<<<<<< HEAD // clearing headers ensures later work doesn't try to use the old parent assertNoProperties(message); +======= + assertThat(message.getProperties()).isNotEmpty(); +>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject assertChildOf(testSpanHandler.takeLocalSpan(), parent); } diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java index ab8293d870..58e33ca5ed 100644 --- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java +++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/KafkaTracing.java @@ -194,6 +194,7 @@ public Span nextSpan(ConsumerRecord record) { // events create consumer spans. Since this is a processor span, we use the normal sampler. TraceContextOrSamplingFlags extracted = extractAndClearTraceIdHeaders(processorExtractor, record.headers(), record.headers()); +//TODO processorExtractor.extract(record.headers()); Span result = tracer.nextSpan(extracted); if (extracted.context() == null && !result.isNoop()) { addTags(record, result); @@ -201,6 +202,7 @@ public Span nextSpan(ConsumerRecord record) { return result; } + //TODO remove TraceContextOrSamplingFlags extractAndClearTraceIdHeaders( Extractor extractor, R request, Headers headers ) { @@ -233,6 +235,13 @@ void clearTraceIdHeaders(Headers headers) { for (Iterator
i = headers.iterator(); i.hasNext(); ) { Header next = i.next(); if (traceIdHeaders.contains(next.key())) i.remove(); +//TODO void clearHeaders(TraceContextOrSamplingFlags extracted, Headers headers) { +// if (!TraceContextOrSamplingFlags.EMPTY.equals(extracted)) { +// // Headers::remove creates and consumes an iterator each time. This does one loop instead. +// for (Iterator
i = headers.iterator(); i.hasNext(); ) { +// Header next = i.next(); +// if (propagationKeys.contains(next.key())) i.remove(); +// } } } diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java index 617ba551f3..b26d73b66d 100644 --- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java +++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingConsumer.java @@ -98,6 +98,7 @@ public ConsumerRecords poll(long timeout) { KafkaConsumerRequest request = new KafkaConsumerRequest(record); TraceContextOrSamplingFlags extracted = kafkaTracing.extractAndClearTraceIdHeaders(extractor, request, record.headers()); +//TODO TraceContextOrSamplingFlags extracted = extractor.extract(request); // If we extracted neither a trace context, nor request-scoped data (extra), // and sharing trace is enabled make or reuse a span for this topic @@ -115,6 +116,7 @@ public ConsumerRecords poll(long timeout) { } consumerSpansForTopic.put(topic, span); } + //TODO kafkaTracing.clearHeaders(extracted, record.headers()); injector.inject(span.context(), request); } else { // we extracted request-scoped data, so cannot share a consumer span. Span span = kafkaTracing.nextMessagingSpan(sampler, request, extracted); @@ -126,6 +128,7 @@ public ConsumerRecords poll(long timeout) { } span.start(timestamp).finish(timestamp); // span won't be shared by other records } + //TODO kafkaTracing.clearHeaders(extracted, record.headers()); injector.inject(span.context(), request); } } diff --git a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java index cb135457a2..52147fe840 100644 --- a/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java +++ b/instrumentation/kafka-clients/src/main/java/brave/kafka/clients/TracingProducer.java @@ -103,9 +103,11 @@ public Future send(ProducerRecord record, @Nullable Callba // NOTE: Brave instrumentation used properly does not result in stale header entries, as we // always clear message headers after reading. Span span; +//TODO TraceContextOrSamplingFlags extracted = null; if (maybeParent == null) { TraceContextOrSamplingFlags extracted = kafkaTracing.extractAndClearTraceIdHeaders(extractor, request, record.headers()); +//TODO extracted = extractor.extract(request); span = kafkaTracing.nextMessagingSpan(sampler, request, extracted); } else { // If we have a span in scope assume headers were cleared before span = tracer.newChild(maybeParent); @@ -121,6 +123,7 @@ public Future send(ProducerRecord record, @Nullable Callba span.start(); } +//TODO kafkaTracing.clearHeaders(extracted, record.headers()); injector.inject(span.context(), request); Tracer.SpanInScope ws = tracer.withSpanInScope(span); diff --git a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaTracingTest.java b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaTracingTest.java index b0d9dfc283..59fb14f145 100644 --- a/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaTracingTest.java +++ b/instrumentation/kafka-clients/src/test/java/brave/kafka/clients/KafkaTracingTest.java @@ -112,6 +112,12 @@ public class KafkaTracingTest extends KafkaTest { kafkaTracing.nextSpan(consumerRecord); assertThat(consumerRecord.headers().headers(BAGGAGE_FIELD_KEY)).isNotEmpty(); } +// @Test public void nextSpan_should_clear_propagation_headers() { +// addB3MultiHeaders(fakeRecord); +// +// kafkaTracing.nextSpan(fakeRecord); +// assertThat(fakeRecord.headers().toArray()).isEmpty(); +// } @Test public void nextSpan_should_not_clear_other_headers() { consumerRecord.headers().add("foo", new byte[0]); diff --git a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitTracing.java b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitTracing.java index 73a2250088..ce2d8207a0 100644 --- a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitTracing.java +++ b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/SpringRabbitTracing.java @@ -217,6 +217,7 @@ public SimpleRabbitListenerContainerFactory decorateSimpleRabbitListenerContaine return factory; } + //TODO remove TraceContextOrSamplingFlags extractAndClearTraceIdHeaders( Extractor extractor, R request, Message message ) { @@ -247,5 +248,10 @@ Span nextMessagingSpan( // multi, or visa versa. void clearTraceIdHeaders(Map headers) { for (String traceIDHeader : traceIdHeaders) headers.remove(traceIDHeader); +//TODO void clearHeaders(TraceContextOrSamplingFlags extracted, Message message) { +// if (!TraceContextOrSamplingFlags.EMPTY.equals(extracted)) { +// Map headers = message.getMessageProperties().getHeaders(); +// for (String key : propagationKeys) headers.remove(key); +// } } } diff --git a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingMessagePostProcessor.java b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingMessagePostProcessor.java index a4cdfa8f8d..d434779900 100644 --- a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingMessagePostProcessor.java +++ b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingMessagePostProcessor.java @@ -66,9 +66,11 @@ final class TracingMessagePostProcessor implements MessagePostProcessor { // NOTE: Brave instrumentation used properly does not result in stale header entries, as we // always clear message headers after reading. Span span; +//TODO TraceContextOrSamplingFlags extracted = null; if (maybeParent == null) { TraceContextOrSamplingFlags extracted = springRabbitTracing.extractAndClearTraceIdHeaders(extractor, request, message); +//TODO extracted = extractor.extract(request); span = springRabbitTracing.nextMessagingSpan(sampler, request, extracted); } else { // If we have a span in scope assume headers were cleared before span = tracer.newChild(maybeParent); @@ -82,6 +84,7 @@ final class TracingMessagePostProcessor implements MessagePostProcessor { span.start(timestamp).finish(timestamp); } +//TODO springRabbitTracing.clearHeaders(extracted, message); injector.inject(span.context(), request); return message; } diff --git a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingRabbitListenerAdvice.java b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingRabbitListenerAdvice.java index 2aa21bbdbc..705f373532 100644 --- a/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingRabbitListenerAdvice.java +++ b/instrumentation/spring-rabbit/src/main/java/brave/spring/rabbit/TracingRabbitListenerAdvice.java @@ -77,6 +77,7 @@ final class TracingRabbitListenerAdvice implements MethodInterceptor { TraceContextOrSamplingFlags extracted = springRabbitTracing.extractAndClearTraceIdHeaders(extractor, request, message); +//TODO extractor.extract(request); // named for BlockingQueueConsumer.nextMessage, which we can't currently see Span consumerSpan = springRabbitTracing.nextMessagingSpan(sampler, request, extracted); diff --git a/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/TracingRabbitListenerAdviceTest.java b/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/TracingRabbitListenerAdviceTest.java index 3390fb136b..6a357910f4 100644 --- a/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/TracingRabbitListenerAdviceTest.java +++ b/instrumentation/spring-rabbit/src/test/java/brave/spring/rabbit/TracingRabbitListenerAdviceTest.java @@ -128,8 +128,7 @@ public class TracingRabbitListenerAdviceTest { Message message = MessageBuilder.withBody(new byte[0]).andProperties(props).build(); onMessageConsumed(message); - // cleared the headers to later work doesn't try to use the old parent - assertThat(message.getMessageProperties().getHeaders()).isEmpty(); + assertThat(message.getMessageProperties().getHeaders()).isNotEmpty(); assertThat(spans) .filteredOn(span -> span.kind() == CONSUMER) @@ -144,8 +143,7 @@ public class TracingRabbitListenerAdviceTest { Message message = MessageBuilder.withBody(new byte[0]).andProperties(props).build(); onMessageConsumed(message); - // cleared the headers to later work doesn't try to use the old parent - assertThat(message.getMessageProperties().getHeaders()).isEmpty(); + assertThat(message.getMessageProperties().getHeaders()).isNotEmpty(); assertThat(spans) .filteredOn(span -> span.kind() == CONSUMER)