From ecddacde38f3d1a9ed37eb08ec55bced01e91d47 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Thu, 16 Feb 2023 16:18:50 +0800 Subject: [PATCH 1/3] broker with no replica was ignored from the bandwidth result --- .../org/astraea/common/cost/NetworkCost.java | 37 +++++++++++++++---- .../astraea/common/cost/NetworkCostTest.java | 22 +++++++++++ 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/common/src/main/java/org/astraea/common/cost/NetworkCost.java b/common/src/main/java/org/astraea/common/cost/NetworkCost.java index 40dd6fe490..fb849d797e 100644 --- a/common/src/main/java/org/astraea/common/cost/NetworkCost.java +++ b/common/src/main/java/org/astraea/common/cost/NetworkCost.java @@ -31,6 +31,7 @@ import org.astraea.common.admin.BrokerTopic; import org.astraea.common.admin.ClusterBean; import org.astraea.common.admin.ClusterInfo; +import org.astraea.common.admin.NodeInfo; import org.astraea.common.admin.Replica; import org.astraea.common.admin.TopicPartition; import org.astraea.common.metrics.HasBeanObject; @@ -139,6 +140,11 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) } }, Collectors.summingDouble(x -> x)))); + // the above logic doesn't consider broker with no replica, the following statement put those + // broker into the map + clusterInfo.nodes().stream() + .filter(node -> !brokerRate.containsKey(node)) + .forEach(node -> brokerRate.put(node, 0.0)); var summary = brokerRate.values().stream().mapToDouble(x -> x).summaryStatistics(); if (summary.getMax() < 0) @@ -151,13 +157,8 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) return ClusterCost.of( 0, () -> "network load zero"); // edge case to avoid divided by zero error double score = (summary.getMax() - summary.getMin()) / (summary.getMax()); - return ClusterCost.of( - score, - () -> - brokerRate.values().stream() - .map(x -> DataRate.Byte.of(x.longValue()).perSecond()) - .map(DataRate::toString) - .collect(Collectors.joining(", ", "{", "}"))); + + return new NetworkClusterCost(score, brokerRate); } @Override @@ -258,4 +259,26 @@ public String toString() { return alias(); } } + + public static class NetworkClusterCost implements ClusterCost { + final double score; + final Map brokerRate; + + public NetworkClusterCost(double score, Map brokerRate) { + this.score = score; + this.brokerRate = brokerRate; + } + + public double value() { + return score; + } + + @Override + public String toString() { + return brokerRate.values().stream() + .map(x -> DataRate.Byte.of(x.longValue()).perSecond()) + .map(DataRate::toString) + .collect(Collectors.joining(", ", "{", "}")); + } + } } diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index ae25361c45..f5aec8ddc4 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -421,6 +421,28 @@ void testSingleStepImprovement(int seed) { System.out.println("False test: " + counting.get(false)); } + @Test + void testZeroReplicaBroker() { + var testcase = new LargeTestCase(1, 1, 0); + var beans = testcase.clusterBean(); + var cluster = testcase.clusterInfo(); + var scaledCluster = + ClusterInfoBuilder.builder(cluster) + .addNode(Set.of(4321)) + .addFolders(Map.of(4321, Set.of("/folder"))) + .build(); + var node = scaledCluster.node(4321); + + var costI = + (NetworkCost.NetworkClusterCost) new NetworkIngressCost().clusterCost(scaledCluster, beans); + Assertions.assertEquals(2, costI.brokerRate.size()); + Assertions.assertEquals(0.0, costI.brokerRate.get(node)); + var costE = + (NetworkCost.NetworkClusterCost) new NetworkEgressCost().clusterCost(scaledCluster, beans); + Assertions.assertEquals(2, costE.brokerRate.size()); + Assertions.assertEquals(0.0, costE.brokerRate.get(node)); + } + interface TestCase { ClusterInfo clusterInfo(); From 2cba73117296eafa0978edf38110253d3491eafe Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Thu, 16 Feb 2023 16:49:00 +0800 Subject: [PATCH 2/3] Fix empty broker cause no metric exception --- .../org/astraea/common/cost/NetworkCost.java | 2 ++ .../astraea/common/cost/NetworkCostTest.java | 30 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/common/src/main/java/org/astraea/common/cost/NetworkCost.java b/common/src/main/java/org/astraea/common/cost/NetworkCost.java index fb849d797e..0489290a32 100644 --- a/common/src/main/java/org/astraea/common/cost/NetworkCost.java +++ b/common/src/main/java/org/astraea/common/cost/NetworkCost.java @@ -39,6 +39,7 @@ import org.astraea.common.metrics.broker.LogMetrics; import org.astraea.common.metrics.broker.ServerMetrics; import org.astraea.common.metrics.collector.MetricSensor; +import org.astraea.common.metrics.platform.HostMetrics; /** * This cost function calculate the load balance score in terms of network ingress or network @@ -169,6 +170,7 @@ public Optional metricSensor() { return Optional.of( (client, clusterBean) -> Stream.of( + List.of(HostMetrics.jvmMemory(client)), ServerMetrics.Topic.BYTES_IN_PER_SEC.fetch(client), ServerMetrics.Topic.BYTES_OUT_PER_SEC.fetch(client), LogMetrics.Log.SIZE.fetch(client)) diff --git a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java index f5aec8ddc4..bde30c8627 100644 --- a/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java +++ b/common/src/test/java/org/astraea/common/cost/NetworkCostTest.java @@ -33,6 +33,7 @@ import java.util.stream.Stream; import org.astraea.common.Configuration; import org.astraea.common.DataRate; +import org.astraea.common.Utils; import org.astraea.common.admin.BrokerTopic; import org.astraea.common.admin.ClusterBean; import org.astraea.common.admin.ClusterInfo; @@ -46,6 +47,9 @@ import org.astraea.common.metrics.MetricSeriesBuilder; import org.astraea.common.metrics.broker.LogMetrics; import org.astraea.common.metrics.broker.ServerMetrics; +import org.astraea.common.metrics.collector.MetricCollector; +import org.astraea.it.Service; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; @@ -57,6 +61,13 @@ class NetworkCostTest { + private static final Service SERVICE = Service.builder().numberOfBrokers(3).build(); + + @AfterAll + static void close() { + SERVICE.close(); + } + private static NetworkIngressCost ingressCost() { return new NetworkIngressCost() { @Override @@ -443,6 +454,25 @@ void testZeroReplicaBroker() { Assertions.assertEquals(0.0, costE.brokerRate.get(node)); } + @Test + void testNoMetricCheck() { + try (var collector = MetricCollector.builder().interval(Duration.ofMillis(100)).build()) { + var ingressCost = new NetworkIngressCost(); + + // setup sampling + SERVICE.dataFolders().keySet().forEach(collector::registerLocalJmx); + ingressCost.metricSensor().ifPresent(collector::addMetricSensor); + + // sample metrics for a while. + Utils.sleep(Duration.ofMillis(500)); + + var emptyCluster = ClusterInfoBuilder.builder().addNode(Set.of(1, 2, 3)).build(); + Assertions.assertDoesNotThrow( + () -> ingressCost.clusterCost(emptyCluster, collector.clusterBean()), + "Should not raise an exception"); + } + } + interface TestCase { ClusterInfo clusterInfo(); From 24dca7f4ca758e5f8f5866f092de68af35cd4a82 Mon Sep 17 00:00:00 2001 From: Zheng-Xian Li Date: Fri, 17 Feb 2023 23:21:38 +0800 Subject: [PATCH 3/3] address comments --- .../src/main/java/org/astraea/common/cost/NetworkCost.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/common/src/main/java/org/astraea/common/cost/NetworkCost.java b/common/src/main/java/org/astraea/common/cost/NetworkCost.java index 0489290a32..6690fbdf0b 100644 --- a/common/src/main/java/org/astraea/common/cost/NetworkCost.java +++ b/common/src/main/java/org/astraea/common/cost/NetworkCost.java @@ -141,8 +141,7 @@ public ClusterCost clusterCost(ClusterInfo clusterInfo, ClusterBean clusterBean) } }, Collectors.summingDouble(x -> x)))); - // the above logic doesn't consider broker with no replica, the following statement put those - // broker into the map + // add the brokers having no replicas into map clusterInfo.nodes().stream() .filter(node -> !brokerRate.containsKey(node)) .forEach(node -> brokerRate.put(node, 0.0)); @@ -262,11 +261,11 @@ public String toString() { } } - public static class NetworkClusterCost implements ClusterCost { + static class NetworkClusterCost implements ClusterCost { final double score; final Map brokerRate; - public NetworkClusterCost(double score, Map brokerRate) { + NetworkClusterCost(double score, Map brokerRate) { this.score = score; this.brokerRate = brokerRate; }