-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Mutiny 1.6.0 upgrade and capture Vert.x contexts across all Mutiny sc…
…hedulers Fixes #25818 This is part of a coordinated fix across Quarkus and Mutiny where scheduler wrapping would cause Vert.x context propagation not to be done. Some changes have been adapted from the draft code from @luneo7 in the discussions of: - #26242 - #25818 See the matching changes in Mutiny: smallrye/smallrye-mutiny#955
- Loading branch information
Showing
7 changed files
with
363 additions
and
15 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
71 changes: 71 additions & 0 deletions
71
...ny/runtime/src/main/java/io/quarkus/mutiny/runtime/ContextualRunnableScheduledFuture.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
package io.quarkus.mutiny.runtime; | ||
|
||
import java.util.concurrent.Delayed; | ||
import java.util.concurrent.ExecutionException; | ||
import java.util.concurrent.RunnableScheduledFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
import org.jboss.threads.ContextHandler; | ||
|
||
class ContextualRunnableScheduledFuture<V> implements RunnableScheduledFuture<V> { | ||
private final RunnableScheduledFuture<V> runnable; | ||
private final Object context; | ||
private final ContextHandler<Object> contextHandler; | ||
|
||
public ContextualRunnableScheduledFuture(ContextHandler<Object> contextHandler, Object context, | ||
RunnableScheduledFuture<V> runnable) { | ||
this.contextHandler = contextHandler; | ||
this.context = context; | ||
this.runnable = runnable; | ||
} | ||
|
||
@Override | ||
public boolean isPeriodic() { | ||
return runnable.isPeriodic(); | ||
} | ||
|
||
@Override | ||
public long getDelay(TimeUnit unit) { | ||
return runnable.getDelay(unit); | ||
} | ||
|
||
@Override | ||
public int compareTo(Delayed o) { | ||
return runnable.compareTo(o); | ||
} | ||
|
||
@Override | ||
public void run() { | ||
if (contextHandler != null) { | ||
contextHandler.runWith(runnable, context); | ||
} else { | ||
runnable.run(); | ||
} | ||
} | ||
|
||
@Override | ||
public boolean cancel(boolean mayInterruptIfRunning) { | ||
return runnable.cancel(mayInterruptIfRunning); | ||
} | ||
|
||
@Override | ||
public boolean isCancelled() { | ||
return runnable.isCancelled(); | ||
} | ||
|
||
@Override | ||
public boolean isDone() { | ||
return runnable.isDone(); | ||
} | ||
|
||
@Override | ||
public V get() throws InterruptedException, ExecutionException { | ||
return runnable.get(); | ||
} | ||
|
||
@Override | ||
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { | ||
return runnable.get(timeout, unit); | ||
} | ||
} |
131 changes: 121 additions & 10 deletions
131
extensions/mutiny/runtime/src/main/java/io/quarkus/mutiny/runtime/MutinyInfrastructure.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
24 changes: 24 additions & 0 deletions
24
...tiny/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/BlockingService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
package io.quarkus.it.resteasy.mutiny.regression.bug25818; | ||
|
||
import javax.enterprise.context.ApplicationScoped; | ||
|
||
import io.vertx.core.Context; | ||
import io.vertx.core.Vertx; | ||
|
||
@ApplicationScoped | ||
public class BlockingService { | ||
|
||
public String getBlocking() { | ||
try { | ||
Thread.sleep(250); | ||
} catch (InterruptedException e) { | ||
throw new RuntimeException(e); | ||
} | ||
Context context = Vertx.currentContext(); | ||
if (context == null) { | ||
return "~~ context is null ~~"; | ||
} else { | ||
return "hello-" + context.getLocal("hello-target"); | ||
} | ||
} | ||
} |
83 changes: 83 additions & 0 deletions
83
...y/src/main/java/io/quarkus/it/resteasy/mutiny/regression/bug25818/ReproducerResource.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package io.quarkus.it.resteasy.mutiny.regression.bug25818; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
|
||
import javax.inject.Inject; | ||
import javax.ws.rs.GET; | ||
import javax.ws.rs.Path; | ||
import javax.ws.rs.Produces; | ||
import javax.ws.rs.core.MediaType; | ||
|
||
import org.jboss.logging.Logger; | ||
|
||
import io.smallrye.mutiny.Uni; | ||
import io.smallrye.mutiny.infrastructure.Infrastructure; | ||
import io.vertx.core.Context; | ||
import io.vertx.core.Vertx; | ||
|
||
@Path("/reproducer/25818") | ||
public class ReproducerResource { | ||
|
||
private final Logger logger = Logger.getLogger(ReproducerResource.class); | ||
|
||
@Inject | ||
BlockingService service; | ||
|
||
private void addToContext() { | ||
Vertx.currentContext().putLocal("hello-target", "you"); | ||
} | ||
|
||
@GET | ||
@Path("/worker-pool") | ||
@Produces(MediaType.TEXT_PLAIN) | ||
public Uni<String> workerPool() { | ||
logger.info("worker pool endpoint"); | ||
addToContext(); | ||
return Uni.createFrom() | ||
.item(service::getBlocking) | ||
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool()); | ||
} | ||
|
||
@GET | ||
@Path("/default-executor") | ||
@Produces(MediaType.TEXT_PLAIN) | ||
public Uni<String> defaultExecutor() { | ||
logger.info("default executor endpoint"); | ||
addToContext(); | ||
return Uni.createFrom() | ||
.item(service::getBlocking) | ||
.runSubscriptionOn(Infrastructure.getDefaultExecutor()); | ||
} | ||
|
||
@GET | ||
@Path("/worker-pool-submit") | ||
public Uni<String> workerPoolSubmit() { | ||
Vertx.currentContext().putLocal("yolo", "yolo"); | ||
return Uni.createFrom().emitter(emitter -> { | ||
Infrastructure.getDefaultWorkerPool().submit(() -> { | ||
Context ctx = Vertx.currentContext(); | ||
if (ctx != null) { | ||
emitter.complete("yolo -> " + ctx.getLocal("yolo")); | ||
} else { | ||
emitter.complete("Context was null"); | ||
} | ||
}); | ||
}); | ||
} | ||
|
||
@GET | ||
@Path("/worker-pool-schedule") | ||
public Uni<String> workerPoolSchedule() { | ||
Vertx.currentContext().putLocal("yolo", "yolo"); | ||
return Uni.createFrom().emitter(emitter -> { | ||
Infrastructure.getDefaultWorkerPool().schedule(() -> { | ||
Context ctx = Vertx.currentContext(); | ||
if (ctx != null) { | ||
emitter.complete("yolo -> " + ctx.getLocal("yolo")); | ||
} else { | ||
emitter.complete("Context was null"); | ||
} | ||
}, 25, TimeUnit.MILLISECONDS); | ||
}); | ||
} | ||
} |
Oops, something went wrong.