Skip to content

Commit

Permalink
Use concurrent hash map to handle publisher confirms
Browse files Browse the repository at this point in the history
Same algorithm as in the White Rabbit lib.
  • Loading branch information
acogoluegnes committed Mar 8, 2019
1 parent b10e831 commit 9404592
Showing 1 changed file with 39 additions and 14 deletions.
53 changes: 39 additions & 14 deletions src/main/java/reactor/rabbitmq/Sender.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
Expand Down Expand Up @@ -526,7 +524,10 @@ private static class PublishConfirmSubscriber implements

private final AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();

private final ConcurrentNavigableMap<Long, OutboundMessage> unconfirmed = new ConcurrentSkipListMap<>();
// private final ConcurrentNavigableMap<Long, OutboundMessage> unconfirmed = new ConcurrentSkipListMap<>();

private final Map<Long, OutboundMessage> unconfirmed = new ConcurrentHashMap<>();
private final AtomicLong lowerBoundOfMultiple = new AtomicLong(1);

private final Channel channel;

Expand Down Expand Up @@ -555,24 +556,48 @@ public void handleNack(long deliveryTag, boolean multiple) {
}

private void handleAckNack(long deliveryTag, boolean multiple, boolean ack) {
// if (multiple) {
// try {
// ConcurrentNavigableMap<Long, OutboundMessage> unconfirmedToSend = unconfirmed.headMap(deliveryTag, true);
// Iterator<Map.Entry<Long, OutboundMessage>> iterator = unconfirmedToSend.entrySet().iterator();
// while (iterator.hasNext()) {
// subscriber.onNext(new OutboundMessageResult(iterator.next().getValue(), ack));
// iterator.remove();
// }
// } catch (Exception e) {
// handleError(e, null);
// }
// } else {
// OutboundMessage outboundMessage = unconfirmed.get(deliveryTag);
// try {
// unconfirmed.remove(deliveryTag);
// subscriber.onNext(new OutboundMessageResult(outboundMessage, ack));
// } catch (Exception e) {
// handleError(e, new OutboundMessageResult(outboundMessage, ack));
// }
// }
Long lowerBound = lowerBoundOfMultiple.get();
if (multiple) {
try {
ConcurrentNavigableMap<Long, OutboundMessage> unconfirmedToSend = unconfirmed.headMap(deliveryTag, true);
Iterator<Map.Entry<Long, OutboundMessage>> iterator = unconfirmedToSend.entrySet().iterator();
while (iterator.hasNext()) {
subscriber.onNext(new OutboundMessageResult(iterator.next().getValue(), ack));
iterator.remove();
for (long i = lowerBound; i <= deliveryTag; i++) {
OutboundMessage message = unconfirmed.remove(i);
if (message != null) {
subscriber.onNext(new OutboundMessageResult(message, ack));
}
}
} catch (Exception e) {
handleError(e, null);
}
lowerBoundOfMultiple.compareAndSet(lowerBound, deliveryTag);
} else {
OutboundMessage outboundMessage = unconfirmed.get(deliveryTag);
OutboundMessage message = unconfirmed.remove(deliveryTag);
try {
unconfirmed.remove(deliveryTag);
subscriber.onNext(new OutboundMessageResult(outboundMessage, ack));
subscriber.onNext(new OutboundMessageResult(message, ack));
} catch (Exception e) {
handleError(e, new OutboundMessageResult(outboundMessage, ack));
handleError(e, new OutboundMessageResult(message, ack));
}
if (deliveryTag == lowerBound + 1) {
lowerBoundOfMultiple.compareAndSet(lowerBound, deliveryTag);
}
}
if (unconfirmed.size() == 0) {
Expand Down

0 comments on commit 9404592

Please sign in to comment.