Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: From extractAndClear .. inject approach to extract .. clearAndInject #1047

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions instrumentation/jms/src/main/java/brave/jms/JmsTracing.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ public MessageListener messageListener(MessageListener messageListener, boolean
public Span nextSpan(Message message) {
TraceContextOrSamplingFlags extracted =
extractAndClearTraceIdProperties(processorExtractor, message, message);
//TODO TraceContextOrSamplingFlags extracted = processorExtractor.extract(message);
Span result = tracer.nextSpan(extracted); // Processor spans use the normal sampler.

// When an upstream context was not present, lookup keys are unlikely added
Expand All @@ -254,6 +255,10 @@ <R> TraceContextOrSamplingFlags extractAndClearTraceIdProperties(
// message writable
PropertyFilter.filterProperties(message, traceIdProperties);
return extracted;
//TODO void clearProperties(Message message) {
// if (!TraceContextOrSamplingFlags.EMPTY.equals(extracted)) {
// PropertyFilter.filterProperties(message, propagationKeys);
// }
}

/** Creates a potentially noop remote span representing this request */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ void handleReceive(Message message) {
if (message == null || tracing.isNoop()) return;
MessageConsumerRequest request = new MessageConsumerRequest(message, destination(message));

<<<<<<< HEAD
TraceContextOrSamplingFlags extracted =
jmsTracing.extractAndClearTraceIdProperties(extractor, request, message);
=======
TraceContextOrSamplingFlags extracted = extractor.extract(request);
>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject
Span span = jmsTracing.nextMessagingSpan(sampler, request, extracted);

if (!span.isNoop()) {
Expand All @@ -61,6 +65,7 @@ void handleReceive(Message message) {
long timestamp = tracing.clock(span.context()).currentTimeMicroseconds();
span.start(timestamp).finish(timestamp);
}
jmsTracing.clearProperties(message);
injector.inject(span.context(), request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Span startMessageListenerSpan(Message message) {

TraceContextOrSamplingFlags extracted =
jmsTracing.extractAndClearTraceIdProperties(extractor, request, message);
// TODO TraceContextOrSamplingFlags extracted = extractor.extract(request);
Span consumerSpan = jmsTracing.nextMessagingSpan(sampler, request, extracted);

// JMS has no visibility of the incoming message, which incidentally could be local!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ Span createAndStartProducerSpan(R request) {
// sending one. At any rate, as long as we are using b3-single format, this is an overwrite not
// a clear.
Span span;
TraceContextOrSamplingFlags extracted = null;
if (maybeParent == null) {
TraceContextOrSamplingFlags extracted = extractor.extract(request);
extracted = extractor.extract(request);
span = jmsTracing.nextMessagingSpan(sampler, request, extracted);
} else {
span = tracer.newChild(maybeParent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ void messageListener_resumesTrace(JMSRunnable send, MessageConsumer messageConsu
assertChildOf(listenerSpan, consumerSpan);
assertThat(listenerSpan.tags())
.hasSize(1) // no redundant copy of consumer tags
.containsEntry("b3", "false"); // b3 header not leaked to listener
// This assumption does not hold.
// .containsEntry("b3", "false"); // b3 header not leaked to listener
.containsEntry("b3", "true"); // b3 header not leaked to listener
}

@Test public void messageListener_readsBaggage() throws JMSException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ void messageListener_resumesTrace(Runnable send) {

assertThat(listenerSpan.tags())
.hasSize(1) // no redundant copy of consumer tags
.containsEntry("b3", "false"); // b3 header not leaked to listener
// This expectation does not hold
// .containsEntry("b3", "false"); // b3 header not leaked to listener
.containsEntry("b3", "true"); // b3 header kept
}

@Test public void messageListener_readsBaggage() {
Expand Down
11 changes: 11 additions & 0 deletions instrumentation/jms/src/test/java/brave/jms/JmsTracingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,23 @@ abstract class Both implements XATopicConnection, TopicConnection {
assertThat(testSpanHandler.takeLocalSpan().tags()).isEmpty();
}

<<<<<<< HEAD
@Test public void nextSpan_should_clear_propagation_headers() {
Propagation.B3_STRING.injector(SETTER).inject(parent, message);
Propagation.B3_SINGLE_STRING.injector(SETTER).inject(parent, message);

jmsTracing.nextSpan(message);
assertThat(ITJms.propertiesToMap(message)).isEmpty();
=======
@Test public void nextSpan_should_not_clear_propagation_headers() throws Exception {
TraceContext context =
TraceContext.newBuilder().traceId(1L).parentId(2L).spanId(3L).debug(true).build();
Propagation.B3_STRING.injector(SETTER).inject(context, message);
Propagation.B3_SINGLE_STRING.injector(SETTER).inject(context, message);

jmsTracing.nextSpan(message);
assertThat(JmsTest.propertiesToMap(message)).isNotEmpty();
>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject
}

@Test public void nextSpan_should_retain_baggage_headers() throws JMSException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,12 @@ public class TracingMessageListenerTest extends ITJms {

onMessageConsumed(message);

<<<<<<< HEAD
// clearing headers ensures later work doesn't try to use the old parent
assertNoProperties(message);
=======
assertThat(message.getProperties()).isNotEmpty();
>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject

MutableSpan consumerSpan = testSpanHandler.takeRemoteSpan(CONSUMER);
MutableSpan listenerSpan = testSpanHandler.takeLocalSpan();
Expand All @@ -169,8 +173,12 @@ public class TracingMessageListenerTest extends ITJms {

onMessageConsumed(message);

<<<<<<< HEAD
// clearing headers ensures later work doesn't try to use the old parent
assertNoProperties(message);
=======
assertThat(message.getProperties()).isNotEmpty();
>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject

assertChildOf(testSpanHandler.takeLocalSpan(), parent);
}
Expand All @@ -196,8 +204,12 @@ public class TracingMessageListenerTest extends ITJms {

onMessageConsumed(message);

<<<<<<< HEAD
// clearing headers ensures later work doesn't try to use the old parent
assertNoProperties(message);
=======
assertThat(message.getProperties()).isNotEmpty();
>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject

MutableSpan consumerSpan = testSpanHandler.takeRemoteSpan(CONSUMER);
MutableSpan listenerSpan = testSpanHandler.takeLocalSpan();
Expand All @@ -215,8 +227,12 @@ public class TracingMessageListenerTest extends ITJms {

onMessageConsumed(message);

<<<<<<< HEAD
// clearing headers ensures later work doesn't try to use the old parent
assertNoProperties(message);
=======
assertThat(message.getProperties()).isNotEmpty();
>>>>>>> baba417ef... refactor: from extractAndClear and later inject to extract and later clearAndInject

assertChildOf(testSpanHandler.takeLocalSpan(), parent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,15 @@ public Span nextSpan(ConsumerRecord<?, ?> record) {
// events create consumer spans. Since this is a processor span, we use the normal sampler.
TraceContextOrSamplingFlags extracted =
extractAndClearTraceIdHeaders(processorExtractor, record.headers(), record.headers());
//TODO processorExtractor.extract(record.headers());
Span result = tracer.nextSpan(extracted);
if (extracted.context() == null && !result.isNoop()) {
addTags(record, result);
}
return result;
}

//TODO remove
<R> TraceContextOrSamplingFlags extractAndClearTraceIdHeaders(
Extractor<R> extractor, R request, Headers headers
) {
Expand Down Expand Up @@ -233,6 +235,13 @@ void clearTraceIdHeaders(Headers headers) {
for (Iterator<Header> i = headers.iterator(); i.hasNext(); ) {
Header next = i.next();
if (traceIdHeaders.contains(next.key())) i.remove();
//TODO void clearHeaders(TraceContextOrSamplingFlags extracted, Headers headers) {
// if (!TraceContextOrSamplingFlags.EMPTY.equals(extracted)) {
// // Headers::remove creates and consumes an iterator each time. This does one loop instead.
// for (Iterator<Header> i = headers.iterator(); i.hasNext(); ) {
// Header next = i.next();
// if (propagationKeys.contains(next.key())) i.remove();
// }
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public ConsumerRecords<K, V> poll(long timeout) {
KafkaConsumerRequest request = new KafkaConsumerRequest(record);
TraceContextOrSamplingFlags extracted =
kafkaTracing.extractAndClearTraceIdHeaders(extractor, request, record.headers());
//TODO TraceContextOrSamplingFlags extracted = extractor.extract(request);

// If we extracted neither a trace context, nor request-scoped data (extra),
// and sharing trace is enabled make or reuse a span for this topic
Expand All @@ -115,6 +116,7 @@ public ConsumerRecords<K, V> poll(long timeout) {
}
consumerSpansForTopic.put(topic, span);
}
//TODO kafkaTracing.clearHeaders(extracted, record.headers());
injector.inject(span.context(), request);
} else { // we extracted request-scoped data, so cannot share a consumer span.
Span span = kafkaTracing.nextMessagingSpan(sampler, request, extracted);
Expand All @@ -126,6 +128,7 @@ public ConsumerRecords<K, V> poll(long timeout) {
}
span.start(timestamp).finish(timestamp); // span won't be shared by other records
}
//TODO kafkaTracing.clearHeaders(extracted, record.headers());
injector.inject(span.context(), request);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, @Nullable Callba
// NOTE: Brave instrumentation used properly does not result in stale header entries, as we
// always clear message headers after reading.
Span span;
//TODO TraceContextOrSamplingFlags extracted = null;
if (maybeParent == null) {
TraceContextOrSamplingFlags extracted =
kafkaTracing.extractAndClearTraceIdHeaders(extractor, request, record.headers());
//TODO extracted = extractor.extract(request);
span = kafkaTracing.nextMessagingSpan(sampler, request, extracted);
} else { // If we have a span in scope assume headers were cleared before
span = tracer.newChild(maybeParent);
Expand All @@ -121,6 +123,7 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record, @Nullable Callba
span.start();
}

//TODO kafkaTracing.clearHeaders(extracted, record.headers());
injector.inject(span.context(), request);

Tracer.SpanInScope ws = tracer.withSpanInScope(span);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ public class KafkaTracingTest extends KafkaTest {
kafkaTracing.nextSpan(consumerRecord);
assertThat(consumerRecord.headers().headers(BAGGAGE_FIELD_KEY)).isNotEmpty();
}
// @Test public void nextSpan_should_clear_propagation_headers() {
// addB3MultiHeaders(fakeRecord);
//
// kafkaTracing.nextSpan(fakeRecord);
// assertThat(fakeRecord.headers().toArray()).isEmpty();
// }

@Test public void nextSpan_should_not_clear_other_headers() {
consumerRecord.headers().add("foo", new byte[0]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ public SimpleRabbitListenerContainerFactory decorateSimpleRabbitListenerContaine
return factory;
}

//TODO remove
<R> TraceContextOrSamplingFlags extractAndClearTraceIdHeaders(
Extractor<R> extractor, R request, Message message
) {
Expand Down Expand Up @@ -247,5 +248,10 @@ Span nextMessagingSpan(
// multi, or visa versa.
void clearTraceIdHeaders(Map<String, Object> headers) {
for (String traceIDHeader : traceIdHeaders) headers.remove(traceIDHeader);
//TODO void clearHeaders(TraceContextOrSamplingFlags extracted, Message message) {
// if (!TraceContextOrSamplingFlags.EMPTY.equals(extracted)) {
// Map<String, Object> headers = message.getMessageProperties().getHeaders();
// for (String key : propagationKeys) headers.remove(key);
// }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ final class TracingMessagePostProcessor implements MessagePostProcessor {
// NOTE: Brave instrumentation used properly does not result in stale header entries, as we
// always clear message headers after reading.
Span span;
//TODO TraceContextOrSamplingFlags extracted = null;
if (maybeParent == null) {
TraceContextOrSamplingFlags extracted =
springRabbitTracing.extractAndClearTraceIdHeaders(extractor, request, message);
//TODO extracted = extractor.extract(request);
span = springRabbitTracing.nextMessagingSpan(sampler, request, extracted);
} else { // If we have a span in scope assume headers were cleared before
span = tracer.newChild(maybeParent);
Expand All @@ -82,6 +84,7 @@ final class TracingMessagePostProcessor implements MessagePostProcessor {
span.start(timestamp).finish(timestamp);
}

//TODO springRabbitTracing.clearHeaders(extracted, message);
injector.inject(span.context(), request);
return message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ final class TracingRabbitListenerAdvice implements MethodInterceptor {

TraceContextOrSamplingFlags extracted =
springRabbitTracing.extractAndClearTraceIdHeaders(extractor, request, message);
//TODO extractor.extract(request);

// named for BlockingQueueConsumer.nextMessage, which we can't currently see
Span consumerSpan = springRabbitTracing.nextMessagingSpan(sampler, request, extracted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,7 @@ public class TracingRabbitListenerAdviceTest {
Message message = MessageBuilder.withBody(new byte[0]).andProperties(props).build();
onMessageConsumed(message);

// cleared the headers to later work doesn't try to use the old parent
assertThat(message.getMessageProperties().getHeaders()).isEmpty();
assertThat(message.getMessageProperties().getHeaders()).isNotEmpty();

assertThat(spans)
.filteredOn(span -> span.kind() == CONSUMER)
Expand All @@ -144,8 +143,7 @@ public class TracingRabbitListenerAdviceTest {
Message message = MessageBuilder.withBody(new byte[0]).andProperties(props).build();
onMessageConsumed(message);

// cleared the headers to later work doesn't try to use the old parent
assertThat(message.getMessageProperties().getHeaders()).isEmpty();
assertThat(message.getMessageProperties().getHeaders()).isNotEmpty();

assertThat(spans)
.filteredOn(span -> span.kind() == CONSUMER)
Expand Down