Skip to content

Commit

Permalink
Replace binary propagation with UTF8 encoded text propagation (#3300)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasKunz authored Sep 6, 2023
1 parent 4c248af commit 30f5f3d
Show file tree
Hide file tree
Showing 47 changed files with 1,224 additions and 954 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:
* Add support for Elasticsearch client 8.9 - {pull}3283[#3283]
* Added `baggage_to_attach` config option to allow automatic lifting of baggage into transaction, span and error attributes - {pull}3288[#3288], {pull}3289[#3289]
* Exclude elasticsearch 8.10 and newer clients from instrumentation because they natively support OpenTelemetry - {pull}3303[#3303]
* Switched to OpenTelemetry compatible context propagation for Kafka - {pull}3300[#3300]
[[release-notes-1.x]]
=== Java Agent version 1.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import co.elastic.apm.agent.context.ClosableLifecycleListenerAdapter;
import co.elastic.apm.agent.context.LifecycleListener;
import co.elastic.apm.agent.impl.baggage.Baggage;
import co.elastic.apm.agent.impl.baggage.W3CBaggagePropagation;
import co.elastic.apm.agent.impl.error.ErrorCapture;
import co.elastic.apm.agent.impl.metadata.MetaDataFuture;
import co.elastic.apm.agent.impl.sampling.ProbabilitySampler;
Expand All @@ -55,8 +56,7 @@
import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
import co.elastic.apm.agent.tracer.GlobalTracer;
import co.elastic.apm.agent.tracer.Scope;
import co.elastic.apm.agent.tracer.dispatch.BinaryHeaderGetter;
import co.elastic.apm.agent.tracer.dispatch.TextHeaderGetter;
import co.elastic.apm.agent.tracer.dispatch.HeaderGetter;
import co.elastic.apm.agent.tracer.reference.ReferenceCounted;
import co.elastic.apm.agent.tracer.reference.ReferenceCountedMap;
import co.elastic.apm.agent.util.DependencyInjectingServiceLoader;
Expand All @@ -69,7 +69,9 @@
import java.io.Closeable;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -91,6 +93,14 @@ public class ElasticApmTracer implements Tracer {
private static final WeakMap<ClassLoader, ServiceInfo> serviceInfoByClassLoader = WeakConcurrent.buildMap();

private static final Map<Class<?>, Class<? extends ConfigurationOptionProvider>> configs = new HashMap<>();
public static final Set<String> TRACE_HEADER_NAMES;

static {
Set<String> headerNames = new HashSet<>();
headerNames.addAll(TraceContext.TRACE_TEXTUAL_HEADERS);
headerNames.add(W3CBaggagePropagation.BAGGAGE_HEADER_NAME);
TRACE_HEADER_NAMES = Collections.unmodifiableSet(headerNames);
}

private static volatile boolean classloaderCheckOk = false;

Expand Down Expand Up @@ -277,49 +287,29 @@ public Transaction startRootTransaction(Sampler sampler, long epochMicros, Bagga

@Override
@Nullable
public <C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, @Nullable ClassLoader initiatingClassLoader) {
return startChildTransaction(headerCarrier, textHeadersGetter, sampler, -1, initiatingClassLoader);
}

@Override
@Nullable
public <C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, @Nullable ClassLoader initiatingClassLoader, Baggage baseBaggage, long epochMicros) {
return startChildTransaction(headerCarrier, textHeadersGetter, sampler, epochMicros, initiatingClassLoader);
public <T, C> Transaction startChildTransaction(@Nullable C headerCarrier, HeaderGetter<T, C> headersGetter, @Nullable ClassLoader initiatingClassLoader) {
return startChildTransaction(headerCarrier, headersGetter, sampler, -1, initiatingClassLoader);
}

@Override
@Nullable
public <C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, Sampler sampler,
long epochMicros, @Nullable ClassLoader initiatingClassLoader) {
return startChildTransaction(headerCarrier, textHeadersGetter, sampler, epochMicros, currentContext().getBaggage(), initiatingClassLoader);
}

private <C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, Sampler sampler,
long epochMicros, Baggage baseBaggage, @Nullable ClassLoader initiatingClassLoader) {
Transaction transaction = null;
if (isRunning()) {
transaction = createTransaction().start(TraceContext.<C>getFromTraceContextTextHeaders(), headerCarrier,
textHeadersGetter, epochMicros, sampler, baseBaggage);
afterTransactionStart(initiatingClassLoader, transaction);
}
return transaction;
public <T, C> Transaction startChildTransaction(@Nullable C headerCarrier, HeaderGetter<T, C> headersGetter, @Nullable ClassLoader initiatingClassLoader, Baggage baseBaggage, long epochMicros) {
return startChildTransaction(headerCarrier, headersGetter, sampler, epochMicros, initiatingClassLoader);
}

@Override
@Nullable
public <C> Transaction startChildTransaction(@Nullable C headerCarrier, BinaryHeaderGetter<C> binaryHeadersGetter, @Nullable ClassLoader initiatingClassLoader) {
return startChildTransaction(headerCarrier, binaryHeadersGetter, sampler, -1, initiatingClassLoader);
public <T, C> Transaction startChildTransaction(@Nullable C headerCarrier, HeaderGetter<T, C> headersGetter, Sampler sampler,
long epochMicros, @Nullable ClassLoader initiatingClassLoader) {
return startChildTransaction(headerCarrier, headersGetter, sampler, epochMicros, currentContext().getBaggage(), initiatingClassLoader);
}

@Override
@Nullable
public <C> Transaction startChildTransaction(@Nullable C headerCarrier, BinaryHeaderGetter<C> binaryHeadersGetter,
Sampler sampler, long epochMicros, @Nullable ClassLoader initiatingClassLoader) {
private <T, C> Transaction startChildTransaction(@Nullable C headerCarrier, HeaderGetter<T, C> headersGetter, Sampler sampler,
long epochMicros, Baggage baseBaggage, @Nullable ClassLoader initiatingClassLoader) {
Transaction transaction = null;
if (isRunning()) {
Baggage baseBaggage = currentContext().getBaggage();
transaction = createTransaction().start(TraceContext.<C>getFromTraceContextBinaryHeaders(), headerCarrier,
binaryHeadersGetter, epochMicros, sampler, baseBaggage);
transaction = createTransaction().start(headerCarrier,
headersGetter, epochMicros, sampler, baseBaggage);
afterTransactionStart(initiatingClassLoader, transaction);
}
return transaction;
Expand Down Expand Up @@ -969,6 +959,6 @@ public <T extends co.elastic.apm.agent.tracer.Tracer> T require(Class<T> type) {

@Override
public Set<String> getTraceHeaderNames() {
return TraceContext.TRACE_TEXTUAL_HEADERS;
return TRACE_HEADER_NAMES;
}
}
33 changes: 5 additions & 28 deletions apm-agent-core/src/main/java/co/elastic/apm/agent/impl/Tracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import co.elastic.apm.agent.impl.transaction.ElasticContext;
import co.elastic.apm.agent.impl.transaction.Span;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.tracer.dispatch.BinaryHeaderGetter;
import co.elastic.apm.agent.tracer.dispatch.HeaderGetter;
import co.elastic.apm.agent.tracer.dispatch.TextHeaderGetter;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -59,18 +57,18 @@ public interface Tracer extends co.elastic.apm.agent.tracer.Tracer {

@Override
@Nullable
<C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, @Nullable ClassLoader initiatingClassLoader);
<T, C> Transaction startChildTransaction(@Nullable C headerCarrier, HeaderGetter<T, C> headersGetter, @Nullable ClassLoader initiatingClassLoader);

@Nullable
<C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, @Nullable ClassLoader initiatingClassLoader, Baggage baseBaggage, long epochMicros);
<T, C> Transaction startChildTransaction(@Nullable C headerCarrier, HeaderGetter<T, C> headersGetter, @Nullable ClassLoader initiatingClassLoader, Baggage baseBaggage, long epochMicros);

/**
* Starts a transaction as a child of the context headers obtained through the provided {@link HeaderGetter}.
* If the created transaction cannot be started as a child transaction (for example - if no parent context header is
* available), then it will be started as the root transaction of the trace.
*
* @param headerCarrier the Object from which context headers can be obtained, typically a request or a message
* @param textHeadersGetter provides the trace context headers required in order to create a child transaction
* @param headersGetter provides the trace context headers required in order to create a child transaction
* @param sampler the {@link Sampler} instance which is responsible for determining the sampling decision if this is a root transaction
* @param epochMicros the start timestamp
* @param initiatingClassLoader the class loader corresponding to the service which initiated the creation of the transaction.
Expand All @@ -79,30 +77,9 @@ public interface Tracer extends co.elastic.apm.agent.tracer.Tracer {
* @return a transaction which is a child of the provided parent if the agent is currently RUNNING; null otherwise
*/
@Nullable
<C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, Sampler sampler,
long epochMicros, @Nullable ClassLoader initiatingClassLoader);
<T, C> Transaction startChildTransaction(@Nullable C headerCarrier, HeaderGetter<T, C> headersGetter, Sampler sampler,
long epochMicros, @Nullable ClassLoader initiatingClassLoader);

@Override
@Nullable
<C> Transaction startChildTransaction(@Nullable C headerCarrier, BinaryHeaderGetter<C> binaryHeadersGetter, @Nullable ClassLoader initiatingClassLoader);

/**
* Starts a transaction as a child of the context headers obtained through the provided {@link HeaderGetter}.
* If the created transaction cannot be started as a child transaction (for example - if no parent context header is
* available), then it will be started as the root transaction of the trace.
*
* @param headerCarrier the Object from which context headers can be obtained, typically a request or a message
* @param binaryHeadersGetter provides the trace context headers required in order to create a child transaction
* @param sampler the {@link Sampler} instance which is responsible for determining the sampling decision if this is a root transaction
* @param epochMicros the start timestamp
* @param initiatingClassLoader the class loader corresponding to the service which initiated the creation of the transaction.
* Used to determine the service name and to load application-scoped classes like the {@link org.slf4j.MDC},
* for log correlation.
* @return a transaction which is a child of the provided parent if the agent is currently RUNNING; null otherwise
*/
@Nullable
<C> Transaction startChildTransaction(@Nullable C headerCarrier, BinaryHeaderGetter<C> binaryHeadersGetter,
Sampler sampler, long epochMicros, @Nullable ClassLoader initiatingClassLoader);

@Override
ElasticContext<?> currentContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import co.elastic.apm.agent.impl.transaction.AbstractSpan;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -61,6 +62,11 @@ public class Baggage implements co.elastic.apm.agent.tracer.Baggage {
*/
private volatile String cachedSerializedW3CHeader = null;

/**
* Cached UTF8-representation of {@link #cachedSerializedW3CHeader}.
*/
byte[] cachedSerializedW3CHeaderUtf8 = null;

private Baggage(Map<String, String> baggage, Map<String, String> baggageMetadata) {
this.baggage = baggage;
this.baggageMetadata = baggageMetadata;
Expand Down Expand Up @@ -102,6 +108,20 @@ String getCachedSerializedW3CHeader() {
return this.cachedSerializedW3CHeader;
}


byte[] getCachedSerializedW3CHeaderUtf8() {
if (cachedSerializedW3CHeader == null) {
throw new IllegalStateException("cached string header must be set first");
}
if (cachedSerializedW3CHeader.isEmpty()) {
return null;
}
if (cachedSerializedW3CHeaderUtf8 == null) {
cachedSerializedW3CHeaderUtf8 = cachedSerializedW3CHeader.getBytes(StandardCharsets.UTF_8);
}
return this.cachedSerializedW3CHeaderUtf8;
}

public void storeBaggageInAttributes(AbstractSpan<?> span, List<WildcardMatcher> keyFilter) {
if (baggage.isEmpty() || keyFilter.isEmpty()) {
// early out to prevent unnecessarily allocating an iterator
Expand Down Expand Up @@ -143,6 +163,24 @@ private String getKeyWithAttributePrefix(String key) {
return keyWithPrefix;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

Baggage baggage1 = (Baggage) o;

if (!baggage.equals(baggage1.baggage)) return false;
return baggageMetadata.equals(baggage1.baggageMetadata);
}

@Override
public int hashCode() {
int result = baggage.hashCode();
result = 31 * result + baggageMetadata.hashCode();
return result;
}

public static class Builder {

public Builder(Baggage parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import co.elastic.apm.agent.impl.baggage.otel.Parser;
import co.elastic.apm.agent.impl.baggage.otel.PercentEscaper;
import co.elastic.apm.agent.tracer.dispatch.HeaderGetter;
import co.elastic.apm.agent.tracer.dispatch.HeaderSetter;
import co.elastic.apm.agent.tracer.dispatch.TextHeaderGetter;
import co.elastic.apm.agent.tracer.dispatch.TextHeaderSetter;
import co.elastic.apm.agent.tracer.dispatch.UTF8ByteHeaderGetter;
import co.elastic.apm.agent.tracer.dispatch.UTF8ByteHeaderSetter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;

public class W3CBaggagePropagation {

Expand All @@ -36,7 +40,7 @@ public class W3CBaggagePropagation {

public static final String BAGGAGE_HEADER_NAME = "baggage";

private static final HeaderGetter.HeaderConsumer<String, Baggage.Builder> PARSING_CONSUMER = new HeaderGetter.HeaderConsumer<String, Baggage.Builder>() {
private static final HeaderGetter.HeaderConsumer<String, Baggage.Builder> STRING_PARSING_CONSUMER = new HeaderGetter.HeaderConsumer<String, Baggage.Builder>() {
@Override
public void accept(@Nullable String headerValue, Baggage.Builder state) {
if (headerValue != null) {
Expand All @@ -49,22 +53,71 @@ public void accept(@Nullable String headerValue, Baggage.Builder state) {
}
};

public static <C> void parse(C carrier, TextHeaderGetter<C> headerGetter, Baggage.Builder into) {
headerGetter.forEach(BAGGAGE_HEADER_NAME, carrier, into, PARSING_CONSUMER);

private static final HeaderGetter.HeaderConsumer<byte[], Baggage.Builder> UTF8_BYTES_PARSING_CONSUMER = new HeaderGetter.HeaderConsumer<byte[], Baggage.Builder>() {
@Override
public void accept(@Nullable byte[] headerValue, Baggage.Builder state) {
if (headerValue != null) {
try {
STRING_PARSING_CONSUMER.accept(new String(headerValue, StandardCharsets.UTF_8), state);
} catch (Exception e) {
logger.error("Failed to decode baggage header bytes as UTF8", e);
}
}
}
};

@SuppressWarnings("unchecked")
public static <T, C> void parse(C carrier, HeaderGetter<T, C> headerGetter, Baggage.Builder into) {
HeaderGetter.HeaderConsumer<T, Baggage.Builder> consumer;
if (headerGetter instanceof TextHeaderGetter) {
consumer = (HeaderGetter.HeaderConsumer<T, Baggage.Builder>) STRING_PARSING_CONSUMER;
} else if (headerGetter instanceof UTF8ByteHeaderGetter) {
consumer = (HeaderGetter.HeaderConsumer<T, Baggage.Builder>) UTF8_BYTES_PARSING_CONSUMER;
} else {
throw new IllegalArgumentException("HeaderGetter must be either a TextHeaderGetter or UTF8ByteHeaderGetter: " + headerGetter.getClass().getName());
}
headerGetter.forEach(BAGGAGE_HEADER_NAME, carrier, into, consumer);
}

public static <C> void propagate(Baggage baggage, C carrier, TextHeaderSetter<C> setter) {
public static <T, C> void propagate(Baggage baggage, C carrier, HeaderSetter<T, C> setter) {
if (baggage.isEmpty()) {
return;
}
T header = getTextHeader(baggage, setter);
if (header != null) {
setter.setHeader(BAGGAGE_HEADER_NAME, header, carrier);
}
}

@SuppressWarnings("unchecked")
private static <T> T getTextHeader(Baggage baggage, HeaderSetter<T, ?> setter) {
if (setter instanceof TextHeaderSetter) {
return (T) getTextHeaderString(baggage);
} else if (setter instanceof UTF8ByteHeaderSetter) {
return (T) getTextHeaderUtf8Bytes(baggage);
} else {
throw new IllegalArgumentException("HeaderSetter must be either a TextHeaderSetter or UTF8ByteHeaderSetter: " + setter.getClass().getName());
}
}

@Nullable
private static byte[] getTextHeaderUtf8Bytes(Baggage baggage) {
getTextHeaderString(baggage); //ensures that the string-header is computed and cached
return baggage.getCachedSerializedW3CHeaderUtf8();
}

@Nullable
private static String getTextHeaderString(Baggage baggage) {
String header = baggage.getCachedSerializedW3CHeader();
if (header == null) {
header = encodeToHeader(baggage);
baggage.setCachedSerializedW3CHeader(header);
}
if (!header.isEmpty()) {
setter.setHeader(BAGGAGE_HEADER_NAME, encodeToHeader(baggage), carrier);
if (header.isEmpty()) {
return null;
}
return header;
}


Expand Down
Loading

0 comments on commit 30f5f3d

Please sign in to comment.