Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changing rate statistics #1064

Merged
merged 27 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
8e5ec62
Stat type of record and measure may be different
chinghongfang Nov 1, 2022
c64897a
Merge branch 'main' into setDifference
chinghongfang Nov 1, 2022
a5fa434
Set difference return size only
chinghongfang Nov 2, 2022
1892801
Merge branch 'main' into setDifference
chinghongfang Nov 2, 2022
7d28e5d
Fix conflict
chinghongfang Nov 2, 2022
a5f6029
Return the size of observered target only
chinghongfang Nov 7, 2022
0220566
Record the number of difference
chinghongfang Nov 13, 2022
10d402f
Merge remote-tracking branch 'upstream/main' into setDifference
chinghongfang Nov 13, 2022
72e11cd
Merge branch 'main' into setDifference
chinghongfang Nov 28, 2022
b91a61f
Stat read/write the same type
chinghongfang Nov 28, 2022
ac405b3
Update documents
chinghongfang Nov 28, 2022
92b67c0
Add more description
chinghongfang Nov 29, 2022
e7c5153
Add synchronized
chinghongfang Nov 29, 2022
9b890eb
Revert type changing
chinghongfang Nov 29, 2022
d1317f5
Fix document
chinghongfang Dec 1, 2022
d502cac
Restore demo picture
chinghongfang Dec 8, 2022
6bdc3f7
Record partition changes by difference
chinghongfang Dec 9, 2022
da24528
Restore and move "non-sticky computation"
chinghongfang Dec 12, 2022
12f994d
Reduce code and fix method calling
chinghongfang Dec 13, 2022
d7c3935
Average within a time range
chinghongfang Dec 13, 2022
f1398a2
Merge branch 'main' into setDifference
chinghongfang Dec 13, 2022
70c7a07
Print 15-minute avg of # of partitions in tracker
chinghongfang Dec 13, 2022
e3c2979
Private an inner class
chinghongfang Dec 19, 2022
79815d5
Merge branch 'main' into setDifference
chinghongfang Dec 19, 2022
6e9d58c
Merge branch 'main' into setDifference
chinghongfang Jan 3, 2023
fb1483e
Use exponential weighted average
chinghongfang Jan 3, 2023
2d31ac7
Delete unused
chinghongfang Jan 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,22 @@
import org.astraea.common.consumer.SubscribedConsumer;

public interface ConsumerThread extends AbstractThread {

ConcurrentMap<String, Set<TopicPartition>> CLIENT_ID_ASSIGNED_PARTITIONS =
new ConcurrentHashMap<>();
ConcurrentMap<String, Set<TopicPartition>> CLIENT_ID_REVOKED_PARTITIONS =
new ConcurrentHashMap<>();

static long nonStickyPartitionBetweenRebalance(String clientId) {
return CLIENT_ID_ASSIGNED_PARTITIONS.getOrDefault(clientId, Set.of()).stream()
.filter(tp -> !CLIENT_ID_REVOKED_PARTITIONS.getOrDefault(clientId, Set.of()).contains(tp))
.count();
}

static long differenceBetweenRebalance(String clientId) {
return CLIENT_ID_ASSIGNED_PARTITIONS.getOrDefault(clientId, Set.of()).size()
- CLIENT_ID_REVOKED_PARTITIONS.getOrDefault(clientId, Set.of()).size();
}

static List<ConsumerThread> create(
int consumers,
BiFunction<String, ConsumerRebalanceListener, SubscribedConsumer<byte[], byte[]>>
Expand Down
11 changes: 3 additions & 8 deletions app/src/main/java/org/astraea/app/performance/ReportFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import org.astraea.common.EnumInfo;
Expand Down Expand Up @@ -212,13 +211,9 @@ private static List<CSVContentElement> latencyAndIO() {
CSVContentElement.create(
"Consumer[" + i + "] partition difference",
() ->
Integer.toString(
(ConsumerThread.CLIENT_ID_ASSIGNED_PARTITIONS
.getOrDefault(consumerReports.get(i).clientId(), Set.of())
.size()
- ConsumerThread.CLIENT_ID_REVOKED_PARTITIONS
.getOrDefault(consumerReports.get(i).clientId(), Set.of())
.size()))));
Long.toString(
ConsumerThread.differenceBetweenRebalance(
consumerReports.get(i).clientId()))));
});
return elements;
}
Expand Down
65 changes: 50 additions & 15 deletions app/src/main/java/org/astraea/app/performance/TrackerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,22 @@
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.function.ToDoubleFunction;
import java.util.stream.Collectors;
import org.astraea.common.DataSize;
import org.astraea.common.Utils;
import org.astraea.common.metrics.HasBeanObject;
import org.astraea.common.metrics.MBeanClient;
import org.astraea.common.metrics.Sensor;
import org.astraea.common.metrics.client.consumer.ConsumerMetrics;
import org.astraea.common.metrics.client.consumer.HasConsumerCoordinatorMetrics;
import org.astraea.common.metrics.client.producer.HasProducerTopicMetrics;
import org.astraea.common.metrics.client.producer.ProducerMetrics;
import org.astraea.common.metrics.stats.Avg;
import org.astraea.common.metrics.stats.Latest;

/** Print out the given metrics. */
public interface TrackerThread extends AbstractThread {
Expand Down Expand Up @@ -97,6 +98,31 @@ class ConsumerPrinter {
private final Supplier<List<Report>> reportSupplier;
private long lastRecords = 0;

private static final String _15_MINUTE_AVG = "15-minute-avg";
private static final String _5_MINUTE_AVG = "5-minute-avg";
private static final String _1_MINUTE_AVG = "1-minute-avg";
private final Sensor<Double> numOfPartitionSensor =
Sensor.builder()
.addStat(_15_MINUTE_AVG, Avg.ByTime(Duration.ofMinutes(15)))
.addStat(_5_MINUTE_AVG, Avg.ByTime(Duration.ofMinutes(5)))
.addStat(_1_MINUTE_AVG, Avg.ByTime(Duration.ofMinutes(1)))
.addStat("latest", new Latest<Double>())
.build();
private final Sensor<Double> nonStickyPartitionSensor =
Sensor.builder()
.addStat(_15_MINUTE_AVG, Avg.ByTime(Duration.ofMinutes(15)))
.addStat(_5_MINUTE_AVG, Avg.ByTime(Duration.ofMinutes(5)))
.addStat(_1_MINUTE_AVG, Avg.ByTime(Duration.ofMinutes(1)))
.addStat("latest", new Latest<Double>())
.build();
private final Sensor<Double> partitionDifferenceSensor =
Sensor.builder()
.addStat(_15_MINUTE_AVG, Avg.ByTime(Duration.ofMinutes(15)))
.addStat(_5_MINUTE_AVG, Avg.ByTime(Duration.ofMinutes(5)))
.addStat(_1_MINUTE_AVG, Avg.ByTime(Duration.ofMinutes(1)))
.addStat("latest", new Latest<Double>())
.build();

ConsumerPrinter() {
this(Report::consumers);
}
Expand Down Expand Up @@ -143,22 +169,31 @@ boolean tryToPrint(Duration duration) {
for (var i = 0; i < reports.size(); ++i) {
var report = reports.get(i);
var clientId = report.clientId();
var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst();
var assignedPartitions =
ConsumerThread.CLIENT_ID_ASSIGNED_PARTITIONS.getOrDefault(clientId, Set.of());
var revokedPartitions =
ConsumerThread.CLIENT_ID_REVOKED_PARTITIONS.getOrDefault(clientId, Set.of());
var nonStickyPartitions =
assignedPartitions.stream()
.filter(tp -> !revokedPartitions.contains(tp))
.collect(Collectors.toSet());
var ms = metrics.stream().filter(m -> m.clientId().equals(clientId)).findFirst();

if (ms.isPresent()) {
numOfPartitionSensor.record(ms.get().assignedPartitions());
nonStickyPartitionSensor.record(
(double) ConsumerThread.nonStickyPartitionBetweenRebalance(clientId));
partitionDifferenceSensor.record(
(double) ConsumerThread.differenceBetweenRebalance(clientId));

System.out.printf(
" consumer[%d] has %d partitions. Among them, there are %d non-sticky partitions and was assigned %d more partitions than before re-balancing%n",
" consumer[%d] has %.1f partitions. "
+ "%.1f non-sticky partitions, "
+ "assigned %.1f more partitions than before re-balancing%n",
i,
(int) ms.get().assignedPartitions(),
nonStickyPartitions.size(),
(assignedPartitions.size() - revokedPartitions.size()));
numOfPartitionSensor.measure("latest"),
nonStickyPartitionSensor.measure("latest"),
partitionDifferenceSensor.measure("latest"));

System.out.printf(
" %.2f partitions in 15 minute average, "
+ "%.2f non-sticky partitions in 15 minute average, "
+ "assigned %.2f more partitions than before re-balancing in 15 minute average%n",
numOfPartitionSensor.measure(_15_MINUTE_AVG),
nonStickyPartitionSensor.measure(_15_MINUTE_AVG),
partitionDifferenceSensor.measure(_15_MINUTE_AVG));
}
System.out.printf(
" consumed[%d] average throughput: %s%n",
Expand Down
43 changes: 42 additions & 1 deletion common/src/main/java/org/astraea/common/metrics/stats/Avg.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.astraea.common.metrics.stats;

import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedDeque;

public class Avg {
public static final String AVG_KEY = "avg";
Expand Down Expand Up @@ -55,7 +56,6 @@ public static Stat<Double> rateByTime(Duration period) {
private long count = 0;

private final Debounce<Double> debounce = Debounce.of(period);
;

@Override
public synchronized void record(Double value) {
Expand Down Expand Up @@ -112,4 +112,45 @@ public synchronized Double measure() {
}
};
}

/** Compute the average of value recorded in the given time period. */
public static Stat<Double> ByTime(Duration period) {
if (period.toMillis() <= 0) {
throw new IllegalArgumentException(
"Stat, Average by time, needs period longer than 1 millisecond.");
}
return new Stat<>() {
private final ConcurrentLinkedDeque<ValueAndTime<Double>> past =
new ConcurrentLinkedDeque<>();

@Override
public void record(Double value) {
past.add(new ValueAndTime<>(value, System.currentTimeMillis()));
popOutdated();
}

@Override
public Double measure() {
popOutdated();
return past.stream().mapToDouble(e -> e.value).average().orElse(Double.NaN);
}

private void popOutdated() {
var outdated = System.currentTimeMillis() - period.toMillis();
while (!past.isEmpty() && past.peekFirst().timestamp < outdated) {
past.poll();
}
}
};
}

public static class ValueAndTime<V> {
public final V value;
public final long timestamp;

public ValueAndTime(V value, long timestamp) {
this.value = value;
this.timestamp = timestamp;
}
}
}
32 changes: 32 additions & 0 deletions common/src/main/java/org/astraea/common/metrics/stats/Latest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.metrics.stats;

/** Return the latest recorded value */
public class Latest<V> implements Stat<V> {
private V latest;

@Override
public void record(V value) {
latest = value;
}

@Override
public V measure() {
return latest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface Stat<V> {

/** Make a readonly copy of this object. */
default Stat<V> snapshot() {
var value = measure();
var statistics = measure();
return new Stat<>() {
@Override
public void record(V ignore) {
Expand All @@ -33,7 +33,7 @@ public void record(V ignore) {

@Override
public V measure() {
return value;
return statistics;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,18 @@ void testMeasure() {
Mockito.when(stat1.measure()).thenReturn(1.0);
Mockito.when(stat2.measure()).thenReturn(2.0);

Assertions.assertEquals(1.0, sensor.measure("t1"));
Assertions.assertEquals(2.0, sensor.measure("t2"));
Assertions.assertEquals(1.0, (Double) sensor.measure("t1"));
Assertions.assertEquals(2.0, (Double) sensor.measure("t2"));
}

@Test
void testMetrics() {
var sensor = Sensor.builder().addStat("average", Avg.of()).build();
sensor.record(1.0);
var metrics = sensor.metrics();
Assertions.assertEquals(1.0, metrics.get("average").measure());
Assertions.assertEquals(1.0, (Double) metrics.get("average").measure());
sensor.record(2.0);
Assertions.assertEquals(1.0, metrics.get("average").measure());
Assertions.assertEquals(1.0, (Double) metrics.get("average").measure());
}

private Stat<Double> countRecord(AtomicInteger counter) {
Expand Down
23 changes: 23 additions & 0 deletions common/src/test/java/org/astraea/common/metrics/stats/AvgTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,27 @@ void testExpWeightByTime() throws InterruptedException {

Assertions.assertEquals(10 * 0.5 * 0.5 + 50 * 0.5, rateByTime.measure());
}

@Test
void testByTime() throws InterruptedException {
var byTime = Avg.ByTime(Duration.ofMillis(150));

byTime.record(10.0);
Assertions.assertEquals(10.0, byTime.measure());
byTime.record(14.0);
Assertions.assertEquals(12.0, byTime.measure());

Thread.sleep(20);

byTime.record(18.0);
Assertions.assertEquals(14.0, byTime.measure());

Thread.sleep(140);

Assertions.assertEquals(18.0, byTime.measure());

Thread.sleep(20);

Assertions.assertEquals(Double.NaN, byTime.measure());
}
}
17 changes: 10 additions & 7 deletions docs/performance_benchmark.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
2. End-to-End latency : 一筆record從producer端到consumer端的時間
3. Consume rate : consumer拉取資料的速率(MB/s)
4. Produce rate : producer送資料的速率(MB/s)
5. Produce record error rate : 發送資料時錯誤的頻率
6. rebalance latency : consumer group 平衡所花的時間
7. consumer assigned partitions : consumer 被 assigned partition 數量、數量差

#### Performance Benchmark Configurations

Expand Down Expand Up @@ -52,39 +55,39 @@
使用`docker`執行`performance benchmark`

```bash
docker/start_app.sh performance --bootstrap.servers localhost:9092
docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic
```

(localhost, 9092 替換成自己Kafka server 的 ip 和 port)

![performance_tool_demo](pictures/performance_tool_demo.jpg)
![performance_tool_demo](pictures/performance_tool_demo.png)

`performance benchmark`可以指定各種參數如資料大小、分佈、執行時間... 等等。全部參數可以參考上述表格。

以下僅列出一些使用範例:

```bash
# 開啟 1 個 producer 打 25 分鐘資料, 1 個 consumer 消費資料
docker/start_app.sh performance --bootstrap.servers localhost:9092 --run.until 25m
docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic --run.until 25m
```

```bash
# 開啟 1 個 producer ,打 10000 筆資料 且 沒有consumer
docker/start_app.sh performance --bootstrap.servers localhost:9092 --run.until 10000records --consumers 0
docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic --run.until 10000records --consumers 0
```

```bash
# 打50秒資料、每筆大小10KiB、固定大小、使用4個producer threads、10個consumer threads,指定topic名稱,producer送資料前使用 lz4 壓縮演算法
docker/start_app.sh performance --bootstrap.servers localhost:9092 --value.size 10KiB --value.distribution fixed --run.until 50s --producers 4 --consumers 10 --topic partition60Replica1 --compression lz4
docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic --value.size 10KiB --value.distribution fixed --run.until 50s --producers 4 --consumers 10 --topics MyTopic --configs compression.type=lz4
```

```bash
# 使用astraea的 partitioner ,傳入config檔案路徑,裡面可以放 partitioner 所需的參數,如jmx port等
docker/start_app.sh performance --bootstrap.servers localhost:9092 --partitioner org.astraea.app.partitioner.smooth.SmoothWeightRoundRobinDispatcher --prop.file ./config
docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic --partitioner org.astraea.common.partitioner.StrictCostDispatcher --prop.file ./config
```

```bash
# 使用 partitioner 框架,指定參考 Broker Input 做效能指標,把紀錄輸出到指定路徑。
docker/start_app.sh performance --bootstrap.servers localhost:9092 --partitioner org.astraea.common.partitioner.StrictCostDispatcher --configs org.astraea.common.cost.BrokerInputCost=1 --prop.file ./config --report.path ~/report
docker/start_app.sh performance --bootstrap.servers localhost:9092 --topics MyTopic --partitioner org.astraea.common.partitioner.StrictCostDispatcher --configs org.astraea.common.cost.BrokerInputCost=1 --prop.file ./config --report.path ~/report
```