Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance performance tool #710

Closed
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5ca0c50
add pattern subscribe
harryteng9527 Sep 13, 2022
4b3daa5
tracker support printing sticky partitions
harryteng9527 Sep 14, 2022
cc7d9c3
add sticky partitions to file writer
harryteng9527 Sep 14, 2022
9df8d5f
Merge branch 'main' into enhance-per-tool
harryteng9527 Sep 14, 2022
66f0d4b
spotless
harryteng9527 Sep 14, 2022
ae0db49
Merge 'main' into enhance-per-tool
harryteng9527 Sep 16, 2022
fe48a0f
fix conflit
harryteng9527 Sep 22, 2022
5ea22f2
Merge branch 'main' into enhance-per-tool
harryteng9527 Sep 23, 2022
c3bfc23
delete recording sticky partition of each consumer
harryteng9527 Sep 23, 2022
4f1164d
support printing sticky partition
harryteng9527 Sep 27, 2022
7402d26
Merge branch 'main' into enhance-per-tool
harryteng9527 Sep 27, 2022
a649f2a
fix TrackerTest#testConsumerPrinter
harryteng9527 Sep 27, 2022
6791cef
spotless
harryteng9527 Sep 27, 2022
1b56589
add recordListener to pass client ID & listener
harryteng9527 Sep 28, 2022
edc830b
Merge branch 'main' into enhance-per-tool
harryteng9527 Sep 28, 2022
06df7a9
Merge branch and fix conflict
harryteng9527 Oct 9, 2022
e415c24
delete RecordListener and record partition in consumer
harryteng9527 Oct 11, 2022
ccdabd5
Merge branch 'main' into enhance-per-tool
harryteng9527 Oct 11, 2022
c84cecb
Merge branch 'main' into enhance-per-tool
harryteng9527 Oct 12, 2022
f9d7f08
fix some style and add comments
harryteng9527 Oct 12, 2022
30a84ad
Merge branch 'main' into enhance-per-tool
harryteng9527 Oct 14, 2022
8647028
delete detecting client id if it's null
harryteng9527 Oct 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions app/src/main/java/org/astraea/app/performance/ConsumerThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.astraea.app.performance;

import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -38,6 +39,8 @@
public interface ConsumerThread extends AbstractThread {

ConcurrentMap<String, Set<TopicPartition>> CLIENT_ID_PARTITIONS = new ConcurrentHashMap<>();
ConcurrentMap<String, Set<TopicPartition>> CLIENT_ID_STICKY_PARTITIONS =
new ConcurrentHashMap<>();

static List<ConsumerThread> create(
int consumers,
Expand Down Expand Up @@ -72,13 +75,28 @@ static List<ConsumerThread> create(
executors.execute(
() -> {
try {
var assignments = consumer.assignments();
var generationId = consumer.generationId();
while (!closed.get()) {
if (subscribed.get()) consumer.resubscribe();
else {
consumer.unsubscribe();
assignments = Set.of();
CLIENT_ID_PARTITIONS.put(clientId, assignments);
CLIENT_ID_STICKY_PARTITIONS.put(clientId, assignments);
Utils.sleep(Duration.ofSeconds(1));
continue;
}
if (!assignments.containsAll(CLIENT_ID_PARTITIONS.get(clientId))
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
|| generationId < consumer.generationId()) {
// check if re-balance or not.
var nowPartitions = CLIENT_ID_PARTITIONS.get(clientId);
var stickyPartitions = new HashSet<>(nowPartitions);
stickyPartitions.retainAll(assignments);
CLIENT_ID_STICKY_PARTITIONS.put(clientId, stickyPartitions);
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
assignments = nowPartitions;
generationId = consumer.generationId();
}
consumer.poll(Duration.ofSeconds(1));
}
} catch (WakeupException ignore) {
Expand Down
58 changes: 42 additions & 16 deletions app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.astraea.common.DataRate;
import org.astraea.common.DataSize;
Expand All @@ -42,6 +43,7 @@
import org.astraea.common.argument.NonEmptyStringField;
import org.astraea.common.argument.NonNegativeShortField;
import org.astraea.common.argument.PathField;
import org.astraea.common.argument.PatternField;
import org.astraea.common.argument.PositiveIntegerField;
import org.astraea.common.argument.PositiveIntegerListField;
import org.astraea.common.argument.PositiveLongField;
Expand Down Expand Up @@ -91,22 +93,39 @@ public static List<String> execute(final Argument param)
param::createProducer,
param.interdependent);
var consumerThreads =
ConsumerThread.create(
param.consumers,
(clientId, listener) ->
Consumer.forTopics(new HashSet<>(param.topics))
.configs(param.configs())
.config(
ConsumerConfigs.ISOLATION_LEVEL_CONFIG,
param.transactionSize > 1
? ConsumerConfigs.ISOLATION_LEVEL_COMMITTED
: ConsumerConfigs.ISOLATION_LEVEL_UNCOMMITTED)
.bootstrapServers(param.bootstrapServers())
.config(ConsumerConfigs.GROUP_ID_CONFIG, param.groupId)
.seek(latestOffsets)
.consumerRebalanceListener(listener)
.config(ConsumerConfigs.CLIENT_ID_CONFIG, clientId)
.build());
param.pattern == null
? ConsumerThread.create(
param.consumers,
(clientId, listener) ->
Consumer.forTopics(new HashSet<>(param.topics))
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
.configs(param.configs())
.config(
ConsumerConfigs.ISOLATION_LEVEL_CONFIG,
param.transactionSize > 1
? ConsumerConfigs.ISOLATION_LEVEL_COMMITTED
: ConsumerConfigs.ISOLATION_LEVEL_UNCOMMITTED)
.bootstrapServers(param.bootstrapServers())
.config(ConsumerConfigs.GROUP_ID_CONFIG, param.groupId)
.seek(latestOffsets)
.consumerRebalanceListener(listener)
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
.config(ConsumerConfigs.CLIENT_ID_CONFIG, clientId)
.build())
: ConsumerThread.create(
param.consumers,
(clientId, listener) ->
Consumer.forTopics(param.pattern)
.configs(param.configs())
.config(
ConsumerConfigs.ISOLATION_LEVEL_CONFIG,
param.transactionSize > 1
? ConsumerConfigs.ISOLATION_LEVEL_COMMITTED
: ConsumerConfigs.ISOLATION_LEVEL_UNCOMMITTED)
.bootstrapServers(param.bootstrapServers())
.config(ConsumerConfigs.GROUP_ID_CONFIG, param.groupId)
.seek(latestOffsets)
.consumerRebalanceListener(listener)
.config(ConsumerConfigs.CLIENT_ID_CONFIG, clientId)
.build());

System.out.println("creating tracker");
var tracker =
Expand All @@ -132,6 +151,7 @@ public static List<String> execute(final Argument param)
? CompletableFuture.completedFuture(null)
: CompletableFuture.runAsync(
() -> {
Utils.sleep(param.chaosDuration);
while (!consumerThreads.stream().allMatch(AbstractThread::closed)) {
var thread =
consumerThreads.get((int) (Math.random() * consumerThreads.size()));
Expand Down Expand Up @@ -168,6 +188,12 @@ public static List<String> execute(final Argument param)

public static class Argument extends org.astraea.common.argument.Argument {

@Parameter(
names = {"--pattern"},
description = "Pattern: topic pattern which you subscribed",
converter = PatternField.class)
Pattern pattern = null;

@Parameter(
names = {"--topics"},
description = "List<String>: topic names which you subscribed",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -141,9 +142,13 @@ boolean tryToPrint(Duration duration) {
for (var i = 0; i < reports.size(); ++i) {
var report = reports.get(i);
var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst();
var clientId = report.clientId() == null ? "forTest" : report.clientId();
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
var stickyNumber =
ConsumerThread.CLIENT_ID_STICKY_PARTITIONS.getOrDefault(clientId, Set.of()).size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

所以這邊不是追蹤有哪些partitions正在消費中?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

這邊是用 client id 來找 sticky 的 partition 數量。

這邊不是追蹤有哪些partitions正在消費中?

追蹤每個 consumer 消費哪些 partitions 的是另外一個 Map : CLIENT_ID_PARTITIONS

if (ms.isPresent()) {
System.out.printf(
" consumer[%d] has %d partitions%n", i, (int) ms.get().assignedPartitions());
" consumer[%d] has %d partitions and %d sticky partitions%n",
i, (int) ms.get().assignedPartitions(), stickyNumber);
}
System.out.printf(
" consumed[%d] average throughput: %s%n",
Expand Down
26 changes: 26 additions & 0 deletions common/src/main/java/org/astraea/common/argument/PatternField.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.argument;

import java.util.regex.Pattern;

public class PatternField extends Field<Pattern> {
@Override
public Pattern convert(String value) {
return Pattern.compile(value);
}
}
18 changes: 15 additions & 3 deletions common/src/main/java/org/astraea/common/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import org.astraea.common.admin.TopicPartition;

/** An interface for polling records. */
Expand Down Expand Up @@ -64,11 +66,21 @@ default Collection<Record<Key, Value>> poll(Duration timeout) {
/**
* Create a consumer builder by setting specific topics
*
* @param topics set of topic names
* @param setTopics set of topic names
* @return consumer builder for topics
*/
static TopicsBuilder<byte[], byte[]> forTopics(Set<String> topics) {
return new TopicsBuilder<>(topics);
static TopicsBuilder<byte[], byte[]> forTopics(Set<String> setTopics) {
return new TopicsBuilder<>(setTopics);
}

/**
* Create a consumer builder by setting specific pattern
*
* @param patternTopics patterns of topic name
* @return consumer builder for topics
*/
static TopicsBuilder<byte[], byte[]> forTopics(Pattern patternTopics) {
return new TopicsBuilder<>(Objects.requireNonNull(patternTopics));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ public interface SubscribedConsumer<Key, Value> extends Consumer<Key, Value> {

/** @return group instance id (static member) */
Optional<String> groupInstanceId();

int generationId();
}
57 changes: 46 additions & 11 deletions common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,25 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.astraea.common.Utils;
import org.astraea.common.admin.TopicPartition;

public class TopicsBuilder<Key, Value> extends Builder<Key, Value> {
private final Set<String> topics;
private final Set<String> setTopics;
private final Pattern topicPattern;
private ConsumerRebalanceListener listener = ignore -> {};

TopicsBuilder(Set<String> topics) {
this.topics = requireNonNull(topics);
TopicsBuilder(Set<String> setTopics) {
this.topicPattern = null;
this.setTopics = requireNonNull(setTopics);
}

TopicsBuilder(Pattern patternTopics) {
this.setTopics = null;
this.topicPattern = requireNonNull(patternTopics);
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
}

public TopicsBuilder<Key, Value> consumerRebalanceListener(ConsumerRebalanceListener listener) {
Expand Down Expand Up @@ -98,8 +106,8 @@ public SubscribedConsumer<Key, Value> build() {
if (seekStrategy != SeekStrategy.NONE) {
// make sure this consumer is assigned before seeking
var latch = new CountDownLatch(1);
kafkaConsumer.subscribe(
topics, ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown())));
subscribe(kafkaConsumer, latch);

while (latch.getCount() != 0) {
// the offset will be reset, so it is fine to poll data
// TODO: should we disable auto-commit here?
Expand All @@ -108,25 +116,45 @@ public SubscribedConsumer<Key, Value> build() {
}
} else {
// nothing to seek so we just subscribe topics
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener)));
subscribe(kafkaConsumer, null);
}

seekStrategy.apply(kafkaConsumer, seekValue);

return new SubscribedConsumerImpl<>(kafkaConsumer, topics, listener);
return new SubscribedConsumerImpl<>(kafkaConsumer, setTopics, topicPattern, listener);
}

private void subscribe(KafkaConsumer<Key, Value> consumer, CountDownLatch latch) {
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
if (latch == null) {
if (setTopics == null)
consumer.subscribe(topicPattern, ConsumerRebalanceListener.of(List.of(listener)));
else consumer.subscribe(setTopics, ConsumerRebalanceListener.of(List.of(listener)));
} else {
if (setTopics == null)
consumer.subscribe(
topicPattern,
ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown())));
else
consumer.subscribe(
setTopics,
ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown())));
}
}

private static class SubscribedConsumerImpl<Key, Value> extends Builder.BaseConsumer<Key, Value>
implements SubscribedConsumer<Key, Value> {
private final Set<String> topics;
private final Set<String> setTopics;
private final ConsumerRebalanceListener listener;
private final Pattern patternTopics;

public SubscribedConsumerImpl(
org.apache.kafka.clients.consumer.Consumer<Key, Value> kafkaConsumer,
Set<String> topics,
Set<String> setTopics,
Pattern patternTopics,
ConsumerRebalanceListener listener) {
super(kafkaConsumer);
this.topics = topics;
this.setTopics = setTopics;
this.patternTopics = patternTopics;
this.listener = listener;
}

Expand All @@ -149,9 +177,16 @@ public Optional<String> groupInstanceId() {
return kafkaConsumer.groupMetadata().groupInstanceId();
}

@Override
public int generationId() {
return kafkaConsumer.groupMetadata().generationId();
}

@Override
protected void doResubscribe() {
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener)));
if (patternTopics == null)
kafkaConsumer.subscribe(setTopics, ConsumerRebalanceListener.of(List.of(listener)));
else kafkaConsumer.subscribe(patternTopics, ConsumerRebalanceListener.of(List.of(listener)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.argument;

import com.beust.jcommander.Parameter;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class PatternFieldTest {
private static class FakeParameter {
@Parameter(
names = {"--field"},
converter = PatternField.class)
Pattern value;
}

@Test
public void testConvert() {
var param = Argument.parse(new FakeParameter(), new String[] {"--field", "test.*"});

Assertions.assertTrue(param.value.matcher("test").matches());
Assertions.assertFalse(param.value.matcher("tes").matches());
}
}