Skip to content

Commit

Permalink
Merge pull request quarkusio#36206 from ozangunalp/bump_rm_4.10.0
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored Oct 1, 2023
2 parents 5aa5ca2 + cadd5a4 commit 1c87e6f
Show file tree
Hide file tree
Showing 17 changed files with 261 additions and 42 deletions.
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.1</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.6.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.9.0</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>4.10.1</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.3.1</smallrye-stork.version>
<jakarta.activation.version>2.1.2</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
Expand Down
62 changes: 61 additions & 1 deletion docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,66 @@ Reciprocally, multiple producers on the same channel can be merged by setting `m
On the `@Incoming` methods, you can control how multiple channels are merged using the `@Merge` annotation.
====

Repeating the `@Outgoing` annotation on outbound or processing methods allows another way of dispatching messages to multiple outgoing channels:

[source, java]
----
import java.util.Random;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
@ApplicationScoped
public class MultipleProducers {
private final Random random = new Random();
@Outgoing("generated")
@Outgoing("generated-2")
double priceBroadcast() {
return random.nextDouble();
}
}
----

In the previous example generated price will be broadcast to both outbound channels.
The following example selectively sends messages to multiple outgoing channels using the `Targeted` container object,
containing key as channel name and value as message payload.

[source, java]
----
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import io.smallrye.reactive.messaging.Targeted;
@ApplicationScoped
public class TargetedProducers {
@Incoming("in")
@Outgoing("out1")
@Outgoing("out2")
@Outgoing("out3")
public Targeted process(double price) {
Targeted targeted = Targeted.of("out1", "Price: " + price,
"out2", "Quote: " + price);
if (price > 90.0) {
return targeted.with("out3", price);
}
return targeted;
}
}
----

Note that <<serialization-autodetection,the auto-detection for Kafka serializers>> doesn't work for signatures using the `Targeted`.

For more details on using multiple outgoings, please refer to the http://smallrye.io/smallrye-reactive-messaging/4.10.0/concepts/outgoings/[SmallRye Reactive Messaging documentation].

=== Kafka Transactions

Kafka transactions enable atomic writes to multiple Kafka topics and partitions.
Expand Down Expand Up @@ -2261,7 +2321,7 @@ See the xref:quarkus-reactive-architecture.adoc[Quarkus Reactive Architecture do

== Channel Decorators

SmallRye Reactive Messaging supports decorating incoming and outgoing channels for implementing cross-cutting concerns such as monitoring, tracing or message interception. For more information on implementing decorators and message interceptors see the http://smallrye.io/smallrye-reactive-messaging/3.19.1/concepts/decorators/[SmallRye Reactive Messaging documentation].
SmallRye Reactive Messaging supports decorating incoming and outgoing channels for implementing cross-cutting concerns such as monitoring, tracing or message interception. For more information on implementing decorators and message interceptors see the http://smallrye.io/smallrye-reactive-messaging/latest/concepts/decorators/[SmallRye Reactive Messaging documentation].

[[kafka-configuration]]
== Configuration Reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ final class DotNames {
static final DotName PROCESSOR = DotName.createSimple(org.reactivestreams.Processor.class.getName());
static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName());
static final DotName MULTI = DotName.createSimple(io.smallrye.mutiny.Multi.class.getName());
static final DotName MULTI_SPLITTER = DotName.createSimple(io.smallrye.mutiny.operators.multi.split.MultiSplitter.class.getName());

static final DotName AVRO_GENERATED = DotName.createSimple("org.apache.avro.specific.AvroGenerated");
static final DotName AVRO_GENERIC_RECORD = DotName.createSimple("org.apache.avro.generic.GenericRecord");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,11 @@ public void defaultChannelConfiguration(
if (launchMode.getLaunchMode().isDevOrTest()) {
if (!runtimeConfig.enableGracefulShutdownInDevAndTestMode) {
List<AnnotationInstance> incomings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.INCOMING);
List<AnnotationInstance> outgoings = discoveryState.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING);
List<AnnotationInstance> channels = discoveryState.findAnnotationsOnInjectionPoints(DotNames.CHANNEL);
List<AnnotationInstance> annotations = new ArrayList<>();
annotations.addAll(incomings);
annotations.addAll(outgoings);
annotations.addAll(channels);
for (AnnotationInstance annotation : annotations) {
String channelName = annotation.value().asString();
Expand Down Expand Up @@ -221,7 +223,7 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
alreadyGeneratedDeserializers);
}

for (AnnotationInstance annotation : discovery.findAnnotationsOnMethods(DotNames.OUTGOING)) {
for (AnnotationInstance annotation : discovery.findRepeatableAnnotationsOnMethods(DotNames.OUTGOING)) {
String channelName = annotation.value().asString();
if (!discovery.isKafkaConnector(channelsManagedByConnectors, false, channelName)) {
continue;
Expand Down Expand Up @@ -428,6 +430,7 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {
if ((isPublisher(returnType) && parametersCount == 0)
|| (isPublisherBuilder(returnType) && parametersCount == 0)
|| (isMulti(returnType) && parametersCount == 0)
|| (isMultiSplitter(returnType) && parametersCount == 0)
|| (isCompletionStage(returnType) && parametersCount == 0)
|| (isUni(returnType) && parametersCount == 0)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
Expand All @@ -443,7 +446,8 @@ private Type getOutgoingTypeFromMethod(MethodInfo method) {
|| (isUni(returnType) && parametersCount == 1)
|| (isPublisher(returnType) && parametersCount == 1)
|| (isPublisherBuilder(returnType) && parametersCount == 1)
|| (isMulti(returnType) && parametersCount == 1)) {
|| (isMulti(returnType) && parametersCount == 1)
|| (isMultiSplitter(returnType) && parametersCount == 1)) {
outgoingType = returnType.asParameterizedType().arguments().get(0);
} else if ((isProcessor(returnType) && parametersCount == 0)
|| (isProcessorBuilder(returnType) && parametersCount == 0)) {
Expand Down Expand Up @@ -556,6 +560,13 @@ private static boolean isMulti(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isMultiSplitter(Type type) {
// raw type MultiSplitter is wrong, must be MultiSplitter<Something, KeyEnum>
return DotNames.MULTI_SPLITTER.equals(type.name())
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
&& type.asParameterizedType().arguments().size() == 2;
}

private static boolean isSubscriber(Type type) {
// raw type Subscriber is wrong, must be Subscriber<Something>
return DotNames.SUBSCRIBER.equals(type.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.smallrye.config.common.MapBackedConfigSource;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.multi.split.MultiSplitter;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
Expand All @@ -66,6 +67,7 @@ private static void doTest(Config customConfig, Tuple[] expectations, Class<?>..

List<Class<?>> classes = new ArrayList<>(Arrays.asList(classesToIndex));
classes.add(Incoming.class);
classes.add(Outgoing.class);
DefaultSerdeDiscoveryState discovery = new DefaultSerdeDiscoveryState(index(classes)) {
@Override
Config getConfig() {
Expand Down Expand Up @@ -2727,6 +2729,45 @@ void method2(JsonObject msg) {

}

@Test
void repeatableOutgoings() {
Tuple[] expectations = {
tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
tuple("mp.messaging.outgoing.channel2.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
tuple("mp.messaging.outgoing.channel3.value.serializer", "io.quarkus.kafka.client.serialization.JsonObjectSerializer"),
tuple("mp.messaging.outgoing.channel4.value.serializer", "io.quarkus.kafka.client.serialization.JsonObjectSerializer"),
tuple("mp.messaging.outgoing.channel5.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
tuple("mp.messaging.outgoing.channel6.value.serializer", "org.apache.kafka.common.serialization.LongSerializer"),
};
doTest(expectations, RepeatableOutgoingsChannels.class);
}

private static class RepeatableOutgoingsChannels {

@Outgoing("channel1")
@Outgoing("channel2")
String method1() {
return null;
}

@Outgoing("channel3")
@Outgoing("channel4")
JsonObject method2() {
return null;
}

enum T {

}

@Outgoing("channel5")
@Outgoing("channel6")
MultiSplitter<Long, T> method3() {
return null;
}

}

@Test
void channelNameContainingDot() {
Tuple[] expectations = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;
Expand Down Expand Up @@ -116,10 +117,12 @@ public static QuarkusMediatorConfiguration create(MethodInfo methodInfo, boolean
incomingValues.addAll(getIncomingValues(methodInfo));
configuration.setIncomings(incomingValues);

String outgoingValue = getValue(methodInfo, OUTGOING);
configuration.setOutgoing(outgoingValue);
// We need to extract the value of @Outgoing and @Outgoings (which contains an array of @Outgoing)
List<String> outgoingValues = new ArrayList<>(getValues(methodInfo, OUTGOING));
outgoingValues.addAll(getOutgoingValues(methodInfo));
configuration.setOutgoings(outgoingValues);

Shape shape = mediatorConfigurationSupport.determineShape(incomingValues, outgoingValue);
Shape shape = mediatorConfigurationSupport.determineShape(incomingValues, outgoingValues);
configuration.setShape(shape);
Acknowledgment.Strategy acknowledgment = mediatorConfigurationSupport
.processSuppliedAcknowledgement(incomingValues,
Expand Down Expand Up @@ -161,7 +164,7 @@ public Merge.Mode get() {
}
}));

configuration.setBroadcastValue(mediatorConfigurationSupport.processBroadcast(outgoingValue,
configuration.setBroadcastValue(mediatorConfigurationSupport.processBroadcast(outgoingValues,
new Supplier<Integer>() {
@Override
public Integer get() {
Expand All @@ -176,6 +179,7 @@ public Integer get() {
return null;
}
}));
configuration.setHasTargetedOutput(mediatorConfigurationSupport.processTargetedOutput());

AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING);
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
Expand Down Expand Up @@ -328,6 +332,13 @@ private static List<String> getIncomingValues(MethodInfo methodInfo) {
.collect(Collectors.toList());
}

private static List<String> getOutgoingValues(MethodInfo methodInfo) {
return methodInfo.annotations().stream().filter(ai -> ai.name().equals(OUTGOINGS))
.flatMap(outgoings -> Arrays.stream(outgoings.value().asNestedArray()))
.map(outgoing -> outgoing.value().asString())
.collect(Collectors.toList());
}

private static String fullMethodName(MethodInfo methodInfo) {
return methodInfo.declaringClass() + "#" + methodInfo.name();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.smallrye.reactive.messaging.annotations.Incomings;
import io.smallrye.reactive.messaging.annotations.Merge;
import io.smallrye.reactive.messaging.annotations.OnOverflow;
import io.smallrye.reactive.messaging.annotations.Outgoings;
import io.smallrye.reactive.messaging.connector.InboundConnector;
import io.smallrye.reactive.messaging.connector.OutboundConnector;
import io.smallrye.reactive.messaging.keyed.KeyValueExtractor;
Expand All @@ -38,6 +39,7 @@ public final class ReactiveMessagingDotNames {
static final DotName INCOMING = DotName.createSimple(Incoming.class.getName());
static final DotName INCOMINGS = DotName.createSimple(Incomings.class.getName());
static final DotName OUTGOING = DotName.createSimple(Outgoing.class.getName());
static final DotName OUTGOINGS = DotName.createSimple(Outgoings.class.getName());

public static final DotName CONNECTOR = DotName.createSimple(Connector.class.getName());
static final DotName CONNECTOR_ATTRIBUTES = DotName.createSimple(ConnectorAttributes.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ public List<UnremovableBeanBuildItem> removalExclusions() {
new UnremovableBeanBuildItem(
new BeanClassAnnotationExclusion(
ReactiveMessagingDotNames.OUTGOING)),
new UnremovableBeanBuildItem(
new BeanClassAnnotationExclusion(
ReactiveMessagingDotNames.OUTGOINGS)),
new UnremovableBeanBuildItem(
new BeanClassAnnotationExclusion(
ReactiveMessagingDotNames.MESSAGE_CONVERTER)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,19 @@ void extractComponents(BeanDiscoveryFinishedBuildItem beanDiscoveryFinished,
ReactiveMessagingDotNames.INCOMINGS);
AnnotationInstance outgoing = transformedAnnotations.getAnnotation(method,
ReactiveMessagingDotNames.OUTGOING);
AnnotationInstance outgoings = transformedAnnotations.getAnnotation(method,
ReactiveMessagingDotNames.OUTGOINGS);
AnnotationInstance blocking = transformedAnnotations.getAnnotation(method,
BLOCKING);
if (incoming != null || incomings != null || outgoing != null) {
if (incoming != null || incomings != null || outgoing != null || outgoings != null) {
handleMethodAnnotatedWithIncoming(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
method, incoming);
handleMethodAnnotationWithIncomings(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
method, incomings);
handleMethodAnnotationWithOutgoing(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
method, outgoing);
handleMethodAnnotationWithOutgoings(appChannels, validationErrors, configDescriptionBuildItemBuildProducer,
method, outgoings);

if (WiringHelper.isSynthetic(method)) {
continue;
Expand Down Expand Up @@ -218,6 +222,24 @@ private void handleMethodAnnotationWithOutgoing(BuildProducer<ChannelBuildItem>
}
}

private void handleMethodAnnotationWithOutgoings(BuildProducer<ChannelBuildItem> appChannels,
BuildProducer<ValidationPhaseBuildItem.ValidationErrorBuildItem> validationErrors,
BuildProducer<ConfigDescriptionBuildItem> configDescriptionBuildItemBuildProducer,
MethodInfo method, AnnotationInstance outgoings) {
if (outgoings != null) {
for (AnnotationInstance instance : outgoings.value().asNestedArray()) {
if (instance.value().asString().isEmpty()) {
validationErrors.produce(new ValidationPhaseBuildItem.ValidationErrorBuildItem(
new DeploymentException("Empty @Outgoing annotation on method " + method)));
}
configDescriptionBuildItemBuildProducer.produce(new ConfigDescriptionBuildItem(
"mp.messaging.outgoing." + instance.value().asString() + ".connector", null,
"The connector to use", null, null, ConfigPhase.BUILD_TIME));
produceOutgoingChannel(appChannels, instance.value().asString());
}
}
}

private void handleMethodAnnotationWithIncomings(BuildProducer<ChannelBuildItem> appChannels,
BuildProducer<ValidationPhaseBuildItem.ValidationErrorBuildItem> validationErrors,
BuildProducer<ConfigDescriptionBuildItem> configDescriptionBuildItemBuildProducer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class QuarkusMediatorConfiguration implements MediatorConfiguration {

private List<String> incomings = new ArrayList<>();

private String outgoing;
private List<String> outgoings = new ArrayList<>();

private Acknowledgment.Strategy acknowledgment;

Expand Down Expand Up @@ -63,6 +63,8 @@ public class QuarkusMediatorConfiguration implements MediatorConfiguration {
private Type valueType;
private Class<? extends KeyValueExtractor> keyed;

private boolean hasTargetedOutput = false;

public String getBeanId() {
return beanId;
}
Expand Down Expand Up @@ -117,11 +119,16 @@ public void setIncomings(List<String> incomings) {

@Override
public String getOutgoing() {
return outgoing;
return outgoings.get(0);
}

@Override
public List<String> getOutgoings() {
return outgoings;
}

public void setOutgoing(String outgoing) {
this.outgoing = outgoing;
public void setOutgoings(List<String> outgoings) {
this.outgoings = outgoings;
}

@Override
Expand Down Expand Up @@ -310,4 +317,17 @@ public void setValueType(Type valueType) {
public void setKeyed(Class<? extends KeyValueExtractor> keyed) {
this.keyed = keyed;
}

@Override
public boolean hasTargetedOutput() {
return hasTargetedOutput;
}

public boolean isHasTargetedOutput() {
return hasTargetedOutput;
}

public void setHasTargetedOutput(boolean hasTargetedOutput) {
this.hasTargetedOutput = hasTargetedOutput;
}
}
Loading

0 comments on commit 1c87e6f

Please sign in to comment.