From 967c298642747cb8309d44bd1877ca0ac2903b5a Mon Sep 17 00:00:00 2001 From: Alexandra Date: Tue, 20 Nov 2018 18:26:35 -0500 Subject: [PATCH] replaced streams used for computations with for loops; updated back to stream for sorting the maps --- .../org/aion/zero/impl/sync/SyncStats.java | 96 +++++++++---------- .../aion/zero/impl/sync/TaskShowStatus.java | 6 +- .../aion/zero/impl/sync/SyncStatsTest.java | 7 +- 3 files changed, 51 insertions(+), 58 deletions(-) diff --git a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java index 8ce9a30413..0eb8706558 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java @@ -29,7 +29,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -72,7 +71,7 @@ public final class SyncStats { /** @implNote Access to this resource is managed by the {@link #responsesLock}. */ private final Map> statusResponseTimeByPeers = new HashMap<>(); /** @implNote Access to this resource is managed by the {@link #responsesLock}. */ - private long overallAvgPeerResponseTime; + private double overallAvgPeerResponseTime; private final Lock responsesLock = new ReentrantLock(); @@ -153,24 +152,21 @@ Map getPercentageOfRequestsToPeers() { requestsLock.lock(); try { - Map percentageReq = new HashMap<>(); - Long totalReq = - requestsToPeers - .values() - .parallelStream() - .mapToLong(entry -> entry.getTotal()) - .sum(); - requestsToPeers - .entrySet() - .parallelStream() - .forEach( - entry -> - percentageReq.put( - entry.getKey(), - entry.getValue().getTotal() / totalReq.floatValue())); + Map percentageReq = new LinkedHashMap<>(); + + float totalReq = 0f; + + for (RequestCounter rc : requestsToPeers.values()) { + totalReq += rc.getTotal(); + } + + for (Map.Entry entry : requestsToPeers.entrySet()) { + percentageReq.put(entry.getKey(), entry.getValue().getTotal() / totalReq); + } + return percentageReq .entrySet() - .parallelStream() + .stream() .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) .collect( Collectors.toMap( @@ -211,7 +207,7 @@ Map getTotalBlocksByPeer() { try { return blocksByPeer .entrySet() - .parallelStream() + .stream() .filter(entry -> entry.getValue() > 0) .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) .collect( @@ -253,7 +249,7 @@ Map getTotalBlockRequestsByPeer() { try { return blockRequestsByPeer .entrySet() - .parallelStream() + .stream() .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) .collect( Collectors.toMap( @@ -314,43 +310,37 @@ public void addPeerResponseTime(String _nodeId, long _responseTime) { Map getAverageResponseTimeByPeers() { responsesLock.lock(); try { - Map avgResponseTimeByPeers = - statusRequestTimeByPeers - .entrySet() - .parallelStream() - .collect( - Collectors.toMap( - // collect a map of average response times by peer - Map.Entry::getKey, // node display Id - entry -> { // calculate the average response time - String _nodeId = entry.getKey(); - final List requests = entry.getValue(); - final List responses = - statusResponseTimeByPeers.getOrDefault( - _nodeId, new LinkedList<>()); - return calculateAverage(requests, responses); - })); + double average; + String nodeId; + List requests, responses; + + Map avgResponseTimeByPeers = new HashMap<>(); + overallAvgPeerResponseTime = 0d; + + for (Map.Entry> peerData : statusRequestTimeByPeers.entrySet()) { + + nodeId = peerData.getKey(); // node display Id + requests = peerData.getValue(); + responses = statusResponseTimeByPeers.getOrDefault(nodeId, new LinkedList<>()); + + // calculate the average response time + average = calculateAverage(requests, responses); + + if (average >= 0) { + // collect a map of average response times by peer + avgResponseTimeByPeers.put(nodeId, average); + overallAvgPeerResponseTime += average; + } + } overallAvgPeerResponseTime = - statusRequestTimeByPeers.isEmpty() - ? overallAvgPeerResponseTime - : Double.valueOf( - Math.ceil( - avgResponseTimeByPeers - .entrySet() - .parallelStream() - // ignore peers without meaningful data - .filter(entry -> entry.getValue() >= 0) - .mapToDouble(Entry::getValue) - .average() - .getAsDouble())) - .longValue(); + avgResponseTimeByPeers.isEmpty() + ? 0d + : overallAvgPeerResponseTime / avgResponseTimeByPeers.size(); return avgResponseTimeByPeers .entrySet() - .parallelStream() - // ignore peers without meaningful data - .filter(entry -> entry.getValue() >= 0) + .stream() .sorted(Map.Entry.comparingByValue()) .collect( Collectors.toMap( @@ -401,7 +391,7 @@ private static double calculateAverage(List requestTimes, List respo * * @return overall average response time */ - long getOverallAveragePeerResponseTime() { + double getOverallAveragePeerResponseTime() { responsesLock.lock(); try { return overallAvgPeerResponseTime; diff --git a/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java b/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java index 318edd6264..39fcda7bef 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java @@ -285,13 +285,15 @@ private String dumpResponseInfo() { if (!avgResponseTimeByPeers.isEmpty()) { // value in milliseconds - Long overallAvgResponse = this.stats.getOverallAveragePeerResponseTime() / 1_000_000; + double overallAvgResponse = this.stats.getOverallAveragePeerResponseTime() / 1_000_000; sb.append("\n====== sync-responses-by-peer ======\n"); sb.append(String.format(" %9s %20s\n", "peer", "avg. response")); sb.append("------------------------------------\n"); - sb.append(String.format(" «overall» %17s ms\n", overallAvgResponse)); + sb.append( + String.format( + " «overall» %17s ms\n", String.format("%.0f", overallAvgResponse))); avgResponseTimeByPeers.forEach( (nodeId, avgResponse) -> diff --git a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java index 5a15e863c8..a2ad97b539 100644 --- a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java +++ b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java @@ -249,9 +249,9 @@ public void testAverageResponseTimeByPeersStats() { Map emptyAvgResponseTimeByPeers = stats.getAverageResponseTimeByPeers(); // request time is logged but no response is received stats.addPeerRequestTime("dummy", System.nanoTime()); - Long overallAveragePeerResponseTime = stats.getOverallAveragePeerResponseTime(); + double overallAveragePeerResponseTime = stats.getOverallAveragePeerResponseTime(); assertThat(emptyAvgResponseTimeByPeers.size() == 0).isTrue(); - assertThat(overallAveragePeerResponseTime.compareTo(0L) == 0).isTrue(); + assertThat(overallAveragePeerResponseTime).isEqualTo(0d); stats = new SyncStats(chain.getBestBlock().getNumber()); @@ -280,7 +280,8 @@ public void testAverageResponseTimeByPeersStats() { if (i++ == 0) { // First record correspond to the overall average response time by all peers assertThat( - ((Long) avgResponseTimeByPeers.get(nodeId).longValue()) + avgResponseTimeByPeers + .get(nodeId) .compareTo(stats.getOverallAveragePeerResponseTime())); } else { assertThat(avgResponseTimeByPeers.get(nodeId) > lastAvgResponseTime).isTrue();