Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Completable Future JDK11 #1908

Merged
merged 10 commits into from
May 23, 2024
44 changes: 44 additions & 0 deletions instrumentation/java.completable-future-jdk11/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# java.completable-future-jdk11

This instrumentation weaves `java.util.concurrent.CompletableFuture` to trace code execution across asynchronous boundaries.

## JDK11 Updates

This instrumentation applies to JRE 11 and higher. This was necessary in order to add support for the method
`completeAsync` (introduced with Java 9).

JREs 8, 9 and 10 use the previous version of this module, `java.completable-future-jdk8u40`.

The instrumentation is otherwise the same as the `java.completable-future-jdk8u40`, and
works as described below.

## How it works

When `CompletableFuture` methods (e.g. `uniApplyStage`, `biApplyStage`, `orApplyStage`, `asyncRunStage`, etc) are invoked, a `TokenDelegateExecutor`
is initialized and used to wrap the `Executor` argument that was passed to executing method. When `TokenDelegateExecutor.execute(Runnable runnable)` is
invoked it will initialize and store a `TokenAwareRunnable` that wraps the `Runnable` argument passed to `Executor`.

The `TokenAwareRunnable` uses `TokenAndRefUtils` to get a `TokenAndRefCount`, if one exists, for the current `Thread`. Otherwise, it creates
a new `TokenAndRefCount`. The `TokenAndRefCount` stores a `Token` that can be used to link asynchronous `Threads` together and tracks the number of incoming references to the `Token`.
When `TokenAwareRunnable.run()` is invoked the stored `Token` is linked on the executing `Thread` and finally the `Token` is expired when `run()` completes,
allowing the `Transaction` to complete.

## Logging

This instrumentation will produce entries such as the following when searching the logs for keywords `token info`:

```
2022-01-07T17:22:03,481-0800 [53655 270] com.newrelic FINEST: [Empty token]: token info TokenAwareRunnable token info set
2022-01-07T17:22:03,482-0800 [53655 270] com.newrelic FINEST: [Empty token]: token info Token info set in thread
2022-01-07T17:22:03,482-0800 [53655 270] com.newrelic FINEST: [Empty token]: token info Clearing token info from thread
```

## Testing

Like all other modules instrumenting JDK classes, this module does not have instrumentation tests within the module
(this is a limitation of the introspector). Additionally, because this module is built with Java 11, it cannot otherwise
be tested in the functional tests.

To work around these limitations, tests for this module have been placed in the `java-agent-integration-tests` project.
Their content is similar to the existing CompletableFuture tests in
`newrelic-java-agent/functional_test/src/test/java/test/newrelic/test/agent/CompletableFutureTest.java`.
26 changes: 26 additions & 0 deletions instrumentation/java.completable-future-jdk11/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
dependencies {
implementation(project(":agent-bridge"))
}

// This instrumentation module should not use the bootstrap classpath


jar {
manifest { attributes 'Implementation-Title': 'com.newrelic.instrumentation.java.completable-future-jdk11' }
}

verifyInstrumentation {
verifyClasspath = false // We don't want to verify classpath since these are JDK classes
}

java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(11))
}
}

site {
title 'Java Completable futures'
type 'Other'
versionOverride '[11,)'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/

package nr.java.util.concurrent;

import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import util.TokenDelegateExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

@Weave(type = MatchType.ExactClass, originalName = "java.util.concurrent.CompletableFuture")
public class CompletableFuture_Instrumentation<T> {

private static Executor useTokenDelegateExecutor(Executor e) {
if (null == e || e instanceof TokenDelegateExecutor) {
return e;
} else {
return new TokenDelegateExecutor(e);
}
}

private <V> CompletableFuture<V> uniApplyStage(
Executor e, Function<? super T, ? extends V> f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

private CompletableFuture<Void> uniAcceptStage(Executor e,
Consumer<? super T> f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

private CompletableFuture<T> uniWhenCompleteStage(
Executor e, BiConsumer<? super T, ? super Throwable> f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

private <V> CompletableFuture<V> uniHandleStage(
Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

private <V> CompletableFuture<V> uniComposeStage(
Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

private <U, V> CompletableFuture<V> biApplyStage(
Executor e, CompletionStage<U> o,
BiFunction<? super T, ? super U, ? extends V> f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

private <U> CompletableFuture<Void> biAcceptStage(
Executor e, CompletionStage<U> o,
BiConsumer<? super T, ? super U> f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o,
Runnable f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

private <U extends T, V> CompletableFuture<V> orApplyStage(
Executor e, CompletionStage<U> o,
Function<? super T, ? extends V> f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

private <U extends T> CompletableFuture<Void> orAcceptStage(
Executor e, CompletionStage<U> o, Consumer<? super T> f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o,
Runnable f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
Supplier<U> f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}

//available since JDK 9
public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, Executor e) {
e = useTokenDelegateExecutor(e);
return Weaver.callOriginal();
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package util;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.agent.bridge.Transaction;

import java.text.MessageFormat;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;

public class TokenAndRefUtils {

public static AgentBridge.TokenAndRefCount getThreadTokenAndRefCount() {
AgentBridge.TokenAndRefCount tokenAndRefCount = AgentBridge.activeToken.get();
if (tokenAndRefCount == null) {
Transaction tx = AgentBridge.getAgent().getTransaction(false);
if (tx != null) {
tokenAndRefCount = new AgentBridge.TokenAndRefCount(tx.getToken(),
AgentBridge.getAgent().getTracedMethod(), new AtomicInteger(1));
}
} else {
tokenAndRefCount.refCount.incrementAndGet();
}
return tokenAndRefCount;
}

public static Transaction getTransaction(AgentBridge.TokenAndRefCount tokenAndRefCount) {
if(tokenAndRefCount != null && tokenAndRefCount.token != null) {
return (Transaction) tokenAndRefCount.token.getTransaction();
} else {
return null;
}
}

public static void setThreadTokenAndRefCount(AgentBridge.TokenAndRefCount tokenAndRefCount, Transaction transaction) {
if (tokenAndRefCount != null && tokenAndRefCount.token != null) {
AgentBridge.activeToken.set(tokenAndRefCount);
tokenAndRefCount.token.link();
} else if(tokenAndRefCount != null && transaction != null) {
tokenAndRefCount.token = transaction.getToken();
tokenAndRefCount.token.link();
tokenAndRefCount.refCount = new AtomicInteger(1);
}
}

public static void clearThreadTokenAndRefCountAndTxn(AgentBridge.TokenAndRefCount tokenAndRefCount) {
AgentBridge.activeToken.remove();
if (tokenAndRefCount != null && tokenAndRefCount.refCount.decrementAndGet() == 0) {
tokenAndRefCount.token.expire();
tokenAndRefCount.token = null;
}
}

public static void logTokenInfo(AgentBridge.TokenAndRefCount tokenAndRefCount, String msg) {
if (AgentBridge.getAgent().getLogger().isLoggable(Level.FINEST)) {
String tokenMsg = (tokenAndRefCount != null && tokenAndRefCount.token != null)
? String.format("[%s:%s:%d]", tokenAndRefCount.token, tokenAndRefCount.token.getTransaction(),
tokenAndRefCount.refCount.get())
: "[Empty token]";
AgentBridge.getAgent().getLogger().log(Level.FINEST, MessageFormat.format("{0}: token info {1}", tokenMsg, msg));
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package util;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.agent.bridge.Transaction;

import static util.TokenAndRefUtils.*;

public final class TokenAwareRunnable implements Runnable {
private final Runnable delegate;

private AgentBridge.TokenAndRefCount tokenAndRefCount;
private Transaction transaction;

public TokenAwareRunnable(Runnable delegate) {
this.delegate = delegate;
//get token state from calling Thread
this.tokenAndRefCount = getThreadTokenAndRefCount();
this.transaction = getTransaction(tokenAndRefCount);
logTokenInfo(tokenAndRefCount, "TokenAwareRunnable token info set");
}

@Override
public void run() {
try {
if (delegate != null) {
logTokenInfo(tokenAndRefCount, "Token info set in thread");
setThreadTokenAndRefCount(tokenAndRefCount, transaction);
delegate.run();
}
} finally {
logTokenInfo(tokenAndRefCount, "Clearing token info from thread ");
clearThreadTokenAndRefCountAndTxn(tokenAndRefCount);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package util;

import java.util.concurrent.Executor;

public class TokenDelegateExecutor implements Executor {
public final Executor delegate;

public TokenDelegateExecutor(final Executor delegate) {
this.delegate = delegate;
}

@Override
public void execute(Runnable runnable) {
runnable = new TokenAwareRunnable(runnable);
delegate.execute(runnable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package skip;

import com.newrelic.api.agent.weaver.Weave;

/**
* * This Weave class instructs JREs 11 and up to skip this instrumentation module,
* * and use java.completable-future-jdk11 instead.
* * javax.security.auth.Policy was chosen because it was removed in Java 11.
*/
@Weave(originalName = "javax.security.auth.Policy")
public class Skip_SecurityPolicy {
//This does nothing
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public final class WeaveUtils {
private static int getRuntimeMaxSupportedClassVersion() {
try {
double jvmSpecVersion = Double.parseDouble(System.getProperty("java.specification.version"));
if (jvmSpecVersion >= 11) {
if (jvmSpecVersion >= 9) {
return (int) jvmSpecVersion + CLASS_FILE_VERSION_OFFSET;
} else if (jvmSpecVersion >= 1.8) {
return 52;
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ include 'instrumentation:hystrix-1.4'
include 'instrumentation:jakarta.xml'
include 'instrumentation:java.completable-future-jdk8'
include 'instrumentation:java.completable-future-jdk8u40'
include 'instrumentation:java.completable-future-jdk11'
include 'instrumentation:java.logging-jdk8'
include 'instrumentation:java.process'
include 'instrumentation:java-io'
Expand Down
Loading