diff --git a/src/main/java/reactor/rabbitmq/Sender.java b/src/main/java/reactor/rabbitmq/Sender.java index 5af3394..b3ce4af 100644 --- a/src/main/java/reactor/rabbitmq/Sender.java +++ b/src/main/java/reactor/rabbitmq/Sender.java @@ -35,11 +35,9 @@ import java.util.Iterator; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Function; @@ -526,7 +524,10 @@ private static class PublishConfirmSubscriber implements private final AtomicReference firstException = new AtomicReference(); - private final ConcurrentNavigableMap unconfirmed = new ConcurrentSkipListMap<>(); +// private final ConcurrentNavigableMap unconfirmed = new ConcurrentSkipListMap<>(); + + private final Map unconfirmed = new ConcurrentHashMap<>(); + private final AtomicLong lowerBoundOfMultiple = new AtomicLong(1); private final Channel channel; @@ -555,24 +556,48 @@ public void handleNack(long deliveryTag, boolean multiple) { } private void handleAckNack(long deliveryTag, boolean multiple, boolean ack) { +// if (multiple) { +// try { +// ConcurrentNavigableMap unconfirmedToSend = unconfirmed.headMap(deliveryTag, true); +// Iterator> iterator = unconfirmedToSend.entrySet().iterator(); +// while (iterator.hasNext()) { +// subscriber.onNext(new OutboundMessageResult(iterator.next().getValue(), ack)); +// iterator.remove(); +// } +// } catch (Exception e) { +// handleError(e, null); +// } +// } else { +// OutboundMessage outboundMessage = unconfirmed.get(deliveryTag); +// try { +// unconfirmed.remove(deliveryTag); +// subscriber.onNext(new OutboundMessageResult(outboundMessage, ack)); +// } catch (Exception e) { +// handleError(e, new OutboundMessageResult(outboundMessage, ack)); +// } +// } + Long lowerBound = lowerBoundOfMultiple.get(); if (multiple) { try { - ConcurrentNavigableMap unconfirmedToSend = unconfirmed.headMap(deliveryTag, true); - Iterator> iterator = unconfirmedToSend.entrySet().iterator(); - while (iterator.hasNext()) { - subscriber.onNext(new OutboundMessageResult(iterator.next().getValue(), ack)); - iterator.remove(); + for (long i = lowerBound; i <= deliveryTag; i++) { + OutboundMessage message = unconfirmed.remove(i); + if (message != null) { + subscriber.onNext(new OutboundMessageResult(message, ack)); + } } } catch (Exception e) { handleError(e, null); } + lowerBoundOfMultiple.compareAndSet(lowerBound, deliveryTag); } else { - OutboundMessage outboundMessage = unconfirmed.get(deliveryTag); + OutboundMessage message = unconfirmed.remove(deliveryTag); try { - unconfirmed.remove(deliveryTag); - subscriber.onNext(new OutboundMessageResult(outboundMessage, ack)); + subscriber.onNext(new OutboundMessageResult(message, ack)); } catch (Exception e) { - handleError(e, new OutboundMessageResult(outboundMessage, ack)); + handleError(e, new OutboundMessageResult(message, ack)); + } + if (deliveryTag == lowerBound + 1) { + lowerBoundOfMultiple.compareAndSet(lowerBound, deliveryTag); } } if (unconfirmed.size() == 0) {