Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix native-image generation of reactive applications #8012

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,47 +1,27 @@
package datadog.trace.bootstrap.instrumentation.decorator;

import static java.util.Collections.singletonList;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtension;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;

/**
* This decorator handles asynchronous result types, finishing spans only when the async calls are
* complete. The different async types are supported using {@link AsyncResultSupportExtension} that
* should be registered using {@link #registerExtension(AsyncResultSupportExtension)} first.
* complete. The different async types are supported using {@link AsyncResultExtension} that should
* be registered using {@link AsyncResultExtensions#register(AsyncResultExtension)} first.
*/
public abstract class AsyncResultDecorator extends BaseDecorator {
private static final CopyOnWriteArrayList<AsyncResultSupportExtension> EXTENSIONS =
new CopyOnWriteArrayList<>(
singletonList(new JavaUtilConcurrentAsyncResultSupportExtension()));

private static final ClassValue<AsyncResultSupportExtension> EXTENSION_CLASS_VALUE =
new ClassValue<AsyncResultSupportExtension>() {
private static final ClassValue<AsyncResultExtension> EXTENSION_CLASS_VALUE =
new ClassValue<AsyncResultExtension>() {
@Override
protected AsyncResultSupportExtension computeValue(Class<?> type) {
return EXTENSIONS.stream()
protected AsyncResultExtension computeValue(Class<?> type) {
return AsyncResultExtensions.registered().stream()
.filter(extension -> extension.supports(type))
.findFirst()
.orElse(null);
}
};

/**
* Registers an extension to add supported async types.
*
* @param extension The extension to register.
*/
public static void registerExtension(AsyncResultSupportExtension extension) {
if (extension != null) {
EXTENSIONS.add(extension);
}
}

/**
* Look for asynchronous result and decorate it with span finisher. If the result is not
* asynchronous, it will be return unmodified and span will be finished.
Expand All @@ -53,7 +33,7 @@ public static void registerExtension(AsyncResultSupportExtension extension) {
*/
public Object wrapAsyncResultOrFinishSpan(
final Object result, final Class<?> methodReturnType, final AgentSpan span) {
AsyncResultSupportExtension extension;
AsyncResultExtension extension;
if (result != null && (extension = EXTENSION_CLASS_VALUE.get(methodReturnType)) != null) {
Object applied = extension.apply(result, span);
if (applied != null) {
Expand All @@ -64,63 +44,4 @@ public Object wrapAsyncResultOrFinishSpan(
span.finish();
return result;
}

/**
* This interface defines asynchronous result type support extension. It allows deferring the
* support implementations where types are available on classpath.
*/
public interface AsyncResultSupportExtension {
/**
* Checks whether this extensions support a result type.
*
* @param result The result type to check.
* @return {@code true} if the type is supported by this extension, {@code false} otherwise.
*/
boolean supports(Class<?> result);

/**
* Applies the extension to the async result.
*
* @param result The async result.
* @param span The related span.
* @return The result object to return (can be the original result if not modified), or {@code
* null} if the extension could not be applied.
*/
Object apply(Object result, AgentSpan span);
}

private static class JavaUtilConcurrentAsyncResultSupportExtension
implements AsyncResultSupportExtension {
@Override
public boolean supports(Class<?> result) {
return CompletableFuture.class.isAssignableFrom(result)
|| CompletionStage.class.isAssignableFrom(result);
}

@Override
public Object apply(Object result, AgentSpan span) {
if (result instanceof CompletableFuture<?>) {
CompletableFuture<?> completableFuture = (CompletableFuture<?>) result;
if (!completableFuture.isDone() && !completableFuture.isCancelled()) {
return completableFuture.whenComplete(finishSpan(span));
}
} else if (result instanceof CompletionStage<?>) {
CompletionStage<?> completionStage = (CompletionStage<?>) result;
return completionStage.whenComplete(finishSpan(span));
}
return null;
}

private <T> BiConsumer<T, Throwable> finishSpan(AgentSpan span) {
return (o, throwable) -> {
if (throwable != null) {
span.addThrowable(
throwable instanceof ExecutionException || throwable instanceof CompletionException
? throwable.getCause()
: throwable);
}
span.finish();
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package datadog.trace.bootstrap.instrumentation.java.concurrent;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;

/**
* This interface defines asynchronous result type support extension. It allows deferring the
* support implementations where types are available on classpath.
*/
public interface AsyncResultExtension {
/**
* Checks whether this extensions support a result type.
*
* @param result The result type to check.
* @return {@code true} if the type is supported by this extension, {@code false} otherwise.
*/
boolean supports(Class<?> result);

/**
* Applies the extension to the async result.
*
* @param result The async result.
* @param span The related span.
* @return The result object to return (can be the original result if not modified), or {@code
* null} if the extension could not be applied.
*/
Object apply(Object result, AgentSpan span);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package datadog.trace.bootstrap.instrumentation.java.concurrent;

import static java.util.Collections.singletonList;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;

public final class AsyncResultExtensions {
private static final List<AsyncResultExtension> EXTENSIONS =
new CopyOnWriteArrayList<>(singletonList(new CompletableAsyncResultExtension()));

/**
* Registers an extension to add supported async types.
*
* @param extension The extension to register.
*/
public static void register(AsyncResultExtension extension) {
if (extension != null) {
EXTENSIONS.add(extension);
}
}

/** Returns the list of currently registered extensions. */
public static List<AsyncResultExtension> registered() {
return EXTENSIONS;
}

static final class CompletableAsyncResultExtension implements AsyncResultExtension {
@Override
public boolean supports(Class<?> result) {
return CompletableFuture.class.isAssignableFrom(result)
|| CompletionStage.class.isAssignableFrom(result);
}

@Override
public Object apply(Object result, AgentSpan span) {
if (result instanceof CompletableFuture<?>) {
CompletableFuture<?> completableFuture = (CompletableFuture<?>) result;
if (!completableFuture.isDone() && !completableFuture.isCancelled()) {
return completableFuture.whenComplete(finishSpan(span));
}
} else if (result instanceof CompletionStage<?>) {
CompletionStage<?> completionStage = (CompletionStage<?>) result;
return completionStage.whenComplete(finishSpan(span));
}
return null;
}

private <T> BiConsumer<T, Throwable> finishSpan(AgentSpan span) {
return (o, throwable) -> {
if (throwable != null) {
span.addThrowable(
throwable instanceof ExecutionException || throwable instanceof CompletionException
? throwable.getCause()
: throwable);
}
span.finish();
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public static void onEnter(@Advice.Argument(value = 0, readOnly = false) String[
+ "datadog.trace.bootstrap.benchmark.StaticEventLogger:build_time,"
+ "datadog.trace.bootstrap.blocking.BlockingExceptionHandler:build_time,"
+ "datadog.trace.bootstrap.InstrumentationErrors:build_time,"
+ "datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions:rerun,"
+ "datadog.trace.bootstrap.instrumentation.java.concurrent.ConcurrentState:build_time,"
+ "datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter:build_time,"
+ "datadog.trace.bootstrap.instrumentation.java.concurrent.QueueTimeHelper:build_time,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@

import com.google.common.util.concurrent.ListenableFuture;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.AsyncResultDecorator;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtension;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

public class GuavaAsyncResultSupportExtension
implements AsyncResultDecorator.AsyncResultSupportExtension {
public class GuavaAsyncResultExtension implements AsyncResultExtension {
static {
AsyncResultDecorator.registerExtension(new GuavaAsyncResultSupportExtension());
AsyncResultExtensions.register(new GuavaAsyncResultExtension());
}

/**
* Register the extension as an {@link AsyncResultDecorator.AsyncResultSupportExtension} using
* static class initialization.<br>
* Register the extension as an {@link AsyncResultExtension} using static class initialization.
* <br>
* It uses an empty static method call to ensure the class loading and the one-time-only static
* class initialization. This will ensure this extension will only be registered once to the
* {@link AsyncResultDecorator}.
* class initialization. This will ensure this extension will only be registered once under {@link
* AsyncResultExtensions}.
*/
public static void initialize() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public String instrumentedType() {
@Override
public String[] helperClassNames() {
return new String[] {
this.packageName + ".GuavaAsyncResultSupportExtension",
this.packageName + ".GuavaAsyncResultExtension",
};
}

Expand All @@ -57,7 +57,7 @@ public void methodAdvice(MethodTransformer transformer) {
public static class AbstractFutureAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void init() {
GuavaAsyncResultSupportExtension.initialize();
GuavaAsyncResultExtension.initialize();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors

class GuavaAsyncResultSupportExtensionTest extends AgentTestRunner {
class GuavaAsyncResultExtensionTest extends AgentTestRunner {
@Override
void configurePreAgent() {
super.configurePreAgent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ public Map<String, String> contextStore() {
@Override
public String[] helperClassNames() {
return new String[] {
this.packageName + ".ReactiveStreamsAsyncResultSupportExtension",
this.packageName + ".ReactiveStreamsAsyncResultSupportExtension$WrappedPublisher",
this.packageName + ".ReactiveStreamsAsyncResultSupportExtension$WrappedSubscriber",
this.packageName + ".ReactiveStreamsAsyncResultSupportExtension$WrappedSubscription",
this.packageName + ".ReactiveStreamsAsyncResultExtension",
this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedPublisher",
this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscriber",
this.packageName + ".ReactiveStreamsAsyncResultExtension$WrappedSubscription",
};
}

Expand All @@ -82,7 +82,7 @@ public void methodAdvice(MethodTransformer transformer) {
public static class PublisherAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void init() {
ReactiveStreamsAsyncResultSupportExtension.initialize();
ReactiveStreamsAsyncResultExtension.initialize();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package datadog.trace.instrumentation.reactivestreams;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.decorator.AsyncResultDecorator;
import datadog.trace.bootstrap.instrumentation.decorator.AsyncResultDecorator.AsyncResultSupportExtension;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtension;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class ReactiveStreamsAsyncResultSupportExtension implements AsyncResultSupportExtension {
public class ReactiveStreamsAsyncResultExtension implements AsyncResultExtension {
static {
AsyncResultDecorator.registerExtension(new ReactiveStreamsAsyncResultSupportExtension());
AsyncResultExtensions.register(new ReactiveStreamsAsyncResultExtension());
}

/**
* Register the extension as an {@link AsyncResultSupportExtension} using static class
* initialization.<br>
* Register the extension as an {@link AsyncResultExtension} using static class initialization.
* <br>
* It uses an empty static method call to ensure the class loading and the one-time-only static
* class initialization. This will ensure this extension will only be registered once to the
* {@link AsyncResultDecorator}.
* class initialization. This will ensure this extension will only be registered once under {@link
* AsyncResultExtensions}.
*/
public static void initialize() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import datadog.trace.bootstrap.instrumentation.api.Tags

import java.util.concurrent.CountDownLatch

class ReactiveStreamsAsyncResultSupportExtensionTest extends AgentTestRunner {
class ReactiveStreamsAsyncResultExtensionTest extends AgentTestRunner {
@Override
void configurePreAgent() {
super.configurePreAgent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import spock.lang.Shared
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors

class ReactorAsyncResultSupportExtensionTest extends AgentTestRunner {
class ReactorAsyncResultExtensionTest extends AgentTestRunner {
@Override
void configurePreAgent() {
super.configurePreAgent()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public BlockingPublisherInstrumentation() {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".ReactorAsyncResultSupportExtension",
packageName + ".ReactorAsyncResultExtension",
};
}

Expand Down Expand Up @@ -83,7 +83,7 @@ public static void after(@Advice.Enter final AgentScope scope) {
public static class AsyncExtensionInstallAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void init() {
ReactorAsyncResultSupportExtension.initialize();
ReactorAsyncResultExtension.initialize();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried to kick it in on the typeInitializer but it did not work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same - I have some ideas to improve this kind of pre-initialization, but need to run further experiments.

So far one approach which shows promise is to trigger their initialization during the transformation step, similar to how we inject the helpers. However this needs some additional support to work with native-image because the initialization is then not tied to the stock class-loading mechanism, but the transformation mechanism on top of it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 #8028

}
}
}
Loading
Loading