Skip to content

Commit

Permalink
Mutiny 1.6.0 upgrade and capture Vert.x contexts across all Mutiny sc…
Browse files Browse the repository at this point in the history
…hedulers

Fixes quarkusio#25818

This is part of a coordinated fix across Quarkus and Mutiny where scheduler wrapping would cause Vert.x context propagation not to be done.

Some changes have been adapted from the draft code from @luneo7 in the discussions of:

- quarkusio#26242
- quarkusio#25818

See the matching changes in Mutiny:  smallrye/smallrye-mutiny#955
  • Loading branch information
jponge committed Jun 22, 2022
1 parent 52db148 commit a3e04ed
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package io.quarkus.mutiny.deployment;

import java.util.Optional;
import java.util.concurrent.ExecutorService;

import org.jboss.threads.ContextHandler;

import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.ContextHandlerBuildItem;
import io.quarkus.deployment.builditem.ExecutorBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.mutiny.runtime.MutinyInfrastructure;
Expand All @@ -13,10 +17,13 @@ public class MutinyProcessor {

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
public void runtimeInit(ExecutorBuildItem executorBuildItem, MutinyInfrastructure recorder,
ShutdownContextBuildItem shutdownContext) {
public void runtimeInit(ExecutorBuildItem executorBuildItem,
MutinyInfrastructure recorder,
ShutdownContextBuildItem shutdownContext,
Optional<ContextHandlerBuildItem> contextHandler) {
ExecutorService executor = executorBuildItem.getExecutorProxy();
recorder.configureMutinyInfrastructure(executor, shutdownContext);
ContextHandler<Object> handler = contextHandler.map(ContextHandlerBuildItem::contextHandler).orElse(null);
recorder.configureMutinyInfrastructure(executor, shutdownContext, handler);
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.quarkus.mutiny.runtime;

import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.jboss.threads.ContextHandler;

class ContextualRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
private final RunnableScheduledFuture<V> runnable;
private final Object context;
private final ContextHandler<Object> contextHandler;

public ContextualRunnableScheduledFuture(ContextHandler<Object> contextHandler, Object context,
RunnableScheduledFuture<V> runnable) {
this.contextHandler = contextHandler;
this.context = context;
this.runnable = runnable;
}

@Override
public boolean isPeriodic() {
return runnable.isPeriodic();
}

@Override
public long getDelay(TimeUnit unit) {
return runnable.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
return runnable.compareTo(o);
}

@Override
public void run() {
if (contextHandler != null) {
contextHandler.runWith(runnable, context);
} else {
runnable.run();
}
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return runnable.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return runnable.isCancelled();
}

@Override
public boolean isDone() {
return runnable.isDone();
}

@Override
public V get() throws InterruptedException, ExecutionException {
return runnable.get();
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return runnable.get(timeout, unit);
}
}
Original file line number Diff line number Diff line change
@@ -1,42 +1,153 @@
package io.quarkus.mutiny.runtime;

import java.util.concurrent.Executor;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

import org.jboss.logging.Logger;
import org.jboss.threads.ContextHandler;

import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.infrastructure.MutinyScheduler;

@Recorder
public class MutinyInfrastructure {

public static final String VERTX_EVENT_LOOP_THREAD_PREFIX = "vert.x-eventloop-thread-";

public void configureMutinyInfrastructure(ExecutorService exec, ShutdownContext shutdownContext) {
//mutiny leaks a ScheduledExecutorService if you don't do this
public void configureMutinyInfrastructure(ExecutorService executor, ShutdownContext shutdownContext,
ContextHandler<Object> contextHandler) {
// Mutiny leaks a ScheduledExecutorService if we don't do this
Infrastructure.getDefaultWorkerPool().shutdown();
Infrastructure.setDefaultExecutor(new Executor() {

// Since executor is not a ScheduledExecutorService and Mutiny needs one for scheduling we have to adapt one around the provided executor
MutinyScheduler mutinyScheduler = new MutinyScheduler(executor) {
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
Object context = (contextHandler != null) ? contextHandler.captureContext() : null;
return super.decorateTask(runnable, new ContextualRunnableScheduledFuture<>(contextHandler, context, task));
}

@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
Object context = (contextHandler != null) ? contextHandler.captureContext() : null;
return super.decorateTask(callable, new ContextualRunnableScheduledFuture<>(contextHandler, context, task));
}
};
Infrastructure.setDefaultExecutor(new ScheduledExecutorService() {

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return mutinyScheduler.schedule(command, delay, unit);
}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return mutinyScheduler.schedule(callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return mutinyScheduler.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return mutinyScheduler.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

@Override
public void shutdown() {
mutinyScheduler.shutdown(); // ...but do not shut `executor` down
}

@Override
public List<Runnable> shutdownNow() {
return mutinyScheduler.shutdownNow();
}

@Override
public boolean isShutdown() {
return mutinyScheduler.isShutdown();
}

@Override
public boolean isTerminated() {
return mutinyScheduler.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return mutinyScheduler.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return executor.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return executor.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return executor.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return executor.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return executor.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return executor.invokeAny(tasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
try {
exec.execute(command);
} catch (RejectedExecutionException e) {
if (!exec.isShutdown() && !exec.isTerminated()) {
throw e;
executor.execute(command);
} catch (RejectedExecutionException rejected) {
// Ignore submission failures on application shutdown
if (!executor.isShutdown() && !executor.isTerminated()) {
throw rejected;
}
// Ignore the failure - the application has been shutdown.
}
}
});

shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
Infrastructure.getDefaultWorkerPool().shutdown();
mutinyScheduler.shutdown();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,14 +546,15 @@ public Object captureContext() {

@Override
public void runWith(Runnable task, Object context) {
if (context != null) {
ContextInternal currentContext = (ContextInternal) Vertx.currentContext();
if (context != null && context != currentContext) {
// Only do context handling if it's non null
final ContextInternal vertxContext = (ContextInternal) context;
vertxContext.beginDispatch();
try {
task.run();
} finally {
vertxContext.endDispatch(null);
vertxContext.endDispatch(currentContext);
}
} else {
task.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.quarkus.it.resteasy.mutiny.regression.bug25818;

import javax.enterprise.context.ApplicationScoped;

import io.vertx.core.Context;
import io.vertx.core.Vertx;

@ApplicationScoped
public class BlockingService {

public String getBlocking() {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Context context = Vertx.currentContext();
if (context == null) {
return "~~ context is null ~~";
} else {
return "hello-" + context.getLocal("hello-target");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.quarkus.it.resteasy.mutiny.regression.bug25818;

import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.jboss.logging.Logger;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

@Path("/reproducer/25818")
public class ReproducerResource {

private final Logger logger = Logger.getLogger(ReproducerResource.class);

@Inject
BlockingService service;

private void addToContext() {
Vertx.currentContext().putLocal("hello-target", "you");
}

@GET
@Path("/worker-pool")
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> workerPool() {
logger.info("worker pool endpoint");
addToContext();
return Uni.createFrom()
.item(service::getBlocking)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
}

@GET
@Path("/default-executor")
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> defaultExecutor() {
logger.info("default executor endpoint");
addToContext();
return Uni.createFrom()
.item(service::getBlocking)
.runSubscriptionOn(Infrastructure.getDefaultExecutor());
}

@GET
@Path("/worker-pool-submit")
public Uni<String> workerPoolSubmit() {
Vertx.currentContext().putLocal("yolo", "yolo");
return Uni.createFrom().emitter(emitter -> {
Infrastructure.getDefaultWorkerPool().submit(() -> {
Context ctx = Vertx.currentContext();
if (ctx != null) {
emitter.complete("yolo -> " + ctx.getLocal("yolo"));
} else {
emitter.complete("Context was null");
}
});
});
}

@GET
@Path("/worker-pool-schedule")
public Uni<String> workerPoolSchedule() {
Vertx.currentContext().putLocal("yolo", "yolo");
return Uni.createFrom().emitter(emitter -> {
Infrastructure.getDefaultWorkerPool().schedule(() -> {
Context ctx = Vertx.currentContext();
if (ctx != null) {
emitter.complete("yolo -> " + ctx.getLocal("yolo"));
} else {
emitter.complete("Context was null");
}
}, 25, TimeUnit.MILLISECONDS);
});
}
}
Loading

0 comments on commit a3e04ed

Please sign in to comment.