From cf7401d2d0e6d639e55d6b6590e6c4065a4f21f9 Mon Sep 17 00:00:00 2001 From: Tim Meehan Date: Thu, 18 Feb 2021 09:54:33 -0800 Subject: [PATCH] Fix decay for quantile digests which are more frequently read than modified --- .../airlift/stats/QuantileDigest.java | 32 +++++++++++++++++++ .../airlift/stats/TestQuantileDigest.java | 25 +++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/stats/src/main/java/com/facebook/airlift/stats/QuantileDigest.java b/stats/src/main/java/com/facebook/airlift/stats/QuantileDigest.java index a26b838dd2..3c6c9fc352 100644 --- a/stats/src/main/java/com/facebook/airlift/stats/QuantileDigest.java +++ b/stats/src/main/java/com/facebook/airlift/stats/QuantileDigest.java @@ -328,6 +328,14 @@ public void scale(double scaleFactor) */ public List getQuantilesLowerBound(List quantiles) { + if (alpha > 0.0) { + long nowInSeconds = TimeUnit.NANOSECONDS.toSeconds(ticker.read()); + if (nowInSeconds - landmarkInSeconds >= RESCALE_THRESHOLD_SECONDS) { + rescale(nowInSeconds); + compress(); // rescale affects weights globally, so force compression + } + } + checkArgument(Ordering.natural().isOrdered(quantiles), "quantiles must be sorted in increasing order"); for (double quantile : quantiles) { checkArgument(quantile >= 0 && quantile <= 1, "quantile must be between [0,1]"); @@ -379,6 +387,14 @@ public boolean process(int node) */ public List getQuantilesUpperBound(List quantiles) { + if (alpha > 0.0) { + long nowInSeconds = TimeUnit.NANOSECONDS.toSeconds(ticker.read()); + if (nowInSeconds - landmarkInSeconds >= RESCALE_THRESHOLD_SECONDS) { + rescale(nowInSeconds); + compress(); // rescale affects weights globally, so force compression + } + } + checkArgument(Ordering.natural().isOrdered(quantiles), "quantiles must be sorted in increasing order"); for (double quantile : quantiles) { checkArgument(quantile >= 0 && quantile <= 1, "quantile must be between [0,1]"); @@ -624,7 +640,14 @@ void compress() { double bound = Math.floor(weightedCount / calculateCompressionFactor()); + AtomicLong max = new AtomicLong(Integer.MIN_VALUE); + AtomicLong min = new AtomicLong(Integer.MAX_VALUE); postOrderTraversal(root, node -> { + // If decay is enabled and this node has a non-zero weight, determine if the value is the new max or min + if (counts[node] >= ZERO_WEIGHT_THRESHOLD && alpha > 0.0) { + max.accumulateAndGet(upperBound(node), Math::max); + min.accumulateAndGet(lowerBound(node), Math::min); + } // if children's weights are 0 remove them and shift the weight to their parent int left = lefts[node]; int right = rights[node]; @@ -655,6 +678,15 @@ void compress() // root's count may have decayed to ~0 if (root != -1 && counts[root] < ZERO_WEIGHT_THRESHOLD) { root = tryRemove(root); + if (root < 0) { + this.min = Long.MAX_VALUE; + this.max = Long.MIN_VALUE; + } + // If decay is being used, the min and max values may have decayed as well + else if (alpha > 0) { + this.min = min.get(); + this.max = max.get(); + } } } diff --git a/stats/src/test/java/com/facebook/airlift/stats/TestQuantileDigest.java b/stats/src/test/java/com/facebook/airlift/stats/TestQuantileDigest.java index 3834d72c90..5d01718f0c 100644 --- a/stats/src/test/java/com/facebook/airlift/stats/TestQuantileDigest.java +++ b/stats/src/test/java/com/facebook/airlift/stats/TestQuantileDigest.java @@ -433,6 +433,31 @@ public void testDecayedCountsWithClockIncrementSmallerThanRescaleThreshold() assertEquals(digest.getCount(), 15.0); } + @Test + public void testDecayedQuantilesWithNoMergeOrAdd() + throws Exception + { + TestingTicker ticker = new TestingTicker(); + QuantileDigest digest = new QuantileDigest(0.01, ExponentialDecay.computeAlpha(0.5, 60), ticker); + + addAll(digest, asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)); + + assertEquals(digest.getConfidenceFactor(), 0.0); + assertEquals(digest.getQuantile(0.5), 5); + + // Decay the digest + ticker.increment(60, TimeUnit.SECONDS); + assertEquals(digest.getQuantile(0.5), 5); + + // Allow further decay + ticker.increment(6, TimeUnit.MINUTES); + assertEquals(digest.getQuantile(0.5), 5); + + // Values have decayed to 0 + ticker.increment(60, TimeUnit.MINUTES); + assertEquals(digest.getQuantile(0.5), Long.MIN_VALUE); // Default value for empty digests + } + @Test public void testMinMax() throws Exception