Skip to content

Commit

Permalink
Vert.x EventBus: use codec selector for non-concrete types
Browse files Browse the repository at this point in the history
  • Loading branch information
mkouba authored and holly-cummins committed Feb 8, 2024
1 parent b09fe47 commit ae9184f
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 39 deletions.
6 changes: 4 additions & 2 deletions docs/src/main/asciidoc/vertx-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -712,9 +712,11 @@ Read xref:./virtual-threads.adoc[the virtual thread guide] for more details.

=== Use codecs

The https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x Event Bus] uses codecs to _serialize_ and _deserialize_ objects.
The https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x Event Bus] uses https://vertx.io/docs/vertx-core/java/#_message_codecs[codecs] to _serialize_ and _deserialize_ message objects.
Quarkus provides a default codec for local delivery.
So you can exchange objects as follows:
This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with `@ConsumeEvent` whete `ConsumeEvent#local() == true` (which is the default).

So that you can exchange the message objects as follows:

[source, java]
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import static io.quarkus.vertx.deployment.VertxConstants.LOCAL_EVENT_BUS_CODEC;
import static io.quarkus.vertx.deployment.VertxConstants.UNI;

import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -17,11 +19,13 @@
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTarget;
import org.jboss.jandex.AnnotationValue;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.ParameterizedType;
import org.jboss.jandex.Type;
import org.jboss.jandex.Type.Kind;
import org.jboss.logging.Logger;

import io.quarkus.arc.deployment.BeanArchiveIndexBuildItem;
Expand All @@ -46,51 +50,71 @@ public void registerCodecs(
BeanArchiveIndexBuildItem beanArchiveIndexBuildItem,
CombinedIndexBuildItem combinedIndex,
BuildProducer<MessageCodecBuildItem> messageCodecs,
BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<LocalCodecSelectorTypesBuildItem> localCodecSelectorTypes) {

final IndexView index = beanArchiveIndexBuildItem.getIndex();
Collection<AnnotationInstance> consumeEventAnnotationInstances = index.getAnnotations(CONSUME_EVENT);
Map<DotName, DotName> codecByTypes = new HashMap<>();
Set<DotName> selectorTypes = new HashSet<>();

for (AnnotationInstance consumeEventAnnotationInstance : consumeEventAnnotationInstances) {
AnnotationTarget typeTarget = consumeEventAnnotationInstance.target();
if (typeTarget.kind() != AnnotationTarget.Kind.METHOD) {
throw new UnsupportedOperationException("@ConsumeEvent annotation must target a method");
throw new IllegalStateException("@ConsumeEvent annotation must target a method");
}

AnnotationValue local = consumeEventAnnotationInstance.value("local");
boolean isLocal = local == null || local.asBoolean();
MethodInfo method = typeTarget.asMethod();
Type codecTargetFromReturnType = extractPayloadTypeFromReturn(method);
Type codecTargetFromParameter = extractPayloadTypeFromParameter(method);

Type codecTargetFromParameter = extractPayloadTypeFromParameter(method);
// If the @ConsumeEvent set the codec, use this codec. It applies to the parameter
AnnotationValue codec = consumeEventAnnotationInstance.value("codec");
if (codec != null && codec.asClass().kind() == Type.Kind.CLASS) {
if (codecTargetFromParameter == null) {
throw new IllegalStateException("Invalid `codec` argument in @ConsumeEvent - no parameter");
}
codecByTypes.put(codecTargetFromParameter.name(), codec.asClass().asClassType().name());
} else if (codecTargetFromParameter != null) {
// Codec is not set, check if we have a built-in codec
if (!hasBuiltInCodec(codecTargetFromParameter)) {
// Ensure local delivery.
AnnotationValue local = consumeEventAnnotationInstance.value("local");
if (local != null && !local.asBoolean()) {
throw new UnsupportedOperationException(
"The generic message codec can only be used for local delivery,"
+ ", implement your own event bus codec for " + codecTargetFromParameter.name()
.toString());
} else if (!codecByTypes.containsKey(codecTargetFromParameter.name())) {
} else if (codecTargetFromParameter != null && !hasBuiltInCodec(codecTargetFromParameter)) {
// Codec is not set and built-in codecs cannot be used
if (!isLocal) {
throw new IllegalStateException(
"The Local Message Codec can only be used for local delivery,"
+ " you will need to implement a message codec for " + codecTargetFromParameter.name()
.toString()
+ " and make use of @ConsumeEvent#codec()");
} else if (!codecByTypes.containsKey(codecTargetFromParameter.name())) {
if (isConcreteClass(codecTargetFromParameter, index)) {
// The default codec makes only sense for concrete classes
LOGGER.debugf("Local Message Codec registered for type %s",
codecTargetFromParameter);
codecByTypes.put(codecTargetFromParameter.name(), LOCAL_EVENT_BUS_CODEC);
} else {
LOGGER.debugf("Local Message Codec will be selected for type %s", codecTargetFromParameter);
selectorTypes.add(codecTargetFromParameter.name());
}
}
}

if (codecTargetFromReturnType != null && !hasBuiltInCodec(codecTargetFromReturnType)
&& !codecByTypes.containsKey(codecTargetFromReturnType.name())) {

LOGGER.debugf("Local Message Codec registered for type %s", codecTargetFromReturnType);
codecByTypes.put(codecTargetFromReturnType.name(), LOCAL_EVENT_BUS_CODEC);
Type codecTargetFromReturnType = extractPayloadTypeFromReturn(method);
if (codecTargetFromReturnType != null && !hasBuiltInCodec(codecTargetFromReturnType)) {
if (!isLocal) {
throw new IllegalStateException(
"The Local Message Codec can only be used for local delivery,"
+ " you will need to modify the method to consume io.vertx.core.eventbus.Message, implement a message codec for "
+ codecTargetFromReturnType.name()
.toString()
+ " and make use of io.vertx.core.eventbus.DeliveryOptions");
} else if (!codecByTypes.containsKey(codecTargetFromReturnType.name())) {
if (isConcreteClass(codecTargetFromReturnType, index)) {
// The default codec makes only sense for concrete classes
LOGGER.debugf("Local Message Codec registered for type %s", codecTargetFromReturnType);
codecByTypes.put(codecTargetFromReturnType.name(), LOCAL_EVENT_BUS_CODEC);
} else {
LOGGER.debugf("Local Message Codec will be selected for type %s", codecTargetFromReturnType);
selectorTypes.add(codecTargetFromReturnType.name());
}
}
}
}

Expand Down Expand Up @@ -133,6 +157,9 @@ public void accept(String name) {
reflectiveClass.produce(ReflectiveClassBuildItem.builder(name).methods().build());
}
});

localCodecSelectorTypes.produce(new LocalCodecSelectorTypesBuildItem(
selectorTypes.stream().map(Object::toString).collect(Collectors.toSet())));
}

private static final List<String> BUILT_IN_CODECS = Arrays.asList(
Expand Down Expand Up @@ -220,4 +247,14 @@ private static boolean hasBuiltInCodec(Type type) {
private static boolean isMessageClass(ParameterizedType type) {
return VertxConstants.isMessage(type.name());
}

private static boolean isConcreteClass(Type type, IndexView index) {
if (type != null && type.kind() == Kind.CLASS) {
ClassInfo clazz = index.getClassByName(type.name());
if (clazz != null) {
return !clazz.isInterface() && !Modifier.isAbstract(clazz.flags());
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.vertx.deployment;

import java.util.Set;

import io.quarkus.builder.item.SimpleBuildItem;

/**
* Carries all types for which the {@link io.quarkus.vertx.LocalEventBusCodec} should be selected automatically.
*/
public final class LocalCodecSelectorTypesBuildItem extends SimpleBuildItem {

private final Set<String> types;

LocalCodecSelectorTypesBuildItem(Set<String> types) {
this.types = types;
}

public Set<String> getTypes() {
return types;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.quarkus.vertx.deployment.VertxConstants.isMessage;
import static io.quarkus.vertx.deployment.VertxConstants.isMessageHeaders;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -45,7 +46,6 @@
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem;
import io.quarkus.deployment.recording.RecorderContext;
import io.quarkus.gizmo.ClassOutput;
import io.quarkus.vertx.ConsumeEvent;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
Expand Down Expand Up @@ -74,7 +74,7 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder rec
BuildProducer<GeneratedClassBuildItem> generatedClass,
AnnotationProxyBuildItem annotationProxy, LaunchModeBuildItem launchMode, ShutdownContextBuildItem shutdown,
BuildProducer<ServiceStartBuildItem> serviceStart, BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
List<MessageCodecBuildItem> codecs, RecorderContext recorderContext) {
List<MessageCodecBuildItem> codecs, LocalCodecSelectorTypesBuildItem localCodecSelectorTypes) {
Map<String, ConsumeEvent> messageConsumerConfigurations = new HashMap<>();
ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true);
for (EventConsumerBusinessMethodItem businessMethod : messageConsumerBusinessMethods) {
Expand All @@ -87,15 +87,20 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder rec
reflectiveClass.produce(ReflectiveClassBuildItem.builder(invokerClass).build());
}

ClassLoader tccl = Thread.currentThread().getContextClassLoader();
Map<Class<?>, Class<?>> codecByClass = new HashMap<>();
for (MessageCodecBuildItem messageCodecItem : codecs) {
codecByClass.put(recorderContext.classProxy(messageCodecItem.getType()),
recorderContext.classProxy(messageCodecItem.getCodec()));
codecByClass.put(tryLoad(messageCodecItem.getType(), tccl), tryLoad(messageCodecItem.getCodec(), tccl));
}

List<Class<?>> selectorTypes = new ArrayList<>();
for (String name : localCodecSelectorTypes.getTypes()) {
selectorTypes.add(tryLoad(name, tccl));
}

recorder.configureVertx(vertx.getVertx(), messageConsumerConfigurations,
launchMode.getLaunchMode(),
shutdown, codecByClass);
shutdown, codecByClass, selectorTypes);
serviceStart.produce(new ServiceStartBuildItem("vertx"));
return new VertxBuildItem(recorder.forceStart(vertx.getVertx()));
}
Expand Down Expand Up @@ -190,4 +195,12 @@ void faultToleranceIntegration(Capabilities capabilities, BuildProducer<ServiceP
"io.smallrye.faulttolerance.vertx.VertxEventLoop"));
}
}

private Class<?> tryLoad(String name, ClassLoader tccl) {
try {
return tccl.loadClass(name);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Unable to load type: " + name, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.quarkus.vertx;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;

import jakarta.inject.Inject;

Expand Down Expand Up @@ -60,10 +63,10 @@ public void testWithUserCodec() {

@Test
public void testWithUserCodecNonLocal() {
Greeting hello = vertx.eventBus().<Greeting> request("nl-pet", new Pet("neo", "rabbit"))
String hello = vertx.eventBus().<String> request("nl-pet", new Pet("neo", "rabbit"))
.onItem().transform(Message::body)
.await().indefinitely();
assertThat(hello.getMessage()).isEqualTo("Non Local Hello NEO");
assertEquals("Non Local Hello NEO", hello);
}

@Test
Expand All @@ -79,6 +82,20 @@ public void testWithSubclass() {
assertThat(hello.getMessage()).isEqualTo("Hello my-subclass-event");
}

@Test
public void testWithInterfaceCodecTarget() {
Supplier<String> supplier = vertx.eventBus()
.<Supplier<String>> request("hello-supplier", new Function<String, String>() {
@Override
public String apply(String value) {
return value.toLowerCase();
}
})
.onItem().transform(Message::body)
.await().indefinitely();
assertEquals("foo", supplier.get());
}

static class Greeting {
private final String message;

Expand Down Expand Up @@ -118,12 +135,23 @@ void messageTypeWithTypeAnnotation(@NonNull Person person) {
public CompletionStage<Greeting> hello(Event event) {
return CompletableFuture.completedFuture(new Greeting("Hello " + event.getProperty()));
}

@ConsumeEvent("hello-supplier")
public Supplier<String> helloSupplier(Function<String, String> fun) {
return new Supplier<String>() {

@Override
public String get() {
return fun.apply("FOO");
}
};
}
}

static class MyNonLocalBean {
@ConsumeEvent(value = "nl-pet", codec = MyPetCodec.class, local = false)
public CompletionStage<Greeting> hello(Pet p) {
return CompletableFuture.completedFuture(new Greeting("Non Local Hello " + p.getName()));
public CompletionStage<String> hello(Pet p) {
return CompletableFuture.completedFuture("Non Local Hello " + p.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.vertx;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import jakarta.inject.Inject;

Expand Down Expand Up @@ -54,10 +55,10 @@ public void testWithUserCodec() {

@Test
public void testWithUserCodecNonLocal() {
Greeting hello = vertx.eventBus().<Greeting> request("nl-pet", new Pet("neo", "rabbit"))
String hello = vertx.eventBus().<String> request("nl-pet", new Pet("neo", "rabbit"))
.onItem().transform(Message::body)
.await().indefinitely();
assertThat(hello.getMessage()).isEqualTo("Non Local Hello NEO");
assertEquals("Non Local Hello NEO", hello);
}

static class Greeting {
Expand Down Expand Up @@ -90,9 +91,9 @@ public Uni<Greeting> hello(Pet p) {

static class MyNonLocalBean {
@ConsumeEvent(value = "nl-pet", codec = MyPetCodec.class, local = false)
public Uni<Greeting> hello(Pet p) {
public Uni<String> hello(Pet p) {
return Uni.createFrom().item(
() -> new Greeting("Non Local Hello " + p.getName()))
() -> "Non Local Hello " + p.getName())
.emitOn(Infrastructure.getDefaultExecutor());
}
}
Expand Down
Loading

0 comments on commit ae9184f

Please sign in to comment.