Skip to content

Commit

Permalink
Implemented in-process baggage propagation (#3248)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasKunz authored Jul 26, 2023
1 parent d75d9e7 commit 9bdb3aa
Show file tree
Hide file tree
Showing 18 changed files with 310 additions and 210 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:
[float]
===== Features
* Added W3C baggage propagation - {pull}3236[#3236]
* Added W3C baggage propagation - {pull}3236[#3236], {pull}3248[#3248]
* Added support for baggage in OpenTelemetry bridge - {pull}3249[#3249]
[[release-notes-1.x]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
package co.elastic.apm.agent.concurrent;

import co.elastic.apm.agent.common.ThreadUtils;
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.sdk.DynamicTransformer;
import co.elastic.apm.agent.sdk.ElasticApmInstrumentation;
import co.elastic.apm.agent.sdk.state.GlobalState;
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.tracer.ElasticContext;
import co.elastic.apm.agent.tracer.GlobalTracer;
import co.elastic.apm.agent.tracer.Tracer;
import co.elastic.apm.agent.tracer.reference.ReferenceCountedMap;
Expand All @@ -42,7 +43,7 @@
@GlobalState
public class JavaConcurrent {

private static final ReferenceCountedMap<Object, AbstractSpan<?>> contextMap = GlobalTracer.get().newReferenceCountedMap();
private static final ReferenceCountedMap<Object, ElasticContext<?>> contextMap = GlobalTracer.get().newReferenceCountedMap();

private static final List<Class<? extends ElasticApmInstrumentation>> RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION = Collections.
<Class<? extends ElasticApmInstrumentation>>singletonList(RunnableCallableForkJoinTaskInstrumentation.class);
Expand Down Expand Up @@ -75,24 +76,25 @@ private static boolean shouldAvoidContextPropagation(@Nullable Object executable
* Retrieves the context mapped to the provided task and activates it on the current thread.
* It is the responsibility of the caller to deactivate the returned context at the right time.
* If the mapped context is already the active span of this thread, this method returns {@code null}.
* @param o a task for which running there may be a context to activate
*
* @param o a task for which running there may be a context to activate
* @param tracer the tracer
* @return the context mapped to the provided task or {@code null} if such does not exist or if the mapped context
* is already the active one on the current thread.
*/
@Nullable
public static AbstractSpan<?> restoreContext(Object o, Tracer tracer) {
public static ElasticContext<?> restoreContext(Object o, Tracer tracer) {
// When an Executor executes directly on the current thread we need to enable this thread for context propagation again
needsContext.set(Boolean.TRUE);

// we cannot remove yet, as this decrements the reference count, which may cause already ended spans to be recycled ahead of time
AbstractSpan<?> context = contextMap.get(o);
ElasticContext<?> context = contextMap.get(o);
if (context == null) {
return null;
}

try {
if (tracer.getActive() != context) {
if (tracer.currentContext() != context) {
return context.activate();
} else {
return null;
Expand All @@ -111,8 +113,8 @@ public static Runnable withContext(@Nullable Runnable runnable, Tracer tracer) {
return runnable;
}
needsContext.set(Boolean.FALSE);
AbstractSpan<?> active = tracer.getActive();
if (active == null) {
ElasticContext<?> active = tracer.currentContext();
if (active.isEmpty()) {
return runnable;
}
if (isLambda(runnable)) {
Expand All @@ -122,11 +124,13 @@ public static Runnable withContext(@Nullable Runnable runnable, Tracer tracer) {
return runnable;
}

private static void captureContext(Object task, AbstractSpan<?> active) {
private static void captureContext(Object task, ElasticContext<?> active) {
DynamicTransformer.ensureInstrumented(task.getClass(), RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION);
contextMap.put(task, active);
// Do no discard branches leading to async operations so not to break span references
active.setNonDiscardable();
if (active.getSpan() != null) {
active.getSpan().setNonDiscardable();
}
}

/**
Expand All @@ -138,8 +142,8 @@ public static <T> Callable<T> withContext(@Nullable Callable<T> callable, Tracer
return callable;
}
needsContext.set(Boolean.FALSE);
AbstractSpan<?> active = tracer.getActive();
if (active == null) {
ElasticContext<?> active = tracer.currentContext();
if (active.isEmpty()) {
return callable;
}
if (isLambda(callable)) {
Expand All @@ -155,8 +159,8 @@ public static <T> ForkJoinTask<T> withContext(@Nullable ForkJoinTask<T> task, Tr
return task;
}
needsContext.set(Boolean.FALSE);
AbstractSpan<?> active = tracer.getActive();
if (active == null) {
ElasticContext<?> active = tracer.currentContext();
if (active.isEmpty()) {
return task;
}
captureContext(task, active);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.tracer.GlobalTracer;
import co.elastic.apm.agent.tracer.Tracer;
import co.elastic.apm.agent.tracer.ElasticContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -80,8 +81,8 @@ public static Object onEnter(@Advice.This Object thiz) {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false)
public static void onExit(@Advice.Thrown Throwable thrown,
@Nullable @Advice.Enter Object context) {
if (context instanceof AbstractSpan) {
((AbstractSpan<?>) context).deactivate();
if (context != null) {
((ElasticContext<?>) context).deactivate();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,30 @@
package co.elastic.apm.agent.concurrent;

import co.elastic.apm.agent.AbstractInstrumentationTest;
import co.elastic.apm.agent.impl.baggage.BaggageContext;
import co.elastic.apm.agent.impl.transaction.ElasticContext;
import co.elastic.apm.agent.impl.transaction.Span;
import co.elastic.apm.agent.impl.transaction.Transaction;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static co.elastic.apm.agent.testutils.assertions.Assertions.assertThat;

@RunWith(Parameterized.class)
public class ExecutorInstrumentationTest extends AbstractInstrumentationTest {

private final Executor executor;
private Transaction transaction;

public ExecutorInstrumentationTest(Supplier<ExecutorService> supplier) {
executor = supplier.get();
Expand All @@ -51,10 +53,6 @@ public static Iterable<Supplier<Executor>> data() {
return Arrays.asList(SimpleAsyncTaskExecutor::new, SyncTaskExecutor::new);
}

@Before
public void setUp() {
transaction = tracer.startRootTransaction(null).withName("Transaction").activate();
}

@After
public void tearDown() {
Expand All @@ -63,14 +61,70 @@ public void tearDown() {

@Test
public void testExecutorExecute_Transaction() {
executor.execute(this::createAsyncSpan);
assertOnlySpanIsChildOfOnlyTransaction();
Transaction transaction = tracer.startRootTransaction(null).withName("Transaction").activate();
executor.execute(() -> createAsyncSpan(transaction));
try {
// wait for the async operation to end
assertThat(reporter.getFirstSpan(1000)).isNotNull();
} finally {
transaction.deactivate().end();
}
assertThat(reporter.getTransactions()).hasSize(1);
assertThat(reporter.getSpans()).hasSize(1);
assertThat(reporter.getFirstSpan().isChildOf(reporter.getFirstTransaction())).isTrue();
}

@Test
public void testBaggagePropagationWithTransaction() throws InterruptedException {
Transaction transaction = tracer.startRootTransaction(null).withName("Transaction").activate();
BaggageContext transactionWithBaggage = tracer.currentContext().withUpdatedBaggage()
.put("foo", "bar")
.buildContext()
.activate();

AtomicReference<ElasticContext<?>> propagatedContext = new AtomicReference<>();
CountDownLatch doneLatch = new CountDownLatch(1);
executor.execute(() -> {
propagatedContext.set(tracer.currentContext());
doneLatch.countDown();
});
transactionWithBaggage.deactivate();
transaction.deactivate().end();
doneLatch.await();

assertThat(propagatedContext.get().getBaggage())
.hasSize(1)
.containsEntry("foo", "bar");
assertThat(propagatedContext.get().getTransaction()).isSameAs(transaction);
}

@Test
public void testBaggagePropagationWithoutTransaction() throws InterruptedException {
BaggageContext transactionWithBaggage = tracer.currentContext().withUpdatedBaggage()
.put("foo", "bar")
.buildContext()
.activate();

AtomicReference<ElasticContext<?>> propagatedContext = new AtomicReference<>();
CountDownLatch doneLatch = new CountDownLatch(1);
executor.execute(() -> {
propagatedContext.set(tracer.currentContext());
doneLatch.countDown();
});
transactionWithBaggage.deactivate();
doneLatch.await();

assertThat(propagatedContext.get().getBaggage())
.hasSize(1)
.containsEntry("foo", "bar");
assertThat(propagatedContext.get().getTransaction()).isNull();
}

@Test
public void testExecutorExecute_Span() {
Transaction transaction = tracer.startRootTransaction(null).withName("Transaction").activate();
Span nonAsyncSpan = transaction.createSpan().withName("NonAsync").activate();
executor.execute(this::createAsyncSpan);
executor.execute(() -> createAsyncSpan(transaction));
try {
// wait for the async operation to end
assertThat(reporter.getFirstSpan(1000)).isNotNull();
Expand All @@ -84,20 +138,9 @@ public void testExecutorExecute_Span() {
assertThat(reporter.getFirstSpan().isChildOf(nonAsyncSpan)).isTrue();
}

private void assertOnlySpanIsChildOfOnlyTransaction() {
try {
// wait for the async operation to end
assertThat(reporter.getFirstSpan(1000)).isNotNull();
} finally {
transaction.deactivate().end();
}
assertThat(reporter.getTransactions()).hasSize(1);
assertThat(reporter.getSpans()).hasSize(1);
assertThat(reporter.getFirstSpan().isChildOf(reporter.getFirstTransaction())).isTrue();
}

private void createAsyncSpan() {
assertThat(tracer.currentTransaction()).isEqualTo(transaction);
private void createAsyncSpan(Transaction expectedCurrent) {
assertThat(tracer.currentTransaction()).isEqualTo(expectedCurrent);
tracer.getActive().createSpan().withName("Async").end();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
package co.elastic.apm.agent.concurrent;

import co.elastic.apm.agent.AbstractInstrumentationTest;
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.tracer.ElasticContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
Expand All @@ -39,34 +38,39 @@ public class ForkJoinPoolTest extends AbstractInstrumentationTest {

private ForkJoinPool pool;
private Transaction transaction;
private ElasticContext<?> txWithBaggage;

@BeforeEach
void setUp() {
pool = new ForkJoinPool();
transaction = tracer.startRootTransaction(null).withName("transaction").activate();
txWithBaggage = tracer.currentContext().withUpdatedBaggage().put("foo", "bar").buildContext().activate();
}

@AfterEach
void tearDown() {
assertThat(tracer.getActive()).isEqualTo(transaction);
assertThat(tracer.currentContext()).isSameAs(txWithBaggage);
txWithBaggage.deactivate();
transaction.deactivate().end();

}

@Test
void testExecute() throws Exception {
final ForkJoinTask<? extends AbstractSpan<?>> task = newTask(() -> tracer.getActive());
final ForkJoinTask<ElasticContext<?>> task = newTask(() -> tracer.currentContext());
pool.execute(task);
assertThat(task.get()).isEqualTo(transaction);
assertThat(task.get()).isSameAs(txWithBaggage);
}

@Test
void testSubmit() throws Exception {
assertThat(pool.submit(newTask(() -> tracer.getActive())).get()).isEqualTo(transaction);
assertThat(pool.submit(newTask(() -> tracer.currentContext())).get()).isSameAs(txWithBaggage);
}

@Test
void testInvoke() throws Exception {
assertThat(pool.invoke(newTask(() -> tracer.getActive()))).isEqualTo(transaction);
assertThat(pool.invoke(newTask(() -> tracer.currentContext()))).isSameAs(txWithBaggage);
}

@Test
Expand All @@ -76,19 +80,19 @@ void testCompletableFuture() throws Exception {
// Preferences | Build, Execution, Deployment | Debugger | Async Stack Traces
// and uncheck the Instrumenting Agent checkbox
assertThat(CompletableFuture
.supplyAsync(() -> Objects.requireNonNull(tracer.getActive()))
.thenApplyAsync(active -> tracer.getActive())
.supplyAsync(() -> tracer.currentContext())
.thenApplyAsync(active -> tracer.currentContext())
.get())
.isEqualTo(transaction);
.isSameAs(txWithBaggage);
}

@Test
void testParallelStream() {
assertThat(Stream.of("foo", "bar", "baz")
.parallel()
.<AbstractSpan<?>>map(s -> tracer.getActive())
.<ElasticContext<?>>map(s -> tracer.currentContext())
.distinct())
.containsExactly(transaction);
.containsExactly(txWithBaggage);
}

public static class AdaptedSupplier<V> extends ForkJoinTask<V> implements Runnable {
Expand Down
Loading

0 comments on commit 9bdb3aa

Please sign in to comment.