diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index cd2bc99d02..7a78440654 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -34,14 +34,33 @@ import org.astraea.common.admin.TopicPartition; import org.astraea.common.consumer.ConsumerRebalanceListener; import org.astraea.common.consumer.SubscribedConsumer; +import org.astraea.common.metrics.Sensor; +import org.astraea.common.metrics.stats.Avg; public interface ConsumerThread extends AbstractThread { - ConcurrentMap> CLIENT_ID_ASSIGNED_PARTITIONS = new ConcurrentHashMap<>(); ConcurrentMap> CLIENT_ID_REVOKED_PARTITIONS = new ConcurrentHashMap<>(); + // consumer id and sensor. The sensor is for recording number of non-sticky partitions + ConcurrentMap> NON_STICKY_SENSOR = new ConcurrentHashMap<>(); + + // consumer id and sensor. The sensor is for recording number of partition difference between + // consumer group rebalance + ConcurrentMap> DIFFERENCE_SENSOR = new ConcurrentHashMap<>(); + + static long nonStickyPartitionBetweenRebalance(String clientId) { + return CLIENT_ID_ASSIGNED_PARTITIONS.getOrDefault(clientId, Set.of()).stream() + .filter(tp -> !CLIENT_ID_REVOKED_PARTITIONS.getOrDefault(clientId, Set.of()).contains(tp)) + .count(); + } + + static long differenceBetweenRebalance(String clientId) { + return CLIENT_ID_ASSIGNED_PARTITIONS.getOrDefault(clientId, Set.of()).size() + - CLIENT_ID_REVOKED_PARTITIONS.getOrDefault(clientId, Set.of()).size(); + } + static List create( int consumers, BiFunction> @@ -90,6 +109,8 @@ static List create( closed.set(true); CLIENT_ID_ASSIGNED_PARTITIONS.remove(clientId); CLIENT_ID_REVOKED_PARTITIONS.remove(clientId); + NON_STICKY_SENSOR.remove(clientId); + DIFFERENCE_SENSOR.remove(clientId); } }); return new ConsumerThread() { @@ -138,6 +159,18 @@ class PartitionRatioListener implements ConsumerRebalanceListener { @Override public void onPartitionAssigned(Set partitions) { CLIENT_ID_ASSIGNED_PARTITIONS.put(clientId, partitions); + NON_STICKY_SENSOR.putIfAbsent( + this.clientId, + Sensor.builder().addStat("exp-avg", Avg.expWeightByTime(Duration.ofSeconds(1))).build()); + DIFFERENCE_SENSOR.putIfAbsent( + this.clientId, + Sensor.builder().addStat("exp-avg", Avg.expWeightByTime(Duration.ofSeconds(1))).build()); + NON_STICKY_SENSOR + .get(this.clientId) + .record((double) nonStickyPartitionBetweenRebalance(this.clientId)); + DIFFERENCE_SENSOR + .get(this.clientId) + .record((double) differenceBetweenRebalance(this.clientId)); } @Override diff --git a/app/src/main/java/org/astraea/app/performance/ReportFormat.java b/app/src/main/java/org/astraea/app/performance/ReportFormat.java index 154b04d45b..5a9483ad5a 100644 --- a/app/src/main/java/org/astraea/app/performance/ReportFormat.java +++ b/app/src/main/java/org/astraea/app/performance/ReportFormat.java @@ -30,7 +30,6 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.Set; import java.util.function.Supplier; import java.util.stream.IntStream; import org.astraea.common.EnumInfo; @@ -212,13 +211,9 @@ private static List latencyAndIO() { CSVContentElement.create( "Consumer[" + i + "] partition difference", () -> - Integer.toString( - (ConsumerThread.CLIENT_ID_ASSIGNED_PARTITIONS - .getOrDefault(consumerReports.get(i).clientId(), Set.of()) - .size() - - ConsumerThread.CLIENT_ID_REVOKED_PARTITIONS - .getOrDefault(consumerReports.get(i).clientId(), Set.of()) - .size())))); + Long.toString( + ConsumerThread.differenceBetweenRebalance( + consumerReports.get(i).clientId())))); }); return elements; } diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index 51d9253f9d..a85f8c8a72 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -19,13 +19,11 @@ import java.time.Duration; import java.util.Collection; import java.util.List; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.function.ToDoubleFunction; -import java.util.stream.Collectors; import org.astraea.common.DataSize; import org.astraea.common.Utils; import org.astraea.common.metrics.HasBeanObject; @@ -143,22 +141,18 @@ boolean tryToPrint(Duration duration) { for (var i = 0; i < reports.size(); ++i) { var report = reports.get(i); var clientId = report.clientId(); - var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst(); - var assignedPartitions = - ConsumerThread.CLIENT_ID_ASSIGNED_PARTITIONS.getOrDefault(clientId, Set.of()); - var revokedPartitions = - ConsumerThread.CLIENT_ID_REVOKED_PARTITIONS.getOrDefault(clientId, Set.of()); - var nonStickyPartitions = - assignedPartitions.stream() - .filter(tp -> !revokedPartitions.contains(tp)) - .collect(Collectors.toSet()); + var ms = metrics.stream().filter(m -> m.clientId().equals(clientId)).findFirst(); + if (ms.isPresent()) { + System.out.printf( - " consumer[%d] has %d partitions. Among them, there are %d non-sticky partitions and was assigned %d more partitions than before re-balancing%n", + " consumer[%d] has %d partitions.%n" + + " %.1f non-sticky partitions in average,%n" + + " assigned %.1f more partitions than before re-balancing in average%n", i, (int) ms.get().assignedPartitions(), - nonStickyPartitions.size(), - (assignedPartitions.size() - revokedPartitions.size())); + ConsumerThread.NON_STICKY_SENSOR.get(clientId).measure("exp-avg"), + ConsumerThread.DIFFERENCE_SENSOR.get(clientId).measure("exp-avg")); } System.out.printf( " consumed[%d] average throughput: %s%n", diff --git a/common/src/main/java/org/astraea/common/metrics/stats/Avg.java b/common/src/main/java/org/astraea/common/metrics/stats/Avg.java index 832eb13dab..11c4656846 100644 --- a/common/src/main/java/org/astraea/common/metrics/stats/Avg.java +++ b/common/src/main/java/org/astraea/common/metrics/stats/Avg.java @@ -17,6 +17,7 @@ package org.astraea.common.metrics.stats; import java.time.Duration; +import java.util.concurrent.ConcurrentLinkedDeque; public class Avg { public static Stat of() { @@ -74,4 +75,45 @@ public synchronized Double measure() { } }; } + + /** Compute the average of value recorded in the given time period. */ + public static Stat byTime(Duration period) { + if (period.toMillis() <= 0) { + throw new IllegalArgumentException( + "Stat, Average by time, needs period longer than 1 millisecond."); + } + return new Stat<>() { + private final ConcurrentLinkedDeque> past = + new ConcurrentLinkedDeque<>(); + + @Override + public void record(Double value) { + past.add(new ValueAndTime<>(value, System.currentTimeMillis())); + popOutdated(); + } + + @Override + public Double measure() { + popOutdated(); + return past.stream().mapToDouble(e -> e.value).average().orElse(Double.NaN); + } + + private void popOutdated() { + var outdated = System.currentTimeMillis() - period.toMillis(); + while (!past.isEmpty() && past.peekFirst().timestamp < outdated) { + past.poll(); + } + } + }; + } + + private static class ValueAndTime { + public final V value; + public final long timestamp; + + public ValueAndTime(V value, long timestamp) { + this.value = value; + this.timestamp = timestamp; + } + } } diff --git a/common/src/main/java/org/astraea/common/metrics/stats/Stat.java b/common/src/main/java/org/astraea/common/metrics/stats/Stat.java index 85d52f312b..7619d6ab35 100644 --- a/common/src/main/java/org/astraea/common/metrics/stats/Stat.java +++ b/common/src/main/java/org/astraea/common/metrics/stats/Stat.java @@ -24,7 +24,7 @@ public interface Stat { /** Make a readonly copy of this object. */ default Stat snapshot() { - var value = measure(); + var statistics = measure(); return new Stat<>() { @Override public void record(V ignore) { @@ -33,7 +33,7 @@ public void record(V ignore) { @Override public V measure() { - return value; + return statistics; } }; } diff --git a/common/src/test/java/org/astraea/common/metrics/SensorTest.java b/common/src/test/java/org/astraea/common/metrics/SensorTest.java index 78039feab5..a49d4d726f 100644 --- a/common/src/test/java/org/astraea/common/metrics/SensorTest.java +++ b/common/src/test/java/org/astraea/common/metrics/SensorTest.java @@ -52,8 +52,8 @@ void testMeasure() { Mockito.when(stat1.measure()).thenReturn(1.0); Mockito.when(stat2.measure()).thenReturn(2.0); - Assertions.assertEquals(1.0, sensor.measure("t1")); - Assertions.assertEquals(2.0, sensor.measure("t2")); + Assertions.assertEquals(1.0, (Double) sensor.measure("t1")); + Assertions.assertEquals(2.0, (Double) sensor.measure("t2")); } @Test @@ -61,9 +61,9 @@ void testMetrics() { var sensor = Sensor.builder().addStat("average", Avg.of()).build(); sensor.record(1.0); var metrics = sensor.metrics(); - Assertions.assertEquals(1.0, metrics.get("average").measure()); + Assertions.assertEquals(1.0, (Double) metrics.get("average").measure()); sensor.record(2.0); - Assertions.assertEquals(1.0, metrics.get("average").measure()); + Assertions.assertEquals(1.0, (Double) metrics.get("average").measure()); } private Stat countRecord(AtomicInteger counter) { diff --git a/common/src/test/java/org/astraea/common/metrics/stats/AvgTest.java b/common/src/test/java/org/astraea/common/metrics/stats/AvgTest.java index f85dd09614..44dac775a5 100644 --- a/common/src/test/java/org/astraea/common/metrics/stats/AvgTest.java +++ b/common/src/test/java/org/astraea/common/metrics/stats/AvgTest.java @@ -51,4 +51,27 @@ void testExpWeightByTime() throws InterruptedException { Assertions.assertEquals(10 * 0.5 * 0.5 + 50 * 0.5, rateByTime.measure()); } + + @Test + void testByTime() throws InterruptedException { + var byTime = Avg.byTime(Duration.ofMillis(150)); + + byTime.record(10.0); + Assertions.assertEquals(10.0, byTime.measure()); + byTime.record(14.0); + Assertions.assertEquals(12.0, byTime.measure()); + + Thread.sleep(20); + + byTime.record(18.0); + Assertions.assertEquals(14.0, byTime.measure()); + + Thread.sleep(140); + + Assertions.assertEquals(18.0, byTime.measure()); + + Thread.sleep(20); + + Assertions.assertEquals(Double.NaN, byTime.measure()); + } } diff --git a/docs/performance_benchmark.md b/docs/performance_benchmark.md index 52b958dd79..324f8ab7bb 100644 --- a/docs/performance_benchmark.md +++ b/docs/performance_benchmark.md @@ -13,6 +13,9 @@ 2. End-to-End latency : 一筆record從producer端到consumer端的時間 3. Consume rate : consumer拉取資料的速率(MB/s) 4. Produce rate : producer送資料的速率(MB/s) +5. Produce record error rate : 發送資料時錯誤的頻率 +6. rebalance latency : consumer group 平衡所花的時間 +7. consumer assigned partitions : consumer 被 assigned partition 數量、數量差 #### Performance Benchmark Configurations @@ -53,12 +56,12 @@ 使用`docker`執行`performance benchmark` ```bash -docker/start_app.sh performance --bootstrap.servers localhost:9092 +docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic ``` (localhost, 9092 替換成自己Kafka server 的 ip 和 port) -![performance_tool_demo](pictures/performance_tool_demo.jpg) +![performance_tool_demo](pictures/performance_tool_demo.png) `performance benchmark`可以指定各種參數如資料大小、分佈、執行時間... 等等。全部參數可以參考上述表格。 @@ -66,27 +69,27 @@ docker/start_app.sh performance --bootstrap.servers localhost:9092 ```bash # 開啟 1 個 producer 打 25 分鐘資料, 1 個 consumer 消費資料 -docker/start_app.sh performance --bootstrap.servers localhost:9092 --run.until 25m +docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic --run.until 25m ``` ```bash # 開啟 1 個 producer ,打 10000 筆資料 且 沒有consumer -docker/start_app.sh performance --bootstrap.servers localhost:9092 --run.until 10000records --consumers 0 +docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic --run.until 10000records --consumers 0 ``` ```bash # 打50秒資料、每筆大小10KiB、固定大小、使用4個producer threads、10個consumer threads,指定topic名稱,producer送資料前使用 lz4 壓縮演算法 -docker/start_app.sh performance --bootstrap.servers localhost:9092 --value.size 10KiB --value.distribution fixed --run.until 50s --producers 4 --consumers 10 --topic partition60Replica1 --compression lz4 +docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic --value.size 10KiB --value.distribution fixed --run.until 50s --producers 4 --consumers 10 --topics MyTopic --configs compression.type=lz4 ``` ```bash # 使用astraea的 partitioner ,傳入config檔案路徑,裡面可以放 partitioner 所需的參數,如jmx port等 -docker/start_app.sh performance --bootstrap.servers localhost:9092 --partitioner org.astraea.app.partitioner.smooth.SmoothWeightRoundRobinDispatcher --prop.file ./config +docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic --partitioner org.astraea.common.partitioner.StrictCostDispatcher --prop.file ./config ``` ```bash # 使用 partitioner 框架,指定參考 Broker Input 做效能指標,把紀錄輸出到指定路徑。 -docker/start_app.sh performance --bootstrap.servers localhost:9092 --partitioner org.astraea.common.partitioner.StrictCostDispatcher --configs org.astraea.common.cost.BrokerInputCost=1 --prop.file ./config --report.path ~/report +docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic --partitioner org.astraea.common.partitioner.StrictCostDispatcher --configs org.astraea.common.cost.BrokerInputCost=1 --prop.file ./config --report.path ~/report ``` ```bash