Skip to content

Commit

Permalink
[DISPATCHER] reduce the overhead of collecting leader info (#1413)
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 authored Jan 10, 2023
1 parent 81324cb commit 177ae6f
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,23 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.astraea.common.Utils;
import org.astraea.common.metrics.MBeanRegister;
import org.astraea.common.metrics.Sensor;
import org.astraea.common.metrics.stats.Avg;
import org.astraea.common.partitioner.Dispatcher;
import org.astraea.common.producer.Producer;
import org.astraea.common.producer.Record;

public interface ProducerThread extends AbstractThread {

String DOMAIN_NAME = "org.astraea";
String TYPE_PROPERTY = "type";

String TYPE_VALUE = "producer";

String AVG_PROPERTY = "avg";
String ID_PROPERTY = "client-id";

static List<ProducerThread> create(
List<ArrayBlockingQueue<List<Record<byte[], byte[]>>>> queues,
int producers,
Expand Down Expand Up @@ -61,6 +72,14 @@ static List<ProducerThread> create(
var closed = new AtomicBoolean(false);
var producer = producerSupplier.get();
var queue = queues.get(index);
var sensor = Sensor.builder().addStat(AVG_PROPERTY, Avg.of()).build();
// export the custom jmx for report thread
MBeanRegister.local()
.domainName(DOMAIN_NAME)
.property(TYPE_PROPERTY, TYPE_VALUE)
.property(ID_PROPERTY, producer.clientId())
.attribute(AVG_PROPERTY, Double.class, () -> sensor.measure(AVG_PROPERTY))
.register();
executors.execute(
() -> {
try {
Expand All @@ -75,7 +94,18 @@ static List<ProducerThread> create(
Dispatcher.beginInterdependent(producer);
interdependentCounter += data.size();
}
if (data != null) producer.send(data);
var now = System.currentTimeMillis();
if (data != null)
producer
.send(data)
.forEach(
f ->
f.whenComplete(
(r, e) -> {
if (e == null)
sensor.record(
(double) (System.currentTimeMillis() - now));
}));

// End interdependent
if (interdependent > 1 && interdependentCounter >= interdependent) {
Expand Down
26 changes: 26 additions & 0 deletions app/src/main/java/org/astraea/app/performance/Report.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package org.astraea.app.performance;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.astraea.common.metrics.BeanQuery;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.client.consumer.ConsumerMetrics;
import org.astraea.common.metrics.client.consumer.HasConsumerFetchMetrics;
Expand Down Expand Up @@ -92,6 +94,22 @@ public double avgLatency() {
return m.requestLatencyAvg();
}

@Override
public Optional<Double> e2eLatency() {
return Optional.ofNullable(
MBeanClient.local()
.queryBean(
BeanQuery.builder()
.domainName(ProducerThread.DOMAIN_NAME)
.property(
ProducerThread.TYPE_PROPERTY, ProducerThread.TYPE_VALUE)
.property(ProducerThread.ID_PROPERTY, m.clientId())
.build())
.attributes()
.get(ProducerThread.AVG_PROPERTY))
.map(v -> (double) v);
}

@Override
public long totalBytes() {
return (long) m.outgoingByteTotal();
Expand Down Expand Up @@ -124,6 +142,14 @@ public String clientId() {
*/
double avgLatency();

/**
* @return the full path of client-to-server latency. Currently, only producer thread offers this
* metrics
*/
default Optional<Double> e2eLatency() {
return Optional.empty();
}

/**
* @return total send/received bytes
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ boolean tryToPrint(Duration duration) {
.mapToDouble(Report::avgLatency)
.average()
.ifPresent(i -> System.out.printf(" publish average latency: %.3f ms%n", i));
reports.stream()
.flatMap(r -> r.e2eLatency().stream())
.mapToDouble(v -> v)
.filter(v -> !Double.isNaN(v))
.average()
.ifPresent(i -> System.out.printf(" publish e2e-average latency: %.3f ms%n", i));
for (int i = 0; i < reports.size(); ++i) {
System.out.printf(
" producer[%d] average throughput: %s%n",
Expand Down
34 changes: 34 additions & 0 deletions common/src/main/java/org/astraea/common/admin/ClusterInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,12 @@ class Optimized implements ClusterInfo {
private final List<Replica> all;

private final Lazy<Map<BrokerTopic, List<Replica>>> byBrokerTopic;

private final Lazy<Map<BrokerTopic, List<Replica>>> byBrokerTopicForLeader;
private final Lazy<Map<Integer, List<Replica>>> byBroker;
private final Lazy<Map<String, List<Replica>>> byTopic;

private final Lazy<Map<String, List<Replica>>> byTopicForLeader;
private final Lazy<Map<TopicPartition, List<Replica>>> byPartition;
private final Lazy<Map<TopicPartitionReplica, List<Replica>>> byReplica;

Expand All @@ -431,6 +435,17 @@ protected Optimized(String clusterId, List<NodeInfo> nodeInfos, List<Replica> re
Collectors.groupingBy(
r -> BrokerTopic.of(r.nodeInfo().id(), r.topic()),
Collectors.toUnmodifiableList())));
this.byBrokerTopicForLeader =
Lazy.of(
() ->
all.stream()
.filter(Replica::isOnline)
.filter(Replica::isLeader)
.collect(
Collectors.groupingBy(
r -> BrokerTopic.of(r.nodeInfo().id(), r.topic()),
Collectors.toUnmodifiableList())));

this.byBroker =
Lazy.of(
() ->
Expand All @@ -446,6 +461,15 @@ protected Optimized(String clusterId, List<NodeInfo> nodeInfos, List<Replica> re
.collect(
Collectors.groupingBy(Replica::topic, Collectors.toUnmodifiableList())));

this.byTopicForLeader =
Lazy.of(
() ->
all.stream()
.filter(Replica::isOnline)
.filter(Replica::isLeader)
.collect(
Collectors.groupingBy(Replica::topic, Collectors.toUnmodifiableList())));

this.byPartition =
Lazy.of(
() ->
Expand Down Expand Up @@ -517,5 +541,15 @@ public List<NodeInfo> nodes() {
public Stream<Replica> replicaStream() {
return all.stream();
}

@Override
public List<Replica> replicaLeaders(String topic) {
return byTopicForLeader.get().getOrDefault(topic, List.of());
}

@Override
public List<Replica> replicaLeaders(BrokerTopic brokerTopic) {
return byBrokerTopicForLeader.get().getOrDefault(brokerTopic, List.of());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public synchronized void record(Double value) {

@Override
public synchronized Double measure() {
if (counter == 0) throw new RuntimeException("Nothing to measure");
if (counter == 0) return Double.NaN;
return accumulator / counter;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@
package org.astraea.common.metrics.stats;

public class Max<V extends Comparable<V>> implements Stat<V> {

public static <V extends Comparable<V>> Max<V> of() {
return new Max<>();
}

private V max;

public Max() {
private Max() {
max = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public final void configure(Map<String, ?> configs) {
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())));
config.string(ProducerConfigs.BOOTSTRAP_SERVERS_CONFIG).ifPresent(s -> admin = Admin.of(s));
configure(config);
tryToUpdate(null);
tryToUpdate();
}

@Override
Expand All @@ -155,7 +155,7 @@ public final int partition(
var interdependent = THREAD_LOCAL.get();
if (interdependent.isInterdependent && interdependent.targetPartitions >= 0)
return interdependent.targetPartitions;
tryToUpdate(topic);
tryToUpdate();
final int target;
if (!clusterInfo.topics().contains(topic)) {
// the cached cluster info is not updated, so we just return a random partition
Expand All @@ -166,16 +166,13 @@ public final int partition(
return target;
}

boolean tryToUpdate(String topic) {
boolean tryToUpdate() {
if (admin == null) return false;
var now = System.currentTimeMillis();
// need to refresh cluster info if
// 1) the topic is not included by ClusterInfo
// 2) lease expires
var now = System.nanoTime();
// need to refresh cluster info if lease expires
if (lastUpdated.updateAndGet(
last -> {
if (topic != null && !clusterInfo.topics().contains(topic)) return now;
if (now - last >= CLUSTER_INFO_LEASE.toMillis()) return now;
if (now - last >= CLUSTER_INFO_LEASE.toNanos()) return now;
return last;
})
== now) {
Expand All @@ -186,7 +183,7 @@ boolean tryToUpdate(String topic) {
(c, e) -> {
if (c != null) {
this.clusterInfo = c;
lastUpdated.set(System.currentTimeMillis());
lastUpdated.set(System.nanoTime());
}
});
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ void testAvg() {
}

@Test
void testException() {
void testNan() {
var stat = Avg.of();
Assertions.assertThrows(RuntimeException.class, stat::measure);
Assertions.assertEquals(Double.NaN, stat.measure());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public class MaxTest {
@Test
void testMax() {
var stat = new Max<Integer>();
var stat = Max.<Integer>of();
stat.record(39);
stat.record(20);
stat.record(103);
Expand All @@ -32,7 +32,7 @@ void testMax() {

@Test
void testException() {
var stat = new Max<Integer>();
var stat = Max.<Integer>of();
Assertions.assertThrows(RuntimeException.class, stat::measure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,56 +52,6 @@

public class DispatcherTest extends RequireSingleBrokerCluster {

@Test
void testNoTopicInCachedClusterInfo() {
var topicName = Utils.randomString();
var count = new AtomicInteger(0);
try (var dispatcher =
new Dispatcher() {
@Override
public int partition(String topic, byte[] key, byte[] value, ClusterInfo clusterInfo) {
Assertions.assertNotNull(clusterInfo);
return 0;
}

@Override
boolean tryToUpdate(String topic) {
if (topic != null) Assertions.assertEquals(topicName, topic);
var rval = super.tryToUpdate(topic);
if (rval) count.incrementAndGet();
return rval;
}
}) {

dispatcher.configure(Map.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()));

// the topic is nonexistent in cluster, so it always request to update cluster info
Assertions.assertEquals(
0,
dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty()));
Assertions.assertEquals(2, count.get());
Assertions.assertEquals(
0,
dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty()));
Assertions.assertEquals(3, count.get());

// create the topic
dispatcher.admin.creator().topic(topicName).run().toCompletableFuture().join();
Utils.sleep(Duration.ofSeconds(2));
Assertions.assertEquals(
0,
dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty()));
Assertions.assertEquals(4, count.get());

// ok, the topic exists now, and it should not request to update
Utils.sleep(Duration.ofSeconds(2));
Assertions.assertEquals(
0,
dispatcher.partition(topicName, "xx", new byte[0], "xx", new byte[0], Cluster.empty()));
Assertions.assertEquals(4, count.get());
}
}

@Test
void testUpdateClusterInfo() {

Expand Down

0 comments on commit 177ae6f

Please sign in to comment.