diff --git a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java index abb60f6142..848d0da9d5 100644 --- a/app/src/main/java/org/astraea/app/performance/ConsumerThread.java +++ b/app/src/main/java/org/astraea/app/performance/ConsumerThread.java @@ -37,7 +37,10 @@ public interface ConsumerThread extends AbstractThread { - ConcurrentMap> CLIENT_ID_PARTITIONS = new ConcurrentHashMap<>(); + ConcurrentMap> CLIENT_ID_ASSIGNED_PARTITIONS = + new ConcurrentHashMap<>(); + ConcurrentMap> CLIENT_ID_REVOKED_PARTITIONS = + new ConcurrentHashMap<>(); static List create( int consumers, @@ -64,8 +67,7 @@ static List create( index -> { @SuppressWarnings("resource") var clientId = Utils.randomString(); - var consumer = - consumerSupplier.apply(clientId, ps -> CLIENT_ID_PARTITIONS.put(clientId, ps)); + var consumer = consumerSupplier.apply(clientId, new PartitionRatioListener(clientId)); var closed = new AtomicBoolean(false); var closeLatch = closeLatches.get(index); var subscribed = new AtomicBoolean(true); @@ -87,7 +89,8 @@ static List create( Utils.swallowException(consumer::close); closeLatch.countDown(); closed.set(true); - CLIENT_ID_PARTITIONS.remove(clientId); + CLIENT_ID_ASSIGNED_PARTITIONS.remove(clientId); + CLIENT_ID_REVOKED_PARTITIONS.remove(clientId); } }); return new ConsumerThread() { @@ -125,4 +128,22 @@ public void close() { void resubscribe(); void unsubscribe(); + + class PartitionRatioListener implements ConsumerRebalanceListener { + private final String clientId; + + PartitionRatioListener(String clientId) { + this.clientId = clientId; + } + + @Override + public void onPartitionAssigned(Set partitions) { + CLIENT_ID_ASSIGNED_PARTITIONS.put(clientId, partitions); + } + + @Override + public void onPartitionsRevoked(Set partitions) { + CLIENT_ID_REVOKED_PARTITIONS.put(clientId, partitions); + } + } } diff --git a/app/src/main/java/org/astraea/app/performance/ReportFormat.java b/app/src/main/java/org/astraea/app/performance/ReportFormat.java index af429a461d..154b04d45b 100644 --- a/app/src/main/java/org/astraea/app/performance/ReportFormat.java +++ b/app/src/main/java/org/astraea/app/performance/ReportFormat.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; +import java.util.Set; import java.util.function.Supplier; import java.util.stream.IntStream; import org.astraea.common.EnumInfo; @@ -207,6 +208,17 @@ private static List latencyAndIO() { CSVContentElement.create( "Consumer[" + i + "] average publish latency (ms)", () -> Double.toString(consumerReports.get(i).avgLatency()))); + elements.add( + CSVContentElement.create( + "Consumer[" + i + "] partition difference", + () -> + Integer.toString( + (ConsumerThread.CLIENT_ID_ASSIGNED_PARTITIONS + .getOrDefault(consumerReports.get(i).clientId(), Set.of()) + .size() + - ConsumerThread.CLIENT_ID_REVOKED_PARTITIONS + .getOrDefault(consumerReports.get(i).clientId(), Set.of()) + .size())))); }); return elements; } diff --git a/app/src/main/java/org/astraea/app/performance/TrackerThread.java b/app/src/main/java/org/astraea/app/performance/TrackerThread.java index 365eec133d..b6e47e0862 100644 --- a/app/src/main/java/org/astraea/app/performance/TrackerThread.java +++ b/app/src/main/java/org/astraea/app/performance/TrackerThread.java @@ -19,11 +19,13 @@ import java.time.Duration; import java.util.Collection; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import java.util.function.ToDoubleFunction; +import java.util.stream.Collectors; import org.astraea.common.DataSize; import org.astraea.common.Utils; import org.astraea.common.metrics.HasBeanObject; @@ -140,10 +142,23 @@ boolean tryToPrint(Duration duration) { .ifPresent(i -> System.out.printf(" rebalance average latency: %.3f ms%n", i)); for (var i = 0; i < reports.size(); ++i) { var report = reports.get(i); + var clientId = report.clientId(); var ms = metrics.stream().filter(m -> m.clientId().equals(report.clientId())).findFirst(); + var assignedPartitions = + ConsumerThread.CLIENT_ID_ASSIGNED_PARTITIONS.getOrDefault(clientId, Set.of()); + var revokedPartitions = + ConsumerThread.CLIENT_ID_REVOKED_PARTITIONS.getOrDefault(clientId, Set.of()); + var nonStickyPartitions = + assignedPartitions.stream() + .filter(tp -> !revokedPartitions.contains(tp)) + .collect(Collectors.toSet()); if (ms.isPresent()) { System.out.printf( - " consumer[%d] has %d partitions%n", i, (int) ms.get().assignedPartitions()); + " consumer[%d] has %d partitions. Among them, there are %d non-sticky partitions and was assigned %d more partitions than before re-balancing%n", + i, + (int) ms.get().assignedPartitions(), + nonStickyPartitions.size(), + (assignedPartitions.size() - revokedPartitions.size())); } System.out.printf( " consumed[%d] average throughput: %s%n", diff --git a/app/src/test/java/org/astraea/app/performance/TrackerTest.java b/app/src/test/java/org/astraea/app/performance/TrackerTest.java index 83e6758cb8..4d14d6996f 100644 --- a/app/src/test/java/org/astraea/app/performance/TrackerTest.java +++ b/app/src/test/java/org/astraea/app/performance/TrackerTest.java @@ -101,6 +101,7 @@ void testConsumerPrinter() { var report = Mockito.mock(Report.class); var records = new AtomicLong(0); Mockito.when(report.records()).thenAnswer(a -> records.get()); + Mockito.when(report.clientId()).thenReturn("forTest"); var printer = new TrackerThread.ConsumerPrinter(() -> List.of(report)); Assertions.assertFalse(printer.tryToPrint(Duration.ofSeconds(1))); records.set(100);