Skip to content

Commit

Permalink
[improve][broker] Use shrink map for message redelivery. (apache#15342)
Browse files Browse the repository at this point in the history
(cherry picked from commit 615f05a)
(cherry picked from commit b4762d1)
  • Loading branch information
Technoboy- authored and nicoloboschi committed May 4, 2022
1 parent b300020 commit 8e05428
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
import org.apache.pulsar.common.util.collections.LongPairSet;

Expand All @@ -37,7 +37,10 @@ public class MessageRedeliveryController {

public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
this.hashesToBeBlocked = allowOutOfOrderDelivery
? null
: ConcurrentLongLongPairHashMap
.newBuilder().concurrencyLevel(2).expectedItems(128).autoShrink(true).build();
}

public boolean add(long ledgerId, long entryId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.Set;
import java.util.TreeSet;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.pulsar.common.util.collections.LongPairSet;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down

0 comments on commit 8e05428

Please sign in to comment.