Skip to content

Commit

Permalink
Fix for #25818 and upgrade to Mutiny 1.6.0
Browse files Browse the repository at this point in the history
This is part of a coordinated fix across Quarkus and Mutiny where scheduler wrapping would cause Vert.x context propagation not to be done.

See smallrye/smallrye-mutiny#955

Fixes #25818
  • Loading branch information
jponge committed Jun 20, 2022
1 parent 863d237 commit ad6bfc9
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 13 deletions.
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<netty.version>4.1.74.Final</netty.version>
<reactive-streams.version>1.0.3</reactive-streams.version>
<jboss-logging.version>3.5.0.Final</jboss-logging.version>
<mutiny.version>1.5.0</mutiny.version>
<mutiny.version>1.6.0</mutiny.version>
<kafka3.version>3.1.0</kafka3.version>
<lz4.version>1.8.0</lz4.version> <!-- dependency of the kafka-clients that could be overridden by other imported BOMs in the platform -->
<snappy.version>1.1.8.4</snappy.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package io.quarkus.mutiny.runtime;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

Expand All @@ -11,32 +11,120 @@
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.infrastructure.MutinyScheduler;

@Recorder
public class MutinyInfrastructure {

public static final String VERTX_EVENT_LOOP_THREAD_PREFIX = "vert.x-eventloop-thread-";

public void configureMutinyInfrastructure(ExecutorService exec, ShutdownContext shutdownContext) {
//mutiny leaks a ScheduledExecutorService if you don't do this
public void configureMutinyInfrastructure(ExecutorService executor, ShutdownContext shutdownContext) {
// Mutiny leaks a ScheduledExecutorService if we don't do this
Infrastructure.getDefaultWorkerPool().shutdown();
Infrastructure.setDefaultExecutor(new Executor() {

// Since executor is not a ScheduledExecutorService and Mutiny needs one for scheduling we have to adapt one around the provided executor
MutinyScheduler mutinyScheduler = new MutinyScheduler(executor);
Infrastructure.setDefaultExecutor(new ScheduledExecutorService() {

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return mutinyScheduler.schedule(command, delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return mutinyScheduler.schedule(callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return mutinyScheduler.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return mutinyScheduler.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

@Override
public void shutdown() {
mutinyScheduler.shutdown(); // ...but do not shut `executor` down
}

@Override
public List<Runnable> shutdownNow() {
return mutinyScheduler.shutdownNow();
}

@Override
public boolean isShutdown() {
return mutinyScheduler.isShutdown();
}

@Override
public boolean isTerminated() {
return mutinyScheduler.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return mutinyScheduler.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return executor.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return executor.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return executor.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return executor.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return executor.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return executor.invokeAny(tasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
try {
exec.execute(command);
} catch (RejectedExecutionException e) {
if (!exec.isShutdown() && !exec.isTerminated()) {
throw e;
executor.execute(command);
} catch (RejectedExecutionException rejected) {
// Ignore submission failures on application shutdown
if (!executor.isShutdown() && !executor.isTerminated()) {
throw rejected;
}
// Ignore the failure - the application has been shutdown.
}
}
});

shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
Infrastructure.getDefaultWorkerPool().shutdown();
mutinyScheduler.shutdown();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.quarkus.it.resteasy.mutiny.regression.bug25818;

import javax.enterprise.context.ApplicationScoped;

import io.vertx.core.Context;
import io.vertx.core.Vertx;

@ApplicationScoped
public class BlockingService {

public String getBlocking() {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Context context = Vertx.currentContext();
if (context == null) {
return "~~ context is null ~~";
} else {
return "hello-" + context.getLocal("hello-target");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.quarkus.it.resteasy.mutiny.regression.bug25818;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.jboss.logging.Logger;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Vertx;

@Path("/reproducer/25818")
public class ReproducerResource {

private final Logger logger = Logger.getLogger(ReproducerResource.class);

@Inject
BlockingService service;

private void addToContext() {
Vertx.currentContext().putLocal("hello-target", "you");
}

@GET
@Path("/worker-pool")
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> workerPool() {
logger.info("worker pool endpoint");
addToContext();
return Uni.createFrom()
.item(service::getBlocking)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
}

@GET
@Path("/default-executor")
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> defaultExecutor() {
logger.info("default executor endpoint");
addToContext();
return Uni.createFrom()
.item(service::getBlocking)
.runSubscriptionOn(Infrastructure.getDefaultExecutor());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.it.resteasy.mutiny;

import static io.restassured.RestAssured.get;
import static org.hamcrest.CoreMatchers.is;

import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

import io.quarkus.test.junit.QuarkusTest;

@QuarkusTest
public class RegressionTest {

@Nested
@DisplayName("Regression tests for #25818 (see https://github.com/quarkusio/quarkus/issues/25818)")
public class Bug25818 {

@Test
public void testDefaultExecutor() {
get("/reproducer/25818/default-executor")
.then()
.body(is("hello-you"))
.statusCode(200);
}

@Test
public void testWorkerPool() {
get("/reproducer/25818/worker-pool")
.then()
.body(is("hello-you"))
.statusCode(200);
}
}
}

0 comments on commit ad6bfc9

Please sign in to comment.