From 0ea05a8420e9bd5d92541c3430b507f08b9a6cca Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 5 Jun 2024 10:32:19 +0300 Subject: [PATCH] Stop kotlin coroutine dispatcher from propagating context (#11500) --- .../javaagent/build.gradle.kts | 1 + ...linCoroutineDispatcherInstrumentation.java | 48 +++++++++++++++++++ ...KotlinCoroutinesInstrumentationModule.java | 5 +- .../kotlinxcoroutines/RunnableWrapper.java | 22 +++++++++ .../KotlinCoroutinesInstrumentationTest.kt | 42 ++++++++++++++-- 5 files changed, 113 insertions(+), 5 deletions(-) create mode 100644 instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutineDispatcherInstrumentation.java create mode 100644 instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/RunnableWrapper.java diff --git a/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/build.gradle.kts b/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/build.gradle.kts index 5035ae3f84e0..3e1be05c41ff 100644 --- a/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/build.gradle.kts +++ b/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/build.gradle.kts @@ -42,6 +42,7 @@ dependencies { testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.0.0") testLibrary("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.0.0") + testLibrary("io.vertx:vertx-lang-kotlin-coroutines:3.6.0") } kotlin { diff --git a/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutineDispatcherInstrumentation.java b/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutineDispatcherInstrumentation.java new file mode 100644 index 000000000000..d3bd853428a0 --- /dev/null +++ b/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutineDispatcherInstrumentation.java @@ -0,0 +1,48 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines; + +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.extendsClass; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +public class KotlinCoroutineDispatcherInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("kotlinx.coroutines.CoroutineDispatcher"); + } + + @Override + public ElementMatcher typeMatcher() { + return extendsClass(named("kotlinx.coroutines.CoroutineDispatcher")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("dispatch").and(takesArgument(1, Runnable.class)), + this.getClass().getName() + "$StopContextPropagationAdvice"); + } + + @SuppressWarnings("unused") + public static class StopContextPropagationAdvice { + + @Advice.OnMethodEnter + public static void enter(@Advice.Argument(value = 1, readOnly = false) Runnable runnable) { + if (runnable != null) { + runnable = RunnableWrapper.stopPropagation(runnable); + } + } + } +} diff --git a/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationModule.java b/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationModule.java index 5b79a11d11b9..c1a6dbc18677 100644 --- a/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationModule.java +++ b/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationModule.java @@ -5,7 +5,7 @@ package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines; -import static java.util.Collections.singletonList; +import static java.util.Arrays.asList; import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; @@ -35,6 +35,7 @@ public boolean isIndyModule() { @Override public List typeInstrumentations() { - return singletonList(new KotlinCoroutinesInstrumentation()); + return asList( + new KotlinCoroutinesInstrumentation(), new KotlinCoroutineDispatcherInstrumentation()); } } diff --git a/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/RunnableWrapper.java b/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/RunnableWrapper.java new file mode 100644 index 000000000000..3bdc9b708c54 --- /dev/null +++ b/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/RunnableWrapper.java @@ -0,0 +1,22 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.kotlinxcoroutines; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; + +public final class RunnableWrapper { + + public static Runnable stopPropagation(Runnable runnable) { + return () -> { + try (Scope ignored = Context.root().makeCurrent()) { + runnable.run(); + } + }; + } + + private RunnableWrapper() {} +} diff --git a/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt b/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt index 2bdcd1106c60..9feeb198cefa 100644 --- a/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt +++ b/instrumentation/kotlinx-coroutines/kotlinx-coroutines-1.0/javaagent/src/test/kotlin/io/opentelemetry/javaagent/instrumentation/kotlinxcoroutines/KotlinCoroutinesInstrumentationTest.kt @@ -19,6 +19,8 @@ import io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRo import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo import io.opentelemetry.sdk.testing.assertj.TraceAssert import io.opentelemetry.semconv.incubating.CodeIncubatingAttributes +import io.vertx.core.Vertx +import io.vertx.kotlin.coroutines.dispatcher import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope @@ -41,6 +43,7 @@ import kotlinx.coroutines.withTimeout import kotlinx.coroutines.yield import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.Assumptions import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.extension.ExtensionContext @@ -63,17 +66,20 @@ class KotlinCoroutinesInstrumentationTest { companion object { val threadPool = Executors.newFixedThreadPool(2) val singleThread = Executors.newSingleThreadExecutor() + val vertx = Vertx.vertx() + + @JvmStatic + @RegisterExtension + val testing = AgentInstrumentationExtension.create() } @AfterAll fun shutdown() { threadPool.shutdown() singleThread.shutdown() + vertx.close() } - @RegisterExtension - val testing = AgentInstrumentationExtension.create() - val tracer = testing.openTelemetry.getTracer("test") @ParameterizedTest @@ -517,6 +523,7 @@ class KotlinCoroutinesInstrumentationTest { arguments(DispatcherWrapper(Dispatchers.Unconfined)), arguments(DispatcherWrapper(threadPool.asCoroutineDispatcher())), arguments(DispatcherWrapper(singleThread.asCoroutineDispatcher())), + arguments(DispatcherWrapper(vertx.dispatcher())) ) } @@ -559,4 +566,33 @@ class KotlinCoroutinesInstrumentationTest { return otelContext.makeCurrent() } } + + // regression test for + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/11411 + @ParameterizedTest + @ArgumentsSource(DispatchersSource::class) + fun `dispatch does not propagate context`(dispatcher: DispatcherWrapper) { + Assumptions.assumeTrue(dispatcher.dispatcher != Dispatchers.Unconfined) + + runTest(dispatcher) { + dispatcher.dispatcher.dispatch(coroutineContext) { + tracer.spanBuilder("dispatched").startSpan().end() + } + } + + testing.waitAndAssertTraces( + { trace -> + trace.hasSpansSatisfyingExactly({ + it.hasName("parent") + .hasNoParent() + }) + }, + { trace -> + trace.hasSpansSatisfyingExactly({ + it.hasName("dispatched") + .hasNoParent() + }) + } + ) + } }