diff --git a/app/src/main/java/org/astraea/performance/Metrics.java b/app/src/main/java/org/astraea/performance/Metrics.java index 74e76f20ed..20690fd4ac 100644 --- a/app/src/main/java/org/astraea/performance/Metrics.java +++ b/app/src/main/java/org/astraea/performance/Metrics.java @@ -59,6 +59,7 @@ public synchronized double avgBytes() { var value = new BigDecimal(bytes); var scale = new BigDecimal(1024 * 1024); var time = BigDecimal.valueOf((double) (System.currentTimeMillis() - startTime) / 1000); + if (time.doubleValue() == 0.0) return 0.0; return value.divide(scale, 3, RoundingMode.UP).divide(time, 3, RoundingMode.UP).doubleValue(); } diff --git a/app/src/main/java/org/astraea/performance/Tracker.java b/app/src/main/java/org/astraea/performance/Tracker.java index 8d98263de7..6591637ca0 100644 --- a/app/src/main/java/org/astraea/performance/Tracker.java +++ b/app/src/main/java/org/astraea/performance/Tracker.java @@ -43,6 +43,10 @@ public State execute() throws InterruptedException { System.out.printf( " producer[%d]的發送average latency: %.3fms%n", i, producerData.get(i).avgLatency()); } + if (consumerData.isEmpty()) { + if (completed >= records) return State.DONE; + else return State.RUNNING; + } /* consumer */ completed = 0; bytes = 0L; diff --git a/app/src/test/java/org/astraea/performance/TrackerTest.java b/app/src/test/java/org/astraea/performance/TrackerTest.java new file mode 100644 index 0000000000..fcc3aa145a --- /dev/null +++ b/app/src/test/java/org/astraea/performance/TrackerTest.java @@ -0,0 +1,31 @@ +package org.astraea.performance; + +import java.util.List; +import org.astraea.concurrent.ThreadPool; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TrackerTest { + @Test + public void testTerminate() throws InterruptedException { + var producerData = List.of(new Metrics()); + var consumerData = List.of(new Metrics()); + List empty = List.of(); + int records = 1; + + try (Tracker tracker = new Tracker(producerData, consumerData, records)) { + Assertions.assertEquals(ThreadPool.Executor.State.RUNNING, tracker.execute()); + producerData.get(0).put(1, 1); + consumerData.get(0).put(1, 1); + Assertions.assertEquals(ThreadPool.Executor.State.DONE, tracker.execute()); + } + + // Zero consumer + producerData = List.of(new Metrics()); + try (Tracker tracker = new Tracker(producerData, empty, records)) { + Assertions.assertEquals(ThreadPool.Executor.State.RUNNING, tracker.execute()); + producerData.get(0).put(1, 1); + Assertions.assertEquals(ThreadPool.Executor.State.DONE, tracker.execute()); + } + } +}