Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance performance tool #710

Closed
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
5ca0c50
add pattern subscribe
harryteng9527 Sep 13, 2022
4b3daa5
tracker support printing sticky partitions
harryteng9527 Sep 14, 2022
cc7d9c3
add sticky partitions to file writer
harryteng9527 Sep 14, 2022
9df8d5f
Merge branch 'main' into enhance-per-tool
harryteng9527 Sep 14, 2022
66f0d4b
spotless
harryteng9527 Sep 14, 2022
ae0db49
Merge 'main' into enhance-per-tool
harryteng9527 Sep 16, 2022
fe48a0f
fix conflit
harryteng9527 Sep 22, 2022
5ea22f2
Merge branch 'main' into enhance-per-tool
harryteng9527 Sep 23, 2022
c3bfc23
delete recording sticky partition of each consumer
harryteng9527 Sep 23, 2022
4f1164d
support printing sticky partition
harryteng9527 Sep 27, 2022
7402d26
Merge branch 'main' into enhance-per-tool
harryteng9527 Sep 27, 2022
a649f2a
fix TrackerTest#testConsumerPrinter
harryteng9527 Sep 27, 2022
6791cef
spotless
harryteng9527 Sep 27, 2022
1b56589
add recordListener to pass client ID & listener
harryteng9527 Sep 28, 2022
edc830b
Merge branch 'main' into enhance-per-tool
harryteng9527 Sep 28, 2022
06df7a9
Merge branch and fix conflict
harryteng9527 Oct 9, 2022
e415c24
delete RecordListener and record partition in consumer
harryteng9527 Oct 11, 2022
ccdabd5
Merge branch 'main' into enhance-per-tool
harryteng9527 Oct 11, 2022
c84cecb
Merge branch 'main' into enhance-per-tool
harryteng9527 Oct 12, 2022
f9d7f08
fix some style and add comments
harryteng9527 Oct 12, 2022
30a84ad
Merge branch 'main' into enhance-per-tool
harryteng9527 Oct 14, 2022
8647028
delete detecting client id if it's null
harryteng9527 Oct 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.errors.WakeupException;
Expand All @@ -32,10 +33,10 @@
import org.astraea.common.consumer.SubscribedConsumer;

public interface ConsumerThread extends AbstractThread {

static List<ConsumerThread> create(
int consumers,
Function<ConsumerRebalanceListener, SubscribedConsumer<byte[], byte[]>> consumerSupplier) {
Function<ConsumerRebalanceListener, SubscribedConsumer<byte[], byte[]>> consumerSupplier,
Supplier<Performance.RecordListener> recordListener) {
if (consumers == 0) return List.of();
var closeLatches =
IntStream.range(0, consumers)
Expand All @@ -56,10 +57,13 @@ static List<ConsumerThread> create(
.mapToObj(
index -> {
@SuppressWarnings("resource")
var consumer = consumerSupplier.apply(ps -> {});
var listener = recordListener.get();
var consumer = consumerSupplier.apply(listener);
var closed = new AtomicBoolean(false);
var closeLatch = closeLatches.get(index);
var subscribed = new AtomicBoolean(true);
Performance.RecordListener.stickyNumbers.putIfAbsent(consumer.clientId(), 0);
listener.clientId(consumer.clientId());
executors.execute(
() -> {
try {
Expand All @@ -68,6 +72,8 @@ static List<ConsumerThread> create(
else {
consumer.unsubscribe();
Utils.sleep(Duration.ofSeconds(1));
Performance.RecordListener.stickyNumbers.put(consumer.clientId(), 0);
listener.flushPrevPartitions();
continue;
}
consumer.poll(Duration.ofSeconds(1));
Expand Down
77 changes: 66 additions & 11 deletions app/src/main/java/org/astraea/app/performance/Performance.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.astraea.common.DataRate;
import org.astraea.common.DataSize;
Expand All @@ -43,12 +46,14 @@
import org.astraea.common.argument.NonEmptyStringField;
import org.astraea.common.argument.NonNegativeShortField;
import org.astraea.common.argument.PathField;
import org.astraea.common.argument.PatternField;
import org.astraea.common.argument.PositiveIntegerListField;
import org.astraea.common.argument.PositiveLongField;
import org.astraea.common.argument.PositiveShortField;
import org.astraea.common.argument.StringListField;
import org.astraea.common.argument.TopicPartitionField;
import org.astraea.common.consumer.Consumer;
import org.astraea.common.consumer.ConsumerRebalanceListener;
import org.astraea.common.consumer.Isolation;
import org.astraea.common.producer.Acks;
import org.astraea.common.producer.Producer;
Expand All @@ -73,6 +78,7 @@ private static DataSupplier dataSupplier(Performance.Argument argument) {

public static List<String> execute(final Argument param)
throws InterruptedException, IOException {
Supplier<RecordListener> recordListener = () -> new RecordListener();
// always try to init topic even though it may be existent already.
System.out.println("checking topics: " + String.join(",", param.topics));
param.checkTopics();
Expand All @@ -89,17 +95,31 @@ public static List<String> execute(final Argument param)
param.producers,
param::createProducer);
var consumerThreads =
ConsumerThread.create(
param.consumers,
listener ->
Consumer.forTopics(new HashSet<>(param.topics))
.bootstrapServers(param.bootstrapServers())
.groupId(param.groupId)
.configs(param.configs())
.isolation(param.isolation())
.seek(latestOffsets)
.consumerRebalanceListener(listener)
.build());
param.pattern == null
? ConsumerThread.create(
param.consumers,
listener ->
Consumer.forTopics(new HashSet<>(param.topics))
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
.bootstrapServers(param.bootstrapServers())
.groupId(param.groupId)
.configs(param.configs())
.isolation(param.isolation())
.seek(latestOffsets)
.consumerRebalanceListener(listener)
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
.build(),
recordListener)
: ConsumerThread.create(
param.consumers,
listener ->
Consumer.forTopics(param.pattern)
.bootstrapServers(param.bootstrapServers())
.groupId(param.groupId)
.configs(param.configs())
.isolation(param.isolation())
.seek(latestOffsets)
.consumerRebalanceListener(listener)
.build(),
recordListener);

System.out.println("creating tracker");
var tracker =
Expand All @@ -125,6 +145,7 @@ public static List<String> execute(final Argument param)
? CompletableFuture.completedFuture(null)
: CompletableFuture.runAsync(
() -> {
Utils.sleep(param.chaosDuration);
while (!consumerThreads.stream().allMatch(AbstractThread::closed)) {
var thread =
consumerThreads.get((int) (Math.random() * consumerThreads.size()));
Expand Down Expand Up @@ -160,6 +181,12 @@ public static List<String> execute(final Argument param)
}

public static class Argument extends org.astraea.common.argument.Argument {
@Parameter(
names = {"--pattern"},
description = "Pattern: topic pattern which you subscribed",
converter = PatternField.class)
Pattern pattern = null;

@Parameter(
names = {"--topics"},
description = "List<String>: topic names which you subscribed",
Expand Down Expand Up @@ -406,4 +433,32 @@ else if (specifiedByBroker) {
converter = DurationField.class)
Duration readIdle = Duration.ofSeconds(2);
}

static class RecordListener implements ConsumerRebalanceListener {
public static ConcurrentMap<String, Integer> stickyNumbers = new ConcurrentHashMap<>();
private Set<TopicPartition> prevPartitions = new HashSet<>();
private Set<TopicPartition> nowPartitions = new HashSet<>();
private String clientId = "temp";

public void clientId(String clientId) {
this.clientId = clientId;
}

public void flushPrevPartitions() {
prevPartitions.clear();
}

@Override
public void onPartitionAssigned(Set<TopicPartition> partitions) {
var diffPartitions = new HashSet<>(partitions);
nowPartitions = partitions;
diffPartitions.retainAll(prevPartitions);
stickyNumbers.put(clientId, diffPartitions.size());
}

@Override
public void onPartitionsRevoked(Set<TopicPartition> partitions) {
prevPartitions = partitions;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,12 @@ boolean tryToPrint(Duration duration) {
for (var i = 0; i < reports.size(); ++i) {
var report = reports.get(i);
var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst();
var clientId = report.clientId() == null ? "temp" : report.clientId();
var stickyNumber = Performance.RecordListener.stickyNumbers.get(clientId);
if (ms.isPresent()) {
System.out.printf(
" consumer[%d] has %d partitions%n", i, (int) ms.get().assignedPartitions());
" consumer[%d] has %d partitions and %d sticky partitions%n",
i, (int) ms.get().assignedPartitions(), stickyNumber);
}
System.out.printf(
" consumed[%d] average throughput: %s%n",
Expand Down
26 changes: 26 additions & 0 deletions common/src/main/java/org/astraea/common/argument/PatternField.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.argument;

import java.util.regex.Pattern;

public class PatternField extends Field<Pattern> {
@Override
public Pattern convert(String value) {
return Pattern.compile(value);
}
}
18 changes: 15 additions & 3 deletions common/src/main/java/org/astraea/common/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Pattern;
import org.astraea.common.admin.TopicPartition;

/** An interface for polling records. */
Expand Down Expand Up @@ -64,11 +66,21 @@ default Collection<Record<Key, Value>> poll(Duration timeout) {
/**
* Create a consumer builder by setting specific topics
*
* @param topics set of topic names
* @param setTopics set of topic names
* @return consumer builder for topics
*/
static TopicsBuilder<byte[], byte[]> forTopics(Set<String> topics) {
return new TopicsBuilder<>(topics);
static TopicsBuilder<byte[], byte[]> forTopics(Set<String> setTopics) {
return new TopicsBuilder<>(setTopics);
}

/**
* Create a consumer builder by setting specific pattern
*
* @param patternTopics patterns of topic name
* @return consumer builder for topics
*/
static TopicsBuilder<byte[], byte[]> forTopics(Pattern patternTopics) {
return new TopicsBuilder<>(Objects.requireNonNull(patternTopics));
}

/**
Expand Down
52 changes: 41 additions & 11 deletions common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,26 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.astraea.common.Utils;
import org.astraea.common.admin.TopicPartition;

public class TopicsBuilder<Key, Value> extends Builder<Key, Value> {
private final Set<String> topics;
private final Set<String> setTopics;
private final Pattern topicPattern;
private ConsumerRebalanceListener listener = ignore -> {};

TopicsBuilder(Set<String> topics) {
this.topics = requireNonNull(topics);
TopicsBuilder(Set<String> setTopics) {
this.topicPattern = null;
this.setTopics = requireNonNull(setTopics);
}

TopicsBuilder(Pattern patternTopics) {
this.setTopics = null;
this.topicPattern = requireNonNull(patternTopics);
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
}

public TopicsBuilder<Key, Value> groupId(String groupId) {
Expand Down Expand Up @@ -145,8 +153,8 @@ public SubscribedConsumer<Key, Value> build() {
if (seekStrategy != SeekStrategy.NONE) {
// make sure this consumer is assigned before seeking
var latch = new CountDownLatch(1);
kafkaConsumer.subscribe(
topics, ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown())));
subscribe(kafkaConsumer, latch);

while (latch.getCount() != 0) {
// the offset will be reset, so it is fine to poll data
// TODO: should we disable auto-commit here?
Expand All @@ -155,25 +163,45 @@ public SubscribedConsumer<Key, Value> build() {
}
} else {
// nothing to seek so we just subscribe topics
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener)));
subscribe(kafkaConsumer, null);
}

seekStrategy.apply(kafkaConsumer, seekValue);

return new SubscribedConsumerImpl<>(kafkaConsumer, topics, listener);
return new SubscribedConsumerImpl<>(kafkaConsumer, setTopics, topicPattern, listener);
}

private void subscribe(KafkaConsumer<Key, Value> consumer, CountDownLatch latch) {
harryteng9527 marked this conversation as resolved.
Show resolved Hide resolved
if (latch == null) {
if (setTopics == null)
consumer.subscribe(topicPattern, ConsumerRebalanceListener.of(List.of(listener)));
else consumer.subscribe(setTopics, ConsumerRebalanceListener.of(List.of(listener)));
} else {
if (setTopics == null)
consumer.subscribe(
topicPattern,
ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown())));
else
consumer.subscribe(
setTopics,
ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown())));
}
}

private static class SubscribedConsumerImpl<Key, Value> extends Builder.BaseConsumer<Key, Value>
implements SubscribedConsumer<Key, Value> {
private final Set<String> topics;
private final Set<String> setTopics;
private final ConsumerRebalanceListener listener;
private final Pattern patternTopics;

public SubscribedConsumerImpl(
org.apache.kafka.clients.consumer.Consumer<Key, Value> kafkaConsumer,
Set<String> topics,
Set<String> setTopics,
Pattern patternTopics,
ConsumerRebalanceListener listener) {
super(kafkaConsumer);
this.topics = topics;
this.setTopics = setTopics;
this.patternTopics = patternTopics;
this.listener = listener;
}

Expand All @@ -198,7 +226,9 @@ public Optional<String> groupInstanceId() {

@Override
protected void doResubscribe() {
kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener)));
if (patternTopics == null)
kafkaConsumer.subscribe(setTopics, ConsumerRebalanceListener.of(List.of(listener)));
else kafkaConsumer.subscribe(patternTopics, ConsumerRebalanceListener.of(List.of(listener)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.astraea.common.argument;

import com.beust.jcommander.Parameter;
import java.util.regex.Pattern;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class PatternFieldTest {
private static class FakeParameter {
@Parameter(
names = {"--field"},
converter = PatternField.class)
Pattern value;
}

@Test
public void testConvert() {
var param = Argument.parse(new FakeParameter(), new String[] {"--field", "test.*"});

Assertions.assertTrue(param.value.matcher("test").matches());
Assertions.assertFalse(param.value.matcher("tes").matches());
}
}