Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Mutiny schedulers context propagation bug #26242

Merged
merged 1 commit into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package io.quarkus.mutiny.deployment;

import java.util.Optional;
import java.util.concurrent.ExecutorService;

import org.jboss.threads.ContextHandler;

import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.annotations.ExecutionTime;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.ContextHandlerBuildItem;
import io.quarkus.deployment.builditem.ExecutorBuildItem;
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.mutiny.runtime.MutinyInfrastructure;
Expand All @@ -13,10 +17,13 @@ public class MutinyProcessor {

@BuildStep
@Record(ExecutionTime.RUNTIME_INIT)
public void runtimeInit(ExecutorBuildItem executorBuildItem, MutinyInfrastructure recorder,
ShutdownContextBuildItem shutdownContext) {
public void runtimeInit(ExecutorBuildItem executorBuildItem,
MutinyInfrastructure recorder,
ShutdownContextBuildItem shutdownContext,
Optional<ContextHandlerBuildItem> contextHandler) {
ExecutorService executor = executorBuildItem.getExecutorProxy();
recorder.configureMutinyInfrastructure(executor, shutdownContext);
ContextHandler<Object> handler = contextHandler.map(ContextHandlerBuildItem::contextHandler).orElse(null);
recorder.configureMutinyInfrastructure(executor, shutdownContext, handler);
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.quarkus.mutiny.runtime;

import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.jboss.threads.ContextHandler;

class ContextualRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> {
private final RunnableScheduledFuture<V> runnable;
private final Object context;
private final ContextHandler<Object> contextHandler;

public ContextualRunnableScheduledFuture(ContextHandler<Object> contextHandler, Object context,
RunnableScheduledFuture<V> runnable) {
this.contextHandler = contextHandler;
this.context = context;
this.runnable = runnable;
}

@Override
public boolean isPeriodic() {
return runnable.isPeriodic();
}

@Override
public long getDelay(TimeUnit unit) {
return runnable.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
return runnable.compareTo(o);
}

@Override
public void run() {
if (contextHandler != null) {
contextHandler.runWith(runnable, context);
} else {
runnable.run();
}
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return runnable.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return runnable.isCancelled();
}

@Override
public boolean isDone() {
return runnable.isDone();
}

@Override
public V get() throws InterruptedException, ExecutionException {
return runnable.get();
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return runnable.get(timeout, unit);
}
}
Original file line number Diff line number Diff line change
@@ -1,42 +1,153 @@
package io.quarkus.mutiny.runtime;

import java.util.concurrent.Executor;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;

import org.jboss.logging.Logger;
import org.jboss.threads.ContextHandler;

import io.quarkus.runtime.ShutdownContext;
import io.quarkus.runtime.annotations.Recorder;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.infrastructure.MutinyScheduler;

@Recorder
public class MutinyInfrastructure {

public static final String VERTX_EVENT_LOOP_THREAD_PREFIX = "vert.x-eventloop-thread-";

public void configureMutinyInfrastructure(ExecutorService exec, ShutdownContext shutdownContext) {
//mutiny leaks a ScheduledExecutorService if you don't do this
public void configureMutinyInfrastructure(ExecutorService executor, ShutdownContext shutdownContext,
ContextHandler<Object> contextHandler) {
// Mutiny leaks a ScheduledExecutorService if we don't do this
Infrastructure.getDefaultWorkerPool().shutdown();
Infrastructure.setDefaultExecutor(new Executor() {

// Since executor is not a ScheduledExecutorService and Mutiny needs one for scheduling we have to adapt one around the provided executor
MutinyScheduler mutinyScheduler = new MutinyScheduler(executor) {
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
Object context = (contextHandler != null) ? contextHandler.captureContext() : null;
return super.decorateTask(runnable, new ContextualRunnableScheduledFuture<>(contextHandler, context, task));
}

@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) {
Object context = (contextHandler != null) ? contextHandler.captureContext() : null;
return super.decorateTask(callable, new ContextualRunnableScheduledFuture<>(contextHandler, context, task));
}
};
Infrastructure.setDefaultExecutor(new ScheduledExecutorService() {

@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return mutinyScheduler.schedule(command, delay, unit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we schedule something, inside a route handler the Vert.x context won't be propagated, just when execute is called, that would mean schedule it to run right away... If someone use the Infrastructure.getDefaultWorkerPool and call any of the schedule methods it won't propagate the vert.x context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying a variation of your code (we won't add another parameter to Infrastructure in Mutiny) but I get an error testing the Mutiny extension:

Errors:
[ERROR]   MutinyTest » Runtime java.lang.RuntimeException: io.quarkus.builder.ChainBuildException: No producers for required item class io.quarkus.deployment.builditem.ContextHandlerBuildItem

Other than that I have added test cases calling submit and schedule, context does propagate with your changes to the Vert.x extension.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping in an Optional<...>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, in the mutiny processor the runtimeInit needs to take an Optional<ContextHandlerBuildItem> contextHandlerBuildItem and need to update the code when there is no context handler

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it runtimeInit accept optional:

image

Creating the right MutinyScheduler if context handler is not null:
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed it to my branch, if you are basing on something there: main...luneo7:mutiny-executors-simple

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already have something like this locally 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just let a null handler being forwarded, and just call Runnable::run() when null in CRSF

}

@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return mutinyScheduler.schedule(callable, delay, unit);
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return mutinyScheduler.scheduleAtFixedRate(command, initialDelay, period, unit);
}

@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return mutinyScheduler.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

@Override
public void shutdown() {
mutinyScheduler.shutdown(); // ...but do not shut `executor` down
}

@Override
public List<Runnable> shutdownNow() {
return mutinyScheduler.shutdownNow();
}

@Override
public boolean isShutdown() {
return mutinyScheduler.isShutdown();
}

@Override
public boolean isTerminated() {
return mutinyScheduler.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return mutinyScheduler.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return executor.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return executor.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return executor.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return executor.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return executor.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return executor.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return executor.invokeAny(tasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
try {
exec.execute(command);
} catch (RejectedExecutionException e) {
if (!exec.isShutdown() && !exec.isTerminated()) {
throw e;
executor.execute(command);
} catch (RejectedExecutionException rejected) {
// Ignore submission failures on application shutdown
if (!executor.isShutdown() && !executor.isTerminated()) {
throw rejected;
}
// Ignore the failure - the application has been shutdown.
}
}
});

shutdownContext.addLastShutdownTask(new Runnable() {
@Override
public void run() {
Infrastructure.getDefaultWorkerPool().shutdown();
mutinyScheduler.shutdown();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,14 +546,15 @@ public Object captureContext() {

@Override
public void runWith(Runnable task, Object context) {
if (context != null) {
ContextInternal currentContext = (ContextInternal) Vertx.currentContext();
if (context != null && context != currentContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And now the philosophical question: should we distinguish root context and duplicated context?

I do not believe so, but I think we should consider the question.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated contexts do not share the context-local data, so I would say we are running with a different context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those context are mostly duplicated contexts. When you duplicate a root context, their local data will also by propagated to the duplicate context, the only thing is that changing this local data won't change the root local data.
When I added this check ’if not null and if different from current’ was just a safe guard if trying to run the context handler twice in the same context, so we don't dispatch the vertx context multiple times. Since the executor will span a thread that is not a vertx thread, when you try to get the current vertx context it will always be null, unless the context handler was run multiple times in that thread.
Don't think that we need to distinguish root and duplicated contexts.

// Only do context handling if it's non-null
final ContextInternal vertxContext = (ContextInternal) context;
vertxContext.beginDispatch();
try {
task.run();
} finally {
vertxContext.endDispatch(null);
vertxContext.endDispatch(currentContext);
}
} else {
task.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.quarkus.it.resteasy.mutiny.regression.bug25818;

import javax.enterprise.context.ApplicationScoped;

import io.vertx.core.Context;
import io.vertx.core.Vertx;

@ApplicationScoped
public class BlockingService {

public String getBlocking() {
try {
Thread.sleep(250);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Context context = Vertx.currentContext();
if (context == null) {
return "~~ context is null ~~";
} else {
return "hello-" + context.getLocal("hello-target");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.quarkus.it.resteasy.mutiny.regression.bug25818;

import java.util.concurrent.TimeUnit;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.jboss.logging.Logger;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.vertx.core.Context;
import io.vertx.core.Vertx;

@Path("/reproducer/25818")
public class ReproducerResource {

private final Logger logger = Logger.getLogger(ReproducerResource.class);

@Inject
BlockingService service;

private void addToContext() {
Vertx.currentContext().putLocal("hello-target", "you");
}

@GET
@Path("/worker-pool")
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> workerPool() {
logger.info("worker pool endpoint");
addToContext();
return Uni.createFrom()
.item(service::getBlocking)
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
}

@GET
@Path("/default-executor")
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> defaultExecutor() {
logger.info("default executor endpoint");
addToContext();
return Uni.createFrom()
.item(service::getBlocking)
.runSubscriptionOn(Infrastructure.getDefaultExecutor());
}

@GET
@Path("/worker-pool-submit")
public Uni<String> workerPoolSubmit() {
Vertx.currentContext().putLocal("yolo", "yolo");
return Uni.createFrom().emitter(emitter -> {
Infrastructure.getDefaultWorkerPool().submit(() -> {
Context ctx = Vertx.currentContext();
if (ctx != null) {
emitter.complete("yolo -> " + ctx.getLocal("yolo"));
} else {
emitter.complete("Context was null");
}
});
});
}

@GET
@Path("/worker-pool-schedule")
public Uni<String> workerPoolSchedule() {
Vertx.currentContext().putLocal("yolo", "yolo");
return Uni.createFrom().emitter(emitter -> {
Infrastructure.getDefaultWorkerPool().schedule(() -> {
Context ctx = Vertx.currentContext();
if (ctx != null) {
emitter.complete("yolo -> " + ctx.getLocal("yolo"));
} else {
emitter.complete("Context was null");
}
}, 25, TimeUnit.MILLISECONDS);
});
}
}
Loading