diff --git a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java index 32546bbdeb..3ff27ef6fd 100644 --- a/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java +++ b/smallrye-reactive-messaging-amqp/src/main/java/io/smallrye/reactive/messaging/amqp/AmqpConnector.java @@ -53,7 +53,6 @@ import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; import io.smallrye.reactive.messaging.providers.helpers.MultiUtils; import io.smallrye.reactive.messaging.providers.helpers.VertxContext; -import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig; import io.vertx.amqp.AmqpClientOptions; import io.vertx.amqp.AmqpReceiverOptions; import io.vertx.amqp.AmqpSenderOptions; @@ -224,10 +223,7 @@ public Flow.Publisher> getPublisher(Config config) { AmqpClient client = AmqpClientHelper.createClient(this, ic, clientOptions, clientSslContexts); - Context root = null; - if (ConcurrencyConnectorConfig.getConcurrency(config).filter(i -> i > 1).isPresent()) { - root = Context.newInstance(((VertxInternal) getVertx().getDelegate()).createEventLoopContext()); - } + Context root = Context.newInstance(((VertxInternal) getVertx().getDelegate()).createEventLoopContext()); ConnectionHolder holder = new ConnectionHolder(client, ic, getVertx(), root); holders.put(ic.getChannel(), holder); diff --git a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java index ff16c02736..542f0fc970 100644 --- a/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java +++ b/smallrye-reactive-messaging-jms/src/test/java/io/smallrye/reactive/messaging/jms/JmsSinkTest.java @@ -276,14 +276,21 @@ public void testWithDisconnection() { .extracting(m -> m.getBody(String.class)) .containsExactly("1")); + // close client + close(); + // send just before stopping bean.send("2"); stopArtemis(); + + assertThat(received).hasSize(1); + startArtemis(); // send after restart bean.send("3"); + // init the client init(); consumer = jms.createConsumer(q); consumer.setMessageListener(received::add); diff --git a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java index 30ce90c719..0f5eb4ee84 100644 --- a/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java +++ b/smallrye-reactive-messaging-mqtt/src/main/java/io/smallrye/reactive/messaging/mqtt/MqttSource.java @@ -16,7 +16,6 @@ import io.smallrye.reactive.messaging.mqtt.session.MqttClientSessionOptions; import io.smallrye.reactive.messaging.mqtt.session.RequestedQoS; import io.smallrye.reactive.messaging.providers.helpers.VertxContext; -import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig; import io.vertx.core.impl.VertxInternal; import io.vertx.mutiny.core.Context; import io.vertx.mutiny.core.Vertx; @@ -54,9 +53,7 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, } else { pattern = null; } - final Context root = ConcurrencyConnectorConfig.getConcurrency(config.config).filter(i -> i > 1) - .map(__ -> Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext())) - .orElse(null); + final Context root = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); holder = Clients.getHolder(vertx, options); holder.start().onSuccess(ignore -> started.set(true)); holder.getClient() @@ -69,7 +66,7 @@ public MqttSource(Vertx vertx, MqttConnectorIncomingConfiguration config, this.source = holder.stream() .select().where(m -> MqttTopicHelper.matches(topic, pattern, m)) - .plug(m -> (root != null) ? m.emitOn(c -> VertxContext.runOnContext(root.getDelegate(), c)) : m) + .emitOn(c -> VertxContext.runOnContext(root.getDelegate(), c)) .onItem().transform(m -> new ReceivingMqttMessage(m, onNack)) .stage(multi -> { if (broadcast) diff --git a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java index 4fd2bd8966..2987a1ffd2 100644 --- a/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java +++ b/smallrye-reactive-messaging-rabbitmq/src/main/java/io/smallrye/reactive/messaging/rabbitmq/internals/IncomingRabbitMQChannel.java @@ -22,7 +22,6 @@ import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.providers.helpers.CDIUtils; import io.smallrye.reactive.messaging.providers.helpers.VertxContext; -import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig; import io.smallrye.reactive.messaging.rabbitmq.ClientHolder; import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage; import io.smallrye.reactive.messaging.rabbitmq.RabbitMQConnector; @@ -131,10 +130,7 @@ private Uni> createConsumer(RabbitMQConne .subscribe().with(ignored -> promise.complete(), promise::fail); }); - Context root = null; - if (ConcurrencyConnectorConfig.getConcurrency(ic.config()).filter(i -> i > 1).isPresent()) { - root = Context.newInstance(((VertxInternal) connector.vertx().getDelegate()).createEventLoopContext()); - } + Context root = Context.newInstance(((VertxInternal) connector.vertx().getDelegate()).createEventLoopContext()); final ClientHolder holder = new ClientHolder(client, ic, connector.vertx(), root); return holder.getOrEstablishConnection() .invoke(() -> log.connectionEstablished(ic.getChannel()))