diff --git a/functional_test/build.gradle b/functional_test/build.gradle index 931c1ac3ef..65a9ea9a12 100644 --- a/functional_test/build.gradle +++ b/functional_test/build.gradle @@ -2,8 +2,6 @@ import com.nr.builder.JarUtil evaluationDependsOn(":newrelic-agent") // This is important because we need newrelic-agent to be configured before functional_test so the correct ("unversioned") jar name gets used -apply plugin: 'scala' - jar { manifest { attributes 'Premain-Class': 'com.newrelic.agent.test.agent.FunctionalAgent' } } @@ -17,8 +15,6 @@ dependencies { implementation(project(":agent-bridge-datastore")) implementation(project(":newrelic-weaver")) - implementation("org.scala-lang:scala-library:2.10.7") - testImplementation(project(":functional_test:weave_test_impl")) // the newrelic-agent test classes diff --git a/functional_test/src/test/java/test/newrelic/test/agent/CompletableFutureTest.java b/functional_test/src/test/java/test/newrelic/test/agent/CompletableFutureTest.java index 8f4b833ab0..8bef32dccf 100644 --- a/functional_test/src/test/java/test/newrelic/test/agent/CompletableFutureTest.java +++ b/functional_test/src/test/java/test/newrelic/test/agent/CompletableFutureTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -77,37 +78,54 @@ public void testAnyOf() throws Exception { } @Trace(dispatcher = true) - public CompletableFuture doAnyOf() throws Exception { - CompletableFuture req1 = supplyAsync(new Supplier() { - @Override - public String get() { - pause(5); - NewRelic.addCustomParameter("req1", 1); - return "1"; - } + public CompletableFuture doAnyOf() { + CompletableFuture c = new CompletableFuture<>(); + CompletableFuture req1 = supplyAsync(() -> { + pause(5); + NewRelic.addCustomParameter("req1", 1); + return "1"; }); - CompletableFuture req2 = supplyAsync(new Supplier() { - @Override - public String get() { - pause(15); - NewRelic.addCustomParameter("req2", 2); - return "2"; - } + CompletableFuture req2 = supplyAsync(() -> { + pause(15); + NewRelic.addCustomParameter("req2", 2); + return "2"; }); - CompletableFuture req3 = supplyAsync(new Supplier() { - @Override - public String get() { - pause(10); - NewRelic.addCustomParameter("req3", 3); - return "3"; - } + CompletableFuture req3 = supplyAsync(() -> { + pause(10); + NewRelic.addCustomParameter("req3", 3); + return "3"; }); CompletableFuture fastest = CompletableFuture.anyOf(req1, req2, req3); assertNotNull(fastest); - return fastest; + return fastest.thenApply(o -> o); + } + + @Test + public void testNoComplete() throws InterruptedException, ExecutionException { + TransactionDataList txs = new TransactionDataList(); + ServiceFactory.getTransactionService().addTransactionListener(txs); + + CompletableFuture future = composeCompletable(); + txs.waitFor(1, 5000); // Give the transaction time to finish + assertEquals(1, txs.size()); + TransactionData txData = txs.get(0); + Map userAttributes = txData.getUserAttributes(); + assertNotNull(userAttributes); + assertEquals(0, userAttributes.size()); + } + + @Trace(dispatcher = true) + public CompletableFuture composeCompletable() { + return new CompletableFuture<>().thenCompose( + res -> supplyAsync(() -> { + pause(3000); + NewRelic.addCustomParameter("param1", 1); + return " 2nd"; + }) + ); } @Test @@ -134,29 +152,20 @@ public void testApplyEither() throws Exception { @Trace(dispatcher = true) public CompletableFuture doApplyEither() throws Exception { - CompletableFuture f1 = supplyAsync(new Supplier() { - @Override - public Integer get() { - pause(15); - NewRelic.addCustomParameter("f1", 1); - return 2; - } + CompletableFuture f1 = supplyAsync(() -> { + pause(15); + NewRelic.addCustomParameter("f1", 1); + return 2; }); - CompletableFuture f2 = supplyAsync(new Supplier() { - @Override - public Integer get() { - pause(10); - NewRelic.addCustomParameter("f2", 2); - return 2; - } + CompletableFuture f2 = supplyAsync(() -> { + pause(10); + NewRelic.addCustomParameter("f2", 2); + return 2; }); - CompletableFuture f3 = f1.applyToEither(f2, new Function() { - @Override - public Integer apply(Integer r) { - NewRelic.addCustomParameter("f3", r * r); - return r * r; // This should always be 2 * 2 - } + CompletableFuture f3 = f1.applyToEither(f2, r -> { + NewRelic.addCustomParameter("f3", r * r); + return r * r; // This should always be 2 * 2 }); return f3; @@ -187,23 +196,17 @@ public void testApplyAsync() throws Exception { public CompletableFuture doApplyAsync() throws Exception { final CountDownLatch latch = new CountDownLatch(2); CompletableFuture a = new CompletableFuture<>(); - final CompletableFuture b = a.thenApplyAsync(new Function() { - @Override - public Integer apply(Integer r) { - pause(10); - NewRelic.addCustomParameter("b", r); - latch.countDown(); - return r * r; - } + final CompletableFuture b = a.thenApplyAsync(r -> { + pause(10); + NewRelic.addCustomParameter("b", r); + latch.countDown(); + return r * r; }); - CompletableFuture f = runAsync(new Runnable() { - @Override - public void run() { - int x = b.join(); - NewRelic.addCustomParameter("f", x); - latch.countDown(); - } + runAsync(() -> { + int x = b.join(); + NewRelic.addCustomParameter("f", x); + latch.countDown(); }); a.complete(2); @@ -234,31 +237,19 @@ public void testCompletableFutureError() throws Exception { } @Trace(dispatcher = true) - public CompletableFuture doCompletableFutureError() throws Exception { - CompletableFuture f1 = supplyAsync(new Supplier() { - @Override - public Integer get() { - pause(10); - NewRelic.addCustomParameter("f1", 1); - if (true) { - throw new RuntimeException(); - } - return 1; - } + public CompletableFuture doCompletableFutureError() { + CompletableFuture f1 = supplyAsync(() -> { + pause(10); + NewRelic.addCustomParameter("f1", 1); + throw new RuntimeException(); }); - CompletableFuture f2 = f1.thenApply(new Function() { - @Override - public Integer apply(Integer r) { - NewRelic.addCustomParameter("f2", 2); - return r * r; - } - }).exceptionally(new Function() { - @Override - public Integer apply(Throwable throwable) { - NewRelic.addCustomParameter("e", 3); - return 3; - } + CompletableFuture f2 = f1.thenApply(r -> { + NewRelic.addCustomParameter("f2", 2); + return r * r; + }).exceptionally(throwable -> { + NewRelic.addCustomParameter("e", 3); + return 3; }); return f2; @@ -287,34 +278,18 @@ public void testCompletableFutureAccept() throws Exception { } @Trace(dispatcher = true) - public CompletableFuture doCompletableFutureAccept() throws Exception { - CompletableFuture f1 = supplyAsync(new Supplier() { - @Override - public Integer get() { - pause(10); - NewRelic.addCustomParameter("f1", 1); - if (false) { - throw new RuntimeException(); - } - return 2; - } + public CompletableFuture doCompletableFutureAccept() { + CompletableFuture f1 = supplyAsync(() -> { + pause(10); + NewRelic.addCustomParameter("f1", 1); + return 2; }); - CompletableFuture f2 = f1.thenApply(new Function() { - @Override - public Integer apply(Integer r) { - NewRelic.addCustomParameter("f2", r); - return r * r; - } - }); - - CompletableFuture f3 = f2.thenAccept(new Consumer() { - @Override - public void accept(Integer r) { - NewRelic.addCustomParameter("f3", r); - } + CompletableFuture f2 = f1.thenApply(r -> { + NewRelic.addCustomParameter("f2", r); + return r * r; }); - + f2.thenAccept(r -> NewRelic.addCustomParameter("f3", r)); return f2; } @@ -339,28 +314,15 @@ public void testLargeCompletableFuture() throws Exception { } @Trace(dispatcher = true) - public CompletableFuture doLargeCompletableFuture() throws Exception { - CompletableFuture last = CompletableFuture.supplyAsync(new Supplier() { - @Override - public Integer get() { - return 0; - } - }); + public CompletableFuture doLargeCompletableFuture() { + CompletableFuture last = CompletableFuture.supplyAsync(() -> 0); for (int i = 1; i < 10; i++) { final int current = i; - last = CompletableFuture.supplyAsync(new Supplier() { - @Override - public Integer get() { - return current; - } - }).thenCombine(last, new BiFunction() { - @Override - public Integer apply(Integer a, Integer b) { - Integer result = Math.max(a, b); - NewRelic.addCustomParameter("max", result); - return result; - } + last = CompletableFuture.supplyAsync(() -> current).thenCombine(last, (a, b) -> { + Integer result = Math.max(a, b); + NewRelic.addCustomParameter("max", result); + return result; }); } @@ -392,28 +354,17 @@ public void testWhenComplete() throws Exception { @Trace(dispatcher = true) public CompletableFuture doWhenComplete() throws Exception { - CompletableFuture f1 = supplyAsync(new Supplier() { - @Override - public Integer get() { - pause(10); - NewRelic.addCustomParameter("f1", 1); - return 2; - } + CompletableFuture f1 = supplyAsync(() -> { + pause(10); + NewRelic.addCustomParameter("f1", 1); + return 2; }); - CompletableFuture f2 = f1.handle(new BiFunction() { - @Override - public Integer apply(Integer res, Throwable throwable) { - Integer result = throwable == null ? res * res : 2; - NewRelic.addCustomParameter("f2", result); - return result; - } - }); - CompletableFuture f3 = f1.whenCompleteAsync(new BiConsumer() { - @Override - public void accept(Integer res, Throwable throwable) { - NewRelic.addCustomParameter("f3", res); - } + CompletableFuture f2 = f1.handle((res, throwable) -> { + Integer result = throwable == null ? res * res : 2; + NewRelic.addCustomParameter("f2", result); + return result; }); + CompletableFuture f3 = f1.whenCompleteAsync((res, throwable) -> NewRelic.addCustomParameter("f3", res)); f2.get(); f3.get(); @@ -443,29 +394,20 @@ public void testThenCompose() throws Exception { } @Trace(dispatcher = true) - public CompletableFuture doThenCompose() throws Exception { - CompletableFuture f1 = supplyAsync(new Supplier() { - @Override - public Integer get() { - pause(10); - NewRelic.addCustomParameter("f1", 1); - return 2; - } + public CompletableFuture doThenCompose() { + CompletableFuture f1 = supplyAsync(() -> { + pause(10); + NewRelic.addCustomParameter("f1", 1); + return 2; }); - CompletableFuture f2 = f1.thenComposeAsync(new Function>() { - @Override - public CompletionStage apply(final Integer r) { - NewRelic.addCustomParameter("f2", 2); - return supplyAsync(new Supplier() { - @Override - public Integer get() { - Integer result = r * r; - NewRelic.addCustomParameter("f3", result); - return result; - } - }); - } + CompletableFuture f2 = f1.thenComposeAsync(r -> { + NewRelic.addCustomParameter("f2", 2); + return supplyAsync(() -> { + Integer result = r * r; + NewRelic.addCustomParameter("f3", result); + return result; + }); }); return f2; @@ -489,36 +431,21 @@ public void testAllOf() throws Exception { @Trace(dispatcher = true) private void allOf() throws Exception { - CompletableFuture futureOne = CompletableFuture.runAsync( - new Runnable() { - @Override - public void run() { - pause(30); - NewRelic.addCustomParameter("one", "done"); - setTransactionName(); - } - } - ); + CompletableFuture futureOne = CompletableFuture.runAsync(() -> { + pause(30); + NewRelic.addCustomParameter("one", "done"); + setTransactionName(); + }); - CompletableFuture futureTwo = CompletableFuture.runAsync( - new Runnable() { - @Override - public void run() { - pause(20); - NewRelic.addCustomParameter("two", "done"); - } - } - ); + CompletableFuture futureTwo = CompletableFuture.runAsync(() -> { + pause(20); + NewRelic.addCustomParameter("two", "done"); + }); - CompletableFuture futureThree = CompletableFuture.runAsync( - new Runnable() { - @Override - public void run() { - pause(10); - NewRelic.addCustomParameter("three", "done"); - } - } - ); + CompletableFuture futureThree = CompletableFuture.runAsync(() -> { + pause(10); + NewRelic.addCustomParameter("three", "done"); + }); CompletableFuture all = CompletableFuture.allOf(futureOne, futureTwo, futureThree); all.get(); @@ -540,27 +467,16 @@ public void testAcceptEitherAsync() { @Trace(dispatcher = true) public void acceptEitherAsync() { - CompletableFuture slowFuture = CompletableFuture.supplyAsync(new Supplier() { - @Override - public String get() { - pause(100); - return "Slow"; - } + CompletableFuture slowFuture = CompletableFuture.supplyAsync(() -> { + pause(100); + return "Slow"; }); - CompletableFuture fastFuture = CompletableFuture.supplyAsync(new Supplier() { - @Override - public String get() { - return "Fast"; - } - }); + CompletableFuture fastFuture = CompletableFuture.supplyAsync(() -> "Fast"); - slowFuture.acceptEitherAsync(fastFuture, new Consumer() { - @Override - public void accept(String result) { - setTransactionName(); - NewRelic.addCustomParameter("result", result); - } + slowFuture.acceptEitherAsync(fastFuture, result -> { + setTransactionName(); + NewRelic.addCustomParameter("result", result); }); } @@ -580,30 +496,17 @@ public void testAcceptBothAsync() { @Trace(dispatcher = true) private void acceptBothAsync() { - CompletableFuture one = CompletableFuture.supplyAsync(new Supplier() { - @Override - public Integer get() { - setTransactionName(); - return 1; - } - }); - CompletableFuture two = CompletableFuture.supplyAsync(new Supplier() { - @Override - public Integer get() { - return 2; - } + CompletableFuture one = CompletableFuture.supplyAsync(() -> { + setTransactionName(); + return 1; }); + CompletableFuture two = CompletableFuture.supplyAsync(() -> 2); - one.thenAcceptBothAsync(two, new BiConsumer() { - @Override - public void accept(Integer x, Integer y) { - NewRelic.addCustomParameter("three", x + y); - } - }); + one.thenAcceptBothAsync(two, (x, y) -> NewRelic.addCustomParameter("three", x + y)); } @Test - public void testCancelFuture() throws Exception { + public void testCancelFuture() { // Make sure a cancelled future doesn't hold up the transaction cancelFuture(); pause(500); @@ -611,19 +514,16 @@ public void testCancelFuture() throws Exception { } @Trace(dispatcher = true) - private void cancelFuture() throws Exception { + private void cancelFuture() { setTransactionName(); final CountDownLatch latch = new CountDownLatch(1); - CompletableFuture one = CompletableFuture.supplyAsync(new Supplier() { - @Override - public Integer get() { - try { - latch.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - } - return 1; + CompletableFuture one = CompletableFuture.supplyAsync(() -> { + try { + latch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { } + return 1; }); one.cancel(true); // boolean doesn't do anything @@ -632,86 +532,59 @@ public Integer get() { } @Test - public void testThenRunAsync() throws Exception { + public void testThenRunAsync() { thenRunAsync(); pause(500); AgentHelper.verifyMetrics(AgentHelper.getMetrics(), TRANSACTION_NAME); } @Trace(dispatcher = true) - private void thenRunAsync() throws Exception { - CompletableFuture.supplyAsync(new Supplier() { - @Override - public Integer get() { - pause(20); - return 0; - } - }).thenRun(new Runnable() { - @Override - public void run() { - pause(10); - setTransactionName(); - } + private void thenRunAsync() { + CompletableFuture.supplyAsync(() -> { + pause(20); + return 0; + }).thenRun(() -> { + pause(10); + setTransactionName(); }); } @Test - public void testRunAfterEitherAsync() throws Exception { + public void testRunAfterEitherAsync() { runAfterEitherAsync(); pause(500); AgentHelper.verifyMetrics(AgentHelper.getMetrics(), TRANSACTION_NAME); } @Trace(dispatcher = true) - private void runAfterEitherAsync() throws Exception { - CompletableFuture one = CompletableFuture.supplyAsync( - new Supplier() { - @Override - public Integer get() { - pause(20); - return 1; - } - } - - ); - - CompletableFuture two = CompletableFuture.supplyAsync( - new Supplier() { - @Override - public Integer get() { - pause(30); - return 2; - } - } + private void runAfterEitherAsync() { + CompletableFuture one = CompletableFuture.supplyAsync(() -> { + pause(20); + return 1; + }); - ); + CompletableFuture two = CompletableFuture.supplyAsync(() -> { + pause(30); + return 2; + }); - one.runAfterEitherAsync(two, new Runnable() { - @Override - public void run() { - setTransactionName(); - } - } - ); + one.runAfterEitherAsync(two, this::setTransactionName); } @Test - public void testHandleAsync() throws Exception { + public void testHandleAsync() { handleAsync(); pause(500); AgentHelper.verifyMetrics(AgentHelper.getMetrics(), TRANSACTION_NAME); } @Trace(dispatcher = true) - private void handleAsync() throws Exception { + private void handleAsync() { CompletableFuture one = new CompletableFuture<>(); - one.handleAsync(new BiFunction() { - @Override - public Integer apply(Integer integer, Throwable throwable) { - setTransactionName(); - return 0; - } + one.handleAsync((BiFunction) (integer, throwable) -> { + setTransactionName(); + return 0; }); one.completeExceptionally(new RuntimeException()); diff --git a/instrumentation/java.completable-future-jdk8/README.md b/instrumentation/java.completable-future-jdk8/README.md new file mode 100644 index 0000000000..fda331723f --- /dev/null +++ b/instrumentation/java.completable-future-jdk8/README.md @@ -0,0 +1,65 @@ +# java.completable-future-jdk8 + +This instrumentation weaves `java.util.concurrent.CompletableFuture` and `java.util.concurrent.CompletableFuture$Async` to +trace code execution across asynchronous boundaries. + +## How it works + +Some context on parallelism according to comments in the +[CompletableFuture source](http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/classes/java/util/concurrent/CompletableFuture.java#l77): + +> * All async methods without an explicit Executor +> * argument are performed using the {@link ForkJoinPool#commonPool()} +> * (unless it does not support a parallelism level of at least two, in +> * which case, a new Thread is used). To simplify monitoring, +> * debugging, and tracking, all generated asynchronous tasks are +> * instances of the marker interface {@link +> * AsynchronousCompletionTask}. + +When `CompletableFuture.execAsync(Executor e, Async r)` is invoked, it "starts the given async task using the given executor, +unless the executor is `ForkJoinPool.commonPool` and it has been disabled, in which case starts a new thread." + +Instrumented code: + +```java + static void execAsync(Executor e, CompletableFuture_Instrumentation.Async r) { + if (noParallelism(e)) { + new Thread(new TokenAwareRunnable(r)).start(); + } else { + Executor tde = useTokenDelegateExecutor(e); + if (null != tde) { + tde.execute(r); + } + } + } +``` + +### Case 1: No Parallelism + +If there is no parallelism this instrumentation will initialize a new `Thread` with a `TokenAwareRunnable` that wraps the `CompletableFuture$Async` argument +passed to `execAsync`. The `TokenAwareRunnable` uses `TokenAndRefUtils` to get a `TokenAndRefCount`, if one exists, for the current `Thread`. Otherwise, it +creates a new `TokenAndRefCount`. + +The `TokenAndRefCount` stores a `Token` that can be used to link asynchronous `Threads` together and tracks the number of incoming references to the `Token`. +When `TokenAwareRunnable.run()` is invoked the stored `Token` is linked on the executing `Thread` and finally the `Token` is expired when `run()` completes, +allowing the `Transaction` to complete. + +### Case 2: Parallelism + +In this case a `TokenDelegateExecutor` is initialized and used to wrap the `Executor` argument that was passed to `execAsync`. When +`TokenDelegateExecutor.execute(Runnable runnable)` is invoked it will initialize and store a `TokenAwareRunnable` that wraps the `CompletableFuture$Async` +argument passed to `execAsync`. From this point on, the `TokenAwareRunnable` functions exactly as described in Case 1: No Parallelism. + +## Logging + +This instrumentation will produce entries such as the following when searching the logs for keywords `token info`: + +``` +2022-01-07T17:22:03,481-0800 [53655 270] com.newrelic FINEST: [Empty token]: token info TokenAwareRunnable token info set +2022-01-07T17:22:03,482-0800 [53655 270] com.newrelic FINEST: [Empty token]: token info Token info set in thread +2022-01-07T17:22:03,482-0800 [53655 270] com.newrelic FINEST: [Empty token]: token info Clearing token info from thread +``` + +## Testing + +See the following functional tests: `newrelic-java-agent/functional_test/src/test/java/test/newrelic/test/agent/CompletableFutureTest.java` \ No newline at end of file diff --git a/instrumentation/java.completable-future-jdk8/src/main/java/java/util/concurrent/CompletableFuture_Instrumentation.java b/instrumentation/java.completable-future-jdk8/src/main/java/java/util/concurrent/CompletableFuture_Instrumentation.java index 02b23dedb9..500668883e 100644 --- a/instrumentation/java.completable-future-jdk8/src/main/java/java/util/concurrent/CompletableFuture_Instrumentation.java +++ b/instrumentation/java.completable-future-jdk8/src/main/java/java/util/concurrent/CompletableFuture_Instrumentation.java @@ -7,240 +7,51 @@ package java.util.concurrent; -import java.util.function.BiConsumer; -import java.util.function.BiFunction; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - -import com.newrelic.agent.bridge.AgentBridge; -import com.newrelic.agent.bridge.TracedMethod; -import com.newrelic.agent.bridge.Transaction; -import com.newrelic.api.agent.Token; -import com.newrelic.api.agent.Trace; import com.newrelic.api.agent.weaver.MatchType; -import com.newrelic.api.agent.weaver.NewField; import com.newrelic.api.agent.weaver.Weave; -import com.newrelic.api.agent.weaver.Weaver; +import util.TokenAwareRunnable; +import util.TokenDelegateExecutor; @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture") public class CompletableFuture_Instrumentation { - @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture$AsyncRun") - static final class AsyncRun { - - @NewField - private Token asyncToken; - - AsyncRun(Runnable fn, CompletableFuture dst) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted() && AgentBridge.getAgent().getTracedMethod().trackChildThreads()) { - this.asyncToken = tx.getToken(); - } - } - - @Trace(async = true, excludeFromTransactionTrace = true) - public final boolean exec() { - if (null != this.asyncToken) { - if (this.asyncToken.link()) { - TracedMethod tm = (TracedMethod) AgentBridge.getAgent().getTransaction().getTracedMethod(); - tm.setMetricName("Java", "CompletableFuture", "AsyncRun", "exec"); - } - this.asyncToken.expire(); - this.asyncToken = null; - } - return Weaver.callOriginal(); - } - } - - @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture$AsyncSupply") - static final class AsyncSupply { - - @NewField - private Token asyncToken; - - AsyncSupply(Supplier fn, CompletableFuture dst) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted() && AgentBridge.getAgent().getTracedMethod().trackChildThreads()) { - this.asyncToken = tx.getToken(); - } - } - - @Trace(async = true, excludeFromTransactionTrace = true) - public final boolean exec() { - if (null != this.asyncToken) { - if (this.asyncToken.link()) { - TracedMethod tm = (TracedMethod) AgentBridge.getAgent().getTransaction().getTracedMethod(); - tm.setMetricName("Java", "CompletableFuture", "AsyncSupply", "exec"); - } - this.asyncToken.expire(); - this.asyncToken = null; - } - return Weaver.callOriginal(); - } - } - - @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture$AsyncApply") - static final class AsyncApply { - - @NewField - private Token asyncToken; - - AsyncApply(T arg, Function fn, CompletableFuture dst) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted() && AgentBridge.getAgent().getTracedMethod().trackChildThreads()) { - this.asyncToken = tx.getToken(); - } - } - - @Trace(async = true, excludeFromTransactionTrace = true) - public final boolean exec() { - if (null != this.asyncToken) { - if (this.asyncToken.link()) { - TracedMethod tm = (TracedMethod) AgentBridge.getAgent().getTransaction().getTracedMethod(); - tm.setMetricName("Java", "CompletableFuture", "AsyncApply", "exec"); - } - this.asyncToken.expire(); - this.asyncToken = null; - } - return Weaver.callOriginal(); - } - } - - @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture$AsyncCombine") - static final class AsyncCombine { - - @NewField - private Token asyncToken; - - AsyncCombine(T arg1, U arg2, BiFunction fn, CompletableFuture dst) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted() && AgentBridge.getAgent().getTracedMethod().trackChildThreads()) { - this.asyncToken = tx.getToken(); - } + @Weave(type = MatchType.BaseClass, originalName = "java.util.concurrent.CompletableFuture$Async") + abstract static class Async extends ForkJoinTask + implements Runnable, CompletableFuture.AsynchronousCompletionTask { + public final Void getRawResult() { + return null; } - @Trace(async = true, excludeFromTransactionTrace = true) - public final boolean exec() { - if (null != this.asyncToken) { - if (this.asyncToken.link()) { - TracedMethod tm = (TracedMethod) AgentBridge.getAgent().getTransaction().getTracedMethod(); - tm.setMetricName("Java", "CompletableFuture", "AsyncCombine", "exec"); - } - this.asyncToken.expire(); - this.asyncToken = null; - } - return Weaver.callOriginal(); + public final void setRawResult(Void v) { } - } - - @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture$AsyncAccept") - static final class AsyncAccept { - - @NewField - private Token asyncToken; - AsyncAccept(T arg, Consumer fn, CompletableFuture dst) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted() && AgentBridge.getAgent().getTracedMethod().trackChildThreads()) { - this.asyncToken = tx.getToken(); - } - } - - @Trace(async = true, excludeFromTransactionTrace = true) - public final boolean exec() { - if (null != this.asyncToken) { - if (this.asyncToken.link()) { - TracedMethod tm = (TracedMethod) AgentBridge.getAgent().getTransaction().getTracedMethod(); - tm.setMetricName("Java", "CompletableFuture", "AsyncAccept", "exec"); - } - this.asyncToken.expire(); - this.asyncToken = null; - } - return Weaver.callOriginal(); + public final void run() { + exec(); } } - @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture$AsyncAcceptBoth") - static final class AsyncAcceptBoth { - - @NewField - private Token asyncToken; - - AsyncAcceptBoth(T arg1, U arg2, BiConsumer fn, CompletableFuture dst) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted() && AgentBridge.getAgent().getTracedMethod().trackChildThreads()) { - this.asyncToken = tx.getToken(); - } - } - - @Trace(async = true, excludeFromTransactionTrace = true) - public final boolean exec() { - if (null != this.asyncToken) { - if (this.asyncToken.link()) { - TracedMethod tm = (TracedMethod) AgentBridge.getAgent().getTransaction().getTracedMethod(); - tm.setMetricName("Java", "CompletableFuture", "AsyncAcceptBoth", "exec"); - } - this.asyncToken.expire(); - this.asyncToken = null; - } - return Weaver.callOriginal(); - } + private static boolean noParallelism(Executor e) { + return (e == ForkJoinPool.commonPool() && + ForkJoinPool.getCommonPoolParallelism() <= 1); } - @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture$AsyncCompose") - static final class AsyncCompose { - - @NewField - private Token asyncToken; - - AsyncCompose(T arg, Function> fn, CompletableFuture dst) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted() && AgentBridge.getAgent().getTracedMethod().trackChildThreads()) { - this.asyncToken = tx.getToken(); - } - } - - @Trace(async = true, excludeFromTransactionTrace = true) - public final boolean exec() { - if (null != this.asyncToken) { - if (this.asyncToken.link()) { - TracedMethod tm = (TracedMethod) AgentBridge.getAgent().getTransaction().getTracedMethod(); - tm.setMetricName("Java", "CompletableFuture", "AsyncCompose", "exec"); - } - this.asyncToken.expire(); - this.asyncToken = null; - } - return Weaver.callOriginal(); + private static Executor useTokenDelegateExecutor(Executor e) { + if (null == e || e instanceof TokenDelegateExecutor) { + return e; + } else { + return new TokenDelegateExecutor(e); } } - @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture$AsyncWhenComplete") - static final class AsyncWhenComplete { - - @NewField - private Token asyncToken; - - AsyncWhenComplete(T arg1, Throwable arg2, BiConsumer fn, - CompletableFuture dst) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted() && AgentBridge.getAgent().getTracedMethod().trackChildThreads()) { - this.asyncToken = tx.getToken(); - } - } - - @Trace(async = true, excludeFromTransactionTrace = true) - public final boolean exec() { - if (null != this.asyncToken) { - if (this.asyncToken.link()) { - TracedMethod tm = (TracedMethod) AgentBridge.getAgent().getTransaction().getTracedMethod(); - tm.setMetricName("Java", "CompletableFuture", "AsyncWhenComplete", "exec"); - } - this.asyncToken.expire(); - this.asyncToken = null; - } - return Weaver.callOriginal(); + static void execAsync(Executor e, CompletableFuture_Instrumentation.Async r) { + if (noParallelism(e)) { + new Thread(new TokenAwareRunnable(r)).start(); + } else { + Executor tde = useTokenDelegateExecutor(e); + if (null != tde) { + tde.execute(r); } + } } } diff --git a/instrumentation/java.completable-future-jdk8/src/main/java/util/TokenAndRefUtils.java b/instrumentation/java.completable-future-jdk8/src/main/java/util/TokenAndRefUtils.java new file mode 100644 index 0000000000..c2cf202439 --- /dev/null +++ b/instrumentation/java.completable-future-jdk8/src/main/java/util/TokenAndRefUtils.java @@ -0,0 +1,51 @@ +package util; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.agent.bridge.Transaction; + +import java.text.MessageFormat; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; + +public class TokenAndRefUtils { + + public static AgentBridge.TokenAndRefCount getThreadTokenAndRefCount() { + AgentBridge.TokenAndRefCount tokenAndRefCount = AgentBridge.activeToken.get(); + if (tokenAndRefCount == null) { + Transaction tx = AgentBridge.getAgent().getTransaction(false); + if (tx != null) { + tokenAndRefCount = new AgentBridge.TokenAndRefCount(tx.getToken(), + AgentBridge.getAgent().getTracedMethod(), new AtomicInteger(1)); + } + } else { + tokenAndRefCount.refCount.incrementAndGet(); + } + return tokenAndRefCount; + } + + public static void setThreadTokenAndRefCount(AgentBridge.TokenAndRefCount tokenAndRefCount) { + if (tokenAndRefCount != null) { + AgentBridge.activeToken.set(tokenAndRefCount); + tokenAndRefCount.token.link(); + } + } + + public static void clearThreadTokenAndRefCountAndTxn(AgentBridge.TokenAndRefCount tokenAndRefCount) { + AgentBridge.activeToken.remove(); + if (tokenAndRefCount != null && tokenAndRefCount.refCount.decrementAndGet() == 0) { + tokenAndRefCount.token.expire(); + tokenAndRefCount.token = null; + } + } + + public static void logTokenInfo(AgentBridge.TokenAndRefCount tokenAndRefCount, String msg) { + if (AgentBridge.getAgent().getLogger().isLoggable(Level.FINEST)) { + String tokenMsg = (tokenAndRefCount != null && tokenAndRefCount.token != null) + ? String.format("[%s:%s:%d]", tokenAndRefCount.token, tokenAndRefCount.token.getTransaction(), + tokenAndRefCount.refCount.get()) + : "[Empty token]"; + AgentBridge.getAgent().getLogger().log(Level.FINEST, MessageFormat.format("{0}: token info {1}", tokenMsg, msg)); + } + } + +} diff --git a/instrumentation/java.completable-future-jdk8/src/main/java/util/TokenAwareRunnable.java b/instrumentation/java.completable-future-jdk8/src/main/java/util/TokenAwareRunnable.java new file mode 100644 index 0000000000..0e6f1f3ee5 --- /dev/null +++ b/instrumentation/java.completable-future-jdk8/src/main/java/util/TokenAwareRunnable.java @@ -0,0 +1,35 @@ +package util; + +import com.newrelic.agent.bridge.AgentBridge; + +import static util.TokenAndRefUtils.clearThreadTokenAndRefCountAndTxn; +import static util.TokenAndRefUtils.getThreadTokenAndRefCount; +import static util.TokenAndRefUtils.logTokenInfo; +import static util.TokenAndRefUtils.setThreadTokenAndRefCount; + +public final class TokenAwareRunnable implements Runnable { + private final Runnable delegate; + + private AgentBridge.TokenAndRefCount tokenAndRefCount; + + public TokenAwareRunnable(Runnable delegate) { + this.delegate = delegate; + //get token state from calling Thread + this.tokenAndRefCount = getThreadTokenAndRefCount(); + logTokenInfo(tokenAndRefCount, "TokenAwareRunnable token info set"); + } + + @Override + public void run() { + try { + if (delegate != null) { + logTokenInfo(tokenAndRefCount, "Token info set in thread"); + setThreadTokenAndRefCount(tokenAndRefCount); + delegate.run(); + } + } finally { + logTokenInfo(tokenAndRefCount, "Clearing token info from thread "); + clearThreadTokenAndRefCountAndTxn(tokenAndRefCount); + } + } +} diff --git a/instrumentation/java.completable-future-jdk8/src/main/java/util/TokenDelegateExecutor.java b/instrumentation/java.completable-future-jdk8/src/main/java/util/TokenDelegateExecutor.java new file mode 100644 index 0000000000..ab16289a1e --- /dev/null +++ b/instrumentation/java.completable-future-jdk8/src/main/java/util/TokenDelegateExecutor.java @@ -0,0 +1,17 @@ +package util; + +import java.util.concurrent.Executor; + +public class TokenDelegateExecutor implements Executor { + public final Executor delegate; + + public TokenDelegateExecutor(final Executor delegate) { + this.delegate = delegate; + } + + @Override + public void execute(Runnable runnable) { + runnable = new TokenAwareRunnable(runnable); + delegate.execute(runnable); + } +} diff --git a/instrumentation/java.completable-future-jdk8u40/README.md b/instrumentation/java.completable-future-jdk8u40/README.md new file mode 100644 index 0000000000..40592b6c2f --- /dev/null +++ b/instrumentation/java.completable-future-jdk8u40/README.md @@ -0,0 +1,28 @@ +# java.completable-future-jdk8u40 + +This instrumentation weaves `java.util.concurrent.CompletableFuture` to trace code execution across asynchronous boundaries. + +## How it works + +When `CompletableFuture` methods (e.g. `uniApplyStage`, `biApplyStage`, `orApplyStage`, `asyncRunStage`, etc) are invoked, a `TokenDelegateExecutor` +is initialized and used to wrap the `Executor` argument that was passed to executing method. When `TokenDelegateExecutor.execute(Runnable runnable)` is +invoked it will initialize and store a `TokenAwareRunnable` that wraps the `Runnable` argument passed to `Executor`. + +The `TokenAwareRunnable` uses `TokenAndRefUtils` to get a `TokenAndRefCount`, if one exists, for the current `Thread`. Otherwise, it creates +a new `TokenAndRefCount`. The `TokenAndRefCount` stores a `Token` that can be used to link asynchronous `Threads` together and tracks the number of incoming references to the `Token`. +When `TokenAwareRunnable.run()` is invoked the stored `Token` is linked on the executing `Thread` and finally the `Token` is expired when `run()` completes, +allowing the `Transaction` to complete. + +## Logging + +This instrumentation will produce entries such as the following when searching the logs for keywords `token info`: + +``` +2022-01-07T17:22:03,481-0800 [53655 270] com.newrelic FINEST: [Empty token]: token info TokenAwareRunnable token info set +2022-01-07T17:22:03,482-0800 [53655 270] com.newrelic FINEST: [Empty token]: token info Token info set in thread +2022-01-07T17:22:03,482-0800 [53655 270] com.newrelic FINEST: [Empty token]: token info Clearing token info from thread +``` + +## Testing + +See the following functional tests: `newrelic-java-agent/functional_test/src/test/java/test/newrelic/test/agent/CompletableFutureTest.java` \ No newline at end of file diff --git a/instrumentation/java.completable-future-jdk8u40/src/main/java/java/util/concurrent/CompletableFuture_Instrumentation.java b/instrumentation/java.completable-future-jdk8u40/src/main/java/java/util/concurrent/CompletableFuture_Instrumentation.java index ebb3d62623..e0c28e82a9 100644 --- a/instrumentation/java.completable-future-jdk8u40/src/main/java/java/util/concurrent/CompletableFuture_Instrumentation.java +++ b/instrumentation/java.completable-future-jdk8u40/src/main/java/java/util/concurrent/CompletableFuture_Instrumentation.java @@ -7,153 +7,111 @@ package java.util.concurrent; -import com.newrelic.agent.bridge.AgentBridge; -import com.newrelic.agent.bridge.TracedMethod; -import com.newrelic.agent.bridge.Transaction; -import com.newrelic.api.agent.Token; -import com.newrelic.api.agent.Trace; import com.newrelic.api.agent.weaver.MatchType; -import com.newrelic.api.agent.weaver.NewField; import com.newrelic.api.agent.weaver.Weave; import com.newrelic.api.agent.weaver.Weaver; +import util.TokenDelegateExecutor; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.Supplier; @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture") public class CompletableFuture_Instrumentation { - @NewField - public Token completableToken; - - @Weave(type = MatchType.BaseClass, originalName = "java.util.concurrent.CompletableFuture$UniCompletion") - abstract static class UniCompletion { - - CompletableFuture_Instrumentation dep = Weaver.callOriginal(); - - UniCompletion(Executor executor, CompletableFuture_Instrumentation dep, CompletableFuture_Instrumentation src) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted() && AgentBridge.getAgent().getTracedMethod().trackChildThreads()) { - if (dep.completableToken == null) { - dep.completableToken = tx.getToken(); - } - } - } - - @Trace(async = true, excludeFromTransactionTrace = true) - CompletableFuture_Instrumentation tryFire(int mode) { - if (null != dep.completableToken) { - if (dep.completableToken.link()) { - TracedMethod tm = (TracedMethod) AgentBridge.getAgent().getTransaction().getTracedMethod(); - tm.setMetricName("Java", "CompletableFuture", "Completion", "tryFire"); - } - } - CompletableFuture_Instrumentation future = Weaver.callOriginal(); - return future; + private static Executor useTokenDelegateExecutor(Executor e) { + if (null == e || e instanceof TokenDelegateExecutor) { + return e; + } else { + return new TokenDelegateExecutor(e); } } - /* - * The following methods are all the possible internal completion methods - * that allow us to know when this CompletableFuture is done so we can expire - * any tokens that we've created and used. - */ - - final boolean internalComplete(Object r) { - boolean result = Weaver.callOriginal(); - finishCompletableFuture(); - return result; + private CompletableFuture uniApplyStage( + Executor e, Function f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); } - final boolean completeNull() { - boolean result = Weaver.callOriginal(); - finishCompletableFuture(); - return result; + private CompletableFuture uniAcceptStage(Executor e, + Consumer f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); } - final boolean completeValue(T t) { - boolean result = Weaver.callOriginal(); - finishCompletableFuture(); - return result; + private CompletableFuture uniRunStage(Executor e, Runnable f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); } - final boolean completeThrowable(Throwable x) { - boolean result = Weaver.callOriginal(); - finishCompletableFuture(); - return result; + private CompletableFuture uniWhenCompleteStage( + Executor e, BiConsumer f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); } - final boolean completeThrowable(Throwable x, Object r) { - boolean result = Weaver.callOriginal(); - finishCompletableFuture(); - return result; + private CompletableFuture uniHandleStage( + Executor e, BiFunction f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); } - final boolean completeRelay(Object r) { - boolean result = Weaver.callOriginal(); - finishCompletableFuture(); - return result; + private CompletableFuture uniComposeStage( + Executor e, Function> f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); } - /** - * Expire any tokens that we've created and used on this CompletableFuture since it is now finished executing - */ - private void finishCompletableFuture() { - if (this.completableToken != null) { - this.completableToken.expire(); - this.completableToken = null; - } + private CompletableFuture biApplyStage( + Executor e, CompletionStage o, + BiFunction f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); } - @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture$AsyncRun") - static final class AsyncRun { - - @NewField - private Token asyncToken; + private CompletableFuture biAcceptStage( + Executor e, CompletionStage o, + BiConsumer f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); + } - AsyncRun(CompletableFuture_Instrumentation dep, Runnable fn) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted() && AgentBridge.getAgent().getTracedMethod().trackChildThreads()) { - this.asyncToken = tx.getToken(); - } - } + private CompletableFuture biRunStage(Executor e, CompletionStage o, + Runnable f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); + } - @Trace(async = true, excludeFromTransactionTrace = true) - public void run() { - if (null != this.asyncToken) { - if (this.asyncToken.linkAndExpire()) { - TracedMethod tm = (TracedMethod) AgentBridge.getAgent().getTransaction().getTracedMethod(); - tm.setMetricName("Java", "CompletableFuture", "AsyncRun", "run"); - } - this.asyncToken = null; - } - Weaver.callOriginal(); - } + private CompletableFuture orApplyStage( + Executor e, CompletionStage o, + Function f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); } - @Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture$AsyncSupply") - static final class AsyncSupply { + private CompletableFuture orAcceptStage( + Executor e, CompletionStage o, Consumer f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); + } - @NewField - private Token asyncToken; + private CompletableFuture orRunStage(Executor e, CompletionStage o, + Runnable f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); + } - AsyncSupply(CompletableFuture_Instrumentation dep, Supplier fn) { - Transaction tx = AgentBridge.getAgent().getTransaction(false); - if (tx != null && tx.isStarted() && AgentBridge.getAgent().getTracedMethod().trackChildThreads()) { - this.asyncToken = tx.getToken(); - } - } + static CompletableFuture asyncSupplyStage(Executor e, + Supplier f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); + } - @Trace(async = true, excludeFromTransactionTrace = true) - public void run() { - if (null != this.asyncToken) { - if (this.asyncToken.linkAndExpire()) { - TracedMethod tm = (TracedMethod) AgentBridge.getAgent().getTransaction().getTracedMethod(); - tm.setMetricName("Java", "CompletableFuture", "AsyncSupply", "run"); - } - this.asyncToken = null; - } - Weaver.callOriginal(); - } + static CompletableFuture asyncRunStage(Executor e, Runnable f) { + e = useTokenDelegateExecutor(e); + return Weaver.callOriginal(); } } diff --git a/instrumentation/java.completable-future-jdk8u40/src/main/java/util/TokenAndRefUtils.java b/instrumentation/java.completable-future-jdk8u40/src/main/java/util/TokenAndRefUtils.java new file mode 100644 index 0000000000..c2cf202439 --- /dev/null +++ b/instrumentation/java.completable-future-jdk8u40/src/main/java/util/TokenAndRefUtils.java @@ -0,0 +1,51 @@ +package util; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.agent.bridge.Transaction; + +import java.text.MessageFormat; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; + +public class TokenAndRefUtils { + + public static AgentBridge.TokenAndRefCount getThreadTokenAndRefCount() { + AgentBridge.TokenAndRefCount tokenAndRefCount = AgentBridge.activeToken.get(); + if (tokenAndRefCount == null) { + Transaction tx = AgentBridge.getAgent().getTransaction(false); + if (tx != null) { + tokenAndRefCount = new AgentBridge.TokenAndRefCount(tx.getToken(), + AgentBridge.getAgent().getTracedMethod(), new AtomicInteger(1)); + } + } else { + tokenAndRefCount.refCount.incrementAndGet(); + } + return tokenAndRefCount; + } + + public static void setThreadTokenAndRefCount(AgentBridge.TokenAndRefCount tokenAndRefCount) { + if (tokenAndRefCount != null) { + AgentBridge.activeToken.set(tokenAndRefCount); + tokenAndRefCount.token.link(); + } + } + + public static void clearThreadTokenAndRefCountAndTxn(AgentBridge.TokenAndRefCount tokenAndRefCount) { + AgentBridge.activeToken.remove(); + if (tokenAndRefCount != null && tokenAndRefCount.refCount.decrementAndGet() == 0) { + tokenAndRefCount.token.expire(); + tokenAndRefCount.token = null; + } + } + + public static void logTokenInfo(AgentBridge.TokenAndRefCount tokenAndRefCount, String msg) { + if (AgentBridge.getAgent().getLogger().isLoggable(Level.FINEST)) { + String tokenMsg = (tokenAndRefCount != null && tokenAndRefCount.token != null) + ? String.format("[%s:%s:%d]", tokenAndRefCount.token, tokenAndRefCount.token.getTransaction(), + tokenAndRefCount.refCount.get()) + : "[Empty token]"; + AgentBridge.getAgent().getLogger().log(Level.FINEST, MessageFormat.format("{0}: token info {1}", tokenMsg, msg)); + } + } + +} diff --git a/instrumentation/java.completable-future-jdk8u40/src/main/java/util/TokenAwareRunnable.java b/instrumentation/java.completable-future-jdk8u40/src/main/java/util/TokenAwareRunnable.java new file mode 100644 index 0000000000..0e6f1f3ee5 --- /dev/null +++ b/instrumentation/java.completable-future-jdk8u40/src/main/java/util/TokenAwareRunnable.java @@ -0,0 +1,35 @@ +package util; + +import com.newrelic.agent.bridge.AgentBridge; + +import static util.TokenAndRefUtils.clearThreadTokenAndRefCountAndTxn; +import static util.TokenAndRefUtils.getThreadTokenAndRefCount; +import static util.TokenAndRefUtils.logTokenInfo; +import static util.TokenAndRefUtils.setThreadTokenAndRefCount; + +public final class TokenAwareRunnable implements Runnable { + private final Runnable delegate; + + private AgentBridge.TokenAndRefCount tokenAndRefCount; + + public TokenAwareRunnable(Runnable delegate) { + this.delegate = delegate; + //get token state from calling Thread + this.tokenAndRefCount = getThreadTokenAndRefCount(); + logTokenInfo(tokenAndRefCount, "TokenAwareRunnable token info set"); + } + + @Override + public void run() { + try { + if (delegate != null) { + logTokenInfo(tokenAndRefCount, "Token info set in thread"); + setThreadTokenAndRefCount(tokenAndRefCount); + delegate.run(); + } + } finally { + logTokenInfo(tokenAndRefCount, "Clearing token info from thread "); + clearThreadTokenAndRefCountAndTxn(tokenAndRefCount); + } + } +} diff --git a/instrumentation/java.completable-future-jdk8u40/src/main/java/util/TokenDelegateExecutor.java b/instrumentation/java.completable-future-jdk8u40/src/main/java/util/TokenDelegateExecutor.java new file mode 100644 index 0000000000..ab16289a1e --- /dev/null +++ b/instrumentation/java.completable-future-jdk8u40/src/main/java/util/TokenDelegateExecutor.java @@ -0,0 +1,17 @@ +package util; + +import java.util.concurrent.Executor; + +public class TokenDelegateExecutor implements Executor { + public final Executor delegate; + + public TokenDelegateExecutor(final Executor delegate) { + this.delegate = delegate; + } + + @Override + public void execute(Runnable runnable) { + runnable = new TokenAwareRunnable(runnable); + delegate.execute(runnable); + } +}