Skip to content

Commit

Permalink
Statistic on gauge (#1024)
Browse files Browse the repository at this point in the history
  • Loading branch information
chinghongfang authored Nov 2, 2022
1 parent 6dfc864 commit e7c6679
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics.stats;

import java.time.Duration;

public class AvgRateByTime implements Stat<Double> {
private Double accumulate = 0.0;

private long count = 0;

private final Debounce<Double> debounce;

public AvgRateByTime(Duration period) {
this.debounce = Debounce.of(period);
}

@Override
public synchronized void record(Double value) {
long current = System.currentTimeMillis();
debounce
.record(value, current)
.ifPresent(
debouncedValue -> {
accumulate += debouncedValue;
++count;
});
}

@Override
public synchronized Double measure() {
return accumulate / count;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics.stats;

import java.time.Duration;
import java.util.Optional;

/**
* Not all data should be used in statistic. Gauge like "topic size" is a time related data, many
* records recorded at the same time should be considered as one record.
*/
public interface Debounce<V> {
Optional<V> record(V value, long timestamp);

static <V> Debounce<V> of(Duration duration) {
return new Debounce<>() {
private long lastTimestamp = -1;

@Override
public Optional<V> record(V value, long timestamp) {
if (lastTimestamp != -1 && timestamp < lastTimestamp + duration.toMillis()) {
return Optional.empty();
}
lastTimestamp = timestamp;
return Optional.of(value);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics.stats;

import java.time.Duration;

/** Calculate the difference of the latest two ranged data. */
public class RateByTime implements Stat<Double> {

private final Double[] oldValue = new Double[2];

private final Debounce<Double> debounce;

public RateByTime(Duration period) {
this.debounce = Debounce.of(period);
}

@Override
public synchronized void record(Double value) {
long current = System.currentTimeMillis();
// Update when a new record occurred
debounce
.record(value, current)
.ifPresent(
debouncedValue -> {
oldValue[0] = oldValue[1];
oldValue[1] = debouncedValue;
});
}

@Override
public synchronized Double measure() {
return oldValue[1] - oldValue[0];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics.stats;

import java.time.Duration;
import java.util.Optional;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class DebounceTest {
@Test
void testRecord() {
Debounce<Double> debounce = Debounce.of(Duration.ofMillis(500));

Assertions.assertEquals(Optional.of(20.0), debounce.record(20.0, 100));
Assertions.assertEquals(Optional.empty(), debounce.record(21.0, 110));
Assertions.assertEquals(Optional.of(60.0), debounce.record(60.0, 601));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics.stats;

import java.time.Duration;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class RateByTimeTest {
@Test
void testMeasure() throws InterruptedException {
var rateByTime = new RateByTime(Duration.ofSeconds(1));
rateByTime.record(10.0);
rateByTime.record(10.0);
Thread.sleep(1000);
rateByTime.record(50.0);

Assertions.assertEquals(40.0, rateByTime.measure());

rateByTime.record(50.0);

Assertions.assertEquals(40.0, rateByTime.measure());
}
}

0 comments on commit e7c6679

Please sign in to comment.