Skip to content

Commit

Permalink
Add support for @Incomings.
Browse files Browse the repository at this point in the history
The reactive messaging extension has its own customized configuration layer to configure reactive messaging to use the Quarkus built time mechanism. This layer didn't support @Incomings.
This commit adds support for it.

Fix quarkusio#19562
  • Loading branch information
cescoffier committed Aug 22, 2021
1 parent 2be9adf commit c9ef124
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.BROADCAST;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.COMPLETION_STAGE;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.INCOMING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.INCOMINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.VOID_CLASS;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
Expand Down Expand Up @@ -84,8 +87,11 @@ public static QuarkusMediatorConfiguration create(MethodInfo methodInfo, boolean
}
configuration.setParameterTypes(parameterTypes);

List<String> incomingValues = getValues(methodInfo, INCOMING);
// We need to extract the value of @Incoming and @Incomings (which contains an array of @Incoming)
List<String> incomingValues = new ArrayList<>(getValues(methodInfo, INCOMING));
incomingValues.addAll(getIncomingValues(methodInfo));
configuration.setIncomings(incomingValues);

String outgoingValue = getValue(methodInfo, OUTGOING);
configuration.setOutgoing(outgoingValue);

Expand Down Expand Up @@ -247,6 +253,13 @@ private static List<String> getValues(MethodInfo methodInfo, DotName dotName) {
.collect(Collectors.toList());
}

private static List<String> getIncomingValues(MethodInfo methodInfo) {
return methodInfo.annotations().stream().filter(ai -> ai.name().equals(INCOMINGS))
.flatMap(incomings -> Arrays.stream(incomings.value().asNestedArray()))
.map(incoming -> incoming.value().asString())
.collect(Collectors.toList());
}

private static String fullMethodName(MethodInfo methodInfo) {
return methodInfo.declaringClass() + "#" + methodInfo.name();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.quarkus.smallrye.reactivemessaging.signatures;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.reactivestreams.Publisher;

import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.mutiny.Multi;

public class IncomingsTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
.addClasses(ProducerOnA.class, ProducerOnB.class, MyBeanUsingMultipleIncomings.class));

@Inject
MyBeanUsingMultipleIncomings bean;

@Test
public void testIncomingsWithTwoSources() {
await().until(() -> bean.list().size() == 6);
assertThat(bean.list()).containsExactlyInAnyOrder("a", "b", "c", "d", "e", "f");
}

@ApplicationScoped
public static class ProducerOnA {

@Outgoing("a")
public Publisher<String> produce() {
return Multi.createFrom().items("a", "b", "c");
}

}

@ApplicationScoped
public static class ProducerOnB {

@Outgoing("b")
public Publisher<String> produce() {
return Multi.createFrom().items("d", "e", "f");
}

}

@ApplicationScoped
public static class MyBeanUsingMultipleIncomings {

private final List<String> list = new CopyOnWriteArrayList<>();

@Incoming("a")
@Incoming("b")
public void consume(String s) {
list.add(s);
}

public List<String> list() {
return list;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void setInvokerClass(Class<? extends Invoker> invokerClass) {

@Override
public String methodAsString() {
if (Arc.container() != null) {
if (Arc.container() != null && getBean() != null) {
return getBean().getBeanClass().getName() + "#" + getMethodName();
} else {
return getMethodName();
Expand Down

0 comments on commit c9ef124

Please sign in to comment.