diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java index ead5ccea51b2..f5af796b575d 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java @@ -30,6 +30,9 @@ import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.chaos.policies.Policy; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ReservoirSample; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -37,6 +40,7 @@ * Chaos monkey that given multiple policies will run actions against the cluster. */ public class PolicyBasedChaosMonkey extends ChaosMonkey { + private static final Logger LOG = LoggerFactory.getLogger(PolicyBasedChaosMonkey.class); private static final long ONE_SEC = 1000; private static final long ONE_MIN = 60 * ONE_SEC; @@ -116,13 +120,14 @@ public static T selectWeightedRandomItem(List> items) { /** Selects and returns ceil(ratio * items.length) random items from the given array */ public static List selectRandomItems(T[] items, float ratio) { - int selectedNumber = (int) Math.ceil(items.length * ratio); - - List originalItems = Arrays.asList(items); - Collections.shuffle(originalItems); - - int startIndex = ThreadLocalRandom.current().nextInt(items.length - selectedNumber); - return originalItems.subList(startIndex, startIndex + selectedNumber); + // clamp ratio to [0.0,1.0] + ratio = Math.max(Math.min(ratio, 1.0f), 0.0f); + final int selectedNumber = (int) Math.ceil(items.length * ratio); + final ReservoirSample sample = new ReservoirSample<>(selectedNumber); + sample.add(Arrays.stream(items)); + final List shuffledItems = sample.getSamplingResult(); + Collections.shuffle(shuffledItems); + return shuffledItems; } @Override @@ -151,7 +156,10 @@ public boolean isStopped() { @Override public void waitForStop() throws InterruptedException { - monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES); + if (!monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES)) { + LOG.warn("Some pool threads failed to terminate. Forcing. {}", monkeyThreadPool); + monkeyThreadPool.shutdownNow(); + } } @Override