diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java index d45335a2ca..30ce90c719 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java @@ -15,6 +15,10 @@ import io.smallrye.reactive.messaging.mqtt.internal.MqttTopicHelper; import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS; +import io.smallrye.reactive.messaging.providers.helpers.VertxContext; +import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig; +import io.vertx.core.impl.VertxInternal; +import io.vertx.mutiny.core.Context; import io.vertx.mutiny.core.Vertx; public class MqttSource { @@ -50,7 +54,9 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, } else { pattern = null; } - + final Context root = ConcurrencyConnectorConfig.getConcurrency(config.config).filter(i -> i > 1) + .map(__ -> Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext())) + .orElse(null); holder = Clients.getHolder(vertx, options); holder.start().onSuccess(ignore -> started.set(true)); holder.getClient() @@ -63,6 +69,7 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, this.source = holder.stream() .select().where(m -> MqttTopicHelper.matches(topic, pattern, m)) + .plug(m -> (root != null) ? m.emitOn(c -> VertxContext.runOnContext(root.getDelegate(), c)) : m) .onItem().transform(m -> new ReceivingMqttMessage(m, onNack)) .stage(multi -> { if (broadcast) diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/ReceivingMqttMessageMetadata.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/ReceivingMqttMessageMetadata.java index ea25743600..b1de162ba8 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/ReceivingMqttMessageMetadata.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/ReceivingMqttMessageMetadata.java @@ -14,6 +14,13 @@ public ReceivingMqttMessageMetadata(MqttPublishMessage message) { this.message = message; } + /** + * @return the MQTT message + */ + public MqttPublishMessage getMessage() { + return message; + } + /** * @return the message id of the MQTT message */ diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/converter/MqttMessageConverter.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/converter/MqttMessageConverter.java new file mode 100644 index 0000000000..3b0891e5db --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/converter/MqttMessageConverter.java @@ -0,0 +1,28 @@ +package io.smallrye.reactive.messaging.mqtt.converter; + +import java.lang.reflect.Type; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; + +import io.smallrye.reactive.messaging.MessageConverter; +import io.smallrye.reactive.messaging.mqtt.ReceivingMqttMessageMetadata; +import io.smallrye.reactive.messaging.providers.helpers.TypeUtils; +import io.vertx.mqtt.messages.MqttPublishMessage; + +@ApplicationScoped +public class MqttMessageConverter implements MessageConverter { + @Override + public boolean canConvert(Message in, Type target) { + return in.getMetadata(ReceivingMqttMessageMetadata.class).isPresent() + && TypeUtils.isAssignable(target, MqttPublishMessage.class); + } + + @Override + public Message convert(Message in, Type target) { + ReceivingMqttMessageMetadata metadata = in.getMetadata(ReceivingMqttMessageMetadata.class) + .orElseThrow(() -> new IllegalStateException("No MQTT metadata")); + return in.withPayload(metadata.getMessage().getDelegate()); + } +} diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/ConcurrentProcessorTest.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/ConcurrentProcessorTest.java new file mode 100644 index 0000000000..0d0f8f918f --- /dev/null +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/ConcurrentProcessorTest.java @@ -0,0 +1,284 @@ +package io.smallrye.reactive.messaging.mqtt; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorLiteral; +import org.jboss.weld.environment.se.Weld; +import org.jboss.weld.environment.se.WeldContainer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; +import io.vertx.mqtt.messages.MqttPublishMessage; + +public class ConcurrentProcessorTest extends MqttTestBase { + + private WeldContainer container; + + private String topic; + private String clientId; + + private MapBasedConfig dataconfig() { + return new MapBasedConfig() + .with("mp.messaging.incoming.data.connector", MqttConnector.CONNECTOR_NAME) + .with("mp.messaging.incoming.data.host", address) + .with("mp.messaging.incoming.data.port", port) + .with("mp.messaging.incoming.data.client-id", clientId) + .with("mp.messaging.incoming.data.qos", 1) + .with("mp.messaging.incoming.data.concurrency", 3) + .with("mp.messaging.incoming.data$1.topic", topic + "-1") + .with("mp.messaging.incoming.data$2.topic", topic + "-2") + .with("mp.messaging.incoming.data$3.topic", topic + "-3"); + } + + private void produceMessages() { + CountDownLatch latch = new CountDownLatch(3); + AtomicInteger counter = new AtomicInteger(0); + usage.produceIntegers(topic + "-1", 4, latch::countDown, counter::getAndIncrement); + usage.produceIntegers(topic + "-2", 3, latch::countDown, counter::getAndIncrement); + usage.produceIntegers(topic + "-3", 3, latch::countDown, counter::getAndIncrement); + try { + latch.await(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private T runApplication(MapBasedConfig config, Class beanClass) { + Weld weld = baseWeld(config); + weld.addBeanClass(beanClass); + container = weld.initialize(); + + waitUntilReady(container); + return container.getBeanManager().createInstance().select(beanClass).get(); + } + + public void waitUntilReady(WeldContainer container) { + MqttConnector connector = container.select(MqttConnector.class, + ConnectorLiteral.of(MqttConnector.CONNECTOR_NAME)).get(); + await().until(() -> connector.getReadiness().isOk()); + } + + @BeforeEach + public void setupTopicName() { + topic = UUID.randomUUID().toString(); + clientId = UUID.randomUUID().toString(); + } + + @AfterEach + public void cleanup() { + if (container != null) { + container.close(); + } + Clients.clear(); + } + + @Test + public void testConcurrentConsumer() { + MyConsumerBean bean = runApplication(dataconfig(), MyConsumerBean.class); + waitUntilReady(container); + + List list = bean.getResults(); + assertThat(list).isEmpty(); + + produceMessages(); + await().untilAsserted(() -> { + assertThat(bean.getResults()).hasSize(10); + assertThat(bean.getPerThread().keySet()).hasSize(3); + }); + assertThat(bean.getResults()).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void testConcurrentProcessor() { + MyProcessorBean bean = runApplication(dataconfig(), MyProcessorBean.class); + waitUntilReady(container); + + List list = bean.getResults(); + assertThat(list).isEmpty(); + + produceMessages(); + await().untilAsserted(() -> { + assertThat(bean.getResults()).hasSize(10); + assertThat(bean.getPerThread().keySet()).hasSize(3); + }); + assertThat(bean.getResults()).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void testConcurrentStreamTransformer() { + MyStreamTransformerBean bean = runApplication(dataconfig(), MyStreamTransformerBean.class); + waitUntilReady(container); + + List list = bean.getResults(); + assertThat(list).isEmpty(); + + produceMessages(); + await().untilAsserted(() -> { + assertThat(bean.getResults()).hasSize(10); + assertThat(bean.getPerThread().keySet()).hasSize(3); + }); + assertThat(bean.getResults()).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void testConcurrentStreamInjectingBean() { + MyChannelInjectingBean bean = runApplication(dataconfig(), MyChannelInjectingBean.class); + bean.process(); + waitUntilReady(container); + + List list = bean.getResults(); + assertThat(list).isEmpty(); + + produceMessages(); + await().untilAsserted(() -> { + assertThat(bean.getResults()).hasSize(10); + assertThat(bean.getPerThread().keySet()).hasSize(3); + }); + assertThat(bean.getResults()).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @ApplicationScoped + public static class MyConsumerBean { + + private final List list = new CopyOnWriteArrayList<>(); + private final Map> perThread = new ConcurrentHashMap<>(); + + @Incoming("data") + public Uni process(MqttPublishMessage input) { + int value = Integer.parseInt(input.payload().toString()); + int next = value + 1; + perThread.computeIfAbsent(Thread.currentThread(), t -> new CopyOnWriteArrayList<>()).add(next); + list.add(next); + return Uni.createFrom().voidItem().onItem().delayIt().by(Duration.ofMillis(100)); + } + + public List getResults() { + return list; + } + + public Map> getPerThread() { + return perThread; + } + } + + @ApplicationScoped + public static class MyProcessorBean { + + private final List list = new CopyOnWriteArrayList<>(); + private final Map> perThread = new ConcurrentHashMap<>(); + + @Incoming("data") + @Outgoing("sink") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + public Uni> process(Message input) { + int value = Integer.parseInt(input.getPayload()); + int next = value + 1; + perThread.computeIfAbsent(Thread.currentThread(), t -> new CopyOnWriteArrayList<>()).add(next); + return Uni.createFrom().item(Message.of(next, input::ack)) + .onItem().delayIt().by(Duration.ofMillis(100)); + } + + @Incoming("sink") + public void sink(int val) { + list.add(val); + } + + public List getResults() { + return list; + } + + public Map> getPerThread() { + return perThread; + } + } + + @ApplicationScoped + public static class MyStreamTransformerBean { + + private final List list = new CopyOnWriteArrayList<>(); + private final Map> perThread = new ConcurrentHashMap<>(); + + @Incoming("data") + @Outgoing("sink") + public Multi> process(Multi> multi) { + return multi.onItem() + .transformToUniAndConcatenate(input -> { + int value = Integer.parseInt(input.getPayload()); + int next = value + 1; + perThread.computeIfAbsent(Thread.currentThread(), t -> new CopyOnWriteArrayList<>()).add(next); + return Uni.createFrom().item(Message.of(next, input::ack)) + .onItem().delayIt().by(Duration.ofMillis(100)); + }); + } + + @Incoming("sink") + public void sink(int val) { + list.add(val); + } + + public List getResults() { + return list; + } + + public Map> getPerThread() { + return perThread; + } + } + + @ApplicationScoped + public static class MyChannelInjectingBean { + + private final List list = new CopyOnWriteArrayList<>(); + private final Map> perThread = new ConcurrentHashMap<>(); + + @Channel("data") + @Inject + Multi> multi; + + public void process() { + multi.onItem() + .transformToUniAndConcatenate(input -> { + int value = Integer.parseInt(input.getPayload()); + int next = value + 1; + list.add(next); + perThread.computeIfAbsent(Thread.currentThread(), t -> new CopyOnWriteArrayList<>()).add(next); + return Uni.createFrom().completionStage(input::ack) + .onItem().delayIt().by(Duration.ofMillis(100)); + }) + .subscribe().with(__ -> { + }); + } + + public List getResults() { + return list; + } + + public Map> getPerThread() { + return perThread; + } + } + +} diff --git a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java index f65d6f7b24..a4bdf0e162 100644 --- a/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java +++ b/smallrye-reactive-messaging-mqtt/src/test/java/io/smallrye/reactive/messaging/mqtt/MqttTestBase.java @@ -19,6 +19,7 @@ import io.smallrye.config.SmallRyeConfigProviderResolver; import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.mqtt.converter.JsonObjectMessageConverter; +import io.smallrye.reactive.messaging.mqtt.converter.MqttMessageConverter; import io.smallrye.reactive.messaging.mqtt.converter.StringMessageConverter; import io.smallrye.reactive.messaging.providers.MediatorFactory; import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; @@ -157,6 +158,7 @@ static Weld baseWeld(MapBasedConfig config) { weld.addBeanClass(HealthCenter.class); weld.addBeanClass(JsonObjectMessageConverter.class); weld.addBeanClass(StringMessageConverter.class); + weld.addBeanClass(MqttMessageConverter.class); // Add SmallRye Config weld.addExtension(new io.smallrye.config.inject.ConfigExtension());