Skip to content

Commit

Permalink
Change the default thread model for RR
Browse files Browse the repository at this point in the history
- If an async type is returned (CompletionStage, Uni, Multi etc) then it
  remains non blocking
- If the method is a Kotlin suspended method it remains non-blocking
- All other methods default to blocking, unless explicitly specified
- @(Non)Blocking on an application class works as normal, and will
  override the method signature based defaults
  • Loading branch information
stuartwdouglas committed Jul 30, 2021
1 parent 3582f13 commit 45d7dbb
Show file tree
Hide file tree
Showing 23 changed files with 137 additions and 35 deletions.
42 changes: 31 additions & 11 deletions docs/src/main/asciidoc/resteasy-reactive.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -963,16 +963,28 @@ The event-loop threads (also called IO threads) are responsible for actually per
operations in an asynchronous way, and to trigger any listener interested in the completion of those
IO operations.

By default, RESTEasy Reactive will run endpoint methods on the event-loop threads, on the assumption that
they are going to be fast and only invoke non-blocking operations.

This is the model of execution that leads to best performance if your endpoints do not do any blocking
operation (such as blocking IO, blocking on an asynchronous operation, or sleeping).

If your endpoint method needs to do any of those blocking operations, you should add the
By default, the thread RESTEasy Reactive will run endpoint methods on depends on the signature of the method.
If a method returns one of the following types then it is considered non-blocking, and will be run on the IO thread
by default:

- `io.smallrye.mutiny.Uni`
- `io.smallrye.mutiny.Multi`
- `java.util.concurrent.CompletionStage`
- `org.reactivestreams.Publisher`
- Kotlin `suspended` methods

This 'best guess' approach means that the majority of operations will run on the correct thread by default. If you are
writing reactive code then your method will generally return one of these types, and will be executed on the IO thread.
If you are writing blocking code your methods will generally return the result directly, and these will be run on a worker
thread.

You can override this behaviour using the
https://javadoc.io/doc/io.smallrye.common/smallrye-common-annotation/1.5.0/io/smallrye/common/annotation/Blocking.html[`@Blocking`]
annotation on your endpoint and it will instead be invoked on a worker thread. Your endpoint method
code can remain exactly the same, and it will be allowed to block:
and
https://javadoc.io/doc/io.smallrye.common/smallrye-common-annotation/1.5.0/io/smallrye/common/annotation/NonBlocking.html[`@NonBlocking`]
annotations. This can be applied at the method, class or `javax.ws.rs.core.Application` level.

The example below will override the default behaviour and always run on a worker thread, even though it returns a `Uni`.

[source,java]
----
Expand All @@ -988,10 +1000,10 @@ public class Endpoint {
@Blocking
@GET
public String blockingHello() throws InterruptedException {
public Uni<String> blockingHello() throws InterruptedException {
// do a blocking operation
Thread.sleep(1000);
return "Yaaaawwwwnnnnnn…";
return Uni.createFrom().item("Yaaaawwwwnnnnnn…");
}
}
----
Expand Down Expand Up @@ -1027,6 +1039,14 @@ If a method or class is annotated with `javax.transaction.Transactional` then it
method. This is because JTA is a blocking technology, and is generally used with other blocking technology such as
Hibernate and JDBC. An explicit `@Blocking` or `@NonBlocking` on the class will override this behaviour.

==== Overriding the default behaviour

If you want to override the default behaviour you can annotate a `javax.ws.rs.core.Application` subclass in your application
with `@Blocking` or `@NonBlocking`, and this will set the default for every method that does not have an explicit annotation.

Behaviour can still be overridden on a class or method level by annotating them directly, however all endpoints without
an annotation will now follow the default, no matter their method signature.

=== Exception mapping

If your application needs to return non-nominal HTTP codes in error cases, the best is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ void setupClientProxies(JaxrsClientReactiveRecorder recorder,
.setInjectableBeans(new HashMap<>())
.setFactoryCreator(new QuarkusFactoryCreator(recorder, beanContainerBuildItem.getValue()))
.setAdditionalWriters(additionalWriters)
.setDefaultBlocking(applicationResultBuildItem.getResult().isBlocking())
.setDefaultBlocking(applicationResultBuildItem.getResult().getBlockingDefault())
.setHasRuntimeConverters(false)
.setDefaultProduces(defaultProducesType)
.setSmartDefaultProduces(disableSmartDefaultProduces.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@

import io.quarkus.resteasy.reactive.jackson.CustomSerialization;
import io.quarkus.runtime.BlockingOperationControl;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.mutiny.Multi;

@Path("/simple")
@NonBlocking
public class SimpleJsonResource extends SuperClass<Person> {

@ServerExceptionMapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
import javax.ws.rs.core.Response;

import io.quarkus.runtime.BlockingOperationControl;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.mutiny.Multi;

@Path("/simple")
@NonBlocking
public class SimpleJsonResource extends SuperClass<Person> {

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ public ParameterExtractor handleCustomParameter(Type paramType, Map<DotName, Ann
}
return null;
}

@Override
public boolean isMethodSignatureAsync(MethodInfo info) {
for (var param : info.parameters()) {
if (param.name().equals(CONTINUATION)) {
return true;
}
}
return false;
}
});
}

Expand Down Expand Up @@ -167,6 +177,11 @@ public List<HandlerChainCustomizer> scan(MethodInfo method, ClassInfo actualEndp
}
return Collections.emptyList();
}

@Override
public boolean isMethodSignatureAsync(MethodInfo info) {
return info.returnType().name().equals(FLOW);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public void setupEndpoints(Capabilities capabilities, BeanArchiveIndexBuildItem
.setHttpAnnotationToMethod(result.getHttpAnnotationToMethod())
.setInjectableBeans(injectableBeans)
.setAdditionalWriters(additionalWriters)
.setDefaultBlocking(appResult.isBlocking())
.setDefaultBlocking(appResult.getBlockingDefault())
.setHasRuntimeConverters(!paramConverterProviders.getParamConverterProviders().isEmpty())
.setClassLevelExceptionMappers(
classLevelExceptionMappers.isPresent() ? classLevelExceptionMappers.get().getMappers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public ParameterExtractor handleCustomParameter(Type paramType, Map<DotName, Ann
}
return null;
}

@Override
public boolean isMethodSignatureAsync(MethodInfo info) {
for (var param : info.parameters()) {
if (param.name().equals(SERVER_WEB_SOCKET)) {
return true;
}
}
return false;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.restassured.http.Headers;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.mutiny.Uni;

public class ValidNonBlockingFiltersTest {

Expand Down Expand Up @@ -79,8 +80,8 @@ public Response blocking(@Context HttpHeaders headers) {

@Path("nonblocking")
@GET
public Response nonblocking(@Context HttpHeaders headers) {
return getResponse(headers);
public Uni<Response> nonblocking(@Context HttpHeaders headers) {
return Uni.createFrom().item(getResponse(headers));
}

private Response getResponse(HttpHeaders headers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import io.quarkus.runtime.BlockingOperationControl;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.NonBlocking;

@Path("/multipart")
public class MultipartResource {
Expand All @@ -24,6 +25,7 @@ public class MultipartResource {
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Path("/simple/{times}")
@NonBlocking
public String simple(@MultipartForm FormData formData, Integer times) {
if (BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should not have dispatched");
Expand All @@ -41,7 +43,7 @@ public String simple(@MultipartForm FormData formData, Integer times) {
@Path("/blocking")
public Response blocking(@DefaultValue("1") @RestQuery Integer times, FormData formData) throws IOException {
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should not have dispatched");
throw new RuntimeException("should have dispatched");
}
return Response.ok(formData.getName() + " - " + times * formData.getNum() + " - " + formData.getStatus())
.header("html-size", formData.getHtmlPart().size())
Expand All @@ -58,8 +60,8 @@ public Response blocking(@DefaultValue("1") @RestQuery Integer times, FormData f
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Path("/same-name")
public String sameName(FormDataSameFileName formData) {
if (BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should not have dispatched");
if (!BlockingOperationControl.isBlockingAllowed()) {
throw new RuntimeException("should have dispatched");
}
return formData.status + " - " + formData.getHtmlFiles().size() + " - " + formData.txtFiles.size() + " - "
+ formData.xmlFiles.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
import org.jboss.resteasy.reactive.multipart.FileUpload;

import io.quarkus.runtime.BlockingOperationControl;
import io.smallrye.common.annotation.NonBlocking;

@Path("/multipart-all")
public class MultipartResourceWithAllUploads {

@POST
@Produces(MediaType.TEXT_PLAIN)
@Consumes(MediaType.MULTIPART_FORM_DATA)
@NonBlocking
@Path("/simple/{times}")
public String simple(@MultipartForm FormDataWithAllUploads formData, Integer times) {
if (BlockingOperationControl.isBlockingAllowed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;
import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.NonBlocking;

public class InterfaceWithImplTest {

Expand All @@ -33,6 +34,7 @@ public void test() {
}

@Path("/hello")
@NonBlocking
public interface Greeting {

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import org.eclipse.microprofile.rest.client.inject.RestClient;

import io.smallrye.common.annotation.Blocking;
import io.smallrye.common.annotation.NonBlocking;

@Path("/non-blocking")
@NonBlocking
public class CallMakingResource {

@RestClient
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package org.jboss.resteasy.reactive.common.processor;

public enum BlockingDefault {
/**
* The nature of the method is determined by the return type
*/
AUTOMATIC,
BLOCKING,
NON_BLOCKING
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public abstract class EndpointIndexer<T extends EndpointIndexer<T, PARAM, METHOD
protected final AdditionalReaders additionalReaders;
private final Map<DotName, String> httpAnnotationToMethod;
private final AdditionalWriters additionalWriters;
private final boolean defaultBlocking;
private final BlockingDefault defaultBlocking;
private final Map<DotName, Map<String, String>> classLevelExceptionMappers;
private final Function<String, BeanFactory<Object>> factoryCreator;
private final Consumer<Map.Entry<MethodInfo, ResourceMethod>> resourceMethodCallback;
Expand Down Expand Up @@ -511,7 +511,9 @@ private ResourceMethod createResourceMethod(ClassInfo currentClassInfo, ClassInf
MethodInfo actualMethodInfo = actualEndpointInfo.method(currentMethodInfo.name(),
currentMethodInfo.parameters().toArray(new Type[0]));
if (actualMethodInfo != null) {
blocking = isBlocking(actualMethodInfo, blocking);
//we don't pass AUTOMATIC here, as the method signature would be the same, so the same determination
//would be reached for a default
blocking = isBlocking(actualMethodInfo, blocking ? BlockingDefault.BLOCKING : BlockingDefault.NON_BLOCKING);
}
}

Expand Down Expand Up @@ -554,7 +556,7 @@ protected void handleClientSubResource(ResourceMethod resourceMethod, MethodInfo

}

private boolean isBlocking(MethodInfo info, boolean defaultValue) {
private boolean isBlocking(MethodInfo info, BlockingDefault defaultValue) {
Map.Entry<AnnotationTarget, AnnotationInstance> blockingAnnotation = getInheritableAnnotation(info, BLOCKING);
Map.Entry<AnnotationTarget, AnnotationInstance> nonBlockingAnnotation = getInheritableAnnotation(info,
NON_BLOCKING);
Expand All @@ -575,7 +577,16 @@ private boolean isBlocking(MethodInfo info, boolean defaultValue) {
if (transactional != null) {
return true;
}
return defaultValue;
if (defaultValue == BlockingDefault.BLOCKING) {
return true;
} else if (defaultValue == BlockingDefault.NON_BLOCKING) {
return false;
}
return doesMethodHaveBlockingSignature(info);
}

protected boolean doesMethodHaveBlockingSignature(MethodInfo info) {
return true;
}

protected void handleMultipart(ClassInfo multipartClassInfo) {
Expand Down Expand Up @@ -1075,7 +1086,7 @@ public Set<String> nameBindingNames(MethodInfo methodInfo, Set<String> forClass)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static abstract class Builder<T extends EndpointIndexer<T, ?, METHOD>, B extends Builder<T, B, METHOD>, METHOD extends ResourceMethod> {
private Function<String, BeanFactory<Object>> factoryCreator = new ReflectionBeanFactoryCreator();
private boolean defaultBlocking;
private BlockingDefault defaultBlocking = BlockingDefault.AUTOMATIC;
private IndexView index;
private Map<String, String> existingConverters;
private Map<DotName, String> scannedResourcePaths;
Expand All @@ -1088,7 +1099,7 @@ public static abstract class Builder<T extends EndpointIndexer<T, ?, METHOD>, B
private Map<DotName, Map<String, String>> classLevelExceptionMappers;
private Consumer<Map.Entry<MethodInfo, ResourceMethod>> resourceMethodCallback;

public B setDefaultBlocking(boolean defaultBlocking) {
public B setDefaultBlocking(BlockingDefault defaultBlocking) {
this.defaultBlocking = defaultBlocking;
return (B) this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Set;
import javax.ws.rs.core.Application;
import org.jboss.jandex.ClassInfo;
import org.jboss.resteasy.reactive.common.processor.BlockingDefault;
import org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames;

public final class ApplicationScanningResult {
Expand All @@ -14,11 +15,11 @@ public final class ApplicationScanningResult {
final boolean filterClasses;
final Application application;
final ClassInfo selectedAppClass;
final boolean blocking;
final BlockingDefault blocking;

public ApplicationScanningResult(Set<String> allowedClasses, Set<String> singletonClasses, Set<String> excludedClasses,
Set<String> globalNameBindings, boolean filterClasses, Application application,
ClassInfo selectedAppClass, boolean blocking) {
ClassInfo selectedAppClass, BlockingDefault blocking) {
this.allowedClasses = allowedClasses;
this.singletonClasses = singletonClasses;
this.excludedClasses = excludedClasses;
Expand Down Expand Up @@ -83,7 +84,7 @@ public ClassInfo getSelectedAppClass() {
return selectedAppClass;
}

public boolean isBlocking() {
public BlockingDefault getBlockingDefault() {
return blocking;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.jboss.jandex.IndexView;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.Type;
import org.jboss.resteasy.reactive.common.processor.BlockingDefault;
import org.jboss.resteasy.reactive.common.processor.NameBindingUtil;
import org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames;

Expand Down Expand Up @@ -57,7 +58,7 @@ public static ApplicationScanningResult scanForApplicationClass(IndexView index,
boolean filterClasses = !excludedClasses.isEmpty();
Application application = null;
ClassInfo selectedAppClass = null;
boolean blocking = false;
BlockingDefault blocking = BlockingDefault.AUTOMATIC;
for (ClassInfo applicationClassInfo : applications) {
if (Modifier.isAbstract(applicationClassInfo.flags())) {
continue;
Expand Down Expand Up @@ -94,9 +95,9 @@ public static ApplicationScanningResult scanForApplicationClass(IndexView index,
throw new RuntimeException("Unable to handle class: " + applicationClass, e);
}
if (applicationClassInfo.classAnnotation(ResteasyReactiveDotNames.BLOCKING) != null) {
blocking = true;
blocking = BlockingDefault.BLOCKING;
} else if (applicationClassInfo.classAnnotation(ResteasyReactiveDotNames.NON_BLOCKING) != null) {
blocking = false;
blocking = BlockingDefault.NON_BLOCKING;
}
}
if (selectedAppClass != null) {
Expand Down
Loading

0 comments on commit 45d7dbb

Please sign in to comment.