Skip to content

Commit

Permalink
Bump Smallrye RM from 4.9.0 to 4.10.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Sep 28, 2023
1 parent a06ccd7 commit 2f4eeb8
Show file tree
Hide file tree
Showing 16 changed files with 229 additions and 87 deletions.
56 changes: 10 additions & 46 deletions bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.1</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.6.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.9.0</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>4.10.1</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.3.1</smallrye-stork.version>
<jakarta.activation.version>2.1.2</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
Expand Down Expand Up @@ -341,6 +341,15 @@
<type>pom</type>
</dependency>

<!-- Smallrye Reactive Messaging dependencies, imported as a BOM -->
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-bom</artifactId>
<version>${smallrye-reactive-messaging.version}</version>
<scope>import</scope>
<type>pom</type>
</dependency>

<!-- Micrometer Core and Registries, imported as BOM -->
<dependency>
<groupId>io.micrometer</groupId>
Expand Down Expand Up @@ -5343,31 +5352,6 @@
<version>${quartz.version}</version>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-provider</artifactId>
<version>${smallrye-reactive-messaging.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
<version>${smallrye-reactive-messaging.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-health</artifactId>
<version>${smallrye-reactive-messaging.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-mqtt</artifactId>
<version>${smallrye-reactive-messaging.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-in-memory</artifactId>
<version>${smallrye-reactive-messaging.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-kafka</artifactId>
Expand Down Expand Up @@ -5399,11 +5383,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-pulsar</artifactId>
<version>${smallrye-reactive-messaging.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-original</artifactId>
Expand Down Expand Up @@ -5443,11 +5422,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-kafka-api</artifactId>
<version>${smallrye-reactive-messaging.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-kafka-test-companion</artifactId>
Expand All @@ -5474,16 +5448,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-rabbitmq</artifactId>
<version>${smallrye-reactive-messaging.version}</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-connector-attribute-processor</artifactId>
<version>${smallrye-reactive-messaging.version}</version>
</dependency>

<dependency>
<groupId>org.infinispan</groupId>
Expand Down
62 changes: 61 additions & 1 deletion docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,66 @@ Reciprocally, multiple producers on the same channel can be merged by setting `m
On the `@Incoming` methods, you can control how multiple channels are merged using the `@Merge` annotation.
====

Repeating the `@Outgoing` annotation on outbound or processing methods allows another way of dispatching messages to multiple outgoing channels:

[source, java]
----
import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MultipleProducers {
private final Random random = new Random();
@Outgoing("generated")
@Outgoing("generated-2")
double priceBroadcast() {
return random.nextDouble();
}
}
----

In the previous example generated price will be broadcast to both outbound channels.
The following example selectively sends messages to multiple outgoing channels using the `Targeted` container object,
containing key as channel name and value as message payload.

[source, java]
----
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.Targeted;
@ApplicationScoped
public class TargetedProducers {
@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
@Outgoing("out3")
public Targeted process(double price) {
Targeted targeted = Targeted.of("out1", "Price: " + price,
"out2", "Quote: " + price);
if (price > 90.0) {
return targeted.with("out3", price);
}
return targeted;
}
}
----

Note that <<serialization-autodetection,the auto-detection for Kafka serializers>> doesn't work for signatures using the `Targeted`.

For more details on using multiple outgoings, please refer to the http://smallrye.io/smallrye-reactive-messaging/4.10.0/concepts/outgoings/[SmallRye Reactive Messaging documentation].

=== Kafka Transactions

Kafka transactions enable atomic writes to multiple Kafka topics and partitions.
Expand Down Expand Up @@ -2261,7 +2321,7 @@ See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture do

== Channel Decorators

SmallRye Reactive Messaging supports decorating incoming and outgoing channels for implementing cross-cutting concerns such as monitoring, tracing or message interception. For more information on implementing decorators and message interceptors see the http://smallrye.io/smallrye-reactive-messaging/3.19.1/concepts/decorators/[SmallRye Reactive Messaging documentation].
SmallRye Reactive Messaging supports decorating incoming and outgoing channels for implementing cross-cutting concerns such as monitoring, tracing or message interception. For more information on implementing decorators and message interceptors see the http://smallrye.io/smallrye-reactive-messaging/latest/concepts/decorators/[SmallRye Reactive Messaging documentation].

[[kafka-configuration]]
== Configuration Reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ final class DotNames {
static final DotName PROCESSOR = DotName.createSimple(org.reactivestreams.Processor.class.getName());
static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName());
static final DotName MULTI = DotName.createSimple(io.smallrye.mutiny.Multi.class.getName());
static final DotName MULTI_SPLITTER = DotName.createSimple(io.smallrye.mutiny.operators.multi.split.MultiSplitter.class.getName());

static final DotName AVRO_GENERATED = DotName.createSimple("org.apache.avro.specific.AvroGenerated");
static final DotName AVRO_GENERIC_RECORD = DotName.createSimple("org.apache.avro.generic.GenericRecord");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,11 @@ public void defaultChannelConfiguration(
if (launchMode.getLaunchMode().isDevOrTest()) {
if (!runtimeConfig.enableGracefulShutdownInDevAndTestMode) {
List<AnnotationInstance> incomings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.INCOMING);
List<AnnotationInstance> outgoings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING);
List<AnnotationInstance> channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL);
List<AnnotationInstance> annotations = new ArrayList<>();
annotations.addAll(incomings);
annotations.addAll(outgoings);
annotations.addAll(channels);
for (AnnotationInstance annotation : annotations) {
String channelName = annotation.value().asString();
Expand Down Expand Up @@ -221,7 +223,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
alreadyGeneratedDeserializers);
}

for (AnnotationInstance annotation : discovery.findAnnotationsOnMethods(DotNames.OUTGOING)) {
for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING)) {
String channelName = annotation.value().asString();
if (!discovery.isKafkaConnector(channelsManagedByConnectors, false, channelName)) {
continue;
Expand Down Expand Up @@ -428,6 +430,7 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {
if ((isPublisher(returnType) && parametersCount == 0)
|| (isPublisherBuilder(returnType) && parametersCount == 0)
|| (isMulti(returnType) && parametersCount == 0)
|| (isMultiSplitter(returnType) && parametersCount == 0)
|| (isCompletionStage(returnType) && parametersCount == 0)
|| (isUni(returnType) && parametersCount == 0)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
Expand All @@ -443,7 +446,8 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {
|| (isUni(returnType) && parametersCount == 1)
|| (isPublisher(returnType) && parametersCount == 1)
|| (isPublisherBuilder(returnType) && parametersCount == 1)
|| (isMulti(returnType) && parametersCount == 1)) {
|| (isMulti(returnType) && parametersCount == 1)
|| (isMultiSplitter(returnType) && parametersCount == 1)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
} else if ((isProcessor(returnType) && parametersCount == 0)
|| (isProcessorBuilder(returnType) && parametersCount == 0)) {
Expand Down Expand Up @@ -556,6 +560,13 @@ private static boolean isMulti(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isMultiSplitter(Type type) {
// raw type MultiSplitter is wrong, must be MultiSplitter<Something, KeyEnum>
return DotNames.MULTI_SPLITTER.equals(type.name())
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
&& type.asParameterizedType().arguments().size() == 2;
}

private static boolean isSubscriber(Type type) {
// raw type Subscriber is wrong, must be Subscriber<Something>
return DotNames.SUBSCRIBER.equals(type.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;
Expand Down Expand Up @@ -116,10 +117,12 @@ public static QuarkusMediatorConfiguration create(MethodInfo methodInfo, boolean
incomingValues.addAll(getIncomingValues(methodInfo));
configuration.setIncomings(incomingValues);

String outgoingValue = getValue(methodInfo, OUTGOING);
configuration.setOutgoing(outgoingValue);
// We need to extract the value of @Outgoing and @Outgoings (which contains an array of @Outgoing)
List<String> outgoingValues = new ArrayList<>(getValues(methodInfo, OUTGOING));
outgoingValues.addAll(getOutgoingValues(methodInfo));
configuration.setOutgoings(outgoingValues);

Shape shape = mediatorConfigurationSupport.determineShape(incomingValues, outgoingValue);
Shape shape = mediatorConfigurationSupport.determineShape(incomingValues, outgoingValues);
configuration.setShape(shape);
Acknowledgment.Strategy acknowledgment = mediatorConfigurationSupport
.processSuppliedAcknowledgement(incomingValues,
Expand Down Expand Up @@ -161,7 +164,7 @@ public Merge.Mode get() {
}
}));

configuration.setBroadcastValue(mediatorConfigurationSupport.processBroadcast(outgoingValue,
configuration.setBroadcastValue(mediatorConfigurationSupport.processBroadcast(outgoingValues,
new Supplier<Integer>() {
@Override
public Integer get() {
Expand All @@ -176,6 +179,7 @@ public Integer get() {
return null;
}
}));
configuration.setHasTargetedOutput(mediatorConfigurationSupport.processTargetedOutput());

AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING);
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
Expand Down Expand Up @@ -328,6 +332,13 @@ private static List<String> getIncomingValues(MethodInfo methodInfo) {
.collect(Collectors.toList());
}

private static List<String> getOutgoingValues(MethodInfo methodInfo) {
return methodInfo.annotations().stream().filter(ai -> ai.name().equals(OUTGOINGS))
.flatMap(outgoings -> Arrays.stream(outgoings.value().asNestedArray()))
.map(outgoing -> outgoing.value().asString())
.collect(Collectors.toList());
}

private static String fullMethodName(MethodInfo methodInfo) {
return methodInfo.declaringClass() + "#" + methodInfo.name();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.smallrye.reactive.messaging.annotations.Incomings;
import io.smallrye.reactive.messaging.annotations.Merge;
import io.smallrye.reactive.messaging.annotations.OnOverflow;
import io.smallrye.reactive.messaging.annotations.Outgoings;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.keyed.KeyValueExtractor;
Expand All @@ -38,6 +39,7 @@ public final class ReactiveMessagingDotNames {
static final DotName INCOMING = DotName.createSimple(Incoming.class.getName());
static final DotName INCOMINGS = DotName.createSimple(Incomings.class.getName());
static final DotName OUTGOING = DotName.createSimple(Outgoing.class.getName());
static final DotName OUTGOINGS = DotName.createSimple(Outgoings.class.getName());

public static final DotName CONNECTOR = DotName.createSimple(Connector.class.getName());
static final DotName CONNECTOR_ATTRIBUTES = DotName.createSimple(ConnectorAttributes.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ public List<UnremovableBeanBuildItem> removalExclusions() {
new UnremovableBeanBuildItem(
new BeanClassAnnotationExclusion(
ReactiveMessagingDotNames.OUTGOING)),
new UnremovableBeanBuildItem(
new BeanClassAnnotationExclusion(
ReactiveMessagingDotNames.OUTGOINGS)),
new UnremovableBeanBuildItem(
new BeanClassAnnotationExclusion(
ReactiveMessagingDotNames.MESSAGE_CONVERTER)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,19 @@ void extractComponents(BeanDiscoveryFinishedBuildItem beanDiscoveryFinished,
ReactiveMessagingDotNames.INCOMINGS);
AnnotationInstance outgoing = transformedAnnotations.getAnnotation(method,
ReactiveMessagingDotNames.OUTGOING);
AnnotationInstance outgoings = transformedAnnotations.getAnnotation(method,
ReactiveMessagingDotNames.OUTGOINGS);
AnnotationInstance blocking = transformedAnnotations.getAnnotation(method,
BLOCKING);
if (incoming != null || incomings != null || outgoing != null) {
if (incoming != null || incomings != null || outgoing != null || outgoings != null) {
handleMethodAnnotatedWithIncoming(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
method, incoming);
handleMethodAnnotationWithIncomings(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
method, incomings);
handleMethodAnnotationWithOutgoing(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
method, outgoing);
handleMethodAnnotationWithOutgoings(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
method, outgoings);

if (WiringHelper.isSynthetic(method)) {
continue;
Expand Down Expand Up @@ -218,6 +222,24 @@ private void handleMethodAnnotationWithOutgoing(BuildProducer<ChannelBuildItem>
}
}

private void handleMethodAnnotationWithOutgoings(BuildProducer<ChannelBuildItem> appChannels,
BuildProducer<ValidationPhaseBuildItem.ValidationErrorBuildItem> validationErrors,
BuildProducer<ConfigDescriptionBuildItem> configDescriptionBuildItemBuildProducer,
MethodInfo method, AnnotationInstance outgoings) {
if (outgoings != null) {
for (AnnotationInstance instance : outgoings.value().asNestedArray()) {
if (instance.value().asString().isEmpty()) {
validationErrors.produce(new ValidationPhaseBuildItem.ValidationErrorBuildItem(
new DeploymentException("Empty @Outgoing annotation on method " + method)));
}
configDescriptionBuildItemBuildProducer.produce(new ConfigDescriptionBuildItem(
"mp.messaging.outgoing." + instance.value().asString() + ".connector", null,
"The connector to use", null, null, ConfigPhase.BUILD_TIME));
produceOutgoingChannel(appChannels, instance.value().asString());
}
}
}

private void handleMethodAnnotationWithIncomings(BuildProducer<ChannelBuildItem> appChannels,
BuildProducer<ValidationPhaseBuildItem.ValidationErrorBuildItem> validationErrors,
BuildProducer<ConfigDescriptionBuildItem> configDescriptionBuildItemBuildProducer,
Expand Down
Loading

0 comments on commit 2f4eeb8

Please sign in to comment.