From 168a201024bf6c230b60303871ef5b23f4c6d527 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Tue, 10 Jan 2023 21:14:33 +0800 Subject: [PATCH 1/6] Upload draft --- .../common/metrics/MetricSeriesBuilder.java | 111 ++++++++++++++++++ .../astraea/common/cost/NetworkCostTest.java | 71 ++++++----- 2 files changed, 145 insertions(+), 37 deletions(-) create mode 100644 common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java diff --git a/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java b/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java new file mode 100644 index 0000000000..be83ed20c6 --- /dev/null +++ b/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Stream; +import org.astraea.common.admin.ClusterBean; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.Partition; +import org.astraea.common.admin.Replica; +import org.astraea.common.admin.TopicPartition; +import org.astraea.common.metrics.broker.LogMetrics; +import org.astraea.common.metrics.broker.ServerMetrics; + +/** + * A utility for generating a series of metric objects, where the measured metric value might be + * highly correlated to specific variables. For example broker id, calendar time, or unknown noise. + * This class offers a way to construct large-scale metric sources of a fake cluster, which will be + * useful for testing and experiment purposes. + */ +public interface MetricSeriesBuilder { + + static MetricSeriesBuilder of() { + throw new UnsupportedOperationException(); + } + + MetricSeriesBuilder sourceCluster(ClusterInfo clusterInfo); + + MetricSeriesBuilder timeRange(LocalDateTime firstMetricTime, Duration duration); + + MetricSeriesBuilder sampleInterval(Duration interval); + + MetricSeriesBuilder series( + BiFunction> seriesGenerator); + + ClusterBean build(); + + final class MetricGenerator { + + private final ClusterInfo clusterInfo; + private final int broker; + private final LocalDateTime time; + + public MetricGenerator(ClusterInfo clusterInfo, int broker, LocalDateTime time) { + this.clusterInfo = clusterInfo; + this.broker = broker; + this.time = time; + } + + public LocalDateTime now() { + return time; + } + + public Stream perOnlinePartitionLeader( + Function mapper) { + throw new UnsupportedOperationException(); + } + + public Stream perTopic(Function mapper) { + throw new UnsupportedOperationException(); + } + + public Stream perPartition(Function mapper) { + throw new UnsupportedOperationException(); + } + + public Stream perReplica(Function mapper) { + throw new UnsupportedOperationException(); + } + + public ServerMetrics.Topic.Meter topic( + ServerMetrics.Topic metric, String topic, Map attributes) { + var domainName = ServerMetrics.DOMAIN_NAME; + var properties = + Map.of("type", "BrokerTopicMetric", "topic", topic, "name", metric.metricName()); + return new ServerMetrics.Topic.Meter( + new BeanObject(domainName, properties, attributes, time.toEpochSecond(ZoneOffset.UTC))); + } + + public LogMetrics.Log.Gauge logSize(TopicPartition topicPartition, long size) { + var domainName = LogMetrics.DOMAIN_NAME; + var properties = + Map.of( + "type", "BrokerTopicMetric", + "topic", topicPartition.topic(), + "partition", String.valueOf(topicPartition.partition()), + "name", LogMetrics.Log.SIZE.metricName()); + var attributes = Map.of("value", size); + return new LogMetrics.Log.Gauge(new BeanObject(domainName, properties, attributes)); + } + } +} diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index f3cd301371..079a9398c4 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -17,6 +17,7 @@ package org.astraea.common.cost; import java.time.Duration; +import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.Random; @@ -39,6 +40,7 @@ import org.astraea.common.balancer.algorithms.AlgorithmConfig; import org.astraea.common.balancer.tweakers.ShuffleTweaker; import org.astraea.common.metrics.BeanObject; +import org.astraea.common.metrics.MetricSeriesBuilder; import org.astraea.common.metrics.broker.LogMetrics; import org.astraea.common.metrics.broker.ServerMetrics; import org.junit.jupiter.api.Assertions; @@ -602,44 +604,39 @@ public LargeTestCase(int brokers, int partitions, int seed) { Collectors.toUnmodifiableMap( p -> TopicPartition.of("Pipeline", p), p -> random.nextInt(10))); this.clusterBean = - ClusterBean.of( - IntStream.range(0, brokers) - .boxed() - .collect( - Collectors.toUnmodifiableMap( - id -> id, - id -> - List.of( - noise(random.nextInt()), - noise(random.nextInt()), - noise(random.nextInt()), - noise(random.nextInt()), - bandwidth( + MetricSeriesBuilder.of() + .timeRange(LocalDateTime.now(), Duration.ofSeconds(10)) + .sampleInterval(Duration.ofSeconds(2)) + .series( + (Gen, broker) -> + Gen.perOnlinePartitionLeader( + r -> + Gen.topic( + ServerMetrics.Topic.BYTES_IN_PER_SEC, + r.topic(), + Map.of("FifteenMinuteRate", rate.get(r.topicPartition()))))) + .series( + (Gen, broker) -> + Gen.perOnlinePartitionLeader( + r -> + Gen.topic( + ServerMetrics.Topic.BYTES_OUT_PER_SEC, + r.topic(), + Map.of( + "FifteenMinuteRate", + rate.get(r.topicPartition()) + * consumerFanout.get(r.topicPartition()))))) + .series( + (Gen, broker) -> + IntStream.range(0, 10) + .mapToObj( + i -> + Gen.topic( ServerMetrics.Topic.BYTES_IN_PER_SEC, - "Pipeline", - clusterInfo - .replicaStream() - .filter(r -> r.nodeInfo().id() == id) - .filter(Replica::isLeader) - .filter(Replica::isOnline) - .mapToLong(r -> rate.get(r.topicPartition())) - .sum()), - noise(random.nextInt()), - noise(random.nextInt()), - bandwidth( - ServerMetrics.Topic.BYTES_OUT_PER_SEC, - "Pipeline", - clusterInfo - .replicaStream() - .filter(r -> r.nodeInfo().id() == id) - .filter(Replica::isLeader) - .filter(Replica::isOnline) - .mapToLong( - r -> - rate.get(r.topicPartition()) - * consumerFanout.get(r.topicPartition())) - .sum()), - noise(random.nextInt()))))); + "Noise_" + i, + Map.of()))) + .series((Gen, broker) -> Gen.perReplica(r -> Gen.logSize(r.topicPartition(), 0))) + .build(); } @Override From c892e5f1ad3838fdcedf04be2b5b1f220fb7bb5d Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Wed, 11 Jan 2023 23:14:34 +0800 Subject: [PATCH 2/6] Fill in implementation --- .../common/metrics/MetricSeriesBuilder.java | 119 +++++++++++-- .../astraea/common/cost/NetworkCostTest.java | 47 +++-- .../metrics/MetricSeriesBuilderTest.java | 164 ++++++++++++++++++ 3 files changed, 295 insertions(+), 35 deletions(-) create mode 100644 common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java diff --git a/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java b/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java index be83ed20c6..95720f09f2 100644 --- a/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java +++ b/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java @@ -19,13 +19,19 @@ import java.time.Duration; import java.time.LocalDateTime; import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.astraea.common.admin.ClusterBean; import org.astraea.common.admin.ClusterInfo; -import org.astraea.common.admin.Partition; +import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; import org.astraea.common.admin.TopicPartition; import org.astraea.common.metrics.broker.LogMetrics; @@ -39,11 +45,11 @@ */ public interface MetricSeriesBuilder { - static MetricSeriesBuilder of() { - throw new UnsupportedOperationException(); + static MetricSeriesBuilder builder() { + return new MetricSeriesBuilderImpl(); } - MetricSeriesBuilder sourceCluster(ClusterInfo clusterInfo); + MetricSeriesBuilder cluster(ClusterInfo clusterInfo); MetricSeriesBuilder timeRange(LocalDateTime firstMetricTime, Duration duration); @@ -54,15 +60,85 @@ MetricSeriesBuilder series( ClusterBean build(); + final class MetricSeriesBuilderImpl implements MetricSeriesBuilder { + + private final List>>> series = + new ArrayList<>(); + + private ClusterInfo clusterInfo; + private LocalDateTime timeStart; + private Duration timeRange; + private Duration sampleInterval = Duration.ofSeconds(1); + + @Override + public MetricSeriesBuilder cluster(ClusterInfo clusterInfo) { + this.clusterInfo = Objects.requireNonNull(clusterInfo); + return this; + } + + @Override + public MetricSeriesBuilder timeRange(LocalDateTime firstMetricTime, Duration duration) { + this.timeStart = firstMetricTime; + this.timeRange = duration; + return this; + } + + @Override + public MetricSeriesBuilder sampleInterval(Duration interval) { + if (interval.isNegative() || interval.isZero()) + throw new IllegalArgumentException("The sample interval must be positive"); + this.sampleInterval = interval; + return this; + } + + @Override + public MetricSeriesBuilder series( + BiFunction> seriesGenerator) { + // the series state is decided at the call time, instead of build time. + final var cluster = clusterInfo; + final var start = timeStart; + final var end = timeStart.plus(timeRange); + final var interval = sampleInterval; + this.series.add( + () -> + Stream.iterate( + start, (t) -> t.isBefore(end) || t.isEqual(end), (t) -> t.plus(interval)) + .flatMap( + time -> + cluster.nodes().stream() + .map(node -> new MetricGenerator(cluster, node, time))) + .collect( + Collectors.toUnmodifiableMap( + gen -> gen.node().id(), + gen -> seriesGenerator.apply(gen, gen.node().id()), + Stream::concat))); + return this; + } + + @Override + public ClusterBean build() { + Map> allMetrics = + this.series.stream() + .map(Supplier::get) + .flatMap(metrics -> metrics.entrySet().stream()) + .collect( + Collectors.groupingBy( + Map.Entry::getKey, + Collectors.flatMapping( + Map.Entry::getValue, Collectors.toCollection(ArrayList::new)))); + return ClusterBean.of(allMetrics); + } + } + final class MetricGenerator { private final ClusterInfo clusterInfo; - private final int broker; + private final NodeInfo node; private final LocalDateTime time; - public MetricGenerator(ClusterInfo clusterInfo, int broker, LocalDateTime time) { + MetricGenerator(ClusterInfo clusterInfo, NodeInfo node, LocalDateTime time) { this.clusterInfo = clusterInfo; - this.broker = broker; + this.node = node; this.time = time; } @@ -70,21 +146,25 @@ public LocalDateTime now() { return time; } - public Stream perOnlinePartitionLeader( - Function mapper) { - throw new UnsupportedOperationException(); + public ClusterInfo cluster() { + return clusterInfo; + } + + public NodeInfo node() { + return node; } - public Stream perTopic(Function mapper) { - throw new UnsupportedOperationException(); + public Stream perBrokerTopic(Function mapper) { + return clusterInfo.replicaStream(node.id()).map(Replica::topic).distinct().map(mapper); } - public Stream perPartition(Function mapper) { - throw new UnsupportedOperationException(); + public Stream perBrokerPartition( + Function mapper) { + return clusterInfo.replicaStream(node.id()).map(Replica::topicPartition).map(mapper); } - public Stream perReplica(Function mapper) { - throw new UnsupportedOperationException(); + public Stream perBrokerReplica(Function mapper) { + return clusterInfo.replicaStream(node.id()).map(mapper); } public ServerMetrics.Topic.Meter topic( @@ -100,12 +180,13 @@ public LogMetrics.Log.Gauge logSize(TopicPartition topicPartition, long size) { var domainName = LogMetrics.DOMAIN_NAME; var properties = Map.of( - "type", "BrokerTopicMetric", + "type", LogMetrics.LOG_TYPE, "topic", topicPartition.topic(), "partition", String.valueOf(topicPartition.partition()), "name", LogMetrics.Log.SIZE.metricName()); - var attributes = Map.of("value", size); - return new LogMetrics.Log.Gauge(new BeanObject(domainName, properties, attributes)); + var attributes = Map.of("Value", size); + return new LogMetrics.Log.Gauge( + new BeanObject(domainName, properties, attributes, time.toEpochSecond(ZoneOffset.UTC))); } } } diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index 079a9398c4..8d941e24a3 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -31,6 +31,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import org.astraea.common.DataRate; +import org.astraea.common.admin.BrokerTopic; import org.astraea.common.admin.ClusterBean; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.ClusterInfoBuilder; @@ -604,38 +605,52 @@ public LargeTestCase(int brokers, int partitions, int seed) { Collectors.toUnmodifiableMap( p -> TopicPartition.of("Pipeline", p), p -> random.nextInt(10))); this.clusterBean = - MetricSeriesBuilder.of() - .timeRange(LocalDateTime.now(), Duration.ofSeconds(10)) - .sampleInterval(Duration.ofSeconds(2)) + MetricSeriesBuilder.builder() + .cluster(clusterInfo) + .timeRange(LocalDateTime.now(), Duration.ZERO) + .sampleInterval(Duration.ofSeconds(1)) .series( (Gen, broker) -> - Gen.perOnlinePartitionLeader( - r -> + Gen.perBrokerTopic( + topic -> Gen.topic( ServerMetrics.Topic.BYTES_IN_PER_SEC, - r.topic(), - Map.of("FifteenMinuteRate", rate.get(r.topicPartition()))))) + topic, + Map.of( + "FifteenMinuteRate", + clusterInfo + .replicaStream(BrokerTopic.of(broker, topic)) + .filter(Replica::isLeader) + .filter(Replica::isOnline) + .mapToDouble(r -> rate.get(r.topicPartition())) + .sum())))) .series( (Gen, broker) -> - Gen.perOnlinePartitionLeader( - r -> + Gen.perBrokerTopic( + topic -> Gen.topic( ServerMetrics.Topic.BYTES_OUT_PER_SEC, - r.topic(), + topic, Map.of( "FifteenMinuteRate", - rate.get(r.topicPartition()) - * consumerFanout.get(r.topicPartition()))))) + clusterInfo + .replicaStream(BrokerTopic.of(broker, topic)) + .filter(Replica::isLeader) + .filter(Replica::isOnline) + .mapToDouble( + r -> + rate.get(r.topicPartition()) + * consumerFanout.get(r.topicPartition())) + .sum())))) .series( (Gen, broker) -> IntStream.range(0, 10) .mapToObj( i -> Gen.topic( - ServerMetrics.Topic.BYTES_IN_PER_SEC, + ServerMetrics.Topic.TOTAL_FETCH_REQUESTS_PER_SEC, "Noise_" + i, Map.of()))) - .series((Gen, broker) -> Gen.perReplica(r -> Gen.logSize(r.topicPartition(), 0))) .build(); } @@ -674,11 +689,11 @@ static LogMetrics.Log.Gauge logSize(TopicPartition topicPartition, long size) { var domainName = LogMetrics.DOMAIN_NAME; var properties = Map.of( - "type", "BrokerTopicMetric", + "type", "Log", "topic", topicPartition.topic(), "partition", String.valueOf(topicPartition.partition()), "name", LogMetrics.Log.SIZE.metricName()); - var attributes = Map.of("value", size); + var attributes = Map.of("Value", size); return new LogMetrics.Log.Gauge(new BeanObject(domainName, properties, attributes)); } } diff --git a/common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java b/common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java new file mode 100644 index 0000000000..b23727c3b5 --- /dev/null +++ b/common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.metrics; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Stream; +import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.ClusterInfoBuilder; +import org.astraea.common.admin.Replica; +import org.astraea.common.admin.TopicPartition; +import org.astraea.common.metrics.broker.LogMetrics; +import org.astraea.common.metrics.broker.ServerMetrics; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +class MetricSeriesBuilderTest { + + private static final ClusterInfo cluster = + ClusterInfoBuilder.builder() + .addNode(Set.of(1, 2, 3)) + .addFolders( + Map.ofEntries( + Map.entry(1, Set.of("/folder0", "/folder1", "/folder2")), + Map.entry(2, Set.of("/folder0", "/folder1", "/folder2")), + Map.entry(3, Set.of("/folder0", "/folder1", "/folder2")))) + .addTopic("topicA", 100, (short) 2) + .build(); + + @Test + void example() { + { + // sample 10 second, 2 second interval + var beans = + MetricSeriesBuilder.builder() + .cluster(cluster) + .timeRange(LocalDateTime.now(), Duration.ofSeconds(10)) + .sampleInterval(Duration.ofSeconds(2)) + .series((Gen, broker) -> Stream.of(() -> null)) + .build(); + Assertions.assertEquals(Set.of(1, 2, 3), beans.all().keySet()); + Assertions.assertEquals(6, beans.all().get(1).size()); + Assertions.assertEquals(6, beans.all().get(2).size()); + Assertions.assertEquals(6, beans.all().get(3).size()); + } + { + // sample 10 second, 4 second interval + var beans = + MetricSeriesBuilder.builder() + .cluster(cluster) + .timeRange(LocalDateTime.now(), Duration.ofSeconds(10)) + .sampleInterval(Duration.ofSeconds(4)) + .series((Gen, broker) -> Stream.of(() -> null)) + .build(); + Assertions.assertEquals(Set.of(1, 2, 3), beans.all().keySet()); + Assertions.assertEquals(3, beans.all().get(1).size()); + Assertions.assertEquals(3, beans.all().get(2).size()); + Assertions.assertEquals(3, beans.all().get(3).size()); + } + { + // zero duration, sample just once + var beans = + MetricSeriesBuilder.builder() + .cluster(cluster) + .timeRange(LocalDateTime.now(), Duration.ZERO) + .series((Gen, broker) -> Stream.of(() -> null)) + .build(); + Assertions.assertEquals(Set.of(1, 2, 3), beans.all().keySet()); + Assertions.assertEquals(1, beans.all().get(1).size()); + Assertions.assertEquals(1, beans.all().get(2).size()); + Assertions.assertEquals(1, beans.all().get(3).size()); + } + } + + @Test + @DisplayName( + "By change the setting between series calls, we can have difference sample rate for each series") + void testFlexibility() { + var beans = + MetricSeriesBuilder.builder() + .cluster(cluster) + .timeRange(LocalDateTime.now(), Duration.ofSeconds(10)) + .sampleInterval(Duration.ofSeconds(1)) + .series((Gen, broker) -> broker == 1 ? Stream.of(() -> null) : Stream.of()) + .timeRange(LocalDateTime.now(), Duration.ofSeconds(10)) + .sampleInterval(Duration.ofSeconds(2)) + .series((Gen, broker) -> broker == 2 ? Stream.of(() -> null) : Stream.of()) + .timeRange(LocalDateTime.now(), Duration.ofSeconds(15)) + .sampleInterval(Duration.ofSeconds(5)) + .series((Gen, broker) -> broker == 3 ? Stream.of(() -> null) : Stream.of()) + .build(); + Assertions.assertEquals(Set.of(1, 2, 3), beans.all().keySet()); + Assertions.assertEquals(11, beans.all().get(1).size()); + Assertions.assertEquals(6, beans.all().get(2).size()); + Assertions.assertEquals(4, beans.all().get(3).size()); + } + + @Test + void testInterval() { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> MetricSeriesBuilder.builder().sampleInterval(Duration.ZERO)); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> MetricSeriesBuilder.builder().sampleInterval(Duration.ofSeconds(-1))); + } + + @Test + void testMetricsGenerator() { + var theCluster = cluster; + var theNode = theCluster.node(1); + var unixTime = ThreadLocalRandom.current().nextInt(0, 10000); + var time = LocalDateTime.ofEpochSecond(unixTime, 0, ZoneOffset.UTC); + var gen = new MetricSeriesBuilder.MetricGenerator(theCluster, theNode, time); + + Assertions.assertEquals(theCluster, gen.cluster()); + Assertions.assertEquals(theNode, gen.node()); + Assertions.assertEquals(time, gen.now()); + Assertions.assertEquals( + theCluster.replicaStream(1).map(Replica::topic).distinct().count(), + gen.perBrokerTopic(i -> () -> null).count()); + Assertions.assertEquals( + theCluster.replicaStream(1).map(Replica::topicPartition).distinct().count(), + gen.perBrokerPartition(i -> () -> null).count()); + Assertions.assertEquals( + theCluster.replicaStream(1).distinct().count(), + gen.perBrokerReplica(i -> () -> null).count()); + + var topic = gen.topic(ServerMetrics.Topic.BYTES_IN_PER_SEC, "Example", Map.of("A", "B")); + Assertions.assertEquals("Example", topic.topic()); + Assertions.assertEquals(ServerMetrics.Topic.BYTES_IN_PER_SEC.metricName(), topic.metricsName()); + Assertions.assertEquals(ServerMetrics.DOMAIN_NAME, topic.beanObject().domainName()); + Assertions.assertEquals(unixTime, topic.beanObject().createdTimestamp()); + Assertions.assertEquals("BrokerTopicMetric", topic.beanObject().properties().get("type")); + Assertions.assertEquals(Map.of("A", "B"), topic.beanObject().attributes()); + + var logSize = gen.logSize(TopicPartition.of("Example", 10), 1024); + Assertions.assertEquals(LogMetrics.DOMAIN_NAME, logSize.beanObject().domainName()); + Assertions.assertEquals(1024, logSize.value()); + Assertions.assertEquals("Log", logSize.beanObject().properties().get("type")); + Assertions.assertEquals("Example", logSize.topic()); + Assertions.assertEquals(10, logSize.partition()); + Assertions.assertEquals(unixTime, logSize.beanObject().createdTimestamp()); + } +} From 29184e9bb569cda90515de52e63dcd7a67a0b0c2 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Thu, 12 Jan 2023 00:54:06 +0800 Subject: [PATCH 3/6] revise --- .../src/test/java/org/astraea/common/cost/NetworkCostTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index 8d941e24a3..2fbec5c3c7 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -608,7 +608,6 @@ public LargeTestCase(int brokers, int partitions, int seed) { MetricSeriesBuilder.builder() .cluster(clusterInfo) .timeRange(LocalDateTime.now(), Duration.ZERO) - .sampleInterval(Duration.ofSeconds(1)) .series( (Gen, broker) -> Gen.perBrokerTopic( From 35c32dbedf931989885ed623fa1de4c099d8617c Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 16 Jan 2023 01:44:49 +0800 Subject: [PATCH 4/6] Address comments --- .../org/astraea/app/web/ThrottleHandler.java | 6 +- .../org/astraea/common/cost/Dispersion.java | 6 +- .../common/metrics/MetricSeriesBuilder.java | 202 +++++++++++------- .../common/metrics/broker/LogMetrics.java | 43 ++++ .../common/metrics/broker/ServerMetrics.java | 70 ++++++ .../astraea/common/cost/NetworkCostTest.java | 86 ++++---- .../metrics/MetricSeriesBuilderTest.java | 58 +---- 7 files changed, 297 insertions(+), 174 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/ThrottleHandler.java b/app/src/main/java/org/astraea/app/web/ThrottleHandler.java index d47e14a856..c925a97d26 100644 --- a/app/src/main/java/org/astraea/app/web/ThrottleHandler.java +++ b/app/src/main/java/org/astraea/app/web/ThrottleHandler.java @@ -322,9 +322,9 @@ private Set toReplicaSet(String topic, String throttledRe } /** - * Given a series of leader/follower throttle config, this method attempts to reduce its size into - * the simplest form by merging any targets with a common topic/partition/replica scope throttle - * target. + * Given a seriesByBrokerTopic of leader/follower throttle config, this method attempts to reduce + * its size into the simplest form by merging any targets with a common topic/partition/replica + * scope throttle target. */ private Set simplify( Set leaders, Set followers) { diff --git a/common/src/main/java/org/astraea/common/cost/Dispersion.java b/common/src/main/java/org/astraea/common/cost/Dispersion.java index bc4c87e170..f6b6ad4e90 100644 --- a/common/src/main/java/org/astraea/common/cost/Dispersion.java +++ b/common/src/main/java/org/astraea/common/cost/Dispersion.java @@ -22,7 +22,7 @@ @FunctionalInterface public interface Dispersion { /** - * Apply coefficient of variation to a series of values. + * Apply coefficient of variation to a seriesByBrokerTopic of values. * *

This implementation come with some assumption: * @@ -54,7 +54,7 @@ static Dispersion cov() { } /** - * Obtain standard deviation from a series of values. + * Obtain standard deviation from a seriesByBrokerTopic of values. * *

    *
  • If no number was given, then the standard deviation is zero. @@ -77,7 +77,7 @@ static Dispersion standardDeviation() { } /** - * Processing a series of values via a specific statistics method. + * Processing a seriesByBrokerTopic of values via a specific statistics method. * * @param scores origin data * @return aggregated data diff --git a/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java b/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java index 95720f09f2..fcbb163413 100644 --- a/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java +++ b/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java @@ -18,14 +18,11 @@ import java.time.Duration; import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.BiFunction; -import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -34,14 +31,12 @@ import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; import org.astraea.common.admin.TopicPartition; -import org.astraea.common.metrics.broker.LogMetrics; -import org.astraea.common.metrics.broker.ServerMetrics; /** - * A utility for generating a series of metric objects, where the measured metric value might be - * highly correlated to specific variables. For example broker id, calendar time, or unknown noise. - * This class offers a way to construct large-scale metric sources of a fake cluster, which will be - * useful for testing and experiment purposes. + * A utility for generating a seriesByBrokerTopic of metric objects, where the measured metric value + * might be highly correlated to specific variables. For example broker id, calendar time, or + * unknown noise. This class offers a way to construct large-scale metric sources of a fake cluster, + * which will be useful for testing and experiment purposes. */ public interface MetricSeriesBuilder { @@ -55,11 +50,36 @@ static MetricSeriesBuilder builder() { MetricSeriesBuilder sampleInterval(Duration interval); - MetricSeriesBuilder series( - BiFunction> seriesGenerator); + MetricSeriesBuilder seriesByBroker(BrokerSeries seriesGenerator); + + MetricSeriesBuilder seriesByBrokerTopic(BrokerTopicSeries seriesGenerator); + + MetricSeriesBuilder seriesByBrokerPartition(BrokerPartitionSeries seriesGenerator); + + MetricSeriesBuilder seriesByBrokerReplica(BrokerReplicaSeries seriesGenerator); ClusterBean build(); + @FunctionalInterface + interface BrokerSeries { + Stream series(LocalDateTime time, int broker); + } + + @FunctionalInterface + interface BrokerTopicSeries { + HasBeanObject series(LocalDateTime time, int broker, String topic); + } + + @FunctionalInterface + interface BrokerPartitionSeries { + HasBeanObject series(LocalDateTime time, int broker, TopicPartition partition); + } + + @FunctionalInterface + interface BrokerReplicaSeries { + HasBeanObject series(LocalDateTime time, int broker, Replica replica); + } + final class MetricSeriesBuilderImpl implements MetricSeriesBuilder { private final List>>> series = @@ -92,9 +112,7 @@ public MetricSeriesBuilder sampleInterval(Duration interval) { } @Override - public MetricSeriesBuilder series( - BiFunction> seriesGenerator) { - // the series state is decided at the call time, instead of build time. + public MetricSeriesBuilder seriesByBroker(BrokerSeries seriesGenerator) { final var cluster = clusterInfo; final var start = timeStart; final var end = timeStart.plus(timeRange); @@ -106,12 +124,100 @@ public MetricSeriesBuilder series( .flatMap( time -> cluster.nodes().stream() - .map(node -> new MetricGenerator(cluster, node, time))) + .map( + node -> + Map.entry( + node.id(), seriesGenerator.series(time, node.id())))) + .collect( + Collectors.toUnmodifiableMap( + Map.Entry::getKey, Map.Entry::getValue, Stream::concat))); + return this; + } + + @Override + public MetricSeriesBuilder seriesByBrokerTopic(BrokerTopicSeries seriesGenerator) { + final var cluster = clusterInfo; + final var start = timeStart; + final var end = timeStart.plus(timeRange); + final var interval = sampleInterval; + this.series.add( + () -> + cluster.nodes().stream() + .collect( + Collectors.toUnmodifiableMap( + NodeInfo::id, + node -> + Stream.iterate( + start, + (t) -> t.isBefore(end) || t.isEqual(end), + (t) -> t.plus(interval)) + .flatMap( + time -> + cluster + .replicaStream(node.id()) + .map(Replica::topic) + .distinct() + .map( + topic -> + seriesGenerator.series( + time, node.id(), topic)))))); + return this; + } + + @Override + public MetricSeriesBuilder seriesByBrokerPartition(BrokerPartitionSeries seriesGenerator) { + final var cluster = clusterInfo; + final var start = timeStart; + final var end = timeStart.plus(timeRange); + final var interval = sampleInterval; + this.series.add( + () -> + cluster.nodes().stream() .collect( Collectors.toUnmodifiableMap( - gen -> gen.node().id(), - gen -> seriesGenerator.apply(gen, gen.node().id()), - Stream::concat))); + NodeInfo::id, + node -> + Stream.iterate( + start, + (t) -> t.isBefore(end) || t.isEqual(end), + (t) -> t.plus(interval)) + .flatMap( + time -> + cluster + .replicaStream(node.id()) + .map(Replica::topicPartition) + .map( + partition -> + seriesGenerator.series( + time, node.id(), partition)))))); + return this; + } + + @Override + public MetricSeriesBuilder seriesByBrokerReplica(BrokerReplicaSeries seriesGenerator) { + final var cluster = clusterInfo; + final var start = timeStart; + final var end = timeStart.plus(timeRange); + final var interval = sampleInterval; + this.series.add( + () -> + cluster.nodes().stream() + .collect( + Collectors.toUnmodifiableMap( + NodeInfo::id, + node -> + Stream.iterate( + start, + (t) -> t.isBefore(end) || t.isEqual(end), + (t) -> t.plus(interval)) + .flatMap( + time -> + cluster + .replicaStream(node.id()) + .map( + replica -> + seriesGenerator.series( + time, node.id(), replica)))))); return this; } @@ -129,64 +235,4 @@ public ClusterBean build() { return ClusterBean.of(allMetrics); } } - - final class MetricGenerator { - - private final ClusterInfo clusterInfo; - private final NodeInfo node; - private final LocalDateTime time; - - MetricGenerator(ClusterInfo clusterInfo, NodeInfo node, LocalDateTime time) { - this.clusterInfo = clusterInfo; - this.node = node; - this.time = time; - } - - public LocalDateTime now() { - return time; - } - - public ClusterInfo cluster() { - return clusterInfo; - } - - public NodeInfo node() { - return node; - } - - public Stream perBrokerTopic(Function mapper) { - return clusterInfo.replicaStream(node.id()).map(Replica::topic).distinct().map(mapper); - } - - public Stream perBrokerPartition( - Function mapper) { - return clusterInfo.replicaStream(node.id()).map(Replica::topicPartition).map(mapper); - } - - public Stream perBrokerReplica(Function mapper) { - return clusterInfo.replicaStream(node.id()).map(mapper); - } - - public ServerMetrics.Topic.Meter topic( - ServerMetrics.Topic metric, String topic, Map attributes) { - var domainName = ServerMetrics.DOMAIN_NAME; - var properties = - Map.of("type", "BrokerTopicMetric", "topic", topic, "name", metric.metricName()); - return new ServerMetrics.Topic.Meter( - new BeanObject(domainName, properties, attributes, time.toEpochSecond(ZoneOffset.UTC))); - } - - public LogMetrics.Log.Gauge logSize(TopicPartition topicPartition, long size) { - var domainName = LogMetrics.DOMAIN_NAME; - var properties = - Map.of( - "type", LogMetrics.LOG_TYPE, - "topic", topicPartition.topic(), - "partition", String.valueOf(topicPartition.partition()), - "name", LogMetrics.Log.SIZE.metricName()); - var attributes = Map.of("Value", size); - return new LogMetrics.Log.Gauge( - new BeanObject(domainName, properties, attributes, time.toEpochSecond(ZoneOffset.UTC))); - } - } } diff --git a/common/src/main/java/org/astraea/common/metrics/broker/LogMetrics.java b/common/src/main/java/org/astraea/common/metrics/broker/LogMetrics.java index 8fd5e4b7b3..001fc1ae4d 100644 --- a/common/src/main/java/org/astraea/common/metrics/broker/LogMetrics.java +++ b/common/src/main/java/org/astraea/common/metrics/broker/LogMetrics.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import org.astraea.common.EnumInfo; import org.astraea.common.metrics.BeanObject; @@ -159,6 +160,48 @@ public List fetch(MBeanClient mBeanClient) { .collect(Collectors.toUnmodifiableList()); } + public Builder builder() { + return new Builder(this); + } + + public static class Builder { + private final LogMetrics.Log metric; + private String topic; + private int partition; + private long value; + + public Builder(Log metric) { + this.metric = metric; + } + + public Builder topic(String topic) { + this.topic = topic; + return this; + } + + public Builder partition(int partition) { + this.partition = partition; + return this; + } + + public Builder logSize(long value) { + this.value = value; + return this; + } + + public Gauge build() { + return new Gauge( + new BeanObject( + LogMetrics.DOMAIN_NAME, + Map.ofEntries( + Map.entry("type", LOG_TYPE), + Map.entry("name", metric.metricName()), + Map.entry("topic", topic), + Map.entry("partition", String.valueOf(partition))), + Map.of("Value", value))); + } + } + public static class Gauge implements HasGauge { private final BeanObject beanObject; diff --git a/common/src/main/java/org/astraea/common/metrics/broker/ServerMetrics.java b/common/src/main/java/org/astraea/common/metrics/broker/ServerMetrics.java index 97299b4b8a..fe17e8f9e9 100644 --- a/common/src/main/java/org/astraea/common/metrics/broker/ServerMetrics.java +++ b/common/src/main/java/org/astraea/common/metrics/broker/ServerMetrics.java @@ -17,9 +17,12 @@ package org.astraea.common.metrics.broker; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.astraea.common.EnumInfo; import org.astraea.common.metrics.AppInfo; @@ -395,6 +398,73 @@ public List fetch(MBeanClient mBeanClient) { .collect(Collectors.toList()); } + public Builder builder() { + return new Builder(this); + } + + public static class Builder { + + private final ServerMetrics.Topic metric; + private String topic; + private long time; + private final Map attributes = new HashMap<>(); + + public Builder(Topic metric) { + this.metric = metric; + } + + public Builder topic(String topic) { + this.topic = topic; + return this; + } + + public Builder time(long time) { + this.time = time; + return this; + } + + public Builder meanRate(double value) { + this.attributes.put("MeanRate", value); + return this; + } + + public Builder oneMinuteRate(double value) { + this.attributes.put("OneMinuteRate", value); + return this; + } + + public Builder fiveMinuteRate(double value) { + this.attributes.put("FiveMinuteRate", value); + return this; + } + + public Builder fifteenMinuteRate(double value) { + this.attributes.put("FifteenMinuteRate", value); + return this; + } + + public Builder rateUnit(TimeUnit timeUnit) { + this.attributes.put("RateUnit", timeUnit); + return this; + } + + public Builder count(long count) { + this.attributes.put("Count", count); + return this; + } + + public Meter build() { + return new Meter( + new BeanObject( + ServerMetrics.DOMAIN_NAME, + Map.ofEntries( + Map.entry("type", "BrokerTopicMetrics"), + Map.entry("topic", topic), + Map.entry("name", metric.metricName())), + Map.copyOf(attributes))); + } + } + public static class Meter implements HasMeter { private final BeanObject beanObject; diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index 2fbec5c3c7..3dca6cf1b1 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.List; import java.util.Map; import java.util.Random; @@ -608,48 +609,55 @@ public LargeTestCase(int brokers, int partitions, int seed) { MetricSeriesBuilder.builder() .cluster(clusterInfo) .timeRange(LocalDateTime.now(), Duration.ZERO) - .series( - (Gen, broker) -> - Gen.perBrokerTopic( - topic -> - Gen.topic( - ServerMetrics.Topic.BYTES_IN_PER_SEC, - topic, - Map.of( - "FifteenMinuteRate", - clusterInfo - .replicaStream(BrokerTopic.of(broker, topic)) - .filter(Replica::isLeader) - .filter(Replica::isOnline) - .mapToDouble(r -> rate.get(r.topicPartition())) - .sum())))) - .series( - (Gen, broker) -> - Gen.perBrokerTopic( - topic -> - Gen.topic( - ServerMetrics.Topic.BYTES_OUT_PER_SEC, - topic, - Map.of( - "FifteenMinuteRate", - clusterInfo - .replicaStream(BrokerTopic.of(broker, topic)) - .filter(Replica::isLeader) - .filter(Replica::isOnline) - .mapToDouble( - r -> - rate.get(r.topicPartition()) - * consumerFanout.get(r.topicPartition())) - .sum())))) - .series( - (Gen, broker) -> + .seriesByBrokerTopic( + (time, broker, topic) -> + ServerMetrics.Topic.BYTES_IN_PER_SEC + .builder() + .topic(topic) + .time(time.toEpochSecond(ZoneOffset.UTC)) + .fifteenMinuteRate( + clusterInfo + .replicaStream(BrokerTopic.of(broker, topic)) + .filter(Replica::isLeader) + .filter(Replica::isOnline) + .mapToDouble(r -> rate.get(r.topicPartition())) + .sum()) + .build()) + .seriesByBrokerTopic( + (time, broker, topic) -> + ServerMetrics.Topic.BYTES_OUT_PER_SEC + .builder() + .topic(topic) + .time(time.toEpochSecond(ZoneOffset.UTC)) + .fifteenMinuteRate( + clusterInfo + .replicaStream(BrokerTopic.of(broker, topic)) + .filter(Replica::isLeader) + .filter(Replica::isOnline) + .mapToDouble( + r -> + rate.get(r.topicPartition()) + * consumerFanout.get(r.topicPartition())) + .sum()) + .build()) + .seriesByBroker( + (time, broker) -> IntStream.range(0, 10) .mapToObj( i -> - Gen.topic( - ServerMetrics.Topic.TOTAL_FETCH_REQUESTS_PER_SEC, - "Noise_" + i, - Map.of()))) + ServerMetrics.Topic.TOTAL_FETCH_REQUESTS_PER_SEC + .builder() + .topic("Noise_" + i) + .time(time.toEpochSecond(ZoneOffset.UTC)) + .build())) + .seriesByBrokerReplica( + (time, broker, replica) -> + LogMetrics.Log.SIZE + .builder() + .topic(replica.topic()) + .partition(replica.partition()) + .logSize(replica.size()) + .build()) .build(); } diff --git a/common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java b/common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java index b23727c3b5..438e269a4b 100644 --- a/common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java +++ b/common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java @@ -18,17 +18,11 @@ import java.time.Duration; import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.util.Map; import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Stream; import org.astraea.common.admin.ClusterInfo; import org.astraea.common.admin.ClusterInfoBuilder; -import org.astraea.common.admin.Replica; -import org.astraea.common.admin.TopicPartition; -import org.astraea.common.metrics.broker.LogMetrics; -import org.astraea.common.metrics.broker.ServerMetrics; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -55,7 +49,7 @@ void example() { .cluster(cluster) .timeRange(LocalDateTime.now(), Duration.ofSeconds(10)) .sampleInterval(Duration.ofSeconds(2)) - .series((Gen, broker) -> Stream.of(() -> null)) + .seriesByBroker((Gen, broker) -> Stream.of(() -> null)) .build(); Assertions.assertEquals(Set.of(1, 2, 3), beans.all().keySet()); Assertions.assertEquals(6, beans.all().get(1).size()); @@ -69,7 +63,7 @@ void example() { .cluster(cluster) .timeRange(LocalDateTime.now(), Duration.ofSeconds(10)) .sampleInterval(Duration.ofSeconds(4)) - .series((Gen, broker) -> Stream.of(() -> null)) + .seriesByBroker((Gen, broker) -> Stream.of(() -> null)) .build(); Assertions.assertEquals(Set.of(1, 2, 3), beans.all().keySet()); Assertions.assertEquals(3, beans.all().get(1).size()); @@ -82,7 +76,7 @@ void example() { MetricSeriesBuilder.builder() .cluster(cluster) .timeRange(LocalDateTime.now(), Duration.ZERO) - .series((Gen, broker) -> Stream.of(() -> null)) + .seriesByBroker((Gen, broker) -> Stream.of(() -> null)) .build(); Assertions.assertEquals(Set.of(1, 2, 3), beans.all().keySet()); Assertions.assertEquals(1, beans.all().get(1).size()); @@ -93,20 +87,20 @@ void example() { @Test @DisplayName( - "By change the setting between series calls, we can have difference sample rate for each series") + "By change the setting between seriesByBrokerTopic calls, we can have difference sample rate for each seriesByBrokerTopic") void testFlexibility() { var beans = MetricSeriesBuilder.builder() .cluster(cluster) .timeRange(LocalDateTime.now(), Duration.ofSeconds(10)) .sampleInterval(Duration.ofSeconds(1)) - .series((Gen, broker) -> broker == 1 ? Stream.of(() -> null) : Stream.of()) + .seriesByBroker((Gen, broker) -> broker == 1 ? Stream.of(() -> null) : Stream.of()) .timeRange(LocalDateTime.now(), Duration.ofSeconds(10)) .sampleInterval(Duration.ofSeconds(2)) - .series((Gen, broker) -> broker == 2 ? Stream.of(() -> null) : Stream.of()) + .seriesByBroker((Gen, broker) -> broker == 2 ? Stream.of(() -> null) : Stream.of()) .timeRange(LocalDateTime.now(), Duration.ofSeconds(15)) .sampleInterval(Duration.ofSeconds(5)) - .series((Gen, broker) -> broker == 3 ? Stream.of(() -> null) : Stream.of()) + .seriesByBroker((Gen, broker) -> broker == 3 ? Stream.of(() -> null) : Stream.of()) .build(); Assertions.assertEquals(Set.of(1, 2, 3), beans.all().keySet()); Assertions.assertEquals(11, beans.all().get(1).size()); @@ -123,42 +117,4 @@ void testInterval() { IllegalArgumentException.class, () -> MetricSeriesBuilder.builder().sampleInterval(Duration.ofSeconds(-1))); } - - @Test - void testMetricsGenerator() { - var theCluster = cluster; - var theNode = theCluster.node(1); - var unixTime = ThreadLocalRandom.current().nextInt(0, 10000); - var time = LocalDateTime.ofEpochSecond(unixTime, 0, ZoneOffset.UTC); - var gen = new MetricSeriesBuilder.MetricGenerator(theCluster, theNode, time); - - Assertions.assertEquals(theCluster, gen.cluster()); - Assertions.assertEquals(theNode, gen.node()); - Assertions.assertEquals(time, gen.now()); - Assertions.assertEquals( - theCluster.replicaStream(1).map(Replica::topic).distinct().count(), - gen.perBrokerTopic(i -> () -> null).count()); - Assertions.assertEquals( - theCluster.replicaStream(1).map(Replica::topicPartition).distinct().count(), - gen.perBrokerPartition(i -> () -> null).count()); - Assertions.assertEquals( - theCluster.replicaStream(1).distinct().count(), - gen.perBrokerReplica(i -> () -> null).count()); - - var topic = gen.topic(ServerMetrics.Topic.BYTES_IN_PER_SEC, "Example", Map.of("A", "B")); - Assertions.assertEquals("Example", topic.topic()); - Assertions.assertEquals(ServerMetrics.Topic.BYTES_IN_PER_SEC.metricName(), topic.metricsName()); - Assertions.assertEquals(ServerMetrics.DOMAIN_NAME, topic.beanObject().domainName()); - Assertions.assertEquals(unixTime, topic.beanObject().createdTimestamp()); - Assertions.assertEquals("BrokerTopicMetric", topic.beanObject().properties().get("type")); - Assertions.assertEquals(Map.of("A", "B"), topic.beanObject().attributes()); - - var logSize = gen.logSize(TopicPartition.of("Example", 10), 1024); - Assertions.assertEquals(LogMetrics.DOMAIN_NAME, logSize.beanObject().domainName()); - Assertions.assertEquals(1024, logSize.value()); - Assertions.assertEquals("Log", logSize.beanObject().properties().get("type")); - Assertions.assertEquals("Example", logSize.topic()); - Assertions.assertEquals(10, logSize.partition()); - Assertions.assertEquals(unixTime, logSize.beanObject().createdTimestamp()); - } } From 4c868bb9f3aaec54ba5f915154b7393c6aeb0253 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 16 Jan 2023 09:50:41 +0800 Subject: [PATCH 5/6] address comment --- .../common/metrics/MetricSeriesBuilder.java | 55 ++++++++----------- 1 file changed, 22 insertions(+), 33 deletions(-) diff --git a/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java b/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java index fcbb163413..8b536b8946 100644 --- a/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java +++ b/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.function.BiFunction; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -31,6 +32,7 @@ import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; import org.astraea.common.admin.TopicPartition; +import org.astraea.common.function.Bi3Function; /** * A utility for generating a seriesByBrokerTopic of metric objects, where the measured metric value @@ -50,36 +52,20 @@ static MetricSeriesBuilder builder() { MetricSeriesBuilder sampleInterval(Duration interval); - MetricSeriesBuilder seriesByBroker(BrokerSeries seriesGenerator); + MetricSeriesBuilder seriesByBroker( + BiFunction> seriesGenerator); - MetricSeriesBuilder seriesByBrokerTopic(BrokerTopicSeries seriesGenerator); + MetricSeriesBuilder seriesByBrokerTopic( + Bi3Function seriesGenerator); - MetricSeriesBuilder seriesByBrokerPartition(BrokerPartitionSeries seriesGenerator); + MetricSeriesBuilder seriesByBrokerPartition( + Bi3Function seriesGenerator); - MetricSeriesBuilder seriesByBrokerReplica(BrokerReplicaSeries seriesGenerator); + MetricSeriesBuilder seriesByBrokerReplica( + Bi3Function seriesGenerator); ClusterBean build(); - @FunctionalInterface - interface BrokerSeries { - Stream series(LocalDateTime time, int broker); - } - - @FunctionalInterface - interface BrokerTopicSeries { - HasBeanObject series(LocalDateTime time, int broker, String topic); - } - - @FunctionalInterface - interface BrokerPartitionSeries { - HasBeanObject series(LocalDateTime time, int broker, TopicPartition partition); - } - - @FunctionalInterface - interface BrokerReplicaSeries { - HasBeanObject series(LocalDateTime time, int broker, Replica replica); - } - final class MetricSeriesBuilderImpl implements MetricSeriesBuilder { private final List>>> series = @@ -112,7 +98,8 @@ public MetricSeriesBuilder sampleInterval(Duration interval) { } @Override - public MetricSeriesBuilder seriesByBroker(BrokerSeries seriesGenerator) { + public MetricSeriesBuilder seriesByBroker( + BiFunction> seriesGenerator) { final var cluster = clusterInfo; final var start = timeStart; final var end = timeStart.plus(timeRange); @@ -126,8 +113,7 @@ public MetricSeriesBuilder seriesByBroker(BrokerSeries seriesGenerator) { cluster.nodes().stream() .map( node -> - Map.entry( - node.id(), seriesGenerator.series(time, node.id())))) + Map.entry(node.id(), seriesGenerator.apply(time, node.id())))) .collect( Collectors.toUnmodifiableMap( Map.Entry::getKey, Map.Entry::getValue, Stream::concat))); @@ -135,7 +121,8 @@ public MetricSeriesBuilder seriesByBroker(BrokerSeries seriesGenerator) { } @Override - public MetricSeriesBuilder seriesByBrokerTopic(BrokerTopicSeries seriesGenerator) { + public MetricSeriesBuilder seriesByBrokerTopic( + Bi3Function seriesGenerator) { final var cluster = clusterInfo; final var start = timeStart; final var end = timeStart.plus(timeRange); @@ -159,13 +146,14 @@ public MetricSeriesBuilder seriesByBrokerTopic(BrokerTopicSeries seriesGenerator .distinct() .map( topic -> - seriesGenerator.series( + seriesGenerator.apply( time, node.id(), topic)))))); return this; } @Override - public MetricSeriesBuilder seriesByBrokerPartition(BrokerPartitionSeries seriesGenerator) { + public MetricSeriesBuilder seriesByBrokerPartition( + Bi3Function seriesGenerator) { final var cluster = clusterInfo; final var start = timeStart; final var end = timeStart.plus(timeRange); @@ -188,13 +176,14 @@ public MetricSeriesBuilder seriesByBrokerPartition(BrokerPartitionSeries seriesG .map(Replica::topicPartition) .map( partition -> - seriesGenerator.series( + seriesGenerator.apply( time, node.id(), partition)))))); return this; } @Override - public MetricSeriesBuilder seriesByBrokerReplica(BrokerReplicaSeries seriesGenerator) { + public MetricSeriesBuilder seriesByBrokerReplica( + Bi3Function seriesGenerator) { final var cluster = clusterInfo; final var start = timeStart; final var end = timeStart.plus(timeRange); @@ -216,7 +205,7 @@ public MetricSeriesBuilder seriesByBrokerReplica(BrokerReplicaSeries seriesGener .replicaStream(node.id()) .map( replica -> - seriesGenerator.series( + seriesGenerator.apply( time, node.id(), replica)))))); return this; } From a4ffdab32fc1ddddd32475d46b8fd80e6e31b3d3 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Mon, 16 Jan 2023 11:01:21 +0800 Subject: [PATCH 6/6] Revert refactor error --- .../main/java/org/astraea/app/web/ThrottleHandler.java | 6 +++--- .../src/main/java/org/astraea/common/cost/Dispersion.java | 6 +++--- .../org/astraea/common/metrics/MetricSeriesBuilder.java | 8 ++++---- .../astraea/common/metrics/MetricSeriesBuilderTest.java | 2 +- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/app/src/main/java/org/astraea/app/web/ThrottleHandler.java b/app/src/main/java/org/astraea/app/web/ThrottleHandler.java index c925a97d26..d47e14a856 100644 --- a/app/src/main/java/org/astraea/app/web/ThrottleHandler.java +++ b/app/src/main/java/org/astraea/app/web/ThrottleHandler.java @@ -322,9 +322,9 @@ private Set toReplicaSet(String topic, String throttledRe } /** - * Given a seriesByBrokerTopic of leader/follower throttle config, this method attempts to reduce - * its size into the simplest form by merging any targets with a common topic/partition/replica - * scope throttle target. + * Given a series of leader/follower throttle config, this method attempts to reduce its size into + * the simplest form by merging any targets with a common topic/partition/replica scope throttle + * target. */ private Set simplify( Set leaders, Set followers) { diff --git a/common/src/main/java/org/astraea/common/cost/Dispersion.java b/common/src/main/java/org/astraea/common/cost/Dispersion.java index f6b6ad4e90..bc4c87e170 100644 --- a/common/src/main/java/org/astraea/common/cost/Dispersion.java +++ b/common/src/main/java/org/astraea/common/cost/Dispersion.java @@ -22,7 +22,7 @@ @FunctionalInterface public interface Dispersion { /** - * Apply coefficient of variation to a seriesByBrokerTopic of values. + * Apply coefficient of variation to a series of values. * *

    This implementation come with some assumption: * @@ -54,7 +54,7 @@ static Dispersion cov() { } /** - * Obtain standard deviation from a seriesByBrokerTopic of values. + * Obtain standard deviation from a series of values. * *

      *
    • If no number was given, then the standard deviation is zero. @@ -77,7 +77,7 @@ static Dispersion standardDeviation() { } /** - * Processing a seriesByBrokerTopic of values via a specific statistics method. + * Processing a series of values via a specific statistics method. * * @param scores origin data * @return aggregated data diff --git a/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java b/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java index 8b536b8946..c1eb84c8ec 100644 --- a/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java +++ b/common/src/main/java/org/astraea/common/metrics/MetricSeriesBuilder.java @@ -35,10 +35,10 @@ import org.astraea.common.function.Bi3Function; /** - * A utility for generating a seriesByBrokerTopic of metric objects, where the measured metric value - * might be highly correlated to specific variables. For example broker id, calendar time, or - * unknown noise. This class offers a way to construct large-scale metric sources of a fake cluster, - * which will be useful for testing and experiment purposes. + * A utility for generating a series of metric objects, where the measured metric value might be + * highly correlated to specific variables. For example broker id, calendar time, or unknown noise. + * This class offers a way to construct large-scale metric sources of a fake cluster, which will be + * useful for testing and experiment purposes. */ public interface MetricSeriesBuilder { diff --git a/common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java b/common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java index 438e269a4b..4ad12ab3b3 100644 --- a/common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java +++ b/common/src/test/java/org/astraea/common/metrics/MetricSeriesBuilderTest.java @@ -87,7 +87,7 @@ void example() { @Test @DisplayName( - "By change the setting between seriesByBrokerTopic calls, we can have difference sample rate for each seriesByBrokerTopic") + "By change the setting between series calls, we can have difference sample rate for each series") void testFlexibility() { var beans = MetricSeriesBuilder.builder()