diff --git a/bom/application/pom.xml b/bom/application/pom.xml index f2e0e016d66142..e49c28d5a34e9a 100644 --- a/bom/application/pom.xml +++ b/bom/application/pom.xml @@ -63,7 +63,7 @@ 1.0.13 3.0.1 3.6.0 - 4.9.0 + 4.10.1 2.3.1 2.1.2 2.1.1 @@ -341,6 +341,15 @@ pom + + + io.smallrye.reactive + smallrye-reactive-messaging-bom + ${smallrye-reactive-messaging.version} + import + pom + + io.micrometer @@ -5343,31 +5352,6 @@ ${quartz.version} - - io.smallrye.reactive - smallrye-reactive-messaging-provider - ${smallrye-reactive-messaging.version} - - - io.smallrye.reactive - smallrye-reactive-messaging-api - ${smallrye-reactive-messaging.version} - - - io.smallrye.reactive - smallrye-reactive-messaging-health - ${smallrye-reactive-messaging.version} - - - io.smallrye.reactive - smallrye-reactive-messaging-mqtt - ${smallrye-reactive-messaging.version} - - - io.smallrye.reactive - smallrye-reactive-messaging-in-memory - ${smallrye-reactive-messaging.version} - io.smallrye.reactive smallrye-reactive-messaging-kafka @@ -5399,11 +5383,6 @@ - - io.smallrye.reactive - smallrye-reactive-messaging-pulsar - ${smallrye-reactive-messaging.version} - org.apache.pulsar pulsar-client-original @@ -5443,11 +5422,6 @@ - - io.smallrye.reactive - smallrye-reactive-messaging-kafka-api - ${smallrye-reactive-messaging.version} - io.smallrye.reactive smallrye-reactive-messaging-kafka-test-companion @@ -5474,16 +5448,6 @@ - - io.smallrye.reactive - smallrye-reactive-messaging-rabbitmq - ${smallrye-reactive-messaging.version} - - - io.smallrye.reactive - smallrye-connector-attribute-processor - ${smallrye-reactive-messaging.version} - org.infinispan diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc index 25e142198b0218..a76c98778f8a5c 100644 --- a/docs/src/main/asciidoc/kafka.adoc +++ b/docs/src/main/asciidoc/kafka.adoc @@ -1245,6 +1245,66 @@ Reciprocally, multiple producers on the same channel can be merged by setting `m On the `@Incoming` methods, you can control how multiple channels are merged using the `@Merge` annotation. ==== +Repeating the `@Outgoing` annotation on outbound or processing methods allows another way of dispatching messages to multiple outgoing channels: + +[source, java] +---- +import java.util.Random; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +@ApplicationScoped +public class MultipleProducers { + + private final Random random = new Random(); + + @Outgoing("generated") + @Outgoing("generated-2") + double priceBroadcast() { + return random.nextDouble(); + } + +} +---- + +In the previous example generated price will be broadcast to both outbound channels. +The following example selectively sends messages to multiple outgoing channels using the `Targeted` container object, +containing key as channel name and value as message payload. + +[source, java] +---- +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import io.smallrye.reactive.messaging.Targeted; + +@ApplicationScoped +public class TargetedProducers { + + @Incoming("in") + @Outgoing("out1") + @Outgoing("out2") + @Outgoing("out3") + public Targeted process(double price) { + Targeted targeted = Targeted.of("out1", "Price: " + price, + "out2", "Quote: " + price); + if (price > 90.0) { + return targeted.with("out3", price); + } + return targeted; + } + +} +---- + +Note that <> doesn't work for signatures using the `Targeted`. + +For more details on using multiple outgoings, please refer to the http://smallrye.io/smallrye-reactive-messaging/4.10.0/concepts/outgoings/[SmallRye Reactive Messaging documentation]. + === Kafka Transactions Kafka transactions enable atomic writes to multiple Kafka topics and partitions. @@ -2261,7 +2321,7 @@ See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture do == Channel Decorators -SmallRye Reactive Messaging supports decorating incoming and outgoing channels for implementing cross-cutting concerns such as monitoring, tracing or message interception. For more information on implementing decorators and message interceptors see the http://smallrye.io/smallrye-reactive-messaging/3.19.1/concepts/decorators/[SmallRye Reactive Messaging documentation]. +SmallRye Reactive Messaging supports decorating incoming and outgoing channels for implementing cross-cutting concerns such as monitoring, tracing or message interception. For more information on implementing decorators and message interceptors see the http://smallrye.io/smallrye-reactive-messaging/latest/concepts/decorators/[SmallRye Reactive Messaging documentation]. [[kafka-configuration]] == Configuration Reference diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java index 66366a7095eb72..0a065e4bcedf19 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java @@ -28,6 +28,7 @@ final class DotNames { static final DotName PROCESSOR = DotName.createSimple(org.reactivestreams.Processor.class.getName()); static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName()); static final DotName MULTI = DotName.createSimple(io.smallrye.mutiny.Multi.class.getName()); + static final DotName MULTI_SPLITTER = DotName.createSimple(io.smallrye.mutiny.operators.multi.split.MultiSplitter.class.getName()); static final DotName AVRO_GENERATED = DotName.createSimple("org.apache.avro.specific.AvroGenerated"); static final DotName AVRO_GENERIC_RECORD = DotName.createSimple("org.apache.avro.generic.GenericRecord"); diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 1981a3ad43ff83..825c9550e3ff4f 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -181,9 +181,11 @@ public void defaultChannelConfiguration( if (launchMode.getLaunchMode().isDevOrTest()) { if (!runtimeConfig.enableGracefulShutdownInDevAndTestMode) { List incomings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.INCOMING); + List outgoings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING); List channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL); List annotations = new ArrayList<>(); annotations.addAll(incomings); + annotations.addAll(outgoings); annotations.addAll(channels); for (AnnotationInstance annotation : annotations) { String channelName = annotation.value().asString(); @@ -221,7 +223,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery, alreadyGeneratedDeserializers); } - for (AnnotationInstance annotation : discovery.findAnnotationsOnMethods(DotNames.OUTGOING)) { + for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING)) { String channelName = annotation.value().asString(); if (!discovery.isKafkaConnector(channelsManagedByConnectors, false, channelName)) { continue; @@ -428,6 +430,7 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) { if ((isPublisher(returnType) && parametersCount == 0) || (isPublisherBuilder(returnType) && parametersCount == 0) || (isMulti(returnType) && parametersCount == 0) + || (isMultiSplitter(returnType) && parametersCount == 0) || (isCompletionStage(returnType) && parametersCount == 0) || (isUni(returnType) && parametersCount == 0)) { outgoingType = returnType.asParameterizedType().arguments().get(0); @@ -443,7 +446,8 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) { || (isUni(returnType) && parametersCount == 1) || (isPublisher(returnType) && parametersCount == 1) || (isPublisherBuilder(returnType) && parametersCount == 1) - || (isMulti(returnType) && parametersCount == 1)) { + || (isMulti(returnType) && parametersCount == 1) + || (isMultiSplitter(returnType) && parametersCount == 1)) { outgoingType = returnType.asParameterizedType().arguments().get(0); } else if ((isProcessor(returnType) && parametersCount == 0) || (isProcessorBuilder(returnType) && parametersCount == 0)) { @@ -556,6 +560,13 @@ private static boolean isMulti(Type type) { && type.asParameterizedType().arguments().size() == 1; } + private static boolean isMultiSplitter(Type type) { + // raw type MultiSplitter is wrong, must be MultiSplitter + return DotNames.MULTI_SPLITTER.equals(type.name()) + && type.kind() == Type.Kind.PARAMETERIZED_TYPE + && type.asParameterizedType().arguments().size() == 2; + } + private static boolean isSubscriber(Type type) { // raw type Subscriber is wrong, must be Subscriber return DotNames.SUBSCRIBER.equals(type.name()) diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java index e7b2e6c8567599..53834d1195f709 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/QuarkusMediatorConfigurationUtil.java @@ -9,6 +9,7 @@ import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING; +import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOINGS; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING; import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL; @@ -116,10 +117,12 @@ public static QuarkusMediatorConfiguration create(MethodInfo methodInfo, boolean incomingValues.addAll(getIncomingValues(methodInfo)); configuration.setIncomings(incomingValues); - String outgoingValue = getValue(methodInfo, OUTGOING); - configuration.setOutgoing(outgoingValue); + // We need to extract the value of @Outgoing and @Outgoings (which contains an array of @Outgoing) + List outgoingValues = new ArrayList<>(getValues(methodInfo, OUTGOING)); + outgoingValues.addAll(getOutgoingValues(methodInfo)); + configuration.setOutgoings(outgoingValues); - Shape shape = mediatorConfigurationSupport.determineShape(incomingValues, outgoingValue); + Shape shape = mediatorConfigurationSupport.determineShape(incomingValues, outgoingValues); configuration.setShape(shape); Acknowledgment.Strategy acknowledgment = mediatorConfigurationSupport .processSuppliedAcknowledgement(incomingValues, @@ -161,7 +164,7 @@ public Merge.Mode get() { } })); - configuration.setBroadcastValue(mediatorConfigurationSupport.processBroadcast(outgoingValue, + configuration.setBroadcastValue(mediatorConfigurationSupport.processBroadcast(outgoingValues, new Supplier() { @Override public Integer get() { @@ -176,6 +179,7 @@ public Integer get() { return null; } })); + configuration.setHasTargetedOutput(mediatorConfigurationSupport.processTargetedOutput()); AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING); AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING); @@ -328,6 +332,13 @@ private static List getIncomingValues(MethodInfo methodInfo) { .collect(Collectors.toList()); } + private static List getOutgoingValues(MethodInfo methodInfo) { + return methodInfo.annotations().stream().filter(ai -> ai.name().equals(OUTGOINGS)) + .flatMap(outgoings -> Arrays.stream(outgoings.value().asNestedArray())) + .map(outgoing -> outgoing.value().asString()) + .collect(Collectors.toList()); + } + private static String fullMethodName(MethodInfo methodInfo) { return methodInfo.declaringClass() + "#" + methodInfo.name(); } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java index 384dcd3aee6ba7..e78713e715f9bf 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java @@ -23,6 +23,7 @@ import io.smallrye.reactive.messaging.annotations.Incomings; import io.smallrye.reactive.messaging.annotations.Merge; import io.smallrye.reactive.messaging.annotations.OnOverflow; +import io.smallrye.reactive.messaging.annotations.Outgoings; import io.smallrye.reactive.messaging.connector.InboundConnector; import io.smallrye.reactive.messaging.connector.OutboundConnector; import io.smallrye.reactive.messaging.keyed.KeyValueExtractor; @@ -38,6 +39,7 @@ public final class ReactiveMessagingDotNames { static final DotName INCOMING = DotName.createSimple(Incoming.class.getName()); static final DotName INCOMINGS = DotName.createSimple(Incomings.class.getName()); static final DotName OUTGOING = DotName.createSimple(Outgoing.class.getName()); + static final DotName OUTGOINGS = DotName.createSimple(Outgoings.class.getName()); public static final DotName CONNECTOR = DotName.createSimple(Connector.class.getName()); static final DotName CONNECTOR_ATTRIBUTES = DotName.createSimple(ConnectorAttributes.class.getName()); diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index d2ebac1e6fa28d..aa82b90d8ac8c3 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -159,6 +159,9 @@ public List removalExclusions() { new UnremovableBeanBuildItem( new BeanClassAnnotationExclusion( ReactiveMessagingDotNames.OUTGOING)), + new UnremovableBeanBuildItem( + new BeanClassAnnotationExclusion( + ReactiveMessagingDotNames.OUTGOINGS)), new UnremovableBeanBuildItem( new BeanClassAnnotationExclusion( ReactiveMessagingDotNames.MESSAGE_CONVERTER)), diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringProcessor.java index 61a31a4f8f5e21..41fbd927a99a62 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringProcessor.java @@ -107,15 +107,19 @@ void extractComponents(BeanDiscoveryFinishedBuildItem beanDiscoveryFinished, ReactiveMessagingDotNames.INCOMINGS); AnnotationInstance outgoing = transformedAnnotations.getAnnotation(method, ReactiveMessagingDotNames.OUTGOING); + AnnotationInstance outgoings = transformedAnnotations.getAnnotation(method, + ReactiveMessagingDotNames.OUTGOINGS); AnnotationInstance blocking = transformedAnnotations.getAnnotation(method, BLOCKING); - if (incoming != null || incomings != null || outgoing != null) { + if (incoming != null || incomings != null || outgoing != null || outgoings != null) { handleMethodAnnotatedWithIncoming(appChannels, validationErrors, configDescriptionBuildItemBuildProducer, method, incoming); handleMethodAnnotationWithIncomings(appChannels, validationErrors, configDescriptionBuildItemBuildProducer, method, incomings); handleMethodAnnotationWithOutgoing(appChannels, validationErrors, configDescriptionBuildItemBuildProducer, method, outgoing); + handleMethodAnnotationWithOutgoings(appChannels, validationErrors, configDescriptionBuildItemBuildProducer, + method, outgoings); if (WiringHelper.isSynthetic(method)) { continue; @@ -218,6 +222,24 @@ private void handleMethodAnnotationWithOutgoing(BuildProducer } } + private void handleMethodAnnotationWithOutgoings(BuildProducer appChannels, + BuildProducer validationErrors, + BuildProducer configDescriptionBuildItemBuildProducer, + MethodInfo method, AnnotationInstance outgoings) { + if (outgoings != null) { + for (AnnotationInstance instance : outgoings.value().asNestedArray()) { + if (instance.value().asString().isEmpty()) { + validationErrors.produce(new ValidationPhaseBuildItem.ValidationErrorBuildItem( + new DeploymentException("Empty @Outgoing annotation on method " + method))); + } + configDescriptionBuildItemBuildProducer.produce(new ConfigDescriptionBuildItem( + "mp.messaging.outgoing." + instance.value().asString() + ".connector", null, + "The connector to use", null, null, ConfigPhase.BUILD_TIME)); + produceOutgoingChannel(appChannels, instance.value().asString()); + } + } + } + private void handleMethodAnnotationWithIncomings(BuildProducer appChannels, BuildProducer validationErrors, BuildProducer configDescriptionBuildItemBuildProducer, diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java index 605cede36f44ed..f1fa6442420e3c 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusMediatorConfiguration.java @@ -31,7 +31,7 @@ public class QuarkusMediatorConfiguration implements MediatorConfiguration { private List incomings = new ArrayList<>(); - private String outgoing; + private List outgoings = new ArrayList<>(); private Acknowledgment.Strategy acknowledgment; @@ -63,6 +63,8 @@ public class QuarkusMediatorConfiguration implements MediatorConfiguration { private Type valueType; private Class keyed; + private boolean hasTargetedOutput = false; + public String getBeanId() { return beanId; } @@ -117,11 +119,16 @@ public void setIncomings(List incomings) { @Override public String getOutgoing() { - return outgoing; + return outgoings.get(0); + } + + @Override + public List getOutgoings() { + return outgoings; } - public void setOutgoing(String outgoing) { - this.outgoing = outgoing; + public void setOutgoings(List outgoings) { + this.outgoings = outgoings; } @Override @@ -310,4 +317,17 @@ public void setValueType(Type valueType) { public void setKeyed(Class keyed) { this.keyed = keyed; } + + @Override + public boolean hasTargetedOutput() { + return hasTargetedOutput; + } + + public boolean isHasTargetedOutput() { + return hasTargetedOutput; + } + + public void setHasTargetedOutput(boolean hasTargetedOutput) { + this.hasTargetedOutput = hasTargetedOutput; + } } diff --git a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/Fruit.java b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/Fruit.java index afe51f1d0551fa..2eca3e8ac02f17 100644 --- a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/Fruit.java +++ b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/Fruit.java @@ -3,12 +3,31 @@ public class Fruit { public String name; + public Fruits type; - public Fruit(String name) { + public Fruit(String name, Fruits type) { this.name = name; + this.type = type; } public Fruit() { // Jackson will uses this constructor } + + public enum Fruits { + BERRY, + CITRUS, + POME, + STONE, + TROPICAL; + + @Override + public String toString() { + return "fruit-" + name().toLowerCase(); + } + + Fruit create(String name) { + return new Fruit(name, this); + } + } } diff --git a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/FruitProducer.java b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/FruitProducer.java index 251f6217334940..8b9e5af30bd430 100644 --- a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/FruitProducer.java +++ b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/FruitProducer.java @@ -11,9 +11,10 @@ public class FruitProducer { @Outgoing("fruits-out") public Multi generateFruits() { return Multi.createFrom().items( - new Fruit("apple"), - new Fruit("banana"), - new Fruit("peach"), - new Fruit("orange")); + Fruit.Fruits.BERRY.create("strawberry"), + Fruit.Fruits.POME.create("apple"), + Fruit.Fruits.TROPICAL.create("banana"), + Fruit.Fruits.STONE.create("peach"), + Fruit.Fruits.CITRUS.create("orange")); } } diff --git a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/FruitSplitProducer.java b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/FruitSplitProducer.java new file mode 100644 index 00000000000000..e51f3089c680df --- /dev/null +++ b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/FruitSplitProducer.java @@ -0,0 +1,23 @@ +package io.quarkus.it.kafka; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.operators.multi.split.MultiSplitter; + +@ApplicationScoped +public class FruitSplitProducer { + + @Incoming("fruits-out") + @Outgoing("fruit-berry") + @Outgoing("fruit-citrus") + @Outgoing("fruit-pome") + @Outgoing("fruit-stone") + @Outgoing("fruit-tropical") + MultiSplitter produce(Multi fruits) { + return fruits.split(Fruit.Fruits.class, s -> s.type); + } +} diff --git a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PricesProducer.java b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PricesProducer.java deleted file mode 100644 index cd47f37c2a1583..00000000000000 --- a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PricesProducer.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.quarkus.it.kafka; - -import jakarta.enterprise.context.ApplicationScoped; - -import org.eclipse.microprofile.reactive.messaging.Outgoing; - -import io.smallrye.mutiny.Multi; - -@ApplicationScoped -public class PricesProducer { - - @Outgoing("prices-out") - public Multi generatePrices() { - return Multi.createFrom().items(1.2, 2.2, 3.4); - } - - @Outgoing("prices-out2") - public Multi generatePrices2() { - return Multi.createFrom().items(4.5, 5.6, 6.7); - } -} diff --git a/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PricesRepeatableProducers.java b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PricesRepeatableProducers.java new file mode 100644 index 00000000000000..da21d8a40a01c5 --- /dev/null +++ b/integration-tests/reactive-messaging-kafka/src/main/java/io/quarkus/it/kafka/PricesRepeatableProducers.java @@ -0,0 +1,22 @@ +package io.quarkus.it.kafka; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Outgoing; + +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.Targeted; + +@ApplicationScoped +public class PricesRepeatableProducers { + + @Outgoing("prices-out") + @Outgoing("prices-out2") + public Multi produce() { + return Multi.createFrom().items( + Targeted.of("prices-out", 1.2, "prices-out2", 4.5), + Targeted.of("prices-out", 2.2, "prices-out2", 5.6), + Targeted.of("prices-out", 3.4, "prices-out2", 6.7)); + } + +} diff --git a/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties b/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties index b80f4bdaff62ab..89117fa6769b03 100644 --- a/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties +++ b/integration-tests/reactive-messaging-kafka/src/main/resources/application.properties @@ -18,8 +18,8 @@ mp.messaging.incoming.people-in.checkpoint.state-type=io.quarkus.it.kafka.KafkaR mp.messaging.incoming.people-in.checkpoint.quarkus-redis.client-name=my-redis mp.messaging.incoming.people-in.group.id=people-checkpoint -mp.messaging.outgoing.fruits-out.topic=fruits -mp.messaging.incoming.fruits-in.topic=fruits +mp.messaging.incoming.fruits-in.topic=fruit-.* +mp.messaging.incoming.fruits-in.pattern=true mp.messaging.outgoing.pets-out.topic=pets mp.messaging.incoming.pets-in.topic=pets @@ -27,7 +27,9 @@ mp.messaging.incoming.pets-in.topic=pets quarkus.redis.my-redis.hosts=${quarkus.redis.hosts} mp.messaging.outgoing.prices-out.topic=prices +mp.messaging.outgoing.prices-out.value.serializer=org.apache.kafka.common.serialization.DoubleSerializer mp.messaging.outgoing.prices-out2.topic=prices2 +mp.messaging.outgoing.prices-out2.value.serializer=org.apache.kafka.common.serialization.DoubleSerializer mp.messaging.incoming.prices-in.topic=prices mp.messaging.incoming.prices-in2.topic=prices2 diff --git a/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java b/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java index 3527c4a9ecf111..82bd9d316defb0 100644 --- a/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java +++ b/integration-tests/reactive-messaging-kafka/src/test/java/io/quarkus/it/kafka/KafkaConnectorTest.java @@ -24,6 +24,8 @@ public class KafkaConnectorTest { protected static final TypeRef> TYPE_REF = new TypeRef>() { }; + protected static final TypeRef> FRUIT_TYPE_REF = new TypeRef>() { + }; protected static final TypeRef> PEOPLE_STATE_REF = new TypeRef>() { }; @@ -48,7 +50,7 @@ public void testPets() { @Test public void testFruits() { - await().untilAsserted(() -> Assertions.assertEquals(get("/kafka/fruits").as(TYPE_REF).size(), 4)); + await().untilAsserted(() -> Assertions.assertEquals(get("/kafka/fruits").as(FRUIT_TYPE_REF).size(), 5)); } @Test