From 5ca0c50a60221828f5b316913479c3c0599a941e Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Tue, 13 Sep 2022 18:10:56 +0800 Subject: [PATCH 01/12] add pattern subscribe --- .../astraea/app/performance/Performance.java | 42 +++++++++++---- .../astraea/common/argument/PatternField.java | 26 ++++++++++ .../org/astraea/common/consumer/Consumer.java | 17 ++++-- .../common/consumer/TopicsBuilder.java | 52 +++++++++++++++---- .../common/argument/PatternFieldTest.java | 39 ++++++++++++++ 5 files changed, 151 insertions(+), 25 deletions(-) create mode 100644 common/src/main/java/org/astraea/common/argument/PatternField.java create mode 100644 common/src/test/java/org/astraea/common/argument/PatternFieldTest.java diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index 4352133279..11e3e988da 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.astraea.common.DataRate; import org.astraea.common.DataSize; @@ -45,6 +46,7 @@ import org.astraea.common.argument.NonEmptyStringField; import org.astraea.common.argument.NonNegativeShortField; import org.astraea.common.argument.PathField; +import org.astraea.common.argument.PatternField; import org.astraea.common.argument.PositiveIntegerListField; import org.astraea.common.argument.PositiveLongField; import org.astraea.common.argument.PositiveShortField; @@ -94,17 +96,29 @@ public static List execute(final Argument param) param.producers, param::createProducer); var consumerThreads = - ConsumerThread.create( - param.consumers, - listener -> - Consumer.forTopics(topicSet) - .bootstrapServers(param.bootstrapServers()) - .groupId(param.groupId) - .configs(param.configs()) - .isolation(param.isolation()) - .seek(latestOffsets) - .consumerRebalanceListener(listener) - .build()); + param.pattern == null + ? ConsumerThread.create( + param.consumers, + listener -> + Consumer.forTopics(topicSet) + .bootstrapServers(param.bootstrapServers()) + .groupId(param.groupId) + .configs(param.configs()) + .isolation(param.isolation()) + .seek(latestOffsets) + .consumerRebalanceListener(listener) + .build()) + : ConsumerThread.create( + param.consumers, + listener -> + Consumer.forTopics(param.pattern) + .bootstrapServers(param.bootstrapServers()) + .groupId(param.groupId) + .configs(param.configs()) + .isolation(param.isolation()) + .seek(latestOffsets) + .consumerRebalanceListener(listener) + .build()); Supplier> producerReporter = () -> @@ -179,6 +193,12 @@ public static class Argument extends org.astraea.common.argument.Argument { private final List defaultTopics = List.of("testPerformance-" + System.currentTimeMillis()); + @Parameter( + names = {"--pattern"}, + description = "Pattern: topic pattern which you subscribed", + converter = PatternField.class) + Pattern pattern = null; + @Parameter( names = {"--topics"}, description = "List: topic names which you subscribed", diff --git a/common/src/main/java/org/astraea/common/argument/PatternField.java b/common/src/main/java/org/astraea/common/argument/PatternField.java new file mode 100644 index 0000000000..8c6e82ed15 --- /dev/null +++ b/common/src/main/java/org/astraea/common/argument/PatternField.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.argument; + +import java.util.regex.Pattern; + +public class PatternField extends Field { + @Override + public Pattern convert(String value) { + return Pattern.compile(value); + } +} diff --git a/common/src/main/java/org/astraea/common/consumer/Consumer.java b/common/src/main/java/org/astraea/common/consumer/Consumer.java index 09b466f034..bedfa384a6 100644 --- a/common/src/main/java/org/astraea/common/consumer/Consumer.java +++ b/common/src/main/java/org/astraea/common/consumer/Consumer.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Collection; import java.util.Set; +import java.util.regex.Pattern; import org.astraea.common.admin.TopicPartition; /** An interface for polling records. */ @@ -64,11 +65,21 @@ default Collection> poll(Duration timeout) { /** * Create a consumer builder by setting specific topics * - * @param topics set of topic names + * @param setTopics set of topic names * @return consumer builder for topics */ - static TopicsBuilder forTopics(Set topics) { - return new TopicsBuilder<>(topics); + static TopicsBuilder forTopics(Set setTopics) { + return new TopicsBuilder<>(setTopics); + } + + /** + * Create a consumer builder by setting specific pattern + * + * @param patternTopics patterns of topic name + * @return consumer builder for topics + */ + static TopicsBuilder forTopics(Pattern patternTopics) { + return new TopicsBuilder<>(patternTopics); } /** diff --git a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java index 30006b4b9c..d6bda9bef2 100644 --- a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java +++ b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java @@ -25,6 +25,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -32,11 +33,18 @@ import org.astraea.common.admin.TopicPartition; public class TopicsBuilder extends Builder { - private final Set topics; + private final Set setTopics; + private final Pattern topicPattern; private ConsumerRebalanceListener listener = ignore -> {}; - TopicsBuilder(Set topics) { - this.topics = requireNonNull(topics); + TopicsBuilder(Set setTopics) { + this.topicPattern = null; + this.setTopics = requireNonNull(setTopics); + } + + TopicsBuilder(Pattern patternTopics) { + this.setTopics = null; + this.topicPattern = requireNonNull(patternTopics); } public TopicsBuilder groupId(String groupId) { @@ -145,8 +153,8 @@ public SubscribedConsumer build() { if (seekStrategy != SeekStrategy.NONE) { // make sure this consumer is assigned before seeking var latch = new CountDownLatch(1); - kafkaConsumer.subscribe( - topics, ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown()))); + subscribe(kafkaConsumer, latch); + while (latch.getCount() != 0) { // the offset will be reset, so it is fine to poll data // TODO: should we disable auto-commit here? @@ -155,25 +163,45 @@ public SubscribedConsumer build() { } } else { // nothing to seek so we just subscribe topics - kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener))); + subscribe(kafkaConsumer, null); } seekStrategy.apply(kafkaConsumer, seekValue); - return new SubscribedConsumerImpl<>(kafkaConsumer, topics, listener); + return new SubscribedConsumerImpl<>(kafkaConsumer, setTopics, topicPattern, listener); + } + + private void subscribe(KafkaConsumer consumer, CountDownLatch latch) { + if (latch == null) { + if (setTopics == null) + consumer.subscribe(topicPattern, ConsumerRebalanceListener.of(List.of(listener))); + else consumer.subscribe(setTopics, ConsumerRebalanceListener.of(List.of(listener))); + } else { + if (setTopics == null) + consumer.subscribe( + topicPattern, + ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown()))); + else + consumer.subscribe( + setTopics, + ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown()))); + } } private static class SubscribedConsumerImpl extends Builder.BaseConsumer implements SubscribedConsumer { - private final Set topics; + private final Set setTopics; private final ConsumerRebalanceListener listener; + private final Pattern patternTopics; public SubscribedConsumerImpl( org.apache.kafka.clients.consumer.Consumer kafkaConsumer, - Set topics, + Set setTopics, + Pattern patternTopics, ConsumerRebalanceListener listener) { super(kafkaConsumer); - this.topics = topics; + this.setTopics = setTopics; + this.patternTopics = patternTopics; this.listener = listener; } @@ -198,7 +226,9 @@ public Optional groupInstanceId() { @Override protected void doResubscribe() { - kafkaConsumer.subscribe(topics, ConsumerRebalanceListener.of(List.of(listener))); + if (patternTopics == null) + kafkaConsumer.subscribe(setTopics, ConsumerRebalanceListener.of(List.of(listener))); + else kafkaConsumer.subscribe(patternTopics, ConsumerRebalanceListener.of(List.of(listener))); } @Override diff --git a/common/src/test/java/org/astraea/common/argument/PatternFieldTest.java b/common/src/test/java/org/astraea/common/argument/PatternFieldTest.java new file mode 100644 index 0000000000..a31cb31b53 --- /dev/null +++ b/common/src/test/java/org/astraea/common/argument/PatternFieldTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.astraea.common.argument; + +import com.beust.jcommander.Parameter; +import java.util.regex.Pattern; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class PatternFieldTest { + private static class FakeParameter { + @Parameter( + names = {"--field"}, + converter = PatternField.class) + Pattern value; + } + + @Test + public void testConvert() { + var param = Argument.parse(new FakeParameter(), new String[] {"--field", "test.*"}); + + Assertions.assertTrue(param.value.matcher("test").matches()); + Assertions.assertFalse(param.value.matcher("tes").matches()); + } +} From 4b3daa539fd49d3631ebb7351f33d9a6bfe20839 Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Wed, 14 Sep 2022 13:50:58 +0800 Subject: [PATCH 02/12] tracker support printing sticky partitions --- .../app/performance/ConsumerThread.java | 12 +++++++++++ .../astraea/app/performance/Performance.java | 1 + .../org/astraea/app/performance/Report.java | 21 ++++++++++++++++++- .../app/performance/TrackerThread.java | 3 ++- .../common/consumer/SubscribedConsumer.java | 3 +++ .../common/consumer/TopicsBuilder.java | 5 +++++ 6 files changed, 43 insertions(+), 2 deletions(-) diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index 3974f9c95e..dedc4c8431 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -17,6 +17,7 @@ package org.astraea.app.performance; import java.time.Duration; +import java.util.HashSet; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -64,13 +65,24 @@ static List create( executors.execute( () -> { try { + var generationId = consumer.generationId(); + var assignments = consumer.assignments(); while (!closed.get()) { if (subscribed.get()) consumer.resubscribe(); else { consumer.unsubscribe(); + report.recordSticky(assignments, new HashSet<>()); + assignments = new HashSet<>(); Utils.sleep(Duration.ofSeconds(1)); continue; } + if (consumer.generationId() != generationId) { + generationId = consumer.generationId(); + var preAssignments = new HashSet<>(assignments); + assignments = consumer.assignments(); + var nowAssignments = new HashSet<>(assignments); + report.recordSticky(preAssignments, nowAssignments); + } consumer .poll(Duration.ofSeconds(1)) .forEach( diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index 11e3e988da..0ec68899e6 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -154,6 +154,7 @@ public static List execute(final Argument param) ? CompletableFuture.completedFuture(null) : CompletableFuture.runAsync( () -> { + Utils.sleep(param.chaosDuration); while (!consumerThreads.stream().allMatch(AbstractThread::closed)) { var thread = consumerThreads.get((int) (Math.random() * consumerThreads.size())); diff --git a/app/src/main/java/org/astraea/app/performance/Report.java b/app/src/main/java/org/astraea/app/performance/Report.java index ffb9572cdf..29e87178ef 100644 --- a/app/src/main/java/org/astraea/app/performance/Report.java +++ b/app/src/main/java/org/astraea/app/performance/Report.java @@ -16,7 +16,9 @@ */ package org.astraea.app.performance; +import java.util.Set; import java.util.function.Supplier; +import org.astraea.common.admin.TopicPartition; import org.astraea.common.metrics.MBeanClient; import org.astraea.common.metrics.client.consumer.ConsumerMetrics; import org.astraea.common.metrics.client.consumer.HasConsumerFetchMetrics; @@ -49,9 +51,13 @@ static long recordsConsumedTotal() { void record(long latency, int bytes); + void recordSticky(Set preAssignments, Set nowAssignments); + + int stickyPartitions(); + static Report of(String clientId, Supplier isClosed) { return new Report() { - + private Set diffAssignments; private double avgLatency = 0; private long records = 0; private long max = 0; @@ -104,6 +110,19 @@ public boolean isClosed() { public String clientId() { return clientId; } + + @Override + public int stickyPartitions() { + if (diffAssignments == null) return 0; + return diffAssignments.size(); + } + + @Override + public void recordSticky( + Set preAssignments, Set nowAssignments) { + if (nowAssignments != null) nowAssignments.retainAll(preAssignments); + this.diffAssignments = nowAssignments; + } }; } } diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index 4629ffc958..6f9b53d6d5 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -138,7 +138,8 @@ boolean tryToPrint(Duration duration) { var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst(); if (ms.isPresent()) { System.out.printf( - " consumer[%d] has %d partitions%n", i, (int) ms.get().assignedPartitions()); + " consumer[%d] has %d partitions and %d partitions sticky%n", + i, (int) ms.get().assignedPartitions(), report.stickyPartitions()); } System.out.printf( " consumer[%d] average throughput: %.3f MB%n", diff --git a/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java b/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java index cdd75af145..0992226f4d 100644 --- a/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java +++ b/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java @@ -42,4 +42,7 @@ public interface SubscribedConsumer extends Consumer { /** @return group instance id (static member) */ Optional groupInstanceId(); + + /** @return consumer group's generation */ + int generationId(); } diff --git a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java index d6bda9bef2..69f4512ef4 100644 --- a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java +++ b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java @@ -237,5 +237,10 @@ public Set assignments() { .map(TopicPartition::from) .collect(Collectors.toUnmodifiableSet()); } + + @Override + public int generationId() { + return kafkaConsumer.groupMetadata().generationId(); + } } } From cc7d9c3b0f49105ebe1e927711160b1f0fa6e661 Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Wed, 14 Sep 2022 14:32:08 +0800 Subject: [PATCH 03/12] add sticky partitions to file writer --- .../main/java/org/astraea/app/performance/ReportFormat.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/app/src/main/java/org/astraea/app/performance/ReportFormat.java b/app/src/main/java/org/astraea/app/performance/ReportFormat.java index 4ae5c74e38..366162cf9c 100644 --- a/app/src/main/java/org/astraea/app/performance/ReportFormat.java +++ b/app/src/main/java/org/astraea/app/performance/ReportFormat.java @@ -34,6 +34,7 @@ import java.util.stream.IntStream; import org.astraea.common.EnumInfo; import org.astraea.common.Utils; +import org.astraea.common.metrics.client.consumer.HasConsumerCoordinatorMetrics; public enum ReportFormat implements EnumInfo { CSV("csv"), @@ -215,6 +216,10 @@ private static List latencyAndIO( CSVContentElement.create( "Consumer[" + i + "] average publish latency (ms)", () -> Double.toString(consumerReports.get(i).avgLatency()))); + elements.add( + CSVContentElement.create( + "Consumer[" + i + "] sticky partitions", + () -> Integer.toString(consumerReports.get(i).stickyPartitions()))); }); return elements; } From 66f0d4bf41c627660522dd2e6a5c5d485200f4f5 Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Wed, 14 Sep 2022 14:43:39 +0800 Subject: [PATCH 04/12] spotless --- app/src/main/java/org/astraea/app/performance/ReportFormat.java | 1 - 1 file changed, 1 deletion(-) diff --git a/app/src/main/java/org/astraea/app/performance/ReportFormat.java b/app/src/main/java/org/astraea/app/performance/ReportFormat.java index 366162cf9c..8940b78a2f 100644 --- a/app/src/main/java/org/astraea/app/performance/ReportFormat.java +++ b/app/src/main/java/org/astraea/app/performance/ReportFormat.java @@ -34,7 +34,6 @@ import java.util.stream.IntStream; import org.astraea.common.EnumInfo; import org.astraea.common.Utils; -import org.astraea.common.metrics.client.consumer.HasConsumerCoordinatorMetrics; public enum ReportFormat implements EnumInfo { CSV("csv"), From c3bfc237d8a5ccc9684d735c68a1cbb554b918e5 Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Fri, 23 Sep 2022 17:11:29 +0800 Subject: [PATCH 05/12] delete recording sticky partition of each consumer --- .../app/performance/ConsumerThread.java | 3 --- .../app/performance/TrackerThread.java | 4 +-- .../org/astraea/common/consumer/Consumer.java | 3 ++- .../common/consumer/SubscribedConsumer.java | 3 --- .../common/consumer/TopicsBuilder.java | 25 ++----------------- 5 files changed, 6 insertions(+), 32 deletions(-) diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index e7182124e1..fd5e742a43 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -69,9 +69,6 @@ static List create( consumer.unsubscribe(); Utils.sleep(Duration.ofSeconds(1)); continue; - } - if(consumer.checkRebalance()) { - } consumer.poll(Duration.ofSeconds(1)); } diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index 202fd96f2e..5a64587cbe 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -143,8 +143,8 @@ boolean tryToPrint(Duration duration) { var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst(); if (ms.isPresent()) { System.out.printf( - " consumer[%d] has %d partitions and %d partitions sticky%n", - i, (int) ms.get().assignedPartitions(),1); //TODO + " consumer[%d] has %d partitions%n", + i, (int) ms.get().assignedPartitions()); } System.out.printf( " consumed[%d] average throughput: %s%n", diff --git a/common/src/main/java/org/astraea/common/consumer/Consumer.java b/common/src/main/java/org/astraea/common/consumer/Consumer.java index bedfa384a6..3d7016c2f5 100644 --- a/common/src/main/java/org/astraea/common/consumer/Consumer.java +++ b/common/src/main/java/org/astraea/common/consumer/Consumer.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.Collection; +import java.util.Objects; import java.util.Set; import java.util.regex.Pattern; import org.astraea.common.admin.TopicPartition; @@ -79,7 +80,7 @@ static TopicsBuilder forTopics(Set setTopics) { * @return consumer builder for topics */ static TopicsBuilder forTopics(Pattern patternTopics) { - return new TopicsBuilder<>(patternTopics); + return new TopicsBuilder<>(Objects.requireNonNull(patternTopics)); } /** diff --git a/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java b/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java index b9bfcc25bc..40b1480728 100644 --- a/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java +++ b/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java @@ -46,7 +46,4 @@ public interface SubscribedConsumer extends Consumer { /** @return group instance id (static member) */ Optional groupInstanceId(); - Set stickyPartitions(); - - boolean checkRebalance(); } diff --git a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java index 2c35eeaad3..f11a87c90e 100644 --- a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java +++ b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java @@ -19,7 +19,6 @@ import static java.util.Objects.requireNonNull; import java.time.Duration; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -194,9 +193,8 @@ private static class SubscribedConsumerImpl extends Builder.BaseCons private final Set setTopics; private final ConsumerRebalanceListener listener; private final Pattern patternTopics; - private int generationId; - private Set prevAssignments; - private Set nowAssignments; + + public SubscribedConsumerImpl( org.apache.kafka.clients.consumer.Consumer kafkaConsumer, Set setTopics, @@ -206,8 +204,6 @@ public SubscribedConsumerImpl( this.setTopics = setTopics; this.patternTopics = patternTopics; this.listener = listener; - prevAssignments=Set.of(); - nowAssignments = Set.of(); } @Override @@ -243,22 +239,5 @@ public Set assignments() { .collect(Collectors.toUnmodifiableSet()); } - @Override - public Set stickyPartitions() { - var diffAssignments = new HashSet<>(nowAssignments); - diffAssignments.retainAll(prevAssignments); - return diffAssignments; - } - @Override - public boolean checkRebalance() { - int nowGeneration = kafkaConsumer.groupMetadata().generationId(); - if(generationId != nowGeneration) { - generationId = nowGeneration; - prevAssignments = nowAssignments; - nowAssignments = assignments(); - return true; - } - return false; - } } } From 4f1164d9686904b8bf11f89dd838fa2a90e3e06f Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Tue, 27 Sep 2022 15:57:51 +0800 Subject: [PATCH 06/12] support printing sticky partition --- .../app/performance/ConsumerThread.java | 12 ++++-- .../astraea/app/performance/Performance.java | 40 ++++++++++++++++++- .../org/astraea/app/performance/Report.java | 6 --- .../app/performance/TrackerThread.java | 5 ++- .../common/consumer/SubscribedConsumer.java | 4 -- .../common/consumer/TopicsBuilder.java | 2 - .../client/consumer/ConsumerMetrics.java | 1 - 7 files changed, 50 insertions(+), 20 deletions(-) diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index fd5e742a43..fd37d30c31 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.kafka.common.errors.WakeupException; @@ -32,10 +33,10 @@ import org.astraea.common.consumer.SubscribedConsumer; public interface ConsumerThread extends AbstractThread { - static List create( int consumers, - Function> consumerSupplier) { + Function> consumerSupplier, + Supplier recordListener) { if (consumers == 0) return List.of(); var closeLatches = IntStream.range(0, consumers) @@ -56,10 +57,13 @@ static List create( .mapToObj( index -> { @SuppressWarnings("resource") - var consumer = consumerSupplier.apply(ps -> {}); + var listener = recordListener.get(); + var consumer = consumerSupplier.apply(listener); var closed = new AtomicBoolean(false); var closeLatch = closeLatches.get(index); var subscribed = new AtomicBoolean(true); + Performance.RecordListener.stickyNumbers.putIfAbsent(consumer.clientId(), 0); + listener.clientId(consumer.clientId()); executors.execute( () -> { try { @@ -68,6 +72,8 @@ static List create( else { consumer.unsubscribe(); Utils.sleep(Duration.ofSeconds(1)); + Performance.RecordListener.stickyNumbers.put(consumer.clientId(), 0); + listener.flushPrevPartitions(); continue; } consumer.poll(Duration.ofSeconds(1)); diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index ad0243d422..b0a42370d1 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -27,6 +27,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -50,6 +52,7 @@ import org.astraea.common.argument.StringListField; import org.astraea.common.argument.TopicPartitionField; import org.astraea.common.consumer.Consumer; +import org.astraea.common.consumer.ConsumerRebalanceListener; import org.astraea.common.consumer.Isolation; import org.astraea.common.producer.Acks; import org.astraea.common.producer.Producer; @@ -74,6 +77,7 @@ private static DataSupplier dataSupplier(Performance.Argument argument) { public static List execute(final Argument param) throws InterruptedException, IOException { + Supplier recordListener = () -> new RecordListener(); // always try to init topic even though it may be existent already. System.out.println("checking topics: " + String.join(",", param.topics)); param.checkTopics(); @@ -101,7 +105,8 @@ public static List execute(final Argument param) .isolation(param.isolation()) .seek(latestOffsets) .consumerRebalanceListener(listener) - .build()) + .build(), + recordListener) : ConsumerThread.create( param.consumers, listener -> @@ -112,7 +117,8 @@ public static List execute(final Argument param) .isolation(param.isolation()) .seek(latestOffsets) .consumerRebalanceListener(listener) - .build()); + .build(), + recordListener); System.out.println("creating tracker"); var tracker = @@ -426,4 +432,34 @@ else if (specifiedByBroker) { converter = DurationField.class) Duration readIdle = Duration.ofSeconds(2); } + + static class RecordListener implements ConsumerRebalanceListener { + public static ConcurrentMap stickyNumbers = new ConcurrentHashMap<>(); + private Set prevPartitions = new HashSet<>(); + private Set nowPartitions = new HashSet<>(); + private String clientId = "temp"; + + public void clientId(String clientId) { + this.clientId = clientId; + } + + public void flushPrevPartitions() { + prevPartitions.clear(); + } + + @Override + public void onPartitionAssigned(Set partitions) { + var diffPartitions = new HashSet<>(partitions); + nowPartitions = partitions; + diffPartitions.retainAll(prevPartitions); + stickyNumbers.put(clientId, diffPartitions.size()); + System.out.println(clientId + " now assignment = " + nowPartitions); + System.out.println(clientId + " previous assignment = " + prevPartitions); + } + + @Override + public void onPartitionsRevoked(Set partitions) { + prevPartitions = partitions; + } + } } diff --git a/app/src/main/java/org/astraea/app/performance/Report.java b/app/src/main/java/org/astraea/app/performance/Report.java index 6ebc921d3d..22ba51962f 100644 --- a/app/src/main/java/org/astraea/app/performance/Report.java +++ b/app/src/main/java/org/astraea/app/performance/Report.java @@ -16,8 +16,6 @@ */ package org.astraea.app.performance; -import java.util.Set; -import org.astraea.common.admin.TopicPartition; import java.util.List; import java.util.stream.Collectors; import org.astraea.common.metrics.MBeanClient; @@ -41,7 +39,6 @@ static List consumers() { .map( m -> new Report() { - private Set diffAssignments; @Override public long records() { return (long) m.recordsConsumedTotal(); @@ -109,8 +106,6 @@ public double avgThroughput() { public String clientId() { return m.clientId(); } - - }) .collect(Collectors.toList()); } @@ -130,5 +125,4 @@ public String clientId() { String clientId(); - } diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index 5a64587cbe..49d427d4a1 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -141,10 +141,11 @@ boolean tryToPrint(Duration duration) { for (var i = 0; i < reports.size(); ++i) { var report = reports.get(i); var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst(); + var stickyNumber = Performance.RecordListener.stickyNumbers.get(report.clientId()); if (ms.isPresent()) { System.out.printf( - " consumer[%d] has %d partitions%n", - i, (int) ms.get().assignedPartitions()); + " consumer[%d] has %d partitions and %d sticky partitions%n", + i, (int) ms.get().assignedPartitions(), stickyNumber); } System.out.printf( " consumed[%d] average throughput: %s%n", diff --git a/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java b/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java index 40b1480728..cdd75af145 100644 --- a/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java +++ b/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java @@ -16,11 +16,8 @@ */ package org.astraea.common.consumer; -import org.astraea.common.admin.TopicPartition; - import java.time.Duration; import java.util.Optional; -import java.util.Set; /** * This inherited consumer offers function related to consumer group. @@ -45,5 +42,4 @@ public interface SubscribedConsumer extends Consumer { /** @return group instance id (static member) */ Optional groupInstanceId(); - } diff --git a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java index f11a87c90e..d6bda9bef2 100644 --- a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java +++ b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java @@ -194,7 +194,6 @@ private static class SubscribedConsumerImpl extends Builder.BaseCons private final ConsumerRebalanceListener listener; private final Pattern patternTopics; - public SubscribedConsumerImpl( org.apache.kafka.clients.consumer.Consumer kafkaConsumer, Set setTopics, @@ -238,6 +237,5 @@ public Set assignments() { .map(TopicPartition::from) .collect(Collectors.toUnmodifiableSet()); } - } } diff --git a/common/src/main/java/org/astraea/common/metrics/client/consumer/ConsumerMetrics.java b/common/src/main/java/org/astraea/common/metrics/client/consumer/ConsumerMetrics.java index 4d2e4447c7..2d443ca337 100644 --- a/common/src/main/java/org/astraea/common/metrics/client/consumer/ConsumerMetrics.java +++ b/common/src/main/java/org/astraea/common/metrics/client/consumer/ConsumerMetrics.java @@ -84,5 +84,4 @@ public static Collection of(MBeanClient mBeanClient) { .map(b -> (HasConsumerMetrics) () -> b) .collect(Collectors.toUnmodifiableList()); } - } From a649f2ae18376a5b34616d21c3b783648c8b1bc8 Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Tue, 27 Sep 2022 17:40:59 +0800 Subject: [PATCH 07/12] fix TrackerTest#testConsumerPrinter --- app/src/main/java/org/astraea/app/performance/Performance.java | 2 -- .../main/java/org/astraea/app/performance/TrackerThread.java | 3 ++- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index 8480213970..96f3eca540 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -454,8 +454,6 @@ public void onPartitionAssigned(Set partitions) { nowPartitions = partitions; diffPartitions.retainAll(prevPartitions); stickyNumbers.put(clientId, diffPartitions.size()); - System.out.println(clientId + " now assignment = " + nowPartitions); - System.out.println(clientId + " previous assignment = " + prevPartitions); } @Override diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index 49d427d4a1..f6bd232d09 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -141,7 +141,8 @@ boolean tryToPrint(Duration duration) { for (var i = 0; i < reports.size(); ++i) { var report = reports.get(i); var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst(); - var stickyNumber = Performance.RecordListener.stickyNumbers.get(report.clientId()); + var clientId = report.clientId() == null ? "temp" : report.clientId(); + var stickyNumber = Performance.RecordListener.stickyNumbers.get(clientId); if (ms.isPresent()) { System.out.printf( " consumer[%d] has %d partitions and %d sticky partitions%n", From 6791cef9ada330b32f039c04c80ca00935a4e874 Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Tue, 27 Sep 2022 18:08:28 +0800 Subject: [PATCH 08/12] spotless --- app/src/main/java/org/astraea/app/performance/Report.java | 1 - 1 file changed, 1 deletion(-) diff --git a/app/src/main/java/org/astraea/app/performance/Report.java b/app/src/main/java/org/astraea/app/performance/Report.java index 22ba51962f..438684ab23 100644 --- a/app/src/main/java/org/astraea/app/performance/Report.java +++ b/app/src/main/java/org/astraea/app/performance/Report.java @@ -124,5 +124,4 @@ public String clientId() { double avgThroughput(); String clientId(); - } From 1b565897f630983304e4afc770e2a8b912dbb2a0 Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Wed, 28 Sep 2022 20:20:29 +0800 Subject: [PATCH 09/12] add recordListener to pass client ID & listener --- .../app/performance/ConsumerThread.java | 11 ++----- .../astraea/app/performance/Performance.java | 30 ++++++++----------- .../app/performance/TrackerThread.java | 2 +- .../org/astraea/common/consumer/Builder.java | 16 +++++++++- .../common/consumer/TopicsBuilder.java | 12 +++++++- 5 files changed, 41 insertions(+), 30 deletions(-) diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index fd37d30c31..3847773c87 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.kafka.common.errors.WakeupException; @@ -35,8 +34,7 @@ public interface ConsumerThread extends AbstractThread { static List create( int consumers, - Function> consumerSupplier, - Supplier recordListener) { + Function> consumerSupplier) { if (consumers == 0) return List.of(); var closeLatches = IntStream.range(0, consumers) @@ -57,13 +55,10 @@ static List create( .mapToObj( index -> { @SuppressWarnings("resource") - var listener = recordListener.get(); - var consumer = consumerSupplier.apply(listener); + var consumer = consumerSupplier.apply(ps -> {}); var closed = new AtomicBoolean(false); var closeLatch = closeLatches.get(index); var subscribed = new AtomicBoolean(true); - Performance.RecordListener.stickyNumbers.putIfAbsent(consumer.clientId(), 0); - listener.clientId(consumer.clientId()); executors.execute( () -> { try { @@ -72,8 +67,6 @@ static List create( else { consumer.unsubscribe(); Utils.sleep(Duration.ofSeconds(1)); - Performance.RecordListener.stickyNumbers.put(consumer.clientId(), 0); - listener.flushPrevPartitions(); continue; } consumer.poll(Duration.ofSeconds(1)); diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index 96f3eca540..d6ec75854d 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -78,7 +79,8 @@ private static DataSupplier dataSupplier(Performance.Argument argument) { public static List execute(final Argument param) throws InterruptedException, IOException { - Supplier recordListener = () -> new RecordListener(); + Function> clientAndListener = + client -> Map.of(client, new RecordListener(client)); // always try to init topic even though it may be existent already. System.out.println("checking topics: " + String.join(",", param.topics)); param.checkTopics(); @@ -105,9 +107,8 @@ public static List execute(final Argument param) .configs(param.configs()) .isolation(param.isolation()) .seek(latestOffsets) - .consumerRebalanceListener(listener) - .build(), - recordListener) + .recordListener(clientAndListener.apply(Utils.randomString(10))) + .build()) : ConsumerThread.create( param.consumers, listener -> @@ -117,9 +118,8 @@ public static List execute(final Argument param) .configs(param.configs()) .isolation(param.isolation()) .seek(latestOffsets) - .consumerRebalanceListener(listener) - .build(), - recordListener); + .recordListener(clientAndListener.apply(Utils.randomString(10))) + .build()); System.out.println("creating tracker"); var tracker = @@ -437,23 +437,17 @@ else if (specifiedByBroker) { static class RecordListener implements ConsumerRebalanceListener { public static ConcurrentMap stickyNumbers = new ConcurrentHashMap<>(); private Set prevPartitions = new HashSet<>(); - private Set nowPartitions = new HashSet<>(); - private String clientId = "temp"; + private final String clientId; - public void clientId(String clientId) { + RecordListener(String clientId) { this.clientId = clientId; } - public void flushPrevPartitions() { - prevPartitions.clear(); - } - @Override public void onPartitionAssigned(Set partitions) { - var diffPartitions = new HashSet<>(partitions); - nowPartitions = partitions; - diffPartitions.retainAll(prevPartitions); - stickyNumbers.put(clientId, diffPartitions.size()); + var stickyPartitions = new HashSet<>(partitions); + stickyPartitions.retainAll(prevPartitions); + stickyNumbers.put(clientId, stickyPartitions.size()); } @Override diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index f6bd232d09..36b94350ed 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -141,7 +141,7 @@ boolean tryToPrint(Duration duration) { for (var i = 0; i < reports.size(); ++i) { var report = reports.get(i); var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst(); - var clientId = report.clientId() == null ? "temp" : report.clientId(); + var clientId = report.clientId() == null ? "forTest" : report.clientId(); var stickyNumber = Performance.RecordListener.stickyNumbers.get(clientId); if (ms.isPresent()) { System.out.printf( diff --git a/common/src/main/java/org/astraea/common/consumer/Builder.java b/common/src/main/java/org/astraea/common/consumer/Builder.java index 307ab2c17c..d7280b2a53 100644 --- a/common/src/main/java/org/astraea/common/consumer/Builder.java +++ b/common/src/main/java/org/astraea/common/consumer/Builder.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -128,6 +129,7 @@ protected abstract static class BaseConsumer implements Consumer {}; public BaseConsumer(org.apache.kafka.clients.consumer.Consumer kafkaConsumer) { this.kafkaConsumer = kafkaConsumer; @@ -135,6 +137,15 @@ public BaseConsumer(org.apache.kafka.clients.consumer.Consumer kafka this.clientId = (String) Utils.member(kafkaConsumer, "clientId"); } + public BaseConsumer( + org.apache.kafka.clients.consumer.Consumer kafkaConsumer, + ConsumerRebalanceListener listener) { + this.kafkaConsumer = kafkaConsumer; + // KafkaConsumer does not expose client-id + this.clientId = (String) Utils.member(kafkaConsumer, "clientId"); + this.listener = listener; + } + @Override public Collection> poll(int recordCount, Duration timeout) { var end = System.currentTimeMillis() + timeout.toMillis(); @@ -164,7 +175,10 @@ public void resubscribe() { @Override public void unsubscribe() { - if (subscribed.compareAndSet(true, false)) kafkaConsumer.unsubscribe(); + if (subscribed.compareAndSet(true, false)) { + kafkaConsumer.unsubscribe(); + listener.onPartitionAssigned(Set.of()); // To record sticky partitions + } } @Override diff --git a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java index d6bda9bef2..11fb53f896 100644 --- a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java +++ b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java @@ -116,6 +116,16 @@ public TopicsBuilder configs(Map configs) { return this; } + public TopicsBuilder recordListener( + Map clientAndListener) { + clientAndListener.forEach( + (clientId, listener) -> { + super.clientId(clientId); + this.listener = listener; + }); + return this; + } + @Override public TopicsBuilder bootstrapServers(String bootstrapServers) { super.bootstrapServers(bootstrapServers); @@ -199,7 +209,7 @@ public SubscribedConsumerImpl( Set setTopics, Pattern patternTopics, ConsumerRebalanceListener listener) { - super(kafkaConsumer); + super(kafkaConsumer, listener); this.setTopics = setTopics; this.patternTopics = patternTopics; this.listener = listener; From e415c240da5cbd8cd8afdd21ce1ab4d8fd9f060a Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Tue, 11 Oct 2022 17:47:54 +0800 Subject: [PATCH 10/12] delete RecordListener and record partition in consumer --- .../app/performance/ConsumerThread.java | 18 ++++++++++++ .../astraea/app/performance/Performance.java | 28 ------------------- .../app/performance/TrackerThread.java | 4 ++- .../org/astraea/common/consumer/Builder.java | 16 +---------- .../common/consumer/SubscribedConsumer.java | 2 ++ .../common/consumer/TopicsBuilder.java | 17 ++++------- 6 files changed, 30 insertions(+), 55 deletions(-) diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index abb60f6142..cdfc9c55bb 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -17,6 +17,7 @@ package org.astraea.app.performance; import java.time.Duration; +import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -38,6 +39,8 @@ public interface ConsumerThread extends AbstractThread { ConcurrentMap> CLIENT_ID_PARTITIONS = new ConcurrentHashMap<>(); + ConcurrentMap> CLIENT_ID_STICKY_PARTITIONS = + new ConcurrentHashMap<>(); static List create( int consumers, @@ -72,13 +75,28 @@ static List create( executors.execute( () -> { try { + var assignments = consumer.assignments(); + var generationId = consumer.generationId(); while (!closed.get()) { if (subscribed.get()) consumer.resubscribe(); else { consumer.unsubscribe(); + assignments = Set.of(); + CLIENT_ID_PARTITIONS.put(clientId, assignments); + CLIENT_ID_STICKY_PARTITIONS.put(clientId, assignments); Utils.sleep(Duration.ofSeconds(1)); continue; } + if (!assignments.containsAll(CLIENT_ID_PARTITIONS.get(clientId)) + || generationId < consumer.generationId()) { + // check if re-balance or not. + var nowPartitions = CLIENT_ID_PARTITIONS.get(clientId); + var stickyPartitions = new HashSet<>(nowPartitions); + stickyPartitions.retainAll(assignments); + CLIENT_ID_STICKY_PARTITIONS.put(clientId, stickyPartitions); + assignments = nowPartitions; + generationId = consumer.generationId(); + } consumer.poll(Duration.ofSeconds(1)); } } catch (WakeupException ignore) { diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index c88b9abc1f..939906cc4e 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -27,10 +27,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -55,7 +52,6 @@ import org.astraea.common.argument.StringListField; import org.astraea.common.argument.TopicPartitionField; import org.astraea.common.consumer.Consumer; -import org.astraea.common.consumer.ConsumerRebalanceListener; import org.astraea.common.consumer.Isolation; import org.astraea.common.partitioner.Dispatcher; import org.astraea.common.producer.Acks; @@ -81,8 +77,6 @@ private static DataSupplier dataSupplier(Performance.Argument argument) { public static List execute(final Argument param) throws InterruptedException, IOException { - Function> clientAndListener = - client -> Map.of(client, new RecordListener(client)); // always try to init topic even though it may be existent already. System.out.println("checking topics: " + String.join(",", param.topics)); param.checkTopics(); @@ -474,26 +468,4 @@ else if (specifiedByBroker) { validateWith = PositiveIntegerField.class) int interdependent = 1; } - - static class RecordListener implements ConsumerRebalanceListener { - public static ConcurrentMap stickyNumbers = new ConcurrentHashMap<>(); - private Set prevPartitions = new HashSet<>(); - private final String clientId; - - RecordListener(String clientId) { - this.clientId = clientId; - } - - @Override - public void onPartitionAssigned(Set partitions) { - var stickyPartitions = new HashSet<>(partitions); - stickyPartitions.retainAll(prevPartitions); - stickyNumbers.put(clientId, stickyPartitions.size()); - } - - @Override - public void onPartitionsRevoked(Set partitions) { - prevPartitions = partitions; - } - } } diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index 36b94350ed..36411b8d7f 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -142,7 +143,8 @@ boolean tryToPrint(Duration duration) { var report = reports.get(i); var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst(); var clientId = report.clientId() == null ? "forTest" : report.clientId(); - var stickyNumber = Performance.RecordListener.stickyNumbers.get(clientId); + var stickyNumber = + ConsumerThread.CLIENT_ID_STICKY_PARTITIONS.getOrDefault(clientId, Set.of()).size(); if (ms.isPresent()) { System.out.printf( " consumer[%d] has %d partitions and %d sticky partitions%n", diff --git a/common/src/main/java/org/astraea/common/consumer/Builder.java b/common/src/main/java/org/astraea/common/consumer/Builder.java index 139169f042..8761b932cd 100644 --- a/common/src/main/java/org/astraea/common/consumer/Builder.java +++ b/common/src/main/java/org/astraea/common/consumer/Builder.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import org.astraea.common.EnumInfo; @@ -128,7 +127,6 @@ protected abstract static class BaseConsumer implements Consumer {}; public BaseConsumer(org.apache.kafka.clients.consumer.Consumer kafkaConsumer) { this.kafkaConsumer = kafkaConsumer; @@ -136,15 +134,6 @@ public BaseConsumer(org.apache.kafka.clients.consumer.Consumer kafka this.clientId = (String) Utils.member(kafkaConsumer, "clientId"); } - public BaseConsumer( - org.apache.kafka.clients.consumer.Consumer kafkaConsumer, - ConsumerRebalanceListener listener) { - this.kafkaConsumer = kafkaConsumer; - // KafkaConsumer does not expose client-id - this.clientId = (String) Utils.member(kafkaConsumer, "clientId"); - this.listener = listener; - } - @Override public Collection> poll(int recordCount, Duration timeout) { var end = System.currentTimeMillis() + timeout.toMillis(); @@ -174,10 +163,7 @@ public void resubscribe() { @Override public void unsubscribe() { - if (subscribed.compareAndSet(true, false)) { - kafkaConsumer.unsubscribe(); - listener.onPartitionAssigned(Set.of()); // To record sticky partitions - } + if (subscribed.compareAndSet(true, false)) kafkaConsumer.unsubscribe(); } @Override diff --git a/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java b/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java index cdd75af145..c6d25c654e 100644 --- a/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java +++ b/common/src/main/java/org/astraea/common/consumer/SubscribedConsumer.java @@ -42,4 +42,6 @@ public interface SubscribedConsumer extends Consumer { /** @return group instance id (static member) */ Optional groupInstanceId(); + + int generationId(); } diff --git a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java index 11914c18ed..2e1831d6cf 100644 --- a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java +++ b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java @@ -115,16 +115,6 @@ public TopicsBuilder configs(Map configs) { return this; } - public TopicsBuilder recordListener( - Map clientAndListener) { - clientAndListener.forEach( - (clientId, listener) -> { - super.clientId(clientId); - this.listener = listener; - }); - return this; - } - @Override public TopicsBuilder bootstrapServers(String bootstrapServers) { super.bootstrapServers(bootstrapServers); @@ -208,7 +198,7 @@ public SubscribedConsumerImpl( Set setTopics, Pattern patternTopics, ConsumerRebalanceListener listener) { - super(kafkaConsumer, listener); + super(kafkaConsumer); this.setTopics = setTopics; this.patternTopics = patternTopics; this.listener = listener; @@ -233,6 +223,11 @@ public Optional groupInstanceId() { return kafkaConsumer.groupMetadata().groupInstanceId(); } + @Override + public int generationId() { + return kafkaConsumer.groupMetadata().generationId(); + } + @Override protected void doResubscribe() { if (patternTopics == null) From f9d7f08df7bd0a3329bbc37f756bd04536bf960f Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Thu, 13 Oct 2022 01:28:13 +0800 Subject: [PATCH 11/12] fix some style and add comments --- .../app/performance/ConsumerThread.java | 6 ++- .../astraea/app/performance/Performance.java | 51 +++++++------------ .../common/consumer/TopicsBuilder.java | 38 ++++++-------- 3 files changed, 38 insertions(+), 57 deletions(-) diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index cdfc9c55bb..b112db0242 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -89,7 +89,11 @@ static List create( } if (!assignments.containsAll(CLIENT_ID_PARTITIONS.get(clientId)) || generationId < consumer.generationId()) { - // check if re-balance or not. + // check whether re-balance has occurred or not. + // 1. compare the assignments : if they are different, re-balance has + // occurred + // 2. double check re-balance via generation id : because the assignment + // may be the same between re-balance var nowPartitions = CLIENT_ID_PARTITIONS.get(clientId); var stickyPartitions = new HashSet<>(nowPartitions); stickyPartitions.retainAll(assignments); diff --git a/app/src/main/java/org/astraea/app/performance/Performance.java b/app/src/main/java/org/astraea/app/performance/Performance.java index 0e1021985e..9513ab4cab 100644 --- a/app/src/main/java/org/astraea/app/performance/Performance.java +++ b/app/src/main/java/org/astraea/app/performance/Performance.java @@ -93,39 +93,24 @@ public static List execute(final Argument param) param::createProducer, param.interdependent); var consumerThreads = - param.pattern == null - ? ConsumerThread.create( - param.consumers, - (clientId, listener) -> - Consumer.forTopics(new HashSet<>(param.topics)) - .configs(param.configs()) - .config( - ConsumerConfigs.ISOLATION_LEVEL_CONFIG, - param.transactionSize > 1 - ? ConsumerConfigs.ISOLATION_LEVEL_COMMITTED - : ConsumerConfigs.ISOLATION_LEVEL_UNCOMMITTED) - .bootstrapServers(param.bootstrapServers()) - .config(ConsumerConfigs.GROUP_ID_CONFIG, param.groupId) - .seek(latestOffsets) - .consumerRebalanceListener(listener) - .config(ConsumerConfigs.CLIENT_ID_CONFIG, clientId) - .build()) - : ConsumerThread.create( - param.consumers, - (clientId, listener) -> - Consumer.forTopics(param.pattern) - .configs(param.configs()) - .config( - ConsumerConfigs.ISOLATION_LEVEL_CONFIG, - param.transactionSize > 1 - ? ConsumerConfigs.ISOLATION_LEVEL_COMMITTED - : ConsumerConfigs.ISOLATION_LEVEL_UNCOMMITTED) - .bootstrapServers(param.bootstrapServers()) - .config(ConsumerConfigs.GROUP_ID_CONFIG, param.groupId) - .seek(latestOffsets) - .consumerRebalanceListener(listener) - .config(ConsumerConfigs.CLIENT_ID_CONFIG, clientId) - .build()); + ConsumerThread.create( + param.consumers, + (clientId, listener) -> + (param.pattern == null + ? Consumer.forTopics(new HashSet<>(param.topics)) + : Consumer.forTopics(param.pattern)) + .configs(param.configs()) + .config( + ConsumerConfigs.ISOLATION_LEVEL_CONFIG, + param.transactionSize > 1 + ? ConsumerConfigs.ISOLATION_LEVEL_COMMITTED + : ConsumerConfigs.ISOLATION_LEVEL_UNCOMMITTED) + .bootstrapServers(param.bootstrapServers()) + .config(ConsumerConfigs.GROUP_ID_CONFIG, param.groupId) + .seek(latestOffsets) + .consumerRebalanceListener(listener) + .config(ConsumerConfigs.CLIENT_ID_CONFIG, clientId) + .build()); System.out.println("creating tracker"); var tracker = diff --git a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java index 0279ec007f..79aa5055db 100644 --- a/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java +++ b/common/src/main/java/org/astraea/common/consumer/TopicsBuilder.java @@ -33,17 +33,17 @@ public class TopicsBuilder extends Builder { private final Set setTopics; - private final Pattern topicPattern; + private final Pattern patternTopics; private ConsumerRebalanceListener listener = ignore -> {}; TopicsBuilder(Set setTopics) { - this.topicPattern = null; + this.patternTopics = null; this.setTopics = requireNonNull(setTopics); } TopicsBuilder(Pattern patternTopics) { this.setTopics = null; - this.topicPattern = requireNonNull(patternTopics); + this.patternTopics = requireNonNull(patternTopics); } public TopicsBuilder consumerRebalanceListener(ConsumerRebalanceListener listener) { @@ -106,7 +106,14 @@ public SubscribedConsumer build() { if (seekStrategy != SeekStrategy.NONE) { // make sure this consumer is assigned before seeking var latch = new CountDownLatch(1); - subscribe(kafkaConsumer, latch); + if (patternTopics == null) + kafkaConsumer.subscribe( + setTopics, + ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown()))); + else + kafkaConsumer.subscribe( + patternTopics, + ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown()))); while (latch.getCount() != 0) { // the offset will be reset, so it is fine to poll data @@ -116,29 +123,14 @@ public SubscribedConsumer build() { } } else { // nothing to seek so we just subscribe topics - subscribe(kafkaConsumer, null); + if (patternTopics == null) + kafkaConsumer.subscribe(setTopics, ConsumerRebalanceListener.of(List.of(listener))); + else kafkaConsumer.subscribe(patternTopics, ConsumerRebalanceListener.of(List.of(listener))); } seekStrategy.apply(kafkaConsumer, seekValue); - return new SubscribedConsumerImpl<>(kafkaConsumer, setTopics, topicPattern, listener); - } - - private void subscribe(KafkaConsumer consumer, CountDownLatch latch) { - if (latch == null) { - if (setTopics == null) - consumer.subscribe(topicPattern, ConsumerRebalanceListener.of(List.of(listener))); - else consumer.subscribe(setTopics, ConsumerRebalanceListener.of(List.of(listener))); - } else { - if (setTopics == null) - consumer.subscribe( - topicPattern, - ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown()))); - else - consumer.subscribe( - setTopics, - ConsumerRebalanceListener.of(List.of(listener, ignored -> latch.countDown()))); - } + return new SubscribedConsumerImpl<>(kafkaConsumer, setTopics, patternTopics, listener); } private static class SubscribedConsumerImpl extends Builder.BaseConsumer From 86470286c658065f8fb155cb91109b1407179af4 Mon Sep 17 00:00:00 2001 From: harryteng9527 Date: Fri, 14 Oct 2022 20:36:13 +0800 Subject: [PATCH 12/12] delete detecting client id if it's null --- .../main/java/org/astraea/app/performance/TrackerThread.java | 2 +- app/src/test/java/org/astraea/app/performance/TrackerTest.java | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index 36411b8d7f..515da414a3 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -142,7 +142,7 @@ boolean tryToPrint(Duration duration) { for (var i = 0; i < reports.size(); ++i) { var report = reports.get(i); var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst(); - var clientId = report.clientId() == null ? "forTest" : report.clientId(); + var clientId = report.clientId(); var stickyNumber = ConsumerThread.CLIENT_ID_STICKY_PARTITIONS.getOrDefault(clientId, Set.of()).size(); if (ms.isPresent()) { diff --git a/app/src/test/java/org/astraea/app/performance/TrackerTest.java b/app/src/test/java/org/astraea/app/performance/TrackerTest.java index 83e6758cb8..3930e60dc7 100644 --- a/app/src/test/java/org/astraea/app/performance/TrackerTest.java +++ b/app/src/test/java/org/astraea/app/performance/TrackerTest.java @@ -100,6 +100,7 @@ void testProducerPrinter() { void testConsumerPrinter() { var report = Mockito.mock(Report.class); var records = new AtomicLong(0); + Mockito.when(report.clientId()).thenReturn("forTest"); Mockito.when(report.records()).thenAnswer(a -> records.get()); var printer = new TrackerThread.ConsumerPrinter(() -> List.of(report)); Assertions.assertFalse(printer.tryToPrint(Duration.ofSeconds(1)));