Skip to content

Commit

Permalink
Fix inconsistent behaviour in REST Client reactive when using contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
Sgitario committed Apr 24, 2023
1 parent a26e071 commit 2725325
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.jboss.resteasy.reactive.common.core.BlockingNotAllowedException;
import org.jboss.shrinkwrap.api.asset.StringAsset;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand Down Expand Up @@ -75,7 +74,6 @@ public void setup() {
EVENT_LOOP_THREAD_USED_BY_MAPPER.set(false);
}

@Disabled("This test randomly fails because https://github.com/quarkusio/quarkus/issues/32839")
@Test
public void shouldUseEventLoopByDefault() {
assertThrows(BlockingNotAllowedException.class, clientUsingNotBlockingExceptionMapper::nonBlocking);
Expand All @@ -91,9 +89,8 @@ public void shouldUseWorkerThreadIfExceptionMapperIsAnnotatedWithBlocking() {

@Test
public void shouldUseWorkerThreadOnlyIfExceptionMapperIsAnnotatedWithBlockingIsUsed() {
// To be uncommented after https://github.com/quarkusio/quarkus/issues/32839 is fixed:
// assertThrows(BlockingNotAllowedException.class, clientUsingBothExceptionMappers::nonBlocking);
// assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isTrue();
assertThrows(BlockingNotAllowedException.class, clientUsingBothExceptionMappers::nonBlocking);
assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isTrue();

RuntimeException exception = assertThrows(RuntimeException.class, clientUsingBothExceptionMappers::blocking);
assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isFalse();
Expand All @@ -102,9 +99,8 @@ public void shouldUseWorkerThreadOnlyIfExceptionMapperIsAnnotatedWithBlockingIsU

@Test
public void shouldUseWorkerThreadWhenClientIsInjected() {
// To be uncommented after https://github.com/quarkusio/quarkus/issues/32839 is fixed:
// given().get("/client/non-blocking").then().statusCode(500);
// assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isTrue();
given().get("/client/non-blocking").then().statusCode(500);
assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isTrue();

given().get("/client/blocking").then().statusCode(500);
assertThat(EVENT_LOOP_THREAD_USED_BY_MAPPER.get()).isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

Expand All @@ -28,7 +27,6 @@
import org.jboss.resteasy.reactive.client.api.LoggingScope;
import org.jboss.resteasy.reactive.client.api.QuarkusRestClientProperties;
import org.jboss.resteasy.reactive.client.impl.AsyncInvokerImpl;
import org.jboss.resteasy.reactive.client.impl.ClientRequestContextImpl;
import org.jboss.resteasy.reactive.client.impl.RestClientRequestContext;
import org.jboss.resteasy.reactive.client.impl.multipart.PausableHttpPostRequestEncoder;
import org.jboss.resteasy.reactive.client.impl.multipart.QuarkusMultipartForm;
Expand All @@ -46,7 +44,6 @@
import io.smallrye.mutiny.Uni;
import io.smallrye.stork.api.ServiceInstance;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
Expand Down Expand Up @@ -88,30 +85,7 @@ public void handle(RestClientRequestContext requestContext) {
return;
}
requestContext.suspend();
Uni<HttpClientRequest> future = createRequest(requestContext)
.runSubscriptionOn(new Executor() {
@Override
public void execute(Runnable command) {
Context current = Vertx.currentContext();
ClientRequestContextImpl clientRequestContext = requestContext.getClientRequestContext();
Context captured = null;
if (clientRequestContext != null) {
captured = clientRequestContext.getContext();
}
if (current == captured || captured == null) {
// No need to switch to another context.
command.run();
} else {
// Switch back to the captured context
captured.runOnContext(new Handler<Void>() {
@Override
public void handle(Void ignored) {
command.run();
}
});
}
}
});
Uni<HttpClientRequest> future = createRequest(requestContext);

// DNS failures happen before we send the request
future.subscribe().with(new Consumer<>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.jboss.resteasy.reactive.client.handlers;

import java.util.concurrent.Executor;

import org.jboss.resteasy.reactive.client.impl.ClientRequestContextImpl;
import org.jboss.resteasy.reactive.client.impl.RestClientRequestContext;
import org.jboss.resteasy.reactive.client.spi.ClientRestHandler;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;

/**
* This handler ensures that the context to use is the same as the client request context, which is important to keep the
* request context in sync when updating the response.
*/
public class ClientSwitchToRequestContextRestHandler implements ClientRestHandler {
@Override
public void handle(RestClientRequestContext requestContext) throws Exception {
Context current = Vertx.currentContext();
ClientRequestContextImpl clientRequestContext = requestContext.getClientRequestContext();
if (clientRequestContext == null) {
return;
}

Context captured = clientRequestContext.getContext();
if (captured != null && current != captured) {
requestContext.suspend();
requestContext.resume(new Executor() {
@Override
public void execute(Runnable command) {
captured.runOnContext(new Handler<Void>() {
@Override
public void handle(Void unused) {
command.run();
}
});
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.jboss.resteasy.reactive.client.handlers.ClientResponseFilterRestHandler;
import org.jboss.resteasy.reactive.client.handlers.ClientSendRequestHandler;
import org.jboss.resteasy.reactive.client.handlers.ClientSetResponseEntityRestHandler;
import org.jboss.resteasy.reactive.client.handlers.ClientSwitchToRequestContextRestHandler;
import org.jboss.resteasy.reactive.client.handlers.PreResponseFilterHandler;
import org.jboss.resteasy.reactive.client.spi.ClientRestHandler;
import org.jboss.resteasy.reactive.client.spi.MultipartResponseData;
Expand All @@ -25,6 +26,7 @@ class HandlerChain {

private static final ClientRestHandler[] EMPTY_REST_HANDLERS = new ClientRestHandler[0];

private final ClientRestHandler clientSwitchToRequestContextRestHandler;
private final ClientRestHandler clientSendHandler;
private final ClientRestHandler clientSetResponseEntityRestHandler;
private final ClientRestHandler clientResponseCompleteRestHandler;
Expand All @@ -34,6 +36,7 @@ class HandlerChain {

public HandlerChain(boolean followRedirects, LoggingScope loggingScope,
Map<Class<?>, MultipartResponseData> multipartData, ClientLogger clientLogger) {
this.clientSwitchToRequestContextRestHandler = new ClientSwitchToRequestContextRestHandler();
this.clientSendHandler = new ClientSendRequestHandler(followRedirects, loggingScope, clientLogger, multipartData);
this.clientSetResponseEntityRestHandler = new ClientSetResponseEntityRestHandler();
this.clientResponseCompleteRestHandler = new ClientResponseCompleteRestHandler();
Expand All @@ -49,7 +52,9 @@ ClientRestHandler[] createHandlerChain(ConfigurationImpl configuration) {
List<ClientRequestFilter> requestFilters = configuration.getRequestFilters();
List<ClientResponseFilter> responseFilters = configuration.getResponseFilters();
if (requestFilters.isEmpty() && responseFilters.isEmpty()) {
return new ClientRestHandler[] { clientSendHandler, clientSetResponseEntityRestHandler,
return new ClientRestHandler[] { clientSwitchToRequestContextRestHandler,
clientSendHandler,
clientSetResponseEntityRestHandler,
clientResponseCompleteRestHandler };
}
List<ClientRestHandler> result = new ArrayList<>(
Expand All @@ -60,6 +65,7 @@ ClientRestHandler[] createHandlerChain(ConfigurationImpl configuration) {
for (int i = 0; i < requestFilters.size(); i++) {
result.add(new ClientRequestFilterRestHandler(requestFilters.get(i)));
}
result.add(clientSwitchToRequestContextRestHandler);
result.add(clientSendHandler);
result.add(clientSetResponseEntityRestHandler);
result.add(new PreResponseFilterHandler());
Expand Down

0 comments on commit 2725325

Please sign in to comment.