diff --git a/common/src/main/java/org/astraea/common/metrics/stats/AvgRateByTime.java b/common/src/main/java/org/astraea/common/metrics/stats/AvgRateByTime.java new file mode 100644 index 0000000000..85200a6453 --- /dev/null +++ b/common/src/main/java/org/astraea/common/metrics/stats/AvgRateByTime.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics.stats; + +import java.time.Duration; + +public class AvgRateByTime implements Stat { + private Double accumulate = 0.0; + + private long count = 0; + + private final Debounce debounce; + + public AvgRateByTime(Duration period) { + this.debounce = Debounce.of(period); + } + + @Override + public synchronized void record(Double value) { + long current = System.currentTimeMillis(); + debounce + .record(value, current) + .ifPresent( + debouncedValue -> { + accumulate += debouncedValue; + ++count; + }); + } + + @Override + public synchronized Double measure() { + return accumulate / count; + } +} diff --git a/common/src/main/java/org/astraea/common/metrics/stats/Debounce.java b/common/src/main/java/org/astraea/common/metrics/stats/Debounce.java new file mode 100644 index 0000000000..c23a5c72bf --- /dev/null +++ b/common/src/main/java/org/astraea/common/metrics/stats/Debounce.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics.stats; + +import java.time.Duration; +import java.util.Optional; + +/** + * Not all data should be used in statistic. Gauge like "topic size" is a time related data, many + * records recorded at the same time should be considered as one record. + */ +public interface Debounce { + Optional record(V value, long timestamp); + + static Debounce of(Duration duration) { + return new Debounce<>() { + private long lastTimestamp = -1; + + @Override + public Optional record(V value, long timestamp) { + if (lastTimestamp != -1 && timestamp < lastTimestamp + duration.toMillis()) { + return Optional.empty(); + } + lastTimestamp = timestamp; + return Optional.of(value); + } + }; + } +} diff --git a/common/src/main/java/org/astraea/common/metrics/stats/RateByTime.java b/common/src/main/java/org/astraea/common/metrics/stats/RateByTime.java new file mode 100644 index 0000000000..e23f69c7ef --- /dev/null +++ b/common/src/main/java/org/astraea/common/metrics/stats/RateByTime.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics.stats; + +import java.time.Duration; + +/** Calculate the difference of the latest two ranged data. */ +public class RateByTime implements Stat { + + private final Double[] oldValue = new Double[2]; + + private final Debounce debounce; + + public RateByTime(Duration period) { + this.debounce = Debounce.of(period); + } + + @Override + public synchronized void record(Double value) { + long current = System.currentTimeMillis(); + // Update when a new record occurred + debounce + .record(value, current) + .ifPresent( + debouncedValue -> { + oldValue[0] = oldValue[1]; + oldValue[1] = debouncedValue; + }); + } + + @Override + public synchronized Double measure() { + return oldValue[1] - oldValue[0]; + } +} diff --git a/common/src/test/java/org/astraea/common/metrics/stats/DebounceTest.java b/common/src/test/java/org/astraea/common/metrics/stats/DebounceTest.java new file mode 100644 index 0000000000..09a2c7cbe2 --- /dev/null +++ b/common/src/test/java/org/astraea/common/metrics/stats/DebounceTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics.stats; + +import java.time.Duration; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DebounceTest { + @Test + void testRecord() { + Debounce debounce = Debounce.of(Duration.ofMillis(500)); + + Assertions.assertEquals(Optional.of(20.0), debounce.record(20.0, 100)); + Assertions.assertEquals(Optional.empty(), debounce.record(21.0, 110)); + Assertions.assertEquals(Optional.of(60.0), debounce.record(60.0, 601)); + } +} diff --git a/common/src/test/java/org/astraea/common/metrics/stats/RateByTimeTest.java b/common/src/test/java/org/astraea/common/metrics/stats/RateByTimeTest.java new file mode 100644 index 0000000000..13eaa8e1da --- /dev/null +++ b/common/src/test/java/org/astraea/common/metrics/stats/RateByTimeTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics.stats; + +import java.time.Duration; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class RateByTimeTest { + @Test + void testMeasure() throws InterruptedException { + var rateByTime = new RateByTime(Duration.ofSeconds(1)); + rateByTime.record(10.0); + rateByTime.record(10.0); + Thread.sleep(1000); + rateByTime.record(50.0); + + Assertions.assertEquals(40.0, rateByTime.measure()); + + rateByTime.record(50.0); + + Assertions.assertEquals(40.0, rateByTime.measure()); + } +}