Skip to content

Commit

Permalink
Support blocking exception mappers in REST Client Reactive
Browse files Browse the repository at this point in the history
  • Loading branch information
Sgitario committed Apr 18, 2023
1 parent 0036931 commit 2c4c97f
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 12 deletions.
62 changes: 62 additions & 0 deletions docs/src/main/asciidoc/rest-client-reactive.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,68 @@ Naturally this handling is per REST Client. `@ClientExceptionMapper` uses the de

NOTE: Methods annotated with `@ClientExceptionMapper` can also take a `java.lang.reflect.Method` parameter which is useful if the exception mapping code needs to know the REST Client method that was invoked and caused the exception mapping code to engage.

=== Using @Blocking annotation in exception mappers

In scenarios to handle large files, the client might require to return directly the InputStream object:

[source, java]
----
@Path("/echo")
@RegisterRestClient
public interface EchoClient {
@GET
InputStream get();
}
----

This will work as expected, but if you try to read this InputStream object in a custom exception mapper, you will receive a BlockingNotAllowedException exception. This is because the REST Client Reactive uses the Event Loop thread executor by default which does not allow to perform IO operations.

To make your exception mapper blocking, you can annotate the exception mapper with the `@Blocking` annotation:

[source, java]
----
@Provider
@Blocking <1>
public class MyResponseExceptionMapper implements ResponseExceptionMapper<RuntimeException> {
@Override
public RuntimeException toThrowable(Response response) {
if (response.getStatus() == 500) {
response.readEntity(String.class); <2>
return new RuntimeException("The remote service responded with HTTP 500");
}
return null;
}
}
----

<1> With the `@Blocking` annotation, the MyResponseExceptionMapper exception mapper will be executed in the worker thread pool.
<2> Reading the entity is now allowed because we're executing the mapper on the worker thread pool.

Note that you can also use the `@Blocking` annotation when using @ClientExceptionMapper:

[source, java]
----
@Path("/echo")
@RegisterRestClient
public interface EchoClient {
@GET
InputStream get();
@ClientExceptionMapper
@Blocking
static RuntimeException toException(Response response) {
if (response.getStatus() == 500) {
response.readEntity(String.class);
return new RuntimeException("The remote service responded with HTTP 500");
}
return null;
}
}
----

[[multipart]]
== Multipart Form support

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,14 @@ GeneratedClassResult generateResponseExceptionMapper(AnnotationInstance instance
throw new IllegalStateException(message);
}

StringBuilder sigBuilder = new StringBuilder();
sigBuilder.append(targetMethod.name()).append("_").append(targetMethod.returnType().name().toString());
for (Type i : targetMethod.parameterTypes()) {
sigBuilder.append(i.name().toString());
}

int priority = Priorities.USER;
AnnotationValue priorityAnnotationValue = instance.value("priority");
if (priorityAnnotationValue != null) {
priority = priorityAnnotationValue.asInt();
}

ClassInfo restClientInterfaceClassInfo = targetMethod.declaringClass();
String generatedClassName = restClientInterfaceClassInfo.name().toString() + "_" + targetMethod.name() + "_"
+ "ResponseExceptionMapper" + "_" + HashUtil.sha1(sigBuilder.toString());
String generatedClassName = getGeneratedClassName(targetMethod);
try (ClassCreator cc = ClassCreator.builder().classOutput(classOutput).className(generatedClassName)
.interfaces(ResteasyReactiveResponseExceptionMapper.class).build()) {
MethodCreator toThrowable = cc.getMethodCreator("toThrowable", Throwable.class, Response.class,
Expand Down Expand Up @@ -143,6 +136,17 @@ GeneratedClassResult generateResponseExceptionMapper(AnnotationInstance instance
return new GeneratedClassResult(restClientInterfaceClassInfo.name().toString(), generatedClassName, priority);
}

public static String getGeneratedClassName(MethodInfo methodInfo) {
StringBuilder sigBuilder = new StringBuilder();
sigBuilder.append(methodInfo.name()).append("_").append(methodInfo.returnType().name().toString());
for (Type i : methodInfo.parameterTypes()) {
sigBuilder.append(i.name().toString());
}

return methodInfo.declaringClass().name().toString() + "_" + methodInfo.name() + "_"
+ "ResponseExceptionMapper" + "_" + HashUtil.sha1(sigBuilder.toString());
}

private static boolean ignoreAnnotation(MethodInfo methodInfo) {
// ignore the annotation if it's placed on a Kotlin companion class
// this is not a problem since the Kotlin compiler will also place the annotation the static method interface method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.eclipse.microprofile.rest.client.annotation.RegisterClientHeaders;
import org.eclipse.microprofile.rest.client.annotation.RegisterProvider;
import org.eclipse.microprofile.rest.client.annotation.RegisterProviders;
import org.eclipse.microprofile.rest.client.ext.ResponseExceptionMapper;
import org.jboss.jandex.DotName;

import io.quarkus.rest.client.reactive.ClientExceptionMapper;
Expand All @@ -32,6 +33,8 @@ public class DotNames {
public static final DotName CLIENT_EXCEPTION_MAPPER = DotName.createSimple(ClientExceptionMapper.class.getName());
public static final DotName CLIENT_REDIRECT_HANDLER = DotName.createSimple(ClientRedirectHandler.class.getName());

public static final DotName RESPONSE_EXCEPTION_MAPPER = DotName.createSimple(ResponseExceptionMapper.class.getName());

static final DotName METHOD = DotName.createSimple(Method.class.getName());

private DotNames() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import static io.quarkus.rest.client.reactive.deployment.DotNames.REGISTER_CLIENT_HEADERS;
import static io.quarkus.rest.client.reactive.deployment.DotNames.REGISTER_PROVIDER;
import static io.quarkus.rest.client.reactive.deployment.DotNames.REGISTER_PROVIDERS;
import static io.quarkus.rest.client.reactive.deployment.DotNames.RESPONSE_EXCEPTION_MAPPER;
import static java.util.Arrays.asList;
import static java.util.stream.Collectors.*;
import static org.jboss.resteasy.reactive.common.processor.EndpointIndexer.CDI_WRAPPER_SUFFIX;
import static org.jboss.resteasy.reactive.common.processor.JandexUtil.isImplementorOf;
import static org.jboss.resteasy.reactive.common.processor.ResteasyReactiveDotNames.BLOCKING;
import static org.jboss.resteasy.reactive.common.processor.scanning.ResteasyReactiveScanner.BUILTIN_HTTP_ANNOTATIONS_TO_METHOD;

import java.lang.annotation.RetentionPolicy;
Expand Down Expand Up @@ -497,6 +500,24 @@ void addRestClientBeans(Capabilities capabilities,
}
}
}

Set<String> blockingClassNames = new HashSet<>();
Set<AnnotationInstance> registerBlockingClasses = new HashSet<>(index.getAnnotations(BLOCKING));
for (AnnotationInstance registerBlockingClass : registerBlockingClasses) {
AnnotationTarget target = registerBlockingClass.target();
if (target.kind() == AnnotationTarget.Kind.CLASS
&& isImplementorOf(index, target.asClass(), RESPONSE_EXCEPTION_MAPPER)) {
// Watch for @Blocking annotations in classes that implements ResponseExceptionMapper:
blockingClassNames.add(target.asClass().toString());
} else if (target.kind() == AnnotationTarget.Kind.METHOD
&& target.asMethod().annotation(CLIENT_EXCEPTION_MAPPER) != null) {
// Watch for @Blocking annotations in methods that are also annotated with @ClientExceptionMapper:
blockingClassNames.add(ClientExceptionMapperHandler.getGeneratedClassName(target.asMethod()));
}
}

recorder.setBlockingClassNames(blockingClassNames);

if (LaunchMode.current() == LaunchMode.DEVELOPMENT) {
recorder.setConfigKeys(configKeys);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package io.quarkus.rest.client.reactive.error;

import static io.quarkus.rest.client.reactive.RestClientTestUtil.setUrlForClass;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.UriBuilder;

import org.eclipse.microprofile.rest.client.RestClientBuilder;
import org.eclipse.microprofile.rest.client.annotation.RegisterProvider;
import org.eclipse.microprofile.rest.client.ext.ResponseExceptionMapper;
import org.eclipse.microprofile.rest.client.inject.RegisterRestClient;
import org.eclipse.microprofile.rest.client.inject.RestClient;
import org.jboss.resteasy.reactive.common.core.BlockingNotAllowedException;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.rest.client.reactive.ClientExceptionMapper;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.Blocking;
import io.vertx.core.Context;

public class BlockingExceptionMapperTest {

private static final AtomicBoolean EVENT_LOOP_THREAD_USED = new AtomicBoolean();

@RegisterExtension
static final QuarkusUnitTest TEST = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(Client.class,
ClientUsingNotBlockingExceptionMapper.class,
ClientUsingBlockingExceptionMapper.class,
ClientUsingBlockingExceptionMapperWithAnnotation.class,
NotBlockingExceptionMapper.class,
BlockingExceptionMapper.class,
Resource.class)
.addAsResource(
new StringAsset(setUrlForClass(ClientUsingNotBlockingExceptionMapper.class) + "\n"
+ setUrlForClass(ClientUsingBlockingExceptionMapper.class) + "\n"
+ setUrlForClass(ClientUsingBlockingExceptionMapperWithAnnotation.class) + "\n"),
"application.properties"));

public static final String ERROR_MESSAGE = "The entity was not found";

@RestClient
ClientUsingNotBlockingExceptionMapper clientUsingNotBlockingExceptionMapper;

@RestClient
ClientUsingBlockingExceptionMapper clientUsingBlockingExceptionMapper;

@RestClient
ClientUsingBlockingExceptionMapperWithAnnotation clientUsingBlockingExceptionMapperWithAnnotation;

@BeforeEach
public void setup() {
EVENT_LOOP_THREAD_USED.set(false);
}

@Test
public void shouldUseEventLoopByDefault() {
assertThrows(BlockingNotAllowedException.class, clientUsingNotBlockingExceptionMapper::get);
assertThat(EVENT_LOOP_THREAD_USED.get()).isTrue();
}

@Test
public void shouldUseWorkerThreadIfExceptionMapperIsAnnotatedWithBlocking() {
RuntimeException exception = assertThrows(RuntimeException.class, clientUsingBlockingExceptionMapper::get);
assertThat(EVENT_LOOP_THREAD_USED.get()).isFalse();
assertThat(exception.getMessage()).isEqualTo(ERROR_MESSAGE);
}

@Test
public void shouldUseWorkerThreadIfExceptionMapperIsAnnotatedWithBlockingAndUsingClientExceptionMapper() {
RuntimeException exception = assertThrows(RuntimeException.class,
clientUsingBlockingExceptionMapperWithAnnotation::get);
assertThat(EVENT_LOOP_THREAD_USED.get()).isFalse();
assertThat(exception.getMessage()).isEqualTo(ERROR_MESSAGE);
}

@Test
public void shouldUseWorkerThreadUsingProgrammaticApproach() {
var client = RestClientBuilder.newBuilder()
.baseUri(UriBuilder.fromUri("http://localhost:8081").build())
.register(BlockingExceptionMapper.class)
.build(Client.class);

RuntimeException exception = assertThrows(RuntimeException.class, client::get);
assertThat(EVENT_LOOP_THREAD_USED.get()).isFalse();
assertThat(exception.getMessage()).isEqualTo(ERROR_MESSAGE);
}

@Path("/error")
@RegisterRestClient
public interface Client {
@GET
InputStream get();
}

@Path("/error")
@RegisterRestClient
@RegisterProvider(NotBlockingExceptionMapper.class)
public interface ClientUsingNotBlockingExceptionMapper {
@GET
InputStream get();
}

@Path("/error")
@RegisterRestClient
@RegisterProvider(BlockingExceptionMapper.class)
public interface ClientUsingBlockingExceptionMapper {
@GET
InputStream get();
}

@Path("/error")
@RegisterRestClient
public interface ClientUsingBlockingExceptionMapperWithAnnotation {
@GET
InputStream get();

@Blocking
@ClientExceptionMapper
static RuntimeException map(Response response) {
EVENT_LOOP_THREAD_USED.set(Context.isOnEventLoopThread());
return new RuntimeException(response.readEntity(String.class));
}
}

public static class NotBlockingExceptionMapper implements ResponseExceptionMapper<Exception> {
@Override
public Exception toThrowable(Response response) {
EVENT_LOOP_THREAD_USED.set(Context.isOnEventLoopThread());
// Reading InputStream in the Event Loop throws the BlockingNotAllowedException exception
response.readEntity(String.class);
return null;
}
}

@Blocking
public static class BlockingExceptionMapper implements ResponseExceptionMapper<Exception> {
@Override
public Exception toThrowable(Response response) {
EVENT_LOOP_THREAD_USED.set(Context.isOnEventLoopThread());
return new RuntimeException(response.readEntity(String.class));
}
}

@Path("/error")
public static class Resource {
@GET
public Response returnError() {
return Response.status(500).entity(ERROR_MESSAGE).build();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.quarkus.rest.client.reactive.runtime;

import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

import jakarta.ws.rs.client.ClientRequestContext;
import jakarta.ws.rs.client.ClientResponseContext;
import jakarta.ws.rs.client.ClientResponseFilter;

import org.jboss.resteasy.reactive.client.impl.ClientRequestContextImpl;
import org.jboss.resteasy.reactive.client.impl.RestClientRequestContext;

import io.quarkus.runtime.ExecutorRecorder;
import io.vertx.core.Context;

public class ClientBlockingResponseFilter implements ClientResponseFilter {

private volatile Executor executor;
private final Supplier<Executor> supplier = () -> ExecutorRecorder.getCurrent();

@Override
public void filter(ClientRequestContext requestContext, ClientResponseContext responseContext)
throws IOException {
if (!Context.isOnEventLoopThread()) {
return; //already dispatched
}

if (executor == null) {
executor = supplier.get();
}

RestClientRequestContext restClientContext = ((ClientRequestContextImpl) requestContext).getRestClientRequestContext();
restClientContext.suspend();
restClientContext.resume(executor);
}
}
Loading

0 comments on commit 2c4c97f

Please sign in to comment.