From 177ae6fe917fda7ad5821b58646492a99ccb14d7 Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 11 Jan 2023 02:57:35 +0800 Subject: [PATCH] [DISPATCHER] reduce the overhead of collecting leader info (#1413) --- .../app/performance/ProducerThread.java | 32 +++++++++++- .../org/astraea/app/performance/Report.java | 26 ++++++++++ .../app/performance/TrackerThread.java | 6 +++ .../org/astraea/common/admin/ClusterInfo.java | 34 +++++++++++++ .../org/astraea/common/metrics/stats/Avg.java | 2 +- .../org/astraea/common/metrics/stats/Max.java | 7 ++- .../common/partitioner/Dispatcher.java | 17 +++---- .../astraea/common/metrics/stats/AvgTest.java | 4 +- .../astraea/common/metrics/stats/MaxTest.java | 4 +- .../common/partitioner/DispatcherTest.java | 50 ------------------- 10 files changed, 115 insertions(+), 67 deletions(-) diff --git a/app/src/main/java/org/astraea/app/performance/ProducerThread.java b/app/src/main/java/org/astraea/app/performance/ProducerThread.java index 38ab987f31..45551ac5dc 100644 --- a/app/src/main/java/org/astraea/app/performance/ProducerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ProducerThread.java @@ -27,12 +27,23 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.astraea.common.Utils; +import org.astraea.common.metrics.MBeanRegister; +import org.astraea.common.metrics.Sensor; +import org.astraea.common.metrics.stats.Avg; import org.astraea.common.partitioner.Dispatcher; import org.astraea.common.producer.Producer; import org.astraea.common.producer.Record; public interface ProducerThread extends AbstractThread { + String DOMAIN_NAME = "org.astraea"; + String TYPE_PROPERTY = "type"; + + String TYPE_VALUE = "producer"; + + String AVG_PROPERTY = "avg"; + String ID_PROPERTY = "client-id"; + static List create( List>>> queues, int producers, @@ -61,6 +72,14 @@ static List create( var closed = new AtomicBoolean(false); var producer = producerSupplier.get(); var queue = queues.get(index); + var sensor = Sensor.builder().addStat(AVG_PROPERTY, Avg.of()).build(); + // export the custom jmx for report thread + MBeanRegister.local() + .domainName(DOMAIN_NAME) + .property(TYPE_PROPERTY, TYPE_VALUE) + .property(ID_PROPERTY, producer.clientId()) + .attribute(AVG_PROPERTY, Double.class, () -> sensor.measure(AVG_PROPERTY)) + .register(); executors.execute( () -> { try { @@ -75,7 +94,18 @@ static List create( Dispatcher.beginInterdependent(producer); interdependentCounter += data.size(); } - if (data != null) producer.send(data); + var now = System.currentTimeMillis(); + if (data != null) + producer + .send(data) + .forEach( + f -> + f.whenComplete( + (r, e) -> { + if (e == null) + sensor.record( + (double) (System.currentTimeMillis() - now)); + })); // End interdependent if (interdependent > 1 && interdependentCounter >= interdependent) { diff --git a/app/src/main/java/org/astraea/app/performance/Report.java b/app/src/main/java/org/astraea/app/performance/Report.java index ba466a0868..2f81a20a7f 100644 --- a/app/src/main/java/org/astraea/app/performance/Report.java +++ b/app/src/main/java/org/astraea/app/performance/Report.java @@ -17,7 +17,9 @@ package org.astraea.app.performance; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; +import org.astraea.common.metrics.BeanQuery; import org.astraea.common.metrics.MBeanClient; import org.astraea.common.metrics.client.consumer.ConsumerMetrics; import org.astraea.common.metrics.client.consumer.HasConsumerFetchMetrics; @@ -92,6 +94,22 @@ public double avgLatency() { return m.requestLatencyAvg(); } + @Override + public Optional e2eLatency() { + return Optional.ofNullable( + MBeanClient.local() + .queryBean( + BeanQuery.builder() + .domainName(ProducerThread.DOMAIN_NAME) + .property( + ProducerThread.TYPE_PROPERTY, ProducerThread.TYPE_VALUE) + .property(ProducerThread.ID_PROPERTY, m.clientId()) + .build()) + .attributes() + .get(ProducerThread.AVG_PROPERTY)) + .map(v -> (double) v); + } + @Override public long totalBytes() { return (long) m.outgoingByteTotal(); @@ -124,6 +142,14 @@ public String clientId() { */ double avgLatency(); + /** + * @return the full path of client-to-server latency. Currently, only producer thread offers this + * metrics + */ + default Optional e2eLatency() { + return Optional.empty(); + } + /** * @return total send/received bytes */ diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index a85f8c8a72..a0b3aea530 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -79,6 +79,12 @@ boolean tryToPrint(Duration duration) { .mapToDouble(Report::avgLatency) .average() .ifPresent(i -> System.out.printf(" publish average latency: %.3f ms%n", i)); + reports.stream() + .flatMap(r -> r.e2eLatency().stream()) + .mapToDouble(v -> v) + .filter(v -> !Double.isNaN(v)) + .average() + .ifPresent(i -> System.out.printf(" publish e2e-average latency: %.3f ms%n", i)); for (int i = 0; i < reports.size(); ++i) { System.out.printf( " producer[%d] average throughput: %s%n", diff --git a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java index afbb8266c2..edc7e847cd 100644 --- a/common/src/main/java/org/astraea/common/admin/ClusterInfo.java +++ b/common/src/main/java/org/astraea/common/admin/ClusterInfo.java @@ -414,8 +414,12 @@ class Optimized implements ClusterInfo { private final List all; private final Lazy>> byBrokerTopic; + + private final Lazy>> byBrokerTopicForLeader; private final Lazy>> byBroker; private final Lazy>> byTopic; + + private final Lazy>> byTopicForLeader; private final Lazy>> byPartition; private final Lazy>> byReplica; @@ -431,6 +435,17 @@ protected Optimized(String clusterId, List nodeInfos, List re Collectors.groupingBy( r -> BrokerTopic.of(r.nodeInfo().id(), r.topic()), Collectors.toUnmodifiableList()))); + this.byBrokerTopicForLeader = + Lazy.of( + () -> + all.stream() + .filter(Replica::isOnline) + .filter(Replica::isLeader) + .collect( + Collectors.groupingBy( + r -> BrokerTopic.of(r.nodeInfo().id(), r.topic()), + Collectors.toUnmodifiableList()))); + this.byBroker = Lazy.of( () -> @@ -446,6 +461,15 @@ protected Optimized(String clusterId, List nodeInfos, List re .collect( Collectors.groupingBy(Replica::topic, Collectors.toUnmodifiableList()))); + this.byTopicForLeader = + Lazy.of( + () -> + all.stream() + .filter(Replica::isOnline) + .filter(Replica::isLeader) + .collect( + Collectors.groupingBy(Replica::topic, Collectors.toUnmodifiableList()))); + this.byPartition = Lazy.of( () -> @@ -517,5 +541,15 @@ public List nodes() { public Stream replicaStream() { return all.stream(); } + + @Override + public List replicaLeaders(String topic) { + return byTopicForLeader.get().getOrDefault(topic, List.of()); + } + + @Override + public List replicaLeaders(BrokerTopic brokerTopic) { + return byBrokerTopicForLeader.get().getOrDefault(brokerTopic, List.of()); + } } } diff --git a/common/src/main/java/org/astraea/common/metrics/stats/Avg.java b/common/src/main/java/org/astraea/common/metrics/stats/Avg.java index 11c4656846..adb5a00a38 100644 --- a/common/src/main/java/org/astraea/common/metrics/stats/Avg.java +++ b/common/src/main/java/org/astraea/common/metrics/stats/Avg.java @@ -33,7 +33,7 @@ public synchronized void record(Double value) { @Override public synchronized Double measure() { - if (counter == 0) throw new RuntimeException("Nothing to measure"); + if (counter == 0) return Double.NaN; return accumulator / counter; } }; diff --git a/common/src/main/java/org/astraea/common/metrics/stats/Max.java b/common/src/main/java/org/astraea/common/metrics/stats/Max.java index b1100965ff..a95cd300d7 100644 --- a/common/src/main/java/org/astraea/common/metrics/stats/Max.java +++ b/common/src/main/java/org/astraea/common/metrics/stats/Max.java @@ -17,9 +17,14 @@ package org.astraea.common.metrics.stats; public class Max> implements Stat { + + public static > Max of() { + return new Max<>(); + } + private V max; - public Max() { + private Max() { max = null; } diff --git a/common/src/main/java/org/astraea/common/partitioner/Dispatcher.java b/common/src/main/java/org/astraea/common/partitioner/Dispatcher.java index 4c085b5c1c..a855dd9193 100644 --- a/common/src/main/java/org/astraea/common/partitioner/Dispatcher.java +++ b/common/src/main/java/org/astraea/common/partitioner/Dispatcher.java @@ -146,7 +146,7 @@ public final void configure(Map configs) { .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString()))); config.string(ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG).ifPresent(s -> admin = Admin.of(s)); configure(config); - tryToUpdate(null); + tryToUpdate(); } @Override @@ -155,7 +155,7 @@ public final int partition( var interdependent = THREAD_LOCAL.get(); if (interdependent.isInterdependent && interdependent.targetPartitions >= 0) return interdependent.targetPartitions; - tryToUpdate(topic); + tryToUpdate(); final int target; if (!clusterInfo.topics().contains(topic)) { // the cached cluster info is not updated, so we just return a random partition @@ -166,16 +166,13 @@ public final int partition( return target; } - boolean tryToUpdate(String topic) { + boolean tryToUpdate() { if (admin == null) return false; - var now = System.currentTimeMillis(); - // need to refresh cluster info if - // 1) the topic is not included by ClusterInfo - // 2) lease expires + var now = System.nanoTime(); + // need to refresh cluster info if lease expires if (lastUpdated.updateAndGet( last -> { - if (topic != null && !clusterInfo.topics().contains(topic)) return now; - if (now - last >= CLUSTER_INFO_LEASE.toMillis()) return now; + if (now - last >= CLUSTER_INFO_LEASE.toNanos()) return now; return last; }) == now) { @@ -186,7 +183,7 @@ boolean tryToUpdate(String topic) { (c, e) -> { if (c != null) { this.clusterInfo = c; - lastUpdated.set(System.currentTimeMillis()); + lastUpdated.set(System.nanoTime()); } }); return true; diff --git a/common/src/test/java/org/astraea/common/metrics/stats/AvgTest.java b/common/src/test/java/org/astraea/common/metrics/stats/AvgTest.java index 44dac775a5..ba9ef4c486 100644 --- a/common/src/test/java/org/astraea/common/metrics/stats/AvgTest.java +++ b/common/src/test/java/org/astraea/common/metrics/stats/AvgTest.java @@ -32,9 +32,9 @@ void testAvg() { } @Test - void testException() { + void testNan() { var stat = Avg.of(); - Assertions.assertThrows(RuntimeException.class, stat::measure); + Assertions.assertEquals(Double.NaN, stat.measure()); } @Test diff --git a/common/src/test/java/org/astraea/common/metrics/stats/MaxTest.java b/common/src/test/java/org/astraea/common/metrics/stats/MaxTest.java index ac5a3f2ce3..4bb3932b22 100644 --- a/common/src/test/java/org/astraea/common/metrics/stats/MaxTest.java +++ b/common/src/test/java/org/astraea/common/metrics/stats/MaxTest.java @@ -22,7 +22,7 @@ public class MaxTest { @Test void testMax() { - var stat = new Max(); + var stat = Max.of(); stat.record(39); stat.record(20); stat.record(103); @@ -32,7 +32,7 @@ void testMax() { @Test void testException() { - var stat = new Max(); + var stat = Max.of(); Assertions.assertThrows(RuntimeException.class, stat::measure); } } diff --git a/common/src/test/java/org/astraea/common/partitioner/DispatcherTest.java b/common/src/test/java/org/astraea/common/partitioner/DispatcherTest.java index 0186e6b70e..810c40200b 100644 --- a/common/src/test/java/org/astraea/common/partitioner/DispatcherTest.java +++ b/common/src/test/java/org/astraea/common/partitioner/DispatcherTest.java @@ -52,56 +52,6 @@ public class DispatcherTest extends RequireSingleBrokerCluster { - @Test - void testNoTopicInCachedClusterInfo() { - var topicName = Utils.randomString(); - var count = new AtomicInteger(0); - try (var dispatcher = - new Dispatcher() { - @Override - public int partition(String topic, byte[] key, byte[] value, ClusterInfo clusterInfo) { - Assertions.assertNotNull(clusterInfo); - return 0; - } - - @Override - boolean tryToUpdate(String topic) { - if (topic != null) Assertions.assertEquals(topicName, topic); - var rval = super.tryToUpdate(topic); - if (rval) count.incrementAndGet(); - return rval; - } - }) { - - dispatcher.configure(Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())); - - // the topic is nonexistent in cluster, so it always request to update cluster info - Assertions.assertEquals( - 0, - dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty())); - Assertions.assertEquals(2, count.get()); - Assertions.assertEquals( - 0, - dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty())); - Assertions.assertEquals(3, count.get()); - - // create the topic - dispatcher.admin.creator().topic(topicName).run().toCompletableFuture().join(); - Utils.sleep(Duration.ofSeconds(2)); - Assertions.assertEquals( - 0, - dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty())); - Assertions.assertEquals(4, count.get()); - - // ok, the topic exists now, and it should not request to update - Utils.sleep(Duration.ofSeconds(2)); - Assertions.assertEquals( - 0, - dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty())); - Assertions.assertEquals(4, count.get()); - } - } - @Test void testUpdateClusterInfo() {