Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Ravindra Dingankar committed Mar 16, 2023
1 parent 056a473 commit efc6016
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public synchronized MutableQuantiles newQuantiles(String name, String desc,
public synchronized MutableQuantiles newQuantiles(String name, String desc, String sampleName,
String valueName, int interval, boolean inverseQuantiles) {
MutableQuantiles ret = newQuantiles(name, desc, sampleName, valueName, interval);
ret.inverseQuantiles = inverseQuantiles;
ret.setInverseQuantiles(inverseQuantiles);
return ret;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.util.InverseQuantiles;
import org.apache.hadoop.metrics2.util.Quantile;
import org.apache.hadoop.metrics2.util.QuantileEstimator;
import org.apache.hadoop.metrics2.util.SampleQuantiles;
Expand All @@ -52,7 +53,7 @@ public class MutableQuantiles extends MutableMetric {
new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };

protected boolean inverseQuantiles = false;
private boolean inverseQuantiles = false;
private final MetricsInfo numInfo;
private final MetricsInfo[] quantileInfos;
private final int interval;
Expand Down Expand Up @@ -105,13 +106,16 @@ public MutableQuantiles(String name, String description, String sampleName,
String.format(descTemplate, percentile));
}

estimator = new SampleQuantiles(quantiles, inverseQuantiles);

estimator = inverseQuantiles ? new InverseQuantiles(quantiles) : new SampleQuantiles(quantiles);
this.interval = interval;
scheduledTask = scheduler.scheduleWithFixedDelay(new RolloverSample(this),
interval, interval, TimeUnit.SECONDS);
}

void setInverseQuantiles(final boolean inverseQuantiles) {
this.inverseQuantiles = inverseQuantiles;
}

@Override
public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
if (all || changed()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.apache.hadoop.metrics2.util;

import org.apache.hadoop.util.Preconditions;
import java.util.ListIterator;

public class InverseQuantiles extends SampleQuantiles{

public InverseQuantiles(Quantile[] quantiles) {
super(quantiles);
}


/**
* Get the estimated value at the inverse of the specified quantile.
* Eg: return the value at (1 - 0.99)*count position for quantile 0.99.
* When count is 100, quantile 0.99 is desired to return the value at the 1st position
*
* @param quantile Queried quantile, e.g. 0.50 or 0.99.
* @return Estimated value at the inverse position of that quantile.
*/
long query(double quantile) {
Preconditions.checkState(!samples.isEmpty(), "no data in estimator");

int rankMin = 0;
int desired = (int) ((1 - quantile) * count);

ListIterator<SampleItem> it = samples.listIterator();
SampleItem prev;
SampleItem cur = it.next();
for (int i = 1; i < samples.size(); i++) {
prev = cur;
cur = it.next();

rankMin += prev.g;

if (rankMin + cur.g + cur.delta > desired + (allowableError(i) / 2)) {
return prev.value;
}
}

// edge case of wanting max value
return samples.get(samples.size() - 1).value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ public class SampleQuantiles implements QuantileEstimator {
/**
* Total number of items in stream
*/
private long count = 0;
long count = 0;

/**
* Current list of sampled items, maintained in sorted order with error bounds
*/
private LinkedList<SampleItem> samples;
LinkedList<SampleItem> samples;

/**
* Buffers incoming items to be inserted in batch. Items are inserted into
Expand All @@ -73,9 +73,8 @@ public class SampleQuantiles implements QuantileEstimator {
private final Quantile quantiles[];
private boolean inverseQuantiles;

public SampleQuantiles(Quantile[] quantiles, boolean inverseQuantiles) {
public SampleQuantiles(Quantile[] quantiles) {
this.quantiles = quantiles;
this. inverseQuantiles = inverseQuantiles;
this.samples = new LinkedList<SampleItem>();
}

Expand All @@ -89,7 +88,7 @@ public SampleQuantiles(Quantile[] quantiles, boolean inverseQuantiles) {
* @param rank
* the index in the list of samples
*/
private double allowableError(int rank) {
double allowableError(int rank) {
int size = samples.size();
double minError = size + 1;
for (Quantile q : quantiles) {
Expand Down Expand Up @@ -202,10 +201,10 @@ private void compress() {
/**
* Get the estimated value at the specified quantile.
*
* @param quantile Queried quantile, e.g. 0.01, 0.50 or 0.99.
* @param quantile Queried quantile, e.g. 0.50 or 0.99.
* @return Estimated value at that quantile.
*/
private long query(double quantile) {
long query(double quantile) {
Preconditions.checkState(!samples.isEmpty(), "no data in estimator");

int rankMin = 0;
Expand Down Expand Up @@ -245,12 +244,7 @@ synchronized public Map<Quantile, Long> snapshot() {

Map<Quantile, Long> values = new TreeMap<Quantile, Long>();
for (int i = 0; i < quantiles.length; i++) {
/* eg : effectiveQuantile for 0.99 with inverseQuantiles will be 0.01.
For inverse quantiles higher numeric value is better and hence we want
to query from the opposite end of the sorted sample
*/
double effectiveQuantile = inverseQuantiles ? 1 - quantiles[i].quantile : quantiles[i].quantile;
values.put(quantiles[i], query(effectiveQuantile));
values.put(quantiles[i], query(quantiles[i].quantile));
}

return values;
Expand Down Expand Up @@ -298,7 +292,7 @@ synchronized public String toString() {
* Describes a measured value passed to the estimator, tracking additional
* metadata required by the CKMS algorithm.
*/
private static class SampleItem {
static class SampleItem {

/**
* Value of the sampled item (e.g. a measured latency value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class TestSampleQuantiles {

@Before
public void init() {
estimator = new SampleQuantiles(quantiles, false);
estimator = new SampleQuantiles(quantiles);
}

/**
Expand Down Expand Up @@ -121,7 +121,7 @@ public void testQuantileError() throws IOException {

@Test
public void testInverseQuantiles() {
SampleQuantiles estimatorWithInverseQuantiles = new SampleQuantiles(quantiles, true);
SampleQuantiles estimatorWithInverseQuantiles = new InverseQuantiles(quantiles);
final int count = 100000;
Random r = new Random(0xDEADDEAD);
Long[] values = new Long[count];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class TestMultiThreadedHflush {
new Quantile[] {
new Quantile(0.50, 0.050),
new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) }, false);
new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) });

/*
* creates a file but does not close it
Expand Down

0 comments on commit efc6016

Please sign in to comment.