From 97e101b526ab82db414e6180e6bd3c20d3d27c85 Mon Sep 17 00:00:00 2001 From: Jose Date: Sun, 23 Apr 2023 08:46:08 +0200 Subject: [PATCH] Support blocking exception mappers in REST Client Reactive Fix https://github.com/quarkusio/quarkus/issues/30312 --- .../main/asciidoc/rest-client-reactive.adoc | 62 +++++ .../ClientExceptionMapperHandler.java | 20 +- .../client/reactive/deployment/DotNames.java | 3 + .../RestClientReactiveProcessor.java | 21 ++ .../error/BlockingExceptionMapperTest.java | 253 ++++++++++++++++++ .../ClientUseWorkerExecutorRestHandler.java | 41 +++ .../MicroProfileRestClientResponseFilter.java | 51 +++- .../reactive/runtime/RestClientRecorder.java | 10 + .../scanning/ClientEndpointIndexer.java | 18 +- .../reactive/common/processor/JandexUtil.java | 35 +++ .../startup/RuntimeResourceDeployment.java | 2 +- 11 files changed, 495 insertions(+), 21 deletions(-) create mode 100644 extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/error/BlockingExceptionMapperTest.java create mode 100644 extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/ClientUseWorkerExecutorRestHandler.java diff --git a/docs/src/main/asciidoc/rest-client-reactive.adoc b/docs/src/main/asciidoc/rest-client-reactive.adoc index c25f0f7dffdff..f4ae3aef37ec6 100644 --- a/docs/src/main/asciidoc/rest-client-reactive.adoc +++ b/docs/src/main/asciidoc/rest-client-reactive.adoc @@ -985,6 +985,68 @@ Naturally this handling is per REST Client. `@ClientExceptionMapper` uses the de NOTE: Methods annotated with `@ClientExceptionMapper` can also take a `java.lang.reflect.Method` parameter which is useful if the exception mapping code needs to know the REST Client method that was invoked and caused the exception mapping code to engage. +=== Using @Blocking annotation in exception mappers + +In cases that warrant using `InputStream` as the return type of REST Client method (such as when large amounts of data need to be read): + +[source, java] +---- +@Path("/echo") +@RegisterRestClient +public interface EchoClient { + + @GET + InputStream get(); +} +---- + +This will work as expected, but if you try to read this InputStream object in a custom exception mapper, you will receive a `BlockingNotAllowedException` exception. This is because `ResponseExceptionMapper` classes are run on the Event Loop thread executor by default - which does not allow to perform IO operations. + +To make your exception mapper blocking, you can annotate the exception mapper with the `@Blocking` annotation: + +[source, java] +---- +@Provider +@Blocking <1> +public class MyResponseExceptionMapper implements ResponseExceptionMapper { + + @Override + public RuntimeException toThrowable(Response response) { + if (response.getStatus() == 500) { + response.readEntity(String.class); <2> + return new RuntimeException("The remote service responded with HTTP 500"); + } + return null; + } +} +---- + +<1> With the `@Blocking` annotation, the MyResponseExceptionMapper exception mapper will be executed in the worker thread pool. +<2> Reading the entity is now allowed because we're executing the mapper on the worker thread pool. + +Note that you can also use the `@Blocking` annotation when using @ClientExceptionMapper: + +[source, java] +---- +@Path("/echo") +@RegisterRestClient +public interface EchoClient { + + @GET + InputStream get(); + + @ClientExceptionMapper + @Blocking + static RuntimeException toException(Response response) { + if (response.getStatus() == 500) { + response.readEntity(String.class); + return new RuntimeException("The remote service responded with HTTP 500"); + } + return null; + } +} +---- + [[multipart]] == Multipart Form support diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/ClientExceptionMapperHandler.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/ClientExceptionMapperHandler.java index 583aa2cfa6883..ae97af6d7c3b3 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/ClientExceptionMapperHandler.java +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/ClientExceptionMapperHandler.java @@ -90,12 +90,6 @@ GeneratedClassResult generateResponseExceptionMapper(AnnotationInstance instance throw new IllegalStateException(message); } - StringBuilder sigBuilder = new StringBuilder(); - sigBuilder.append(targetMethod.name()).append("_").append(targetMethod.returnType().name().toString()); - for (Type i : targetMethod.parameterTypes()) { - sigBuilder.append(i.name().toString()); - } - int priority = Priorities.USER; AnnotationValue priorityAnnotationValue = instance.value("priority"); if (priorityAnnotationValue != null) { @@ -103,8 +97,7 @@ GeneratedClassResult generateResponseExceptionMapper(AnnotationInstance instance } ClassInfo restClientInterfaceClassInfo = targetMethod.declaringClass(); - String generatedClassName = restClientInterfaceClassInfo.name().toString() + "_" + targetMethod.name() + "_" - + "ResponseExceptionMapper" + "_" + HashUtil.sha1(sigBuilder.toString()); + String generatedClassName = getGeneratedClassName(targetMethod); try (ClassCreator cc = ClassCreator.builder().classOutput(classOutput).className(generatedClassName) .interfaces(ResteasyReactiveResponseExceptionMapper.class).build()) { MethodCreator toThrowable = cc.getMethodCreator("toThrowable", Throwable.class, Response.class, @@ -143,6 +136,17 @@ GeneratedClassResult generateResponseExceptionMapper(AnnotationInstance instance return new GeneratedClassResult(restClientInterfaceClassInfo.name().toString(), generatedClassName, priority); } + public static String getGeneratedClassName(MethodInfo methodInfo) { + StringBuilder sigBuilder = new StringBuilder(); + sigBuilder.append(methodInfo.name()).append("_").append(methodInfo.returnType().name().toString()); + for (Type i : methodInfo.parameterTypes()) { + sigBuilder.append(i.name().toString()); + } + + return methodInfo.declaringClass().name().toString() + "_" + methodInfo.name() + "_" + + "ResponseExceptionMapper" + "_" + HashUtil.sha1(sigBuilder.toString()); + } + private static boolean ignoreAnnotation(MethodInfo methodInfo) { // ignore the annotation if it's placed on a Kotlin companion class // this is not a problem since the Kotlin compiler will also place the annotation the static method interface method diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/DotNames.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/DotNames.java index 1d0b3f5d45d2c..df1a36f223dec 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/DotNames.java +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/DotNames.java @@ -10,6 +10,7 @@ import org.eclipse.microprofile.rest.client.annotation.RegisterClientHeaders; import org.eclipse.microprofile.rest.client.annotation.RegisterProvider; import org.eclipse.microprofile.rest.client.annotation.RegisterProviders; +import org.eclipse.microprofile.rest.client.ext.ResponseExceptionMapper; import org.jboss.jandex.DotName; import io.quarkus.rest.client.reactive.ClientExceptionMapper; @@ -32,6 +33,8 @@ public class DotNames { public static final DotName CLIENT_EXCEPTION_MAPPER = DotName.createSimple(ClientExceptionMapper.class.getName()); public static final DotName CLIENT_REDIRECT_HANDLER = DotName.createSimple(ClientRedirectHandler.class.getName()); + public static final DotName RESPONSE_EXCEPTION_MAPPER = DotName.createSimple(ResponseExceptionMapper.class.getName()); + static final DotName METHOD = DotName.createSimple(Method.class.getName()); private DotNames() { diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/RestClientReactiveProcessor.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/RestClientReactiveProcessor.java index 9f4d51271242b..d456cbea6cdb4 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/RestClientReactiveProcessor.java +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/main/java/io/quarkus/rest/client/reactive/deployment/RestClientReactiveProcessor.java @@ -10,9 +10,12 @@ import static io.quarkus.rest.client.reactive.deployment.DotNames.REGISTER_CLIENT_HEADERS; import static io.quarkus.rest.client.reactive.deployment.DotNames.REGISTER_PROVIDER; import static io.quarkus.rest.client.reactive.deployment.DotNames.REGISTER_PROVIDERS; +import static io.quarkus.rest.client.reactive.deployment.DotNames.RESPONSE_EXCEPTION_MAPPER; import static java.util.Arrays.asList; import static java.util.stream.Collectors.*; import static org.jboss.resteasy.reactive.common.processor.EndpointIndexer.CDI_WRAPPER_SUFFIX; +import static org.jboss.resteasy.reactive.common.processor.JandexUtil.isImplementorOf; +import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.BLOCKING; import static org.jboss.resteasy.reactive.common.processor.scanning.ResteasyReactiveScanner.BUILTIN_HTTP_ANNOTATIONS_TO_METHOD; import java.lang.annotation.RetentionPolicy; @@ -497,6 +500,24 @@ void addRestClientBeans(Capabilities capabilities, } } } + + Set blockingClassNames = new HashSet<>(); + Set registerBlockingClasses = new HashSet<>(index.getAnnotations(BLOCKING)); + for (AnnotationInstance registerBlockingClass : registerBlockingClasses) { + AnnotationTarget target = registerBlockingClass.target(); + if (target.kind() == AnnotationTarget.Kind.CLASS + && isImplementorOf(index, target.asClass(), RESPONSE_EXCEPTION_MAPPER)) { + // Watch for @Blocking annotations in classes that implements ResponseExceptionMapper: + blockingClassNames.add(target.asClass().toString()); + } else if (target.kind() == AnnotationTarget.Kind.METHOD + && target.asMethod().annotation(CLIENT_EXCEPTION_MAPPER) != null) { + // Watch for @Blocking annotations in methods that are also annotated with @ClientExceptionMapper: + blockingClassNames.add(ClientExceptionMapperHandler.getGeneratedClassName(target.asMethod())); + } + } + + recorder.setBlockingClassNames(blockingClassNames); + if (LaunchMode.current() == LaunchMode.DEVELOPMENT) { recorder.setConfigKeys(configKeys); } diff --git a/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/error/BlockingExceptionMapperTest.java b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/error/BlockingExceptionMapperTest.java new file mode 100644 index 0000000000000..47b82d5993330 --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/deployment/src/test/java/io/quarkus/rest/client/reactive/error/BlockingExceptionMapperTest.java @@ -0,0 +1,253 @@ +package io.quarkus.rest.client.reactive.error; + +import static io.quarkus.rest.client.reactive.RestClientTestUtil.setUrlForClass; +import static io.restassured.RestAssured.given; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; + +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.MultivaluedMap; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; + +import org.eclipse.microprofile.rest.client.RestClientBuilder; +import org.eclipse.microprofile.rest.client.annotation.RegisterProvider; +import org.eclipse.microprofile.rest.client.ext.ResponseExceptionMapper; +import org.eclipse.microprofile.rest.client.inject.RegisterRestClient; +import org.eclipse.microprofile.rest.client.inject.RestClient; +import org.jboss.resteasy.reactive.common.core.BlockingNotAllowedException; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.rest.client.reactive.ClientExceptionMapper; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.common.annotation.Blocking; +import io.vertx.core.Context; + +public class BlockingExceptionMapperTest { + + private static final AtomicBoolean EVENT_LOOP_THREAD_USED_BY_MAPPER = new AtomicBoolean(); + private static final int STATUS_FOR_BLOCKING_MAPPER = 501; + private static final int STATUS_FOR_NON_BLOCKING_MAPPER = 500; + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest() + .withApplicationRoot((jar) -> jar + .addClasses(Client.class, + ClientUsingNotBlockingExceptionMapper.class, + ClientUsingBlockingExceptionMapper.class, + ClientUsingBlockingExceptionMapperWithAnnotation.class, + ClientUsingBothExceptionMappers.class, + NotBlockingExceptionMapper.class, + BlockingExceptionMapper.class, + ClientResource.class, + Resource.class) + .addAsResource( + new StringAsset(setUrlForClass(ClientUsingNotBlockingExceptionMapper.class) + "\n" + + setUrlForClass(ClientUsingBlockingExceptionMapper.class) + "\n" + + setUrlForClass(ClientUsingBlockingExceptionMapperWithAnnotation.class) + "\n" + + setUrlForClass(ClientUsingBothExceptionMappers.class) + "\n"), + "application.properties")); + + public static final String ERROR_MESSAGE = "The entity was not found"; + + @RestClient + ClientUsingNotBlockingExceptionMapper clientUsingNotBlockingExceptionMapper; + + @RestClient + ClientUsingBlockingExceptionMapper clientUsingBlockingExceptionMapper; + + @RestClient + ClientUsingBlockingExceptionMapperWithAnnotation clientUsingBlockingExceptionMapperWithAnnotation; + + @RestClient + ClientUsingBothExceptionMappers clientUsingBothExceptionMappers; + + @BeforeEach + public void setup() { + EVENT_LOOP_THREAD_USED_BY_MAPPER.set(false); + } + + @Disabled("This test randomly fails because https://github.com/quarkusio/quarkus/issues/32839") + @Test + public void shouldUseEventLoopByDefault() { + assertThrows(BlockingNotAllowedException.class, clientUsingNotBlockingExceptionMapper::nonBlocking); + assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isTrue(); + } + + @Test + public void shouldUseWorkerThreadIfExceptionMapperIsAnnotatedWithBlocking() { + RuntimeException exception = assertThrows(RuntimeException.class, clientUsingBlockingExceptionMapper::blocking); + assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isFalse(); + assertThat(exception.getMessage()).isEqualTo(ERROR_MESSAGE); + } + + @Test + public void shouldUseWorkerThreadOnlyIfExceptionMapperIsAnnotatedWithBlockingIsUsed() { + // To be uncommented after https://github.com/quarkusio/quarkus/issues/32839 is fixed: + // assertThrows(BlockingNotAllowedException.class, clientUsingBothExceptionMappers::nonBlocking); + // assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isTrue(); + + RuntimeException exception = assertThrows(RuntimeException.class, clientUsingBothExceptionMappers::blocking); + assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isFalse(); + assertThat(exception.getMessage()).isEqualTo(ERROR_MESSAGE); + } + + @Test + public void shouldUseWorkerThreadWhenClientIsInjected() { + // To be uncommented after https://github.com/quarkusio/quarkus/issues/32839 is fixed: + // given().get("/client/non-blocking").then().statusCode(500); + // assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isTrue(); + + given().get("/client/blocking").then().statusCode(500); + assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isFalse(); + } + + @Test + public void shouldUseWorkerThreadIfExceptionMapperIsAnnotatedWithBlockingAndUsingClientExceptionMapper() { + RuntimeException exception = assertThrows(RuntimeException.class, + clientUsingBlockingExceptionMapperWithAnnotation::blocking); + assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isFalse(); + assertThat(exception.getMessage()).isEqualTo(ERROR_MESSAGE); + } + + @Test + public void shouldUseWorkerThreadUsingProgrammaticApproach() { + var client = RestClientBuilder.newBuilder() + .baseUri(UriBuilder.fromUri("http://localhost:8081").build()) + .register(BlockingExceptionMapper.class) + .build(Client.class); + + RuntimeException exception = assertThrows(RuntimeException.class, client::blocking); + assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isFalse(); + assertThat(exception.getMessage()).isEqualTo(ERROR_MESSAGE); + } + + @Path("/error") + @RegisterRestClient + public interface Client { + @GET + @Path("/blocking") + InputStream blocking(); + } + + @Path("/error") + @RegisterRestClient + @RegisterProvider(NotBlockingExceptionMapper.class) + public interface ClientUsingNotBlockingExceptionMapper { + + @GET + @Path("/non-blocking") + InputStream nonBlocking(); + } + + @Path("/error") + @RegisterRestClient + @RegisterProvider(BlockingExceptionMapper.class) + public interface ClientUsingBlockingExceptionMapper { + @GET + @Path("/blocking") + InputStream blocking(); + } + + @Path("/error") + @RegisterRestClient + @RegisterProvider(NotBlockingExceptionMapper.class) + @RegisterProvider(BlockingExceptionMapper.class) + public interface ClientUsingBothExceptionMappers { + @GET + @Path("/blocking") + InputStream blocking(); + + @GET + @Path("/non-blocking") + InputStream nonBlocking(); + } + + @Path("/error") + @RegisterRestClient + public interface ClientUsingBlockingExceptionMapperWithAnnotation { + @GET + @Path("/blocking") + InputStream blocking(); + + @Blocking + @ClientExceptionMapper + static RuntimeException map(Response response) { + EVENT_LOOP_THREAD_USED_BY_MAPPER.set(Context.isOnEventLoopThread()); + return new RuntimeException(response.readEntity(String.class)); + } + } + + public static class NotBlockingExceptionMapper implements ResponseExceptionMapper { + + @Override + public boolean handles(int status, MultivaluedMap headers) { + return status == STATUS_FOR_NON_BLOCKING_MAPPER; + } + + @Override + public Exception toThrowable(Response response) { + EVENT_LOOP_THREAD_USED_BY_MAPPER.set(Context.isOnEventLoopThread()); + // Reading InputStream in the Event Loop throws the BlockingNotAllowedException exception + response.readEntity(String.class); + return null; + } + } + + @Blocking + public static class BlockingExceptionMapper implements ResponseExceptionMapper { + @Override + public boolean handles(int status, MultivaluedMap headers) { + return status == STATUS_FOR_BLOCKING_MAPPER; + } + + @Override + public Exception toThrowable(Response response) { + EVENT_LOOP_THREAD_USED_BY_MAPPER.set(Context.isOnEventLoopThread()); + return new RuntimeException(response.readEntity(String.class)); + } + } + + @Path("/error") + public static class Resource { + + @GET + @Path("/blocking") + public Response blocking() { + return Response.status(STATUS_FOR_BLOCKING_MAPPER).entity(ERROR_MESSAGE).build(); + } + + @GET + @Path("/non-blocking") + public Response nonBlocking() { + return Response.status(STATUS_FOR_NON_BLOCKING_MAPPER).entity(ERROR_MESSAGE).build(); + } + } + + @Path("/client") + public static class ClientResource { + + @RestClient + ClientUsingBothExceptionMappers clientUsingBothExceptionMappers; + + @GET + @Path("/blocking") + public void callBlocking() { + clientUsingBothExceptionMappers.blocking(); + } + + @GET + @Path("/non-blocking") + public void callNonBlocking() { + clientUsingBothExceptionMappers.nonBlocking(); + } + } +} diff --git a/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/ClientUseWorkerExecutorRestHandler.java b/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/ClientUseWorkerExecutorRestHandler.java new file mode 100644 index 0000000000000..d8cf3b8305253 --- /dev/null +++ b/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/ClientUseWorkerExecutorRestHandler.java @@ -0,0 +1,41 @@ +package io.quarkus.rest.client.reactive.runtime; + +import java.util.concurrent.Executor; +import java.util.function.Supplier; + +import org.jboss.resteasy.reactive.client.impl.RestClientRequestContext; +import org.jboss.resteasy.reactive.client.spi.ClientRestHandler; + +import io.quarkus.runtime.ExecutorRecorder; +import io.vertx.core.Context; + +/** + * This is added by the Reactive Rest Client if the `@Blocking` annotation is used in some scenarios. For example, when users + * provide a custom ResponseExceptionMapper that is annotates with the `@Blocking` annotation. + * + * Then this handler is applied, the execution of the next handlers will use the worker thread pool. + */ +public class ClientUseWorkerExecutorRestHandler implements ClientRestHandler { + + private volatile Executor executor; + private final Supplier supplier = new Supplier() { + @Override + public Executor get() { + return ExecutorRecorder.getCurrent(); + } + }; + + @Override + public void handle(RestClientRequestContext requestContext) throws Exception { + if (!Context.isOnEventLoopThread()) { + return; //already dispatched + } + + if (executor == null) { + executor = supplier.get(); + } + + requestContext.suspend(); + requestContext.resume(executor); + } +} diff --git a/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/MicroProfileRestClientResponseFilter.java b/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/MicroProfileRestClientResponseFilter.java index 8e78ea877f6df..1d369c71e5a47 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/MicroProfileRestClientResponseFilter.java +++ b/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/MicroProfileRestClientResponseFilter.java @@ -1,6 +1,7 @@ package io.quarkus.rest.client.reactive.runtime; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import jakarta.ws.rs.client.ClientRequestContext; @@ -11,10 +12,14 @@ import org.jboss.resteasy.reactive.client.handlers.ClientResponseCompleteRestHandler; import org.jboss.resteasy.reactive.client.impl.ClientRequestContextImpl; import org.jboss.resteasy.reactive.client.impl.RestClientRequestContext; +import org.jboss.resteasy.reactive.client.spi.ClientRestHandler; import org.jboss.resteasy.reactive.common.core.UnwrappableException; import org.jboss.resteasy.reactive.common.jaxrs.ResponseImpl; +import io.vertx.core.Context; + public class MicroProfileRestClientResponseFilter implements ClientResponseFilter { + private static final ClientRestHandler[] EMPTY_CLIENT_REST_HANDLERS = new ClientRestHandler[0]; private final List> exceptionMappers; public MicroProfileRestClientResponseFilter(List> exceptionMappers) { @@ -29,21 +34,47 @@ public MicroProfileRestClientResponseFilter(List> exc public void filter(ClientRequestContext requestContext, ClientResponseContext responseContext) throws IOException { for (ResponseExceptionMapper exceptionMapper : exceptionMappers) { if (exceptionMapper.handles(responseContext.getStatus(), responseContext.getHeaders())) { - // we have an exception mapper, we don't need the response anymore, we can map it to response right away (I hope :D) RestClientRequestContext restClientContext = ((ClientRequestContextImpl) requestContext) .getRestClientRequestContext(); - ResponseImpl response = ClientResponseCompleteRestHandler.mapToResponse(restClientContext, false); - Throwable throwable; - if (exceptionMapper instanceof ResteasyReactiveResponseExceptionMapper) { - throwable = ((ResteasyReactiveResponseExceptionMapper) exceptionMapper).toThrowable(response, - restClientContext); + + boolean requiresBlocking = RestClientRecorder.isClassBlocking(exceptionMapper.getClass()); + if (Context.isOnEventLoopThread() && requiresBlocking) { + switchToWorkerThreadPoolAndRetry(restClientContext); + break; } else { - throwable = exceptionMapper.toThrowable(response); - } - if (throwable != null) { - throw new UnwrappableException(throwable); + // we have an exception mapper, we don't need the response anymore, we can map it to response right away (I hope :D) + ResponseImpl response = ClientResponseCompleteRestHandler.mapToResponse(restClientContext, false); + Throwable throwable; + if (exceptionMapper instanceof ResteasyReactiveResponseExceptionMapper) { + throwable = ((ResteasyReactiveResponseExceptionMapper) exceptionMapper).toThrowable(response, + restClientContext); + } else { + throwable = exceptionMapper.toThrowable(response); + } + if (throwable != null) { + throw new UnwrappableException(throwable); + } } } } } + + private void switchToWorkerThreadPoolAndRetry(RestClientRequestContext restClientContext) { + int position = restClientContext.getPosition(); + + List nextHandlers = new ArrayList<>(2 + restClientContext.getHandlers().length - position); + nextHandlers.add(new ClientUseWorkerExecutorRestHandler()); + nextHandlers.add(currentHandler(restClientContext)); + + while (position < restClientContext.getHandlers().length) { + nextHandlers.add(restClientContext.getHandlers()[position]); + position++; + } + + restClientContext.restart(nextHandlers.toArray(EMPTY_CLIENT_REST_HANDLERS), true); + } + + private ClientRestHandler currentHandler(RestClientRequestContext restClientContext) { + return restClientContext.getHandlers()[restClientContext.getPosition() - 1]; + } } diff --git a/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/RestClientRecorder.java b/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/RestClientRecorder.java index e310af1d53137..6049e880488f1 100644 --- a/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/RestClientRecorder.java +++ b/extensions/resteasy-reactive/rest-client-reactive/runtime/src/main/java/io/quarkus/rest/client/reactive/runtime/RestClientRecorder.java @@ -1,6 +1,7 @@ package io.quarkus.rest.client.reactive.runtime; import java.util.Map; +import java.util.Set; import org.eclipse.microprofile.rest.client.spi.RestClientBuilderResolver; @@ -9,15 +10,24 @@ @Recorder public class RestClientRecorder { private static volatile Map configKeys; + private static volatile Set blockingClassNames; public void setConfigKeys(Map configKeys) { RestClientRecorder.configKeys = configKeys; } + public void setBlockingClassNames(Set blockingClassNames) { + RestClientRecorder.blockingClassNames = blockingClassNames; + } + public static Map getConfigKeys() { return configKeys; } + public static boolean isClassBlocking(Class exceptionMapperClass) { + return blockingClassNames.contains(exceptionMapperClass.getName()); + } + public void setRestClientBuilderResolver() { RestClientBuilderResolver.setInstance(new BuilderResolver()); } diff --git a/independent-projects/resteasy-reactive/client/processor/src/main/java/org/jboss/resteasy/reactive/client/processor/scanning/ClientEndpointIndexer.java b/independent-projects/resteasy-reactive/client/processor/src/main/java/org/jboss/resteasy/reactive/client/processor/scanning/ClientEndpointIndexer.java index e390280be4816..eaba90c22749c 100644 --- a/independent-projects/resteasy-reactive/client/processor/src/main/java/org/jboss/resteasy/reactive/client/processor/scanning/ClientEndpointIndexer.java +++ b/independent-projects/resteasy-reactive/client/processor/src/main/java/org/jboss/resteasy/reactive/client/processor/scanning/ClientEndpointIndexer.java @@ -8,6 +8,7 @@ import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.JSONP_JSON_STRUCTURE; import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.JSONP_JSON_VALUE; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -44,6 +45,8 @@ public class ClientEndpointIndexer extends EndpointIndexer { static final DotName CONTINUATION = DotName.createSimple("kotlin.coroutines.Continuation"); + static final DotName CLIENT_EXCEPTION_MAPPER = DotName + .createSimple("io.quarkus.rest.client.reactive.ClientExceptionMapper"); private final String[] defaultProduces; private final String[] defaultProducesNegotiated; @@ -86,8 +89,19 @@ public MaybeRestClientInterface createClientProxy(ClassInfo classInfo, } private void warnForUnsupportedAnnotations(ClassInfo classInfo) { - if ((classInfo.annotationsMap().get(ResteasyReactiveDotNames.BLOCKING) != null) - || (classInfo.annotationsMap().get(ResteasyReactiveDotNames.NON_BLOCKING) != null)) { + List offendingBlockingAnnotations = new ArrayList<>(); + + List blockingAnnotations = classInfo.annotationsMap().get(ResteasyReactiveDotNames.BLOCKING); + if (blockingAnnotations != null) { + for (AnnotationInstance blockingAnnotation : blockingAnnotations) { + // If the `@Blocking` annotation is used along with the `@ClientExceptionMapper`, we support it. + if (blockingAnnotation.target().annotation(CLIENT_EXCEPTION_MAPPER) == null) { + offendingBlockingAnnotations.add(blockingAnnotation); + } + } + } + if (!offendingBlockingAnnotations.isEmpty() + || classInfo.annotationsMap().get(ResteasyReactiveDotNames.NON_BLOCKING) != null) { log.warn( "'@Blocking' and '@NonBlocking' annotations are not necessary (or supported) on REST Client interfaces. Offending class is '" + classInfo.name() diff --git a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/JandexUtil.java b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/JandexUtil.java index 1e2f2f6fc29ad..fbc85b77b380e 100644 --- a/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/JandexUtil.java +++ b/independent-projects/resteasy-reactive/common/processor/src/main/java/org/jboss/resteasy/reactive/common/processor/JandexUtil.java @@ -364,4 +364,39 @@ public static boolean isSubclassOf(IndexView index, ClassInfo info, DotName pare return isSubclassOf(index, superClass, parentName); } + /** + * Returns true if the given Jandex ClassInfo is a subclass of or inherits the given name. + * + * @param index the index to use to look up super classes. + * @param info the ClassInfo we want to check. + * @param name the name of the superclass or interface we want to find. + * @throws RuntimeException if one of the superclasses is not indexed. + */ + public static boolean isImplementorOf(IndexView index, ClassInfo info, DotName name) { + // Check interfaces + List interfaceNames = info.interfaceNames(); + for (DotName interfaceName : interfaceNames) { + if (interfaceName.equals(name)) { + return true; + } + } + + // Check direct hierarchy + if (info.superName().equals(DOTNAME_OBJECT) || info.superName().equals(DOTNAME_RECORD)) { + return false; + } + if (info.superName().equals(name)) { + return true; + } + + // climb up the hierarchy of classes + Type superType = info.superClassType(); + ClassInfo superClass = index.getClassByName(superType.name()); + if (superClass == null) { + // this can happens if the parent is not inside the Jandex index + throw new RuntimeException("The class " + superType.name() + " is not inside the Jandex index"); + } + return isImplementorOf(index, superClass, name); + } + } diff --git a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java index d166fc9aebbed..a03c161fa1106 100644 --- a/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java +++ b/independent-projects/resteasy-reactive/server/runtime/src/main/java/org/jboss/resteasy/reactive/server/core/startup/RuntimeResourceDeployment.java @@ -211,7 +211,7 @@ public RuntimeResource buildResourceMethod(ResourceClass clazz, // when a method is blocking, we also want all the request filters to run on the worker thread // because they can potentially set thread local variables - //we don't need to run this for Servlet and other runtimes that default to blocking + // we don't need to run this for Servlet and other runtimes that default to blocking Optional blockingHandlerIndex = Optional.empty(); if (!defaultBlocking) { if (method.isBlocking()) {