Skip to content

Commit

Permalink
Incoming concurrency rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 23, 2023
1 parent 7a49159 commit dbb227f
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public class ConnectionHolder {

public ConnectionHolder(RabbitMQClient client,
RabbitMQConnectorCommonConfiguration configuration,
Vertx vertx) {
Vertx vertx,
Context root) {
this.client = client;
this.vertx = vertx;
this.connector = Uni.createFrom().voidItem()
Expand All @@ -34,7 +35,8 @@ public ConnectionHolder(RabbitMQClient client,
return client.start()
.onSubscription().invoke(() -> log.connectionEstablished(configuration.getChannel()))
.onItem().transform(ignored -> {
connectionHolder.set(new CurrentConnection(client, Vertx.currentContext()));
connectionHolder
.set(new CurrentConnection(client, root == null ? Vertx.currentContext() : root));

// handle the case we are already disconnected.
if (!client.isConnected() || connectionHolder.get() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,17 @@
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.helpers.VertxContext;
import io.smallrye.reactive.messaging.providers.impl.ConcurrencyConnectorConfig;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAck;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAckHandler;
import io.smallrye.reactive.messaging.rabbitmq.ack.RabbitMQAutoAck;
import io.smallrye.reactive.messaging.rabbitmq.fault.RabbitMQFailureHandler;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQOpenTelemetryInstrumenter;
import io.smallrye.reactive.messaging.rabbitmq.tracing.RabbitMQTrace;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.json.JsonObject;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.rabbitmq.RabbitMQClient;
import io.vertx.mutiny.rabbitmq.RabbitMQConsumer;
Expand Down Expand Up @@ -220,12 +224,14 @@ private Multi<? extends Message<?>> getStreamOfMessages(

if (isTracingEnabled) {
return receiver.toMulti()
.emitOn(c -> VertxContext.runOnContext(holder.getContext().getDelegate(), c))
.map(m -> new IncomingRabbitMQMessage<>(m, holder, onNack, onAck, contentTypeOverride))
.map(msg -> instrumenter.traceIncoming(msg,
RabbitMQTrace.traceQueue(queueName, msg.message.envelope().getRoutingKey(),
msg.getHeaders())));
} else {
return receiver.toMulti()
.emitOn(c -> VertxContext.runOnContext(holder.getContext().getDelegate(), c))
.map(m -> new IncomingRabbitMQMessage<>(m, holder, onNack, onAck, contentTypeOverride));
}
}
Expand Down Expand Up @@ -276,7 +282,11 @@ public Flow.Publisher<? extends Message<?>> getPublisher(final Config config) {
.onItem().call(() -> establishDLQ(client, ic))
.subscribe().with(ignored -> promise.complete(), promise::fail);
});
final ConnectionHolder holder = new ConnectionHolder(client, ic, getVertx());
Context root = null;
if (ConcurrencyConnectorConfig.getConcurrency(config).filter(i -> i > 1).isPresent()) {
root = Context.newInstance(((VertxInternal) getVertx().getDelegate()).createEventLoopContext());
}
final ConnectionHolder holder = new ConnectionHolder(client, ic, getVertx(), root);
return holder.getOrEstablishConnection()
.invoke(() -> log.connectionEstablished(connectionIdx, ic.getChannel()))
.flatMap(connection -> createConsumer(ic, connection).map(consumer -> Tuple2.of(holder, consumer)));
Expand Down Expand Up @@ -385,7 +395,7 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber(final Config config)
establishExchange(client, oc).subscribe().with((ignored) -> promise.complete(), promise::fail);
});

final ConnectionHolder holder = new ConnectionHolder(client, oc, getVertx());
final ConnectionHolder holder = new ConnectionHolder(client, oc, getVertx(), null);
final Uni<RabbitMQPublisher> getSender = holder.getOrEstablishConnection()
.onItem().transformToUni(connection -> Uni.createFrom().item(RabbitMQPublisher.create(getVertx(), connection,
new RabbitMQPublisherOptions()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package io.smallrye.reactive.messaging.rabbitmq;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Acknowledgment;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;

import com.rabbitmq.client.AMQP;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class ConcurrentProcessorTest extends WeldTestBase {

private MapBasedConfig dataconfig() {
return commonConfig()
.with("mp.messaging.incoming.data.connector", RabbitMQConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.data.queue.durable", true)
.with("mp.messaging.incoming.data.queue.name", "(server.auto)")
.with("mp.messaging.incoming.data.exchange.name", exchange)
.with("mp.messaging.incoming.data.exchange.type", "direct")
.with("mp.messaging.incoming.data.max-outstanding-messages", 1)
.with("mp.messaging.incoming.data.concurrency", 3)
.with("mp.messaging.incoming.data$1.routing-keys", "foo")
.with("mp.messaging.incoming.data$2.routing-keys", "bar")
.with("mp.messaging.incoming.data$3.routing-keys", "qux");
}

private void produceMessages() {
AtomicInteger counter = new AtomicInteger(0);
usage.produce(exchange, null, "foo", 4, counter::getAndIncrement,
new AMQP.BasicProperties.Builder().contentType("text/plain").headers(Map.of("key", "foo")).build());
usage.produce(exchange, null, "bar", 3, counter::getAndIncrement,
new AMQP.BasicProperties.Builder().contentType("text/plain").headers(Map.of("key", "bar")).build());
usage.produce(exchange, null, "qux", 3, counter::getAndIncrement,
new AMQP.BasicProperties.Builder().contentType("text/plain").headers(Map.of("key", "qux")).build());
}

@Test
public void testConcurrentConsumer() {
MyConsumerBean bean = runApplication(dataconfig(), MyConsumerBean.class);

List<Integer> list = bean.getResults();
assertThat(list).isEmpty();

produceMessages();
await().untilAsserted(() -> {
assertThat(bean.getResults()).hasSize(10);
assertThat(bean.getPerThread().keySet()).hasSize(3);
});
assertThat(bean.getResults()).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

@Test
public void testConcurrentProcessor() {
MyProcessorBean bean = runApplication(dataconfig(), MyProcessorBean.class);

List<Integer> list = bean.getResults();
assertThat(list).isEmpty();

produceMessages();
await().untilAsserted(() -> {
assertThat(bean.getResults()).hasSize(10);
assertThat(bean.getPerThread().keySet()).hasSize(3);
});
assertThat(bean.getResults()).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

@Test
public void testConcurrentStreamTransformer() {
MyStreamTransformerBean bean = runApplication(dataconfig(), MyStreamTransformerBean.class);

List<Integer> list = bean.getResults();
assertThat(list).isEmpty();

produceMessages();
await().untilAsserted(() -> {
assertThat(bean.getResults()).hasSize(10);
assertThat(bean.getPerThread().keySet()).hasSize(3);
});
assertThat(bean.getResults()).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

@Test
public void testConcurrentStreamInjectingBean() {
weld.addBeanClass(MyChannelInjectingBean.class);
dataconfig().write();
container = weld.initialize();
MyChannelInjectingBean bean = get(MyChannelInjectingBean.class);
bean.process();
await().until(() -> isRabbitMQConnectorAlive(container));
await().until(() -> isRabbitMQConnectorReady(container));

List<Integer> list = bean.getResults();
assertThat(list).isEmpty();

produceMessages();
await().untilAsserted(() -> {
assertThat(bean.getResults()).hasSize(10);
assertThat(bean.getPerThread().keySet()).hasSize(3);
});
assertThat(bean.getResults()).containsExactlyInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

@ApplicationScoped
public static class MyConsumerBean {

private final List<Integer> list = new CopyOnWriteArrayList<>();
private final Map<Thread, List<Integer>> perThread = new ConcurrentHashMap<>();

@Incoming("data")
public Uni<Void> process(String input) {
int value = Integer.parseInt(input);
int next = value + 1;
perThread.computeIfAbsent(Thread.currentThread(), t -> new CopyOnWriteArrayList<>()).add(next);
list.add(next);
return Uni.createFrom().voidItem().onItem().delayIt().by(Duration.ofMillis(100));
}

public List<Integer> getResults() {
return list;
}

public Map<Thread, List<Integer>> getPerThread() {
return perThread;
}
}

@ApplicationScoped
public static class MyProcessorBean {

private final List<Integer> list = new CopyOnWriteArrayList<>();
private final Map<Thread, List<Integer>> perThread = new ConcurrentHashMap<>();

@Incoming("data")
@Outgoing("sink")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public Uni<Message<Integer>> process(IncomingRabbitMQMessage<String> input) {
int value = Integer.parseInt(input.getPayload());
int next = value + 1;
perThread.computeIfAbsent(Thread.currentThread(), t -> new CopyOnWriteArrayList<>()).add(next);
return Uni.createFrom().item(Message.of(next, input::ack))
.onItem().delayIt().by(Duration.ofMillis(100));
}

@Incoming("sink")
public void sink(int val) {
list.add(val);
}

public List<Integer> getResults() {
return list;
}

public Map<Thread, List<Integer>> getPerThread() {
return perThread;
}
}

@ApplicationScoped
public static class MyStreamTransformerBean {

private final List<Integer> list = new CopyOnWriteArrayList<>();
private final Map<Thread, List<Integer>> perThread = new ConcurrentHashMap<>();

@Incoming("data")
@Outgoing("sink")
public Multi<Message<Integer>> process(Multi<IncomingRabbitMQMessage<String>> multi) {
return multi.onItem()
.transformToUniAndConcatenate(input -> {
int value = Integer.parseInt(input.getPayload());
int next = value + 1;
perThread.computeIfAbsent(Thread.currentThread(), t -> new CopyOnWriteArrayList<>()).add(next);
return Uni.createFrom().item(Message.of(next, input::ack))
.onItem().delayIt().by(Duration.ofMillis(100));
});
}

@Incoming("sink")
public void sink(int val) {
list.add(val);
}

public List<Integer> getResults() {
return list;
}

public Map<Thread, List<Integer>> getPerThread() {
return perThread;
}
}

@ApplicationScoped
public static class MyChannelInjectingBean {

private final List<Integer> list = new CopyOnWriteArrayList<>();
private final Map<Thread, List<Integer>> perThread = new ConcurrentHashMap<>();

@Inject
@Channel("data")
Multi<Message<String>> multi;

public void process() {
multi.onItem()
.transformToUniAndConcatenate(input -> {
int value = Integer.parseInt(input.getPayload());
int next = value + 1;
list.add(next);
perThread.computeIfAbsent(Thread.currentThread(), t -> new CopyOnWriteArrayList<>()).add(next);
return Uni.createFrom().completionStage(input::ack)
.onItem().delayIt().by(Duration.ofMillis(100));
})
.subscribe().with(__ -> {
});
}

public List<Integer> getResults() {
return list;
}

public Map<Thread, List<Integer>> getPerThread() {
return perThread;
}
}

}

0 comments on commit dbb227f

Please sign in to comment.