From dd0888d0637ed79ad8a0c3010382f10ef45dde66 Mon Sep 17 00:00:00 2001 From: xxia Date: Wed, 13 Oct 2021 14:00:17 -0700 Subject: [PATCH 1/5] create token and link --- .../reactor/core/scheduler/SchedulerTask.java | 44 +++++++++++++++++++ .../SchedulerTask_Instrumentation.java | 22 ---------- 2 files changed, 44 insertions(+), 22 deletions(-) create mode 100644 instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java delete mode 100644 instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java new file mode 100644 index 0000000000..a4c039e6ac --- /dev/null +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java @@ -0,0 +1,44 @@ +/* + * + * * Copyright 2020 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package reactor.core.scheduler; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.agent.bridge.Transaction; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.NewField; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import reactor.core.Disposable; +import reactor.util.annotation.Nullable; + +@Weave(originalName = "reactor.core.scheduler.SchedulerTask") +final class SchedulerTask { + + @NewField + private Token token; + + SchedulerTask(Runnable task, @Nullable Disposable parent) { + Transaction tx = AgentBridge.getAgent().getTransaction(false); + if (tx != null && tx.isStarted()) { + if (token == null) { + token = tx.getToken(); + } + } + } + + // We need to be able to link the Token here when executing on a supplied Scheduler via Mono::publishOn + @Trace(async = true, excludeFromTransactionTrace = true) + public Void call() { + if(token != null) { + token.linkAndExpire(); + token = null; + } + return Weaver.callOriginal(); + } +} diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java deleted file mode 100644 index 86d01bced8..0000000000 --- a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * - * * Copyright 2020 New Relic Corporation. All rights reserved. - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package reactor.core.scheduler; - -import com.newrelic.api.agent.Trace; -import com.newrelic.api.agent.weaver.Weave; -import com.newrelic.api.agent.weaver.Weaver; - -@Weave(originalName = "reactor.core.scheduler.SchedulerTask") -final class SchedulerTask_Instrumentation { - - // We need to be able to link the Token here when executing on a supplied Scheduler via Mono::publishOn - @Trace(async = true, excludeFromTransactionTrace = true) - public Void call() { - return Weaver.callOriginal(); - } -} From 6b4ba38adeadb5b29469cc1fb50293a9556f4076 Mon Sep 17 00:00:00 2001 From: xxia Date: Wed, 13 Oct 2021 19:02:52 -0700 Subject: [PATCH 2/5] expire token when task disposed --- .../main/java/reactor/core/scheduler/SchedulerTask.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java index a4c039e6ac..0d2197b698 100644 --- a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java @@ -41,4 +41,13 @@ public Void call() { } return Weaver.callOriginal(); } + + public void dispose() { + if(token != null) { + token.expire(); + token = null; + } + Weaver.callOriginal(); + } + } From d2cbeec6af60c3dff0cc760dd42da5de093f40ac Mon Sep 17 00:00:00 2001 From: jasonjkeller Date: Thu, 4 Nov 2021 11:30:03 -0700 Subject: [PATCH 3/5] Add tokenLift hook to netty --- .../SchedulerTask_Instrumentation.java | 11 ++++ .../reactor/core/scheduler/SchedulerTask.java | 53 ------------------- .../SchedulerTask_Instrumentation.java | 33 ++++++++++++ 3 files changed, 44 insertions(+), 53 deletions(-) delete mode 100644 instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java create mode 100644 instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java diff --git a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java index 86d01bced8..d4368d3bb1 100644 --- a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java +++ b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java @@ -10,6 +10,11 @@ import com.newrelic.api.agent.Trace; import com.newrelic.api.agent.weaver.Weave; import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Hooks_Instrumentation; + +import static com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.tokenLift; @Weave(originalName = "reactor.core.scheduler.SchedulerTask") final class SchedulerTask_Instrumentation { @@ -17,6 +22,12 @@ final class SchedulerTask_Instrumentation { // We need to be able to link the Token here when executing on a supplied Scheduler via Mono::publishOn @Trace(async = true, excludeFromTransactionTrace = true) public Void call() { + // Add tokenLift hook if it hasn't already been added. This allows for tokens to be retrieved from + // the current context and linked across threads at various points of the Flux/Mono lifecycle. + if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { + Hooks.onEachOperator(TokenLinkingSubscriber.class.getName(), tokenLift()); + } + return Weaver.callOriginal(); } } diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java deleted file mode 100644 index 0d2197b698..0000000000 --- a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * - * * Copyright 2020 New Relic Corporation. All rights reserved. - * * SPDX-License-Identifier: Apache-2.0 - * - */ - -package reactor.core.scheduler; - -import com.newrelic.agent.bridge.AgentBridge; -import com.newrelic.agent.bridge.Transaction; -import com.newrelic.api.agent.Token; -import com.newrelic.api.agent.Trace; -import com.newrelic.api.agent.weaver.NewField; -import com.newrelic.api.agent.weaver.Weave; -import com.newrelic.api.agent.weaver.Weaver; -import reactor.core.Disposable; -import reactor.util.annotation.Nullable; - -@Weave(originalName = "reactor.core.scheduler.SchedulerTask") -final class SchedulerTask { - - @NewField - private Token token; - - SchedulerTask(Runnable task, @Nullable Disposable parent) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted()) { - if (token == null) { - token = tx.getToken(); - } - } - } - - // We need to be able to link the Token here when executing on a supplied Scheduler via Mono::publishOn - @Trace(async = true, excludeFromTransactionTrace = true) - public Void call() { - if(token != null) { - token.linkAndExpire(); - token = null; - } - return Weaver.callOriginal(); - } - - public void dispose() { - if(token != null) { - token.expire(); - token = null; - } - Weaver.callOriginal(); - } - -} diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java new file mode 100644 index 0000000000..d4368d3bb1 --- /dev/null +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java @@ -0,0 +1,33 @@ +/* + * + * * Copyright 2020 New Relic Corporation. All rights reserved. + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package reactor.core.scheduler; + +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Hooks_Instrumentation; + +import static com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.tokenLift; + +@Weave(originalName = "reactor.core.scheduler.SchedulerTask") +final class SchedulerTask_Instrumentation { + + // We need to be able to link the Token here when executing on a supplied Scheduler via Mono::publishOn + @Trace(async = true, excludeFromTransactionTrace = true) + public Void call() { + // Add tokenLift hook if it hasn't already been added. This allows for tokens to be retrieved from + // the current context and linked across threads at various points of the Flux/Mono lifecycle. + if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { + Hooks.onEachOperator(TokenLinkingSubscriber.class.getName(), tokenLift()); + } + + return Weaver.callOriginal(); + } +} From dea937bfae8128e673e6a1cd76cd93c907b7ae16 Mon Sep 17 00:00:00 2001 From: jasonjkeller Date: Fri, 5 Nov 2021 17:39:01 -0700 Subject: [PATCH 4/5] Update netty-reactor modules to fix token linking across contexts --- instrumentation/netty-reactor-0.8.0/README.md | 36 +++++++++++++++ .../reactor/netty/TokenLinkingSubscriber.java | 10 ++++- .../core/publisher/Hooks_Instrumentation.java | 6 +++ ...antPeriodicWorkerTask_Instrumentation.java | 2 + .../PeriodicWorkerTask_Instrumentation.java | 2 + .../SchedulerTask_Instrumentation.java | 12 +---- .../scheduler/Schedulers_Instrumentation.java | 45 +++++++++++++++++++ .../scheduler/WorkerTask_Instrumentation.java | 2 + .../HttpTrafficHandler_Instrumentation.java | 8 ++++ instrumentation/netty-reactor-0.9.0/README.md | 36 +++++++++++++++ .../reactor/netty/TokenLinkingSubscriber.java | 10 ++++- .../core/publisher/Hooks_Instrumentation.java | 6 +++ ...antPeriodicWorkerTask_Instrumentation.java | 2 + .../PeriodicWorkerTask_Instrumentation.java | 2 + .../SchedulerTask_Instrumentation.java | 12 +---- .../scheduler/Schedulers_Instrumentation.java | 44 ++++++++++++++++++ .../scheduler/WorkerTask_Instrumentation.java | 2 + .../HttpTrafficHandler_Instrumentation.java | 8 ++++ 18 files changed, 219 insertions(+), 26 deletions(-) create mode 100644 instrumentation/netty-reactor-0.8.0/README.md create mode 100644 instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/Schedulers_Instrumentation.java create mode 100644 instrumentation/netty-reactor-0.9.0/README.md create mode 100644 instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/Schedulers_Instrumentation.java diff --git a/instrumentation/netty-reactor-0.8.0/README.md b/instrumentation/netty-reactor-0.8.0/README.md new file mode 100644 index 0000000000..6e8c9267be --- /dev/null +++ b/instrumentation/netty-reactor-0.8.0/README.md @@ -0,0 +1,36 @@ +# Reactor Netty Instrumentation + +Instrumentation for Reactor Netty server and also some widely used Reactor Core library code. + +This module is largely responsible for instrumenting the Reactor Core library to facilitate the passing, retrieval, +and linking of `Tokens` across contexts to tie asynchronous threads together for individual `Transactions`. + +This instrumentation is dependent on other instrumentation modules to start a `Transaction`. +Typically, the `netty-n.n` modules work with this instrumentation and will start a `Transaction` (see `NettyDispatcher#channelRead`). + +Most commonly this instrumentation comes into play with SpringBoot usage, in which case the `spring` and `spring-webflux` +instrumentation modules also apply and should result in `Transactions` being renamed after the Spring controller. + +## Key Components + +* `TokenLinkingSubscriber` + Implementation of a `reactor.core.CoreSubscriber` (a `Context` aware subscriber) that can be added as + a lifecycle hook on `Flux`/`Mono` operators to propagate, retrieve, and link `Tokens` across async contexts. This is done in various places as follows: + + ```java + if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { + Hooks.onEachOperator(TokenLinkingSubscriber.class.getName(), tokenLift()); + } + ``` + +* `Schedulers_Instrumentation` and `HttpTrafficHandler_Instrumentation` + Both of these classes serve as entry points to add the `TokenLinkingSubscriber` sub-hook. + +* Scheduler `Task`s + Reactor Core Scheduler tasks that execute on asynchronous threads. These are instrumented as points to link `Tokens`. + +## Troubleshooting + +In cases where a `Transaction` gets named `/NettyDispatcher` (or named after a security `Filter`) it usually indicates that context was lost somewhere in +reactor code and that activity on threads where other instrumentation would typically apply could not be linked to the originating `Transaction` thread. +Figuring out how to propagate and link a `Token` across the threads should resolve the issue. diff --git a/instrumentation/netty-reactor-0.8.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java b/instrumentation/netty-reactor-0.8.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java index cf758c4804..6ff98a5e50 100644 --- a/instrumentation/netty-reactor-0.8.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java +++ b/instrumentation/netty-reactor-0.8.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java @@ -17,8 +17,14 @@ import java.util.function.BiFunction; import java.util.function.Function; -// Based on OpenTelemetry code -// https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java +/** + * Implementation of a reactor.core.CoreSubscriber (a Context aware subscriber) that can be added as + * a lifecycle hook on Flux/Mono operators to propagate, retrieve, and link Tokens across async contexts. + * + * Based on OpenTelemetry code: + * https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java + * @param + */ public class TokenLinkingSubscriber implements CoreSubscriber { private final Token token; private final Subscriber subscriber; diff --git a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/publisher/Hooks_Instrumentation.java b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/publisher/Hooks_Instrumentation.java index abe1cd448c..b573bfefe9 100644 --- a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/publisher/Hooks_Instrumentation.java +++ b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/publisher/Hooks_Instrumentation.java @@ -14,6 +14,12 @@ @Weave(originalName = "reactor.core.publisher.Hooks") public abstract class Hooks_Instrumentation { + + /* + * Note that sub-hooks are cumulative. We want to avoid setting the same sub-hooks + * more than once, so we set this boolean to true the first time we set a sub-hook. + * if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { Hooks.onEachOperator(...) } + */ @NewField public static AtomicBoolean instrumented = new AtomicBoolean(false); } diff --git a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/InstantPeriodicWorkerTask_Instrumentation.java b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/InstantPeriodicWorkerTask_Instrumentation.java index 4a624921f0..4095149fa9 100644 --- a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/InstantPeriodicWorkerTask_Instrumentation.java +++ b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/InstantPeriodicWorkerTask_Instrumentation.java @@ -14,6 +14,8 @@ @Weave(originalName = "reactor.core.scheduler.InstantPeriodicWorkerTask") final class InstantPeriodicWorkerTask_Instrumentation { + // We need to be able to link the Token here when executing on a supplied Scheduler + // A Token should be available on the thread that this task executes on if tokenLift() was added to Hooks.onEachOperator @Trace(async = true, excludeFromTransactionTrace = true) public Void call() { return Weaver.callOriginal(); diff --git a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/PeriodicWorkerTask_Instrumentation.java b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/PeriodicWorkerTask_Instrumentation.java index 961edd6f77..988d33ce26 100644 --- a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/PeriodicWorkerTask_Instrumentation.java +++ b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/PeriodicWorkerTask_Instrumentation.java @@ -14,6 +14,8 @@ @Weave(originalName = "reactor.core.scheduler.PeriodicWorkerTask") final class PeriodicWorkerTask_Instrumentation { + // We need to be able to link the Token here when executing on a supplied Scheduler + // A Token should be available on the thread that this task executes on if tokenLift() was added to Hooks.onEachOperator @Trace(async = true, excludeFromTransactionTrace = true) public Void call() { return Weaver.callOriginal(); diff --git a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java index d4368d3bb1..fda06f032a 100644 --- a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java +++ b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java @@ -10,24 +10,14 @@ import com.newrelic.api.agent.Trace; import com.newrelic.api.agent.weaver.Weave; import com.newrelic.api.agent.weaver.Weaver; -import com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber; -import reactor.core.publisher.Hooks; -import reactor.core.publisher.Hooks_Instrumentation; - -import static com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.tokenLift; @Weave(originalName = "reactor.core.scheduler.SchedulerTask") final class SchedulerTask_Instrumentation { // We need to be able to link the Token here when executing on a supplied Scheduler via Mono::publishOn + // A Token should be available on the thread that this task executes on if tokenLift() was added to Hooks.onEachOperator @Trace(async = true, excludeFromTransactionTrace = true) public Void call() { - // Add tokenLift hook if it hasn't already been added. This allows for tokens to be retrieved from - // the current context and linked across threads at various points of the Flux/Mono lifecycle. - if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { - Hooks.onEachOperator(TokenLinkingSubscriber.class.getName(), tokenLift()); - } - return Weaver.callOriginal(); } } diff --git a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/Schedulers_Instrumentation.java b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/Schedulers_Instrumentation.java new file mode 100644 index 0000000000..29ae1e901a --- /dev/null +++ b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/Schedulers_Instrumentation.java @@ -0,0 +1,45 @@ +/* + * Copyright 2021 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package reactor.core.scheduler; + +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Hooks_Instrumentation; + +import static com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.tokenLift; + +@Weave(type = MatchType.BaseClass, originalName = "reactor.core.scheduler.Schedulers") +public abstract class Schedulers_Instrumentation { + + @Weave(type = MatchType.ExactClass, originalName = "reactor.core.scheduler.Schedulers$CachedScheduler") + static class CachedScheduler { + final Scheduler cached; + final String key; + + CachedScheduler(String key, Scheduler cached) { + /* + * Add tokenLift hook if it hasn't already been added. This allows for tokens to be retrieved from + * the current context and linked across threads at various points of the Flux/Mono lifecycle. + * + * When using Netty Reactor with SpringBoot this hook will be added by the HttpTrafficHandler_Instrumentation + * but when using other embedded web servers (e.g. Tomcat, Jetty, Undertow) the HttpTrafficHandler class + * doesn't get loaded and thus the hook isn't added. This ensures that the hook is added in a common code + * path before any Scheduler Tasks are spun off on new threads. + */ + if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { + Hooks.onEachOperator(TokenLinkingSubscriber.class.getName(), tokenLift()); + } + + this.cached = Weaver.callOriginal(); + this.key = Weaver.callOriginal(); + } + + } + +} diff --git a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/WorkerTask_Instrumentation.java b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/WorkerTask_Instrumentation.java index 027fe89c06..76867aca47 100644 --- a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/WorkerTask_Instrumentation.java +++ b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/core/scheduler/WorkerTask_Instrumentation.java @@ -14,6 +14,8 @@ @Weave(originalName = "reactor.core.scheduler.WorkerTask") final class WorkerTask_Instrumentation { + // We need to be able to link the Token here when executing on a supplied Scheduler + // A Token should be available on the thread that this task executes on if tokenLift() was added to Hooks.onEachOperator @Trace(async = true, excludeFromTransactionTrace = true) public Void call() { return Weaver.callOriginal(); diff --git a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/netty/http/server/HttpTrafficHandler_Instrumentation.java b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/netty/http/server/HttpTrafficHandler_Instrumentation.java index caeb301cde..d998d6d2e4 100644 --- a/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/netty/http/server/HttpTrafficHandler_Instrumentation.java +++ b/instrumentation/netty-reactor-0.8.0/src/main/java/reactor/netty/http/server/HttpTrafficHandler_Instrumentation.java @@ -19,6 +19,14 @@ @Weave(originalName = "reactor.netty.http.server.HttpTrafficHandler") class HttpTrafficHandler_Instrumentation { public void channelRead(ChannelHandlerContext ctx, Object msg) { + + /* + * Add tokenLift hook if it hasn't already been added. This allows for tokens to be retrieved from + * the current context and linked across threads at various points of the Flux/Mono lifecycle. + * + * This hook will only be added when using Netty Reactor with SpringBoot. When using other embedded web + * servers (e.g. Tomcat, Jetty, Undertow) the Schedulers_Instrumentation class will add the hook. + */ if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { Hooks.onEachOperator(TokenLinkingSubscriber.class.getName(), tokenLift()); } diff --git a/instrumentation/netty-reactor-0.9.0/README.md b/instrumentation/netty-reactor-0.9.0/README.md new file mode 100644 index 0000000000..6e8c9267be --- /dev/null +++ b/instrumentation/netty-reactor-0.9.0/README.md @@ -0,0 +1,36 @@ +# Reactor Netty Instrumentation + +Instrumentation for Reactor Netty server and also some widely used Reactor Core library code. + +This module is largely responsible for instrumenting the Reactor Core library to facilitate the passing, retrieval, +and linking of `Tokens` across contexts to tie asynchronous threads together for individual `Transactions`. + +This instrumentation is dependent on other instrumentation modules to start a `Transaction`. +Typically, the `netty-n.n` modules work with this instrumentation and will start a `Transaction` (see `NettyDispatcher#channelRead`). + +Most commonly this instrumentation comes into play with SpringBoot usage, in which case the `spring` and `spring-webflux` +instrumentation modules also apply and should result in `Transactions` being renamed after the Spring controller. + +## Key Components + +* `TokenLinkingSubscriber` + Implementation of a `reactor.core.CoreSubscriber` (a `Context` aware subscriber) that can be added as + a lifecycle hook on `Flux`/`Mono` operators to propagate, retrieve, and link `Tokens` across async contexts. This is done in various places as follows: + + ```java + if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { + Hooks.onEachOperator(TokenLinkingSubscriber.class.getName(), tokenLift()); + } + ``` + +* `Schedulers_Instrumentation` and `HttpTrafficHandler_Instrumentation` + Both of these classes serve as entry points to add the `TokenLinkingSubscriber` sub-hook. + +* Scheduler `Task`s + Reactor Core Scheduler tasks that execute on asynchronous threads. These are instrumented as points to link `Tokens`. + +## Troubleshooting + +In cases where a `Transaction` gets named `/NettyDispatcher` (or named after a security `Filter`) it usually indicates that context was lost somewhere in +reactor code and that activity on threads where other instrumentation would typically apply could not be linked to the originating `Transaction` thread. +Figuring out how to propagate and link a `Token` across the threads should resolve the issue. diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java b/instrumentation/netty-reactor-0.9.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java index cf758c4804..6ff98a5e50 100644 --- a/instrumentation/netty-reactor-0.9.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java @@ -17,8 +17,14 @@ import java.util.function.BiFunction; import java.util.function.Function; -// Based on OpenTelemetry code -// https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java +/** + * Implementation of a reactor.core.CoreSubscriber (a Context aware subscriber) that can be added as + * a lifecycle hook on Flux/Mono operators to propagate, retrieve, and link Tokens across async contexts. + * + * Based on OpenTelemetry code: + * https://github.com/open-telemetry/opentelemetry-java-instrumentation/blob/main/instrumentation/reactor-3.1/library/src/main/java/io/opentelemetry/instrumentation/reactor/TracingSubscriber.java + * @param + */ public class TokenLinkingSubscriber implements CoreSubscriber { private final Token token; private final Subscriber subscriber; diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/publisher/Hooks_Instrumentation.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/publisher/Hooks_Instrumentation.java index abe1cd448c..b573bfefe9 100644 --- a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/publisher/Hooks_Instrumentation.java +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/publisher/Hooks_Instrumentation.java @@ -14,6 +14,12 @@ @Weave(originalName = "reactor.core.publisher.Hooks") public abstract class Hooks_Instrumentation { + + /* + * Note that sub-hooks are cumulative. We want to avoid setting the same sub-hooks + * more than once, so we set this boolean to true the first time we set a sub-hook. + * if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { Hooks.onEachOperator(...) } + */ @NewField public static AtomicBoolean instrumented = new AtomicBoolean(false); } diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/InstantPeriodicWorkerTask_Instrumentation.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/InstantPeriodicWorkerTask_Instrumentation.java index 4a624921f0..4095149fa9 100644 --- a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/InstantPeriodicWorkerTask_Instrumentation.java +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/InstantPeriodicWorkerTask_Instrumentation.java @@ -14,6 +14,8 @@ @Weave(originalName = "reactor.core.scheduler.InstantPeriodicWorkerTask") final class InstantPeriodicWorkerTask_Instrumentation { + // We need to be able to link the Token here when executing on a supplied Scheduler + // A Token should be available on the thread that this task executes on if tokenLift() was added to Hooks.onEachOperator @Trace(async = true, excludeFromTransactionTrace = true) public Void call() { return Weaver.callOriginal(); diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/PeriodicWorkerTask_Instrumentation.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/PeriodicWorkerTask_Instrumentation.java index 961edd6f77..988d33ce26 100644 --- a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/PeriodicWorkerTask_Instrumentation.java +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/PeriodicWorkerTask_Instrumentation.java @@ -14,6 +14,8 @@ @Weave(originalName = "reactor.core.scheduler.PeriodicWorkerTask") final class PeriodicWorkerTask_Instrumentation { + // We need to be able to link the Token here when executing on a supplied Scheduler + // A Token should be available on the thread that this task executes on if tokenLift() was added to Hooks.onEachOperator @Trace(async = true, excludeFromTransactionTrace = true) public Void call() { return Weaver.callOriginal(); diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java index d4368d3bb1..fda06f032a 100644 --- a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/SchedulerTask_Instrumentation.java @@ -10,24 +10,14 @@ import com.newrelic.api.agent.Trace; import com.newrelic.api.agent.weaver.Weave; import com.newrelic.api.agent.weaver.Weaver; -import com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber; -import reactor.core.publisher.Hooks; -import reactor.core.publisher.Hooks_Instrumentation; - -import static com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.tokenLift; @Weave(originalName = "reactor.core.scheduler.SchedulerTask") final class SchedulerTask_Instrumentation { // We need to be able to link the Token here when executing on a supplied Scheduler via Mono::publishOn + // A Token should be available on the thread that this task executes on if tokenLift() was added to Hooks.onEachOperator @Trace(async = true, excludeFromTransactionTrace = true) public Void call() { - // Add tokenLift hook if it hasn't already been added. This allows for tokens to be retrieved from - // the current context and linked across threads at various points of the Flux/Mono lifecycle. - if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { - Hooks.onEachOperator(TokenLinkingSubscriber.class.getName(), tokenLift()); - } - return Weaver.callOriginal(); } } diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/Schedulers_Instrumentation.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/Schedulers_Instrumentation.java new file mode 100644 index 0000000000..9bef1d7d91 --- /dev/null +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/Schedulers_Instrumentation.java @@ -0,0 +1,44 @@ +/* + * Copyright 2021 New Relic Corporation. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +package reactor.core.scheduler; + +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Hooks_Instrumentation; + +import static com.nr.instrumentation.reactor.netty.TokenLinkingSubscriber.tokenLift; + +@Weave(type = MatchType.BaseClass, originalName = "reactor.core.scheduler.Schedulers") +public abstract class Schedulers_Instrumentation { + + @Weave(type = MatchType.ExactClass, originalName = "reactor.core.scheduler.Schedulers$CachedScheduler") + static class CachedScheduler { + final Scheduler cached; + final String stringRepresentation; + + CachedScheduler(String key, Scheduler cached) { + /* + * Add tokenLift hook if it hasn't already been added. This allows for tokens to be retrieved from + * the current context and linked across threads at various points of the Flux/Mono lifecycle. + * + * When using Netty Reactor with SpringBoot this hook will be added by the HttpTrafficHandler_Instrumentation + * but when using other embedded web servers (e.g. Tomcat, Jetty, Undertow) the HttpTrafficHandler class + * doesn't get loaded and thus the hook isn't added. This ensures that the hook is added in a common code + * path before any Scheduler Tasks are spun off on new threads. + */ + if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { + Hooks.onEachOperator(TokenLinkingSubscriber.class.getName(), tokenLift()); + } + + this.cached = Weaver.callOriginal(); + this.stringRepresentation = Weaver.callOriginal(); + } + } + +} diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/WorkerTask_Instrumentation.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/WorkerTask_Instrumentation.java index 027fe89c06..76867aca47 100644 --- a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/WorkerTask_Instrumentation.java +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/core/scheduler/WorkerTask_Instrumentation.java @@ -14,6 +14,8 @@ @Weave(originalName = "reactor.core.scheduler.WorkerTask") final class WorkerTask_Instrumentation { + // We need to be able to link the Token here when executing on a supplied Scheduler + // A Token should be available on the thread that this task executes on if tokenLift() was added to Hooks.onEachOperator @Trace(async = true, excludeFromTransactionTrace = true) public Void call() { return Weaver.callOriginal(); diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/netty/http/server/HttpTrafficHandler_Instrumentation.java b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/netty/http/server/HttpTrafficHandler_Instrumentation.java index caeb301cde..d998d6d2e4 100644 --- a/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/netty/http/server/HttpTrafficHandler_Instrumentation.java +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/reactor/netty/http/server/HttpTrafficHandler_Instrumentation.java @@ -19,6 +19,14 @@ @Weave(originalName = "reactor.netty.http.server.HttpTrafficHandler") class HttpTrafficHandler_Instrumentation { public void channelRead(ChannelHandlerContext ctx, Object msg) { + + /* + * Add tokenLift hook if it hasn't already been added. This allows for tokens to be retrieved from + * the current context and linked across threads at various points of the Flux/Mono lifecycle. + * + * This hook will only be added when using Netty Reactor with SpringBoot. When using other embedded web + * servers (e.g. Tomcat, Jetty, Undertow) the Schedulers_Instrumentation class will add the hook. + */ if (!Hooks_Instrumentation.instrumented.getAndSet(true)) { Hooks.onEachOperator(TokenLinkingSubscriber.class.getName(), tokenLift()); } From 49f8c108fda286818dce2ae19809e2ab3e028531 Mon Sep 17 00:00:00 2001 From: jasonjkeller Date: Mon, 15 Nov 2021 15:40:02 -0800 Subject: [PATCH 5/5] Update comment about token being created by spring-webflux instrumentation of ServerWebExchange --- .../java/com/nr/instrumentation/TokenLinkingSubscriber.java | 2 +- .../instrumentation/reactor/netty/TokenLinkingSubscriber.java | 2 +- .../instrumentation/reactor/netty/TokenLinkingSubscriber.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/instrumentation/netty-reactor-0.7.0/src/main/java/com/nr/instrumentation/TokenLinkingSubscriber.java b/instrumentation/netty-reactor-0.7.0/src/main/java/com/nr/instrumentation/TokenLinkingSubscriber.java index f103e732a5..186e520b35 100644 --- a/instrumentation/netty-reactor-0.7.0/src/main/java/com/nr/instrumentation/TokenLinkingSubscriber.java +++ b/instrumentation/netty-reactor-0.7.0/src/main/java/com/nr/instrumentation/TokenLinkingSubscriber.java @@ -26,7 +26,7 @@ public class TokenLinkingSubscriber implements CoreSubscriber { public TokenLinkingSubscriber(Subscriber subscriber, Context ctx) { this.subscriber = subscriber; this.context = ctx; - // newrelic-token is added by spring-webflux-5.1 instrumentation + // newrelic-token is added by spring-webflux-5.1 instrumentation of ServerWebExchange this.token = ctx.getOrDefault("newrelic-token", null); } diff --git a/instrumentation/netty-reactor-0.8.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java b/instrumentation/netty-reactor-0.8.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java index 6ff98a5e50..541977f380 100644 --- a/instrumentation/netty-reactor-0.8.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java +++ b/instrumentation/netty-reactor-0.8.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java @@ -33,7 +33,7 @@ public class TokenLinkingSubscriber implements CoreSubscriber { public TokenLinkingSubscriber(Subscriber subscriber, Context ctx) { this.subscriber = subscriber; this.context = ctx; - // newrelic-token is added by spring-webflux instrumentation + // newrelic-token is added by spring-webflux instrumentation of ServerWebExchange this.token = ctx.getOrDefault("newrelic-token", null); } diff --git a/instrumentation/netty-reactor-0.9.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java b/instrumentation/netty-reactor-0.9.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java index 6ff98a5e50..541977f380 100644 --- a/instrumentation/netty-reactor-0.9.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java +++ b/instrumentation/netty-reactor-0.9.0/src/main/java/com/nr/instrumentation/reactor/netty/TokenLinkingSubscriber.java @@ -33,7 +33,7 @@ public class TokenLinkingSubscriber implements CoreSubscriber { public TokenLinkingSubscriber(Subscriber subscriber, Context ctx) { this.subscriber = subscriber; this.context = ctx; - // newrelic-token is added by spring-webflux instrumentation + // newrelic-token is added by spring-webflux instrumentation of ServerWebExchange this.token = ctx.getOrDefault("newrelic-token", null); }