Skip to content

Commit

Permalink
Merge pull request #37899 from ozangunalp/smallrye_rm_4.13.0
Browse files Browse the repository at this point in the history
Bump Smallrye Reactive Messaging version from 4.12.0 to 4.13.0
  • Loading branch information
ozangunalp authored Jan 2, 2024
2 parents 0314b10 + cc89874 commit 8892272
Show file tree
Hide file tree
Showing 18 changed files with 634 additions and 22 deletions.
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>3.0.1</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>3.7.2</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>4.12.0</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>4.13.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>2.4.0</smallrye-stork.version>
<jakarta.activation.version>2.1.2</jakarta.activation.version>
<jakarta.annotation-api.version>2.1.1</jakarta.annotation-api.version>
Expand Down
135 changes: 132 additions & 3 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,19 @@ image::kafka-one-app-one-consumer.png[alt=Architecture, width=60%, align=center]

. *Multiple consumer threads inside a consumer group*
+
For a given application instance, the number of consumers inside the consumer group can be configured using `mp.messaging.incoming.$channel.partitions` property.
For a given application instance, the number of consumers inside the consumer group can be configured using `mp.messaging.incoming.$channel.concurrency` property.
The partitions of the subscribed topic will be divided among the consumer threads.
Note that if the `partitions` value exceed the number of partitions of the topic, some consumer threads won't be assigned any partitions.
Note that if the `concurrency` value exceed the number of partitions of the topic, some consumer threads won't be assigned any partitions.
+
image::kafka-one-app-two-consumers.png[alt=Architecture, width=60%, align=center]
+
[NOTE]
.Deprecation
====
The https://smallrye.io/smallrye-reactive-messaging/latest/concepts/incoming-concurrency/[concurrency attribute]
provides a connector agnostic way for non-blocking concurrent channels and replaces the Kafka connector specific `partitions` attribute.
The `partitions` attribute is therefore deprecated and will be removed in future releases.
====

. *Multiple consumer applications inside a consumer group*
+
Expand Down Expand Up @@ -658,6 +666,36 @@ mp.messaging.incoming.your-channel.group.id=${quarkus.uuid}

IMPORTANT: If the `group.id` attribute is not set, it defaults the `quarkus.application.name` configuration property.

==== Manual topic-partition assignment

The `assign-seek` channel attribute allows manually assigning topic-partitions to a Kafka incoming channel,
and optionally seek to a specified offset in the partition to start consuming records.
If `assign-seek` is used, the consumer will not be dynamically subscribed to topics,
but instead will statically assign the described partitions.
In manual topic-partition rebalancing doesn't happen and therefore rebalance listeners are never called.

The attribute takes a list of triplets separated by commas: `<topic>:<partition>:<offset>`.

For example, the configuration

[source, properties]
----
mp.messaging.incoming.data.assign-seek=topic1:0:10, topic2:1:20
----

assigns the consumer to:

- Partition 0 of topic 'topic1', setting the initial position at offset 10.
- Partition 1 of topic 'topic2', setting the initial position at offset 20.

The topic, partition, and offset in each triplet can have the following variations:

- If the topic is omitted, the configured topic will be used.
- If the offset is omitted, partitions are assigned to the consumer but won't be sought to offset.
- If offset is 0, it seeks to the beginning of the topic-partition.
- If offset is -1, it seeks to the end of the topic-partition.


=== Receiving Kafka Records in Batches

By default, incoming methods receive each Kafka record individually.
Expand Down Expand Up @@ -1112,7 +1150,7 @@ The new `Emitter.send` method returns a `CompletionStage` completed when the pro
====

[NOTE]
.Depreciation
.Deprecation
====
`MutinyEmitter#send(Message msg)` method is deprecated in favor of following methods receiving `Message` for emitting:
Expand Down Expand Up @@ -1376,6 +1414,55 @@ If you'd like to consume records only written and committed inside a Kafka trans
mp.messaging.incoming.prices-in.isolation.level=read_committed
----

== Kafka Request-Reply

The Kafka Request-Reply pattern allows to publish a request record to a Kafka topic and then await for a reply record that responds to the initial request.
The Kafka connector provides the `KafkaRequestReply` custom emitter that implements the requestor (or the client) of the request-reply pattern for Kafka outbound channels:

It can be injected as a regular emitter `@Channel`:

[source, java]
----
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply;
@ApplicationScoped
@Path("/kafka")
public class KafkaRequestReplyEmitter {
@Channel("request-reply")
KafkaRequestReply<Integer, String> requestReply;
@POST
@Path("/req-rep")
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> post(Integer request) {
return requestReply.request(request);
}
}
----

The request method publishes the record to the configured target topic of the outgoing channel,
and polls a reply topic (by default, the target topic with `-replies` suffix) for a reply record.
When the reply is received the returned `Uni` is completed with the record value.
The request send operation generates a **correlation id** and sets a header (by default `REPLY_CORRELATION_ID`),
which it expects to be sent back in the reply record.

The replier can be implemented using a Reactive Messaging processor (see <<processing-messages>>).

For more information on Kafka Request Reply feature and advanced configuration options,
see the https://smallrye.io/smallrye-reactive-messaging/latest/kafka/request-reply/[Smallrye Reactive Messaging Documentation].

[[processing-messages]]
== Processing Messages

Applications streaming data often need to consume some events from a topic, process them and publish the result to a different topic.
Expand Down Expand Up @@ -1944,6 +2031,34 @@ and for an outgoing channel checks that the topic used by the producer exist in
Note that to achieve this, an _admin connection_ is required.
You can adjust the timeout for topic verification calls to the broker using the `health-topic-verification-timeout` configuration.

== Observability

If the xref:opentelemetry.adoc[OpenTelemetry extension] is present,
then the Kafka connector channels work out-of-the-box with the OpenTelemetry Tracing.
Messages written to Kafka topics propagate the current tracing span.
On incoming channels, if a consumed Kafka record contains tracing information the message processing inherits the message span as parent.

Tracing can be disabled explicitly per channel:

[source, properties]
----
mp.messaging.incoming.data.tracing-enabled=false
----

If the xref:telemetry-micrometer.adoc[Micrometer extension] is present,
then Kafka producer and consumer clients metrics are exposed as Micrometer meters.

Per channel metrics are also exposed as Micrometer meters.
The number of messages produced or received per channel, acknowledgments and duration of processing are exposed.

The messaging meters can be disabled:

[source, properties]
----
quarkus.micrometer.binder.messaging.enabled=false
----


== Kafka Streams

This is described in a dedicated guide: xref:kafka-streams.adoc[Using Apache Kafka Streams].
Expand Down Expand Up @@ -2204,6 +2319,20 @@ With in-memory channels we were able to test application code processing message
Note that different in-memory channels are independent, and switching channel connector to in-memory does not simulate message delivery between channels configured to the same Kafka topic.
====

==== Context propagation with InMemoryConnector

By default, in-memory channels dispatch messages on the caller thread, which would be the main thread in unit tests.

The `quarkus-test-vertx` dependency provides the `@io.quarkus.test.vertx.RunOnVertxContext` annotation,
which when used on a test method, executes the test on a Vert.x context.

However, most of the other connectors handle context propagation dispatching messages on separate duplicated Vert.x contexts.

If your tests are dependent on context propagation,
you can configure the in-memory connector channels with the `run-on-vertx-context` attribute to dispatch events,
including messages and acknowledgements, on a Vert.x context.
Alternatively you can switch this behaviour using the `InMemorySource#runOnVertxContext` method.

=== Testing using a Kafka broker

If you are using <<kafka-dev-services>>, a Kafka broker will be started and available throughout the tests, unless it is disabled in `%test` profile.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.quarkus.micrometer.deployment.binder;

import java.util.function.BooleanSupplier;

import io.quarkus.arc.deployment.AdditionalBeanBuildItem;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.micrometer.runtime.MicrometerRecorder;
import io.quarkus.micrometer.runtime.config.MicrometerConfig;

public class ReactiveMessagingProcessor {

static final String MESSAGE_OBSERVATION_COLLECTOR = "io.smallrye.reactive.messaging.observation.MessageObservationCollector";
static final String METRICS_BEAN_CLASS = "io.quarkus.micrometer.runtime.binder.reactivemessaging.MicrometerObservationCollector";
static final Class<?> MESSAGE_OBSERVATION_COLLECTOR_CLASS = MicrometerRecorder
.getClassForName(MESSAGE_OBSERVATION_COLLECTOR);

static class ReactiveMessagingSupportEnabled implements BooleanSupplier {
MicrometerConfig mConfig;

public boolean getAsBoolean() {
return MESSAGE_OBSERVATION_COLLECTOR_CLASS != null &&
mConfig.checkBinderEnabledWithDefault(mConfig.binder.messaging);
}
}

@BuildStep(onlyIf = ReactiveMessagingSupportEnabled.class)
AdditionalBeanBuildItem createCDIEventConsumer() {
return AdditionalBeanBuildItem.builder()
.addBeanClass(METRICS_BEAN_CLASS)
.setUnremovable()
.build();
}
}
6 changes: 6 additions & 0 deletions extensions/micrometer/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.quarkus.micrometer.runtime.binder.reactivemessaging;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Message;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.smallrye.reactive.messaging.observation.DefaultMessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservation;
import io.smallrye.reactive.messaging.observation.MessageObservationCollector;
import io.smallrye.reactive.messaging.observation.ObservationContext;

@ApplicationScoped
public class MicrometerObservationCollector
implements MessageObservationCollector<MicrometerObservationCollector.MicrometerContext> {

@Inject
@ConfigProperty(name = "quarkus.messaging.observation.micrometer.enabled", defaultValue = "true")
boolean enabled;

@Override
public MicrometerContext initObservation(String channel, boolean incoming, boolean emitter) {
if (enabled) {
return new MicrometerContext(channel);
}
return null;
}

@Override
public MessageObservation onNewMessage(String channel, Message<?> message, MicrometerContext ctx) {
ctx.msgCount.increment();
return new DefaultMessageObservation(channel);
}

public static class MicrometerContext implements ObservationContext {
final Counter msgCount;
final Timer duration;
final Counter acks;
final Counter nacks;

public MicrometerContext(String channel) {
Tags tags = Tags.of(Tag.of("channel", channel));
this.msgCount = Counter.builder("quarkus.messaging.message.count")
.description("The number of messages observed")
.tags(tags)
.register(Metrics.globalRegistry);
this.duration = Timer.builder("quarkus.messaging.message.duration")
.description("The duration of the message processing")
.tags(tags)
.register(Metrics.globalRegistry);
this.acks = Counter.builder("quarkus.messaging.message.acks")
.description("The number of messages processed successfully")
.tags(tags)
.register(Metrics.globalRegistry);
this.nacks = Counter.builder("quarkus.messaging.message.failures")
.description("The number of messages processed with failures")
.tags(tags)
.register(Metrics.globalRegistry);
}

@Override
public void complete(MessageObservation observation) {
if (observation.getReason() == null) {
acks.increment();
} else {
nacks.increment();
}
duration.record(observation.getCompletionDuration());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public static class BinderConfig {

public GrpcClientConfigGroup grpcClient;

public ReactiveMessagingConfigGroup messaging;

public MPMetricsConfigGroup mpMetrics;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.micrometer.runtime.config;

import java.util.Optional;

import io.quarkus.runtime.annotations.ConfigGroup;
import io.quarkus.runtime.annotations.ConfigItem;

/**
* Build / static runtime config for Reactive Messaging Binders
*/
@ConfigGroup
public class ReactiveMessagingConfigGroup implements MicrometerConfig.CapabilityEnabled {
/**
* Kafka metrics support.
* <p>
* Support for Reactive Messaging metrics will be enabled if Micrometer support is enabled,
* MessageObservationCollector interface is on the classpath
* and either this value is true, or this value is unset and
* {@code quarkus.micrometer.binder-enabled-default} is true.
*/
@ConfigItem
public Optional<Boolean> enabled;

@Override
public Optional<Boolean> getEnabled() {
return enabled;
}

@Override
public String toString() {
return this.getClass().getSimpleName()
+ "{enabled=" + enabled
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ final class DotNames {

static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName());
static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName());
static final DotName KAFKA_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions.class.getName());
static final DotName KAFKA_TRANSACTIONS_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions.class.getName());
static final DotName KAFKA_REQUEST_REPLY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.reply.KafkaRequestReply.class.getName());

static final DotName TARGETED = DotName.createSimple(io.smallrye.reactive.messaging.Targeted.class.getName());
static final DotName TARGETED_MESSAGES = DotName.createSimple(io.smallrye.reactive.messaging.TargetedMessages.class.getName());
Expand Down
Loading

0 comments on commit 8892272

Please sign in to comment.