From ae9184f61f8766a6978870af219754fbfab3b48a Mon Sep 17 00:00:00 2001 From: Martin Kouba Date: Tue, 10 Oct 2023 10:07:58 +0200 Subject: [PATCH] Vert.x EventBus: use codec selector for non-concrete types - fixes #36172 --- docs/src/main/asciidoc/vertx-reference.adoc | 6 +- .../deployment/EventBusCodecProcessor.java | 79 ++++++++++++++----- .../LocalCodecSelectorTypesBuildItem.java | 22 ++++++ .../vertx/deployment/VertxProcessor.java | 23 ++++-- .../io/quarkus/vertx/EventBusCodecTest.java | 36 ++++++++- .../io/quarkus/vertx/MutinyCodecTest.java | 9 ++- .../VertxEventBusConsumerRecorder.java | 27 ++++++- 7 files changed, 163 insertions(+), 39 deletions(-) create mode 100644 extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/LocalCodecSelectorTypesBuildItem.java diff --git a/docs/src/main/asciidoc/vertx-reference.adoc b/docs/src/main/asciidoc/vertx-reference.adoc index ba0ff5be38f0ca..fdcc107d726fba 100644 --- a/docs/src/main/asciidoc/vertx-reference.adoc +++ b/docs/src/main/asciidoc/vertx-reference.adoc @@ -712,9 +712,11 @@ Read xref:./virtual-threads.adoc[the virtual thread guide] for more details. === Use codecs -The https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x Event Bus] uses codecs to _serialize_ and _deserialize_ objects. +The https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x Event Bus] uses https://vertx.io/docs/vertx-core/java/#_message_codecs[codecs] to _serialize_ and _deserialize_ message objects. Quarkus provides a default codec for local delivery. -So you can exchange objects as follows: +This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with `@ConsumeEvent` whete `ConsumeEvent#local() == true` (which is the default). + +So that you can exchange the message objects as follows: [source, java] ---- diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusCodecProcessor.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusCodecProcessor.java index 8dd0ee0830df54..4eb061c4eb11f4 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusCodecProcessor.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/EventBusCodecProcessor.java @@ -5,9 +5,11 @@ import static io.quarkus.vertx.deployment.VertxConstants.LOCAL_EVENT_BUS_CODEC; import static io.quarkus.vertx.deployment.VertxConstants.UNI; +import java.lang.reflect.Modifier; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -17,11 +19,13 @@ import org.jboss.jandex.AnnotationInstance; import org.jboss.jandex.AnnotationTarget; import org.jboss.jandex.AnnotationValue; +import org.jboss.jandex.ClassInfo; import org.jboss.jandex.DotName; import org.jboss.jandex.IndexView; import org.jboss.jandex.MethodInfo; import org.jboss.jandex.ParameterizedType; import org.jboss.jandex.Type; +import org.jboss.jandex.Type.Kind; import org.jboss.logging.Logger; import io.quarkus.arc.deployment.BeanArchiveIndexBuildItem; @@ -46,21 +50,24 @@ public void registerCodecs( BeanArchiveIndexBuildItem beanArchiveIndexBuildItem, CombinedIndexBuildItem combinedIndex, BuildProducer messageCodecs, - BuildProducer reflectiveClass) { + BuildProducer reflectiveClass, + BuildProducer localCodecSelectorTypes) { final IndexView index = beanArchiveIndexBuildItem.getIndex(); Collection consumeEventAnnotationInstances = index.getAnnotations(CONSUME_EVENT); Map codecByTypes = new HashMap<>(); + Set selectorTypes = new HashSet<>(); + for (AnnotationInstance consumeEventAnnotationInstance : consumeEventAnnotationInstances) { AnnotationTarget typeTarget = consumeEventAnnotationInstance.target(); if (typeTarget.kind() != AnnotationTarget.Kind.METHOD) { - throw new UnsupportedOperationException("@ConsumeEvent annotation must target a method"); + throw new IllegalStateException("@ConsumeEvent annotation must target a method"); } - + AnnotationValue local = consumeEventAnnotationInstance.value("local"); + boolean isLocal = local == null || local.asBoolean(); MethodInfo method = typeTarget.asMethod(); - Type codecTargetFromReturnType = extractPayloadTypeFromReturn(method); - Type codecTargetFromParameter = extractPayloadTypeFromParameter(method); + Type codecTargetFromParameter = extractPayloadTypeFromParameter(method); // If the @ConsumeEvent set the codec, use this codec. It applies to the parameter AnnotationValue codec = consumeEventAnnotationInstance.value("codec"); if (codec != null && codec.asClass().kind() == Type.Kind.CLASS) { @@ -68,29 +75,46 @@ public void registerCodecs( throw new IllegalStateException("Invalid `codec` argument in @ConsumeEvent - no parameter"); } codecByTypes.put(codecTargetFromParameter.name(), codec.asClass().asClassType().name()); - } else if (codecTargetFromParameter != null) { - // Codec is not set, check if we have a built-in codec - if (!hasBuiltInCodec(codecTargetFromParameter)) { - // Ensure local delivery. - AnnotationValue local = consumeEventAnnotationInstance.value("local"); - if (local != null && !local.asBoolean()) { - throw new UnsupportedOperationException( - "The generic message codec can only be used for local delivery," - + ", implement your own event bus codec for " + codecTargetFromParameter.name() - .toString()); - } else if (!codecByTypes.containsKey(codecTargetFromParameter.name())) { + } else if (codecTargetFromParameter != null && !hasBuiltInCodec(codecTargetFromParameter)) { + // Codec is not set and built-in codecs cannot be used + if (!isLocal) { + throw new IllegalStateException( + "The Local Message Codec can only be used for local delivery," + + " you will need to implement a message codec for " + codecTargetFromParameter.name() + .toString() + + " and make use of @ConsumeEvent#codec()"); + } else if (!codecByTypes.containsKey(codecTargetFromParameter.name())) { + if (isConcreteClass(codecTargetFromParameter, index)) { + // The default codec makes only sense for concrete classes LOGGER.debugf("Local Message Codec registered for type %s", codecTargetFromParameter); codecByTypes.put(codecTargetFromParameter.name(), LOCAL_EVENT_BUS_CODEC); + } else { + LOGGER.debugf("Local Message Codec will be selected for type %s", codecTargetFromParameter); + selectorTypes.add(codecTargetFromParameter.name()); } } } - if (codecTargetFromReturnType != null && !hasBuiltInCodec(codecTargetFromReturnType) - && !codecByTypes.containsKey(codecTargetFromReturnType.name())) { - - LOGGER.debugf("Local Message Codec registered for type %s", codecTargetFromReturnType); - codecByTypes.put(codecTargetFromReturnType.name(), LOCAL_EVENT_BUS_CODEC); + Type codecTargetFromReturnType = extractPayloadTypeFromReturn(method); + if (codecTargetFromReturnType != null && !hasBuiltInCodec(codecTargetFromReturnType)) { + if (!isLocal) { + throw new IllegalStateException( + "The Local Message Codec can only be used for local delivery," + + " you will need to modify the method to consume io.vertx.core.eventbus.Message, implement a message codec for " + + codecTargetFromReturnType.name() + .toString() + + " and make use of io.vertx.core.eventbus.DeliveryOptions"); + } else if (!codecByTypes.containsKey(codecTargetFromReturnType.name())) { + if (isConcreteClass(codecTargetFromReturnType, index)) { + // The default codec makes only sense for concrete classes + LOGGER.debugf("Local Message Codec registered for type %s", codecTargetFromReturnType); + codecByTypes.put(codecTargetFromReturnType.name(), LOCAL_EVENT_BUS_CODEC); + } else { + LOGGER.debugf("Local Message Codec will be selected for type %s", codecTargetFromReturnType); + selectorTypes.add(codecTargetFromReturnType.name()); + } + } } } @@ -133,6 +157,9 @@ public void accept(String name) { reflectiveClass.produce(ReflectiveClassBuildItem.builder(name).methods().build()); } }); + + localCodecSelectorTypes.produce(new LocalCodecSelectorTypesBuildItem( + selectorTypes.stream().map(Object::toString).collect(Collectors.toSet()))); } private static final List BUILT_IN_CODECS = Arrays.asList( @@ -220,4 +247,14 @@ private static boolean hasBuiltInCodec(Type type) { private static boolean isMessageClass(ParameterizedType type) { return VertxConstants.isMessage(type.name()); } + + private static boolean isConcreteClass(Type type, IndexView index) { + if (type != null && type.kind() == Kind.CLASS) { + ClassInfo clazz = index.getClassByName(type.name()); + if (clazz != null) { + return !clazz.isInterface() && !Modifier.isAbstract(clazz.flags()); + } + } + return false; + } } diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/LocalCodecSelectorTypesBuildItem.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/LocalCodecSelectorTypesBuildItem.java new file mode 100644 index 00000000000000..a8781f1270be8b --- /dev/null +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/LocalCodecSelectorTypesBuildItem.java @@ -0,0 +1,22 @@ +package io.quarkus.vertx.deployment; + +import java.util.Set; + +import io.quarkus.builder.item.SimpleBuildItem; + +/** + * Carries all types for which the {@link io.quarkus.vertx.LocalEventBusCodec} should be selected automatically. + */ +public final class LocalCodecSelectorTypesBuildItem extends SimpleBuildItem { + + private final Set types; + + LocalCodecSelectorTypesBuildItem(Set types) { + this.types = types; + } + + public Set getTypes() { + return types; + } + +} diff --git a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java index 0fd069a0777e0e..2d69d4a6d7be99 100644 --- a/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java +++ b/extensions/vertx/deployment/src/main/java/io/quarkus/vertx/deployment/VertxProcessor.java @@ -4,6 +4,7 @@ import static io.quarkus.vertx.deployment.VertxConstants.isMessage; import static io.quarkus.vertx.deployment.VertxConstants.isMessageHeaders; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -45,7 +46,6 @@ import io.quarkus.deployment.builditem.ShutdownContextBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem; -import io.quarkus.deployment.recording.RecorderContext; import io.quarkus.gizmo.ClassOutput; import io.quarkus.vertx.ConsumeEvent; import io.quarkus.vertx.core.deployment.CoreVertxBuildItem; @@ -74,7 +74,7 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder rec BuildProducer generatedClass, AnnotationProxyBuildItem annotationProxy, LaunchModeBuildItem launchMode, ShutdownContextBuildItem shutdown, BuildProducer serviceStart, BuildProducer reflectiveClass, - List codecs, RecorderContext recorderContext) { + List codecs, LocalCodecSelectorTypesBuildItem localCodecSelectorTypes) { Map messageConsumerConfigurations = new HashMap<>(); ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true); for (EventConsumerBusinessMethodItem businessMethod : messageConsumerBusinessMethods) { @@ -87,15 +87,20 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder rec reflectiveClass.produce(ReflectiveClassBuildItem.builder(invokerClass).build()); } + ClassLoader tccl = Thread.currentThread().getContextClassLoader(); Map, Class> codecByClass = new HashMap<>(); for (MessageCodecBuildItem messageCodecItem : codecs) { - codecByClass.put(recorderContext.classProxy(messageCodecItem.getType()), - recorderContext.classProxy(messageCodecItem.getCodec())); + codecByClass.put(tryLoad(messageCodecItem.getType(), tccl), tryLoad(messageCodecItem.getCodec(), tccl)); + } + + List> selectorTypes = new ArrayList<>(); + for (String name : localCodecSelectorTypes.getTypes()) { + selectorTypes.add(tryLoad(name, tccl)); } recorder.configureVertx(vertx.getVertx(), messageConsumerConfigurations, launchMode.getLaunchMode(), - shutdown, codecByClass); + shutdown, codecByClass, selectorTypes); serviceStart.produce(new ServiceStartBuildItem("vertx")); return new VertxBuildItem(recorder.forceStart(vertx.getVertx())); } @@ -190,4 +195,12 @@ void faultToleranceIntegration(Capabilities capabilities, BuildProducer tryLoad(String name, ClassLoader tccl) { + try { + return tccl.loadClass(name); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("Unable to load type: " + name, e); + } + } } diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/EventBusCodecTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/EventBusCodecTest.java index dcd5f0ebd15cd0..ba6a9cc3576da7 100644 --- a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/EventBusCodecTest.java +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/EventBusCodecTest.java @@ -1,6 +1,7 @@ package io.quarkus.vertx; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -8,6 +9,8 @@ import java.lang.annotation.Target; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.function.Function; +import java.util.function.Supplier; import jakarta.inject.Inject; @@ -60,10 +63,10 @@ public void testWithUserCodec() { @Test public void testWithUserCodecNonLocal() { - Greeting hello = vertx.eventBus(). request("nl-pet", new Pet("neo", "rabbit")) + String hello = vertx.eventBus(). request("nl-pet", new Pet("neo", "rabbit")) .onItem().transform(Message::body) .await().indefinitely(); - assertThat(hello.getMessage()).isEqualTo("Non Local Hello NEO"); + assertEquals("Non Local Hello NEO", hello); } @Test @@ -79,6 +82,20 @@ public void testWithSubclass() { assertThat(hello.getMessage()).isEqualTo("Hello my-subclass-event"); } + @Test + public void testWithInterfaceCodecTarget() { + Supplier supplier = vertx.eventBus() + .> request("hello-supplier", new Function() { + @Override + public String apply(String value) { + return value.toLowerCase(); + } + }) + .onItem().transform(Message::body) + .await().indefinitely(); + assertEquals("foo", supplier.get()); + } + static class Greeting { private final String message; @@ -118,12 +135,23 @@ void messageTypeWithTypeAnnotation(@NonNull Person person) { public CompletionStage hello(Event event) { return CompletableFuture.completedFuture(new Greeting("Hello " + event.getProperty())); } + + @ConsumeEvent("hello-supplier") + public Supplier helloSupplier(Function fun) { + return new Supplier() { + + @Override + public String get() { + return fun.apply("FOO"); + } + }; + } } static class MyNonLocalBean { @ConsumeEvent(value = "nl-pet", codec = MyPetCodec.class, local = false) - public CompletionStage hello(Pet p) { - return CompletableFuture.completedFuture(new Greeting("Non Local Hello " + p.getName())); + public CompletionStage hello(Pet p) { + return CompletableFuture.completedFuture("Non Local Hello " + p.getName()); } } diff --git a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/MutinyCodecTest.java b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/MutinyCodecTest.java index 9ca11164405f3c..5a7732111c7dd1 100644 --- a/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/MutinyCodecTest.java +++ b/extensions/vertx/deployment/src/test/java/io/quarkus/vertx/MutinyCodecTest.java @@ -1,6 +1,7 @@ package io.quarkus.vertx; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import jakarta.inject.Inject; @@ -54,10 +55,10 @@ public void testWithUserCodec() { @Test public void testWithUserCodecNonLocal() { - Greeting hello = vertx.eventBus(). request("nl-pet", new Pet("neo", "rabbit")) + String hello = vertx.eventBus(). request("nl-pet", new Pet("neo", "rabbit")) .onItem().transform(Message::body) .await().indefinitely(); - assertThat(hello.getMessage()).isEqualTo("Non Local Hello NEO"); + assertEquals("Non Local Hello NEO", hello); } static class Greeting { @@ -90,9 +91,9 @@ public Uni hello(Pet p) { static class MyNonLocalBean { @ConsumeEvent(value = "nl-pet", codec = MyPetCodec.class, local = false) - public Uni hello(Pet p) { + public Uni hello(Pet p) { return Uni.createFrom().item( - () -> new Greeting("Non Local Hello " + p.getName())) + () -> "Non Local Hello " + p.getName()) .emitOn(Infrastructure.getDefaultExecutor()); } } diff --git a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java index 3bdad1e36f8f7a..7a3edda5247662 100644 --- a/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java +++ b/extensions/vertx/runtime/src/main/java/io/quarkus/vertx/runtime/VertxEventBusConsumerRecorder.java @@ -11,6 +11,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.function.Function; import java.util.function.Supplier; import org.jboss.logging.Logger; @@ -22,6 +23,7 @@ import io.quarkus.runtime.annotations.Recorder; import io.quarkus.runtime.configuration.ProfileManager; import io.quarkus.vertx.ConsumeEvent; +import io.quarkus.vertx.LocalEventBusCodec; import io.quarkus.virtual.threads.VirtualThreadsRecorder; import io.smallrye.common.vertx.VertxContext; import io.vertx.core.AsyncResult; @@ -44,12 +46,13 @@ public class VertxEventBusConsumerRecorder { static volatile List> messageConsumers; public void configureVertx(Supplier vertx, Map messageConsumerConfigurations, - LaunchMode launchMode, ShutdownContext shutdown, Map, Class> codecByClass) { + LaunchMode launchMode, ShutdownContext shutdown, Map, Class> codecByClass, + List> selectorTypes) { VertxEventBusConsumerRecorder.vertx = vertx.get(); VertxEventBusConsumerRecorder.messageConsumers = new CopyOnWriteArrayList<>(); registerMessageConsumers(messageConsumerConfigurations); - registerCodecs(codecByClass); + registerCodecs(codecByClass, selectorTypes); if (launchMode == LaunchMode.DEVELOPMENT) { shutdown.addShutdownTask(new Runnable() { @@ -244,7 +247,7 @@ private EventConsumerInvoker createInvoker(String invokerClassName) { } @SuppressWarnings("unchecked") - private void registerCodecs(Map, Class> codecByClass) { + private void registerCodecs(Map, Class> codecByClass, List> selectorTypes) { EventBus eventBus = vertx.eventBus(); boolean isDevMode = ProfileManager.getLaunchMode() == LaunchMode.DEVELOPMENT; for (Map.Entry, Class> codecEntry : codecByClass.entrySet()) { @@ -252,6 +255,7 @@ private void registerCodecs(Map, Class> codecByClass) { Class codec = codecEntry.getValue(); try { if (MessageCodec.class.isAssignableFrom(codec)) { + @SuppressWarnings("rawtypes") MessageCodec messageCodec = (MessageCodec) codec.getDeclaredConstructor().newInstance(); if (isDevMode) { // we need to unregister the codecs because in dev mode vert.x is not reloaded @@ -267,6 +271,23 @@ private void registerCodecs(Map, Class> codecByClass) { LOGGER.error("Cannot instantiate the MessageCodec " + target.toString(), e); } } + + String localCodecName = "quarkus_default_local_codec"; + if (isDevMode) { + eventBus.unregisterCodec(localCodecName); + } + eventBus.registerCodec(new LocalEventBusCodec<>(localCodecName)); + eventBus.codecSelector(new Function() { + @Override + public String apply(Object messageBody) { + for (Class selectorType : selectorTypes) { + if (selectorType.isAssignableFrom(messageBody.getClass())) { + return localCodecName; + } + } + return null; + } + }); } public RuntimeValue forceStart(Supplier vertx) {