diff --git a/bom/application/pom.xml b/bom/application/pom.xml
index 38365edd091ff..f0167a7222400 100644
--- a/bom/application/pom.xml
+++ b/bom/application/pom.xml
@@ -133,7 +133,7 @@
4.1.74.Final
1.0.3
3.5.0.Final
- 1.5.0
+ 1.6.0
3.1.0
1.8.0
1.1.8.4
diff --git a/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java b/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java
index 3d606f5fd71bb..fa197fbdc7e64 100644
--- a/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java
+++ b/extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java
@@ -1,8 +1,8 @@
package io.quarkus.mutiny.runtime;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.*;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
@@ -11,32 +11,120 @@
import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.smallrye.mutiny.infrastructure.MutinyScheduler;
@Recorder
public class MutinyInfrastructure {
public static final String VERTX_EVENT_LOOP_THREAD_PREFIX = "vert.x-eventloop-thread-";
- public void configureMutinyInfrastructure(ExecutorService exec, ShutdownContext shutdownContext) {
- //mutiny leaks a ScheduledExecutorService if you don't do this
+ public void configureMutinyInfrastructure(ExecutorService executor, ShutdownContext shutdownContext) {
+ // Mutiny leaks a ScheduledExecutorService if we don't do this
Infrastructure.getDefaultWorkerPool().shutdown();
- Infrastructure.setDefaultExecutor(new Executor() {
+
+ // Since executor is not a ScheduledExecutorService and Mutiny needs one for scheduling we have to adapt one around the provided executor
+ MutinyScheduler mutinyScheduler = new MutinyScheduler(executor);
+ Infrastructure.setDefaultExecutor(new ScheduledExecutorService() {
+
+ @Override
+ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) {
+ return mutinyScheduler.schedule(command, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) {
+ return mutinyScheduler.schedule(callable, delay, unit);
+ }
+
+ @Override
+ public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
+ return mutinyScheduler.scheduleAtFixedRate(command, initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
+ return mutinyScheduler.scheduleWithFixedDelay(command, initialDelay, delay, unit);
+ }
+
+ @Override
+ public void shutdown() {
+ mutinyScheduler.shutdown(); // ...but do not shut `executor` down
+ }
+
+ @Override
+ public List shutdownNow() {
+ return mutinyScheduler.shutdownNow();
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return mutinyScheduler.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return mutinyScheduler.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return mutinyScheduler.awaitTermination(timeout, unit);
+ }
+
+ @Override
+ public Future submit(Callable task) {
+ return executor.submit(task);
+ }
+
+ @Override
+ public Future submit(Runnable task, T result) {
+ return executor.submit(task, result);
+ }
+
+ @Override
+ public Future> submit(Runnable task) {
+ return executor.submit(task);
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks) throws InterruptedException {
+ return executor.invokeAll(tasks);
+ }
+
+ @Override
+ public List> invokeAll(Collection extends Callable> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return executor.invokeAll(tasks, timeout, unit);
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks) throws InterruptedException, ExecutionException {
+ return executor.invokeAny(tasks);
+ }
+
+ @Override
+ public T invokeAny(Collection extends Callable> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return executor.invokeAny(tasks, timeout, unit);
+ }
+
@Override
public void execute(Runnable command) {
try {
- exec.execute(command);
- } catch (RejectedExecutionException e) {
- if (!exec.isShutdown() && !exec.isTerminated()) {
- throw e;
+ executor.execute(command);
+ } catch (RejectedExecutionException rejected) {
+ // Ignore submission failures on application shutdown
+ if (!executor.isShutdown() && !executor.isTerminated()) {
+ throw rejected;
}
- // Ignore the failure - the application has been shutdown.
}
}
});
+
shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
- Infrastructure.getDefaultWorkerPool().shutdown();
+ mutinyScheduler.shutdown();
}
});
}
diff --git a/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java
new file mode 100644
index 0000000000000..8d90464e7455b
--- /dev/null
+++ b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java
@@ -0,0 +1,24 @@
+package io.quarkus.it.resteasy.mutiny.regression.bug25818;
+
+import javax.enterprise.context.ApplicationScoped;
+
+import io.vertx.core.Context;
+import io.vertx.core.Vertx;
+
+@ApplicationScoped
+public class BlockingService {
+
+ public String getBlocking() {
+ try {
+ Thread.sleep(250);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ Context context = Vertx.currentContext();
+ if (context == null) {
+ return "~~ context is null ~~";
+ } else {
+ return "hello-" + context.getLocal("hello-target");
+ }
+ }
+}
diff --git a/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java
new file mode 100644
index 0000000000000..025bd3e97de27
--- /dev/null
+++ b/integration-tests/resteasy-mutiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java
@@ -0,0 +1,48 @@
+package io.quarkus.it.resteasy.mutiny.regression.bug25818;
+
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+import org.jboss.logging.Logger;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+import io.vertx.core.Vertx;
+
+@Path("/reproducer/25818")
+public class ReproducerResource {
+
+ private final Logger logger = Logger.getLogger(ReproducerResource.class);
+
+ @Inject
+ BlockingService service;
+
+ private void addToContext() {
+ Vertx.currentContext().putLocal("hello-target", "you");
+ }
+
+ @GET
+ @Path("/worker-pool")
+ @Produces(MediaType.TEXT_PLAIN)
+ public Uni workerPool() {
+ logger.info("worker pool endpoint");
+ addToContext();
+ return Uni.createFrom()
+ .item(service::getBlocking)
+ .runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
+ }
+
+ @GET
+ @Path("/default-executor")
+ @Produces(MediaType.TEXT_PLAIN)
+ public Uni defaultExecutor() {
+ logger.info("default executor endpoint");
+ addToContext();
+ return Uni.createFrom()
+ .item(service::getBlocking)
+ .runSubscriptionOn(Infrastructure.getDefaultExecutor());
+ }
+}
diff --git a/integration-tests/resteasy-mutiny/src/test/java/io/quarkus/it/resteasy/mutiny/RegressionTest.java b/integration-tests/resteasy-mutiny/src/test/java/io/quarkus/it/resteasy/mutiny/RegressionTest.java
new file mode 100644
index 0000000000000..042aa8f0a7cd8
--- /dev/null
+++ b/integration-tests/resteasy-mutiny/src/test/java/io/quarkus/it/resteasy/mutiny/RegressionTest.java
@@ -0,0 +1,35 @@
+package io.quarkus.it.resteasy.mutiny;
+
+import static io.restassured.RestAssured.get;
+import static org.hamcrest.CoreMatchers.is;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import io.quarkus.test.junit.QuarkusTest;
+
+@QuarkusTest
+public class RegressionTest {
+
+ @Nested
+ @DisplayName("Regression tests for #25818 (see https://github.com/quarkusio/quarkus/issues/25818)")
+ public class Bug25818 {
+
+ @Test
+ public void testDefaultExecutor() {
+ get("/reproducer/25818/default-executor")
+ .then()
+ .body(is("hello-you"))
+ .statusCode(200);
+ }
+
+ @Test
+ public void testWorkerPool() {
+ get("/reproducer/25818/worker-pool")
+ .then()
+ .body(is("hello-you"))
+ .statusCode(200);
+ }
+ }
+}