From 999cac89911595e24d7d8e428fbac687ca095d3d Mon Sep 17 00:00:00 2001 From: Alexandra Roatis Date: Thu, 15 Nov 2018 14:06:37 -0500 Subject: [PATCH 1/9] improved access to resources by using different locks; replaces stream with parallelStream; specified element types for lists --- .../org/aion/zero/impl/sync/SyncStats.java | 370 +++++++++++------- 1 file changed, 234 insertions(+), 136 deletions(-) diff --git a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java index 575ee0429b..90f29e12ae 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java @@ -29,33 +29,56 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Map.Entry; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.IntStream; /** @author chris */ public final class SyncStats { - private long start; + /** @implNote Access to this resource is managed by the {@link #blockAverageLock}. */ + private final long start; + /** @implNote Access to this resource is managed by the {@link #blockAverageLock}. */ + private final long startBlock; + /** @implNote Access to this resource is managed by the {@link #blockAverageLock}. */ + private double avgBlocksPerSec; - private long startBlock; + private final Lock blockAverageLock = new ReentrantLock(); - private double avgBlocksPerSec; + /** @implNote Access to this resource is managed by the {@link #requestsLock}. */ + private final Map requestsToPeers = new HashMap<>(); - private final ConcurrentHashMap requestsToPeers = new ConcurrentHashMap<>(); + private final Lock requestsLock = new ReentrantLock(); - private final ConcurrentHashMap blocksByPeer = new ConcurrentHashMap<>(); + /** + * Records information on top seeds. + * + * @implNote Access to this resource is managed by the {@link #seedsLock}. + */ + private final Map blocksByPeer = new HashMap<>(); - private final ConcurrentHashMap blockRequestsByPeer = new ConcurrentHashMap<>(); + private final Lock seedsLock = new ReentrantLock(); - private final ConcurrentHashMap statusRequestTimeByPeers = - new ConcurrentHashMap<>(); + /** + * Records information on top leeches. + * + * @implNote Access to this resource is managed by the {@link #leechesLock}. + */ + private final Map blockRequestsByPeer = new HashMap<>(); - private final ConcurrentHashMap statusResponseTimeByPeers = - new ConcurrentHashMap<>(); + private final Lock leechesLock = new ReentrantLock(); + /** @implNote Access to this resource is managed by the {@link #responsesLock}. */ + private final Map> statusRequestTimeByPeers = new HashMap<>(); + /** @implNote Access to this resource is managed by the {@link #responsesLock}. */ + private final Map> statusResponseTimeByPeers = new HashMap<>(); + /** @implNote Access to this resource is managed by the {@link #responsesLock}. */ private Long overallAvgPeerResponseTime; + private final Lock responsesLock = new ReentrantLock(); + SyncStats(long _startBlock) { this.start = System.currentTimeMillis(); this.startBlock = _startBlock; @@ -70,15 +93,27 @@ public final class SyncStats { * @param _totalBlocks total imported blocks in batch * @param _blockNumber best block number */ - synchronized void update(String _nodeId, int _totalBlocks, long _blockNumber) { - avgBlocksPerSec = - (double) (_blockNumber - startBlock) * 1000 / (System.currentTimeMillis() - start); - updateTotalRequestsToPeer(_nodeId); - updatePeerTotalBlocks(_nodeId, _totalBlocks); + void update(String _nodeId, int _totalBlocks, long _blockNumber) { + blockAverageLock.lock(); + try { + avgBlocksPerSec = + (double) (_blockNumber - startBlock) + * 1000 + / (System.currentTimeMillis() - start); + updateTotalRequestsToPeer(_nodeId); + updatePeerTotalBlocks(_nodeId, _totalBlocks); + } finally { + blockAverageLock.unlock(); + } } - synchronized double getAvgBlocksPerSec() { - return this.avgBlocksPerSec; + double getAvgBlocksPerSec() { + blockAverageLock.lock(); + try { + return this.avgBlocksPerSec; + } finally { + blockAverageLock.unlock(); + } } /** @@ -87,8 +122,13 @@ synchronized double getAvgBlocksPerSec() { * @param _nodeId peer node display Id */ private void updateTotalRequestsToPeer(String _nodeId) { - if (requestsToPeers.putIfAbsent(_nodeId, 1L) != null) { - requestsToPeers.computeIfPresent(_nodeId, (key, value) -> value + 1L); + requestsLock.lock(); + try { + if (requestsToPeers.putIfAbsent(_nodeId, 1L) != null) { + requestsToPeers.computeIfPresent(_nodeId, (key, value) -> value + 1L); + } + } finally { + requestsLock.unlock(); } } @@ -97,26 +137,35 @@ private void updateTotalRequestsToPeer(String _nodeId) { * requests made. * * @return a hash map in descending order containing peers with underlying percentage of - * requests made by the node + * requests made by the node */ - synchronized Map getPercentageOfRequestsToPeers() { - Map percentageReq = new HashMap<>(); - Long totalReq = requestsToPeers.values().stream().mapToLong(l -> l).sum(); - requestsToPeers.entrySet().stream().forEach( - entry -> { - percentageReq.put( - entry.getKey(), entry.getValue().floatValue() / totalReq.floatValue()); - }); - return percentageReq - .entrySet() - .stream() - .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) - .collect( - Collectors.toMap( - Map.Entry::getKey, - Map.Entry::getValue, - (e1, e2) -> e2, - LinkedHashMap::new)); + Map getPercentageOfRequestsToPeers() { + requestsLock.lock(); + + try { + Map percentageReq = new HashMap<>(); + Long totalReq = requestsToPeers.values().parallelStream().mapToLong(l -> l).sum(); + requestsToPeers + .entrySet() + .parallelStream() + .forEach( + entry -> + percentageReq.put( + entry.getKey(), + entry.getValue().floatValue() / totalReq.floatValue())); + return percentageReq + .entrySet() + .parallelStream() + .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e2, + LinkedHashMap::new)); + } finally { + requestsLock.unlock(); + } } /** @@ -126,9 +175,14 @@ synchronized Map getPercentageOfRequestsToPeers() { * @param _totalBlocks total number of blocks received */ private void updatePeerTotalBlocks(String _nodeId, int _totalBlocks) { - long blocks = (long) _totalBlocks; - if (blocksByPeer.putIfAbsent(_nodeId, blocks) != null) { - blocksByPeer.computeIfPresent(_nodeId, (key, value) -> value + blocks); + seedsLock.lock(); + try { + long blocks = (long) _totalBlocks; + if (blocksByPeer.putIfAbsent(_nodeId, blocks) != null) { + blocksByPeer.computeIfPresent(_nodeId, (key, value) -> value + blocks); + } + } finally { + seedsLock.unlock(); } } @@ -137,18 +191,23 @@ private void updatePeerTotalBlocks(String _nodeId, int _totalBlocks) { * * @return map of total imported blocks by peer and sorted in descending order */ - synchronized Map getTotalBlocksByPeer() { - return blocksByPeer - .entrySet() - .stream() - .filter(entry -> entry.getValue() > 0) - .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) - .collect( - Collectors.toMap( - Map.Entry::getKey, - Map.Entry::getValue, - (e1, e2) -> e2, - LinkedHashMap::new)); + Map getTotalBlocksByPeer() { + seedsLock.lock(); + try { + return blocksByPeer + .entrySet() + .parallelStream() + .filter(entry -> entry.getValue() > 0) + .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e2, + LinkedHashMap::new)); + } finally { + seedsLock.unlock(); + } } /** @@ -156,10 +215,15 @@ synchronized Map getTotalBlocksByPeer() { * * @param _nodeId peer node display Id */ - public synchronized void updateTotalBlockRequestsByPeer(String _nodeId, int _totalBlocks) { - long blocks = (long) _totalBlocks; - if (blockRequestsByPeer.putIfAbsent(_nodeId, blocks) != null) { - blockRequestsByPeer.computeIfPresent(_nodeId, (key, value) -> value + blocks); + public void updateTotalBlockRequestsByPeer(String _nodeId, int _totalBlocks) { + leechesLock.lock(); + try { + long blocks = (long) _totalBlocks; + if (blockRequestsByPeer.putIfAbsent(_nodeId, blocks) != null) { + blockRequestsByPeer.computeIfPresent(_nodeId, (key, value) -> value + blocks); + } + } finally { + leechesLock.unlock(); } } @@ -168,17 +232,22 @@ public synchronized void updateTotalBlockRequestsByPeer(String _nodeId, int _tot * * @return map of total requested blocks by peer and sorted in descending order */ - synchronized Map getTotalBlockRequestsByPeer() { - return blockRequestsByPeer - .entrySet() - .stream() - .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) - .collect( - Collectors.toMap( - Map.Entry::getKey, - Map.Entry::getValue, - (e1, e2) -> e2, - LinkedHashMap::new)); + Map getTotalBlockRequestsByPeer() { + leechesLock.lock(); + try { + return blockRequestsByPeer + .entrySet() + .parallelStream() + .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e2, + LinkedHashMap::new)); + } finally { + leechesLock.unlock(); + } } /** @@ -186,13 +255,18 @@ synchronized Map getTotalBlockRequestsByPeer() { * * @param _nodeId peer node display Id */ - public synchronized void addPeerRequestTime(String _nodeId, long _requestTime) { - LinkedList requestStartTimes = - statusRequestTimeByPeers.containsKey(_nodeId) - ? statusRequestTimeByPeers.get(_nodeId) - : new LinkedList<>(); - requestStartTimes.add(_requestTime); - statusRequestTimeByPeers.put(_nodeId, requestStartTimes); + public void addPeerRequestTime(String _nodeId, long _requestTime) { + responsesLock.lock(); + try { + LinkedList requestStartTimes = + statusRequestTimeByPeers.containsKey(_nodeId) + ? statusRequestTimeByPeers.get(_nodeId) + : new LinkedList<>(); + requestStartTimes.add(_requestTime); + statusRequestTimeByPeers.put(_nodeId, requestStartTimes); + } finally { + responsesLock.unlock(); + } } /** @@ -200,13 +274,18 @@ public synchronized void addPeerRequestTime(String _nodeId, long _requestTime) { * * @param _nodeId peer node display Id */ - public synchronized void addPeerResponseTime(String _nodeId, long _requestTime) { - LinkedList requestStartTimes = - statusResponseTimeByPeers.containsKey(_nodeId) - ? statusResponseTimeByPeers.get(_nodeId) - : new LinkedList<>(); - requestStartTimes.add(_requestTime); - statusResponseTimeByPeers.put(_nodeId, requestStartTimes); + public void addPeerResponseTime(String _nodeId, long _requestTime) { + responsesLock.lock(); + try { + LinkedList requestStartTimes = + statusResponseTimeByPeers.containsKey(_nodeId) + ? statusResponseTimeByPeers.get(_nodeId) + : new LinkedList<>(); + requestStartTimes.add(_requestTime); + statusResponseTimeByPeers.put(_nodeId, requestStartTimes); + } finally { + responsesLock.unlock(); + } } /** @@ -214,59 +293,73 @@ public synchronized void addPeerResponseTime(String _nodeId, long _requestTime) * * @return map of average response time by peer node */ - synchronized Map getAverageResponseTimeByPeers() { - Map avgResponseTimeByPeers = - statusRequestTimeByPeers - .entrySet() - .stream() - .collect( - Collectors.toMap( // collect a map of average response times by peer - Map.Entry::getKey, // node display Id - entry -> { // calculate the average response time + Map getAverageResponseTimeByPeers() { + responsesLock.lock(); + try { + Map avgResponseTimeByPeers = + statusRequestTimeByPeers + .entrySet() + .parallelStream() + .collect( + Collectors.toMap( + // collect a map of average response times by peer + Map.Entry::getKey, // node display Id + entry -> { // calculate the average response time + String _nodeId = entry.getKey(); + final List requestTimes = entry.getValue(); + final List responseTimes = + statusResponseTimeByPeers.getOrDefault( + _nodeId, new LinkedList<>()); + return Math.ceil( // truncates average value + IntStream.range( + // calculates the status + // response time + 0, + Math.min( + requestTimes.size(), + responseTimes + .size())) + .mapToLong( + // subtract + // (response - request) + // time + i -> + responseTimes.get(i) + - requestTimes + .get( + i)) + // averaged over all + // requests + .average() + .orElse(0)); + })); - String _nodeId = entry.getKey(); - final List requestTimes = entry.getValue(); - final List responseTimes = - statusResponseTimeByPeers.getOrDefault( - _nodeId, new LinkedList()); - Double average = - Math.ceil( // truncates average value - IntStream.range( // calculates the status response time - 0, - Math.min(requestTimes.size(), responseTimes.size())) - .mapToLong( // subtract (response - request) time - i -> - ((Long)responseTimes.get(i)).longValue() - - ((Long)requestTimes.get(i)).longValue() - ) - // averaged over all requests - .average().orElse(0)); - return average; - })); - - overallAvgPeerResponseTime = - statusRequestTimeByPeers.isEmpty() - ? overallAvgPeerResponseTime - : Double.valueOf( - Math.ceil( - avgResponseTimeByPeers - .entrySet() - .stream() - .mapToDouble(entry -> entry.getValue()) - .average() - .getAsDouble())) - .longValue(); + overallAvgPeerResponseTime = + statusRequestTimeByPeers.isEmpty() + ? overallAvgPeerResponseTime + : Double.valueOf( + Math.ceil( + avgResponseTimeByPeers + .entrySet() + .parallelStream() + .mapToDouble(Entry::getValue) + .average() + .getAsDouble())) + .longValue(); - return avgResponseTimeByPeers - .entrySet() - .stream() - .sorted(Map.Entry.comparingByValue()) - .collect( - Collectors.toMap( - Map.Entry::getKey, - Map.Entry::getValue, - (e1, e2) -> e2, - LinkedHashMap::new)); + return avgResponseTimeByPeers + .entrySet() + .parallelStream() + .sorted(Map.Entry.comparingByValue()) + .collect( + Collectors.toMap( + Map.Entry::getKey, + Map.Entry::getValue, + (e1, e2) -> e2, + LinkedHashMap::new)); + } finally { + responsesLock.unlock(); + } } /** @@ -274,7 +367,12 @@ synchronized Map getAverageResponseTimeByPeers() { * * @return overall average response time */ - synchronized Long getOverallAveragePeerResponseTime() { - return overallAvgPeerResponseTime; + Long getOverallAveragePeerResponseTime() { + responsesLock.lock(); + try { + return overallAvgPeerResponseTime; + } finally { + responsesLock.unlock(); + } } } From 4bfe10bcb7875492b12e3b732eca157c22a43f75 Mon Sep 17 00:00:00 2001 From: Alexandra Date: Mon, 19 Nov 2018 16:03:03 -0500 Subject: [PATCH 2/9] calculate average response times without producing incorrect negative value --- .../org/aion/zero/impl/sync/SyncStats.java | 64 +++++++++++-------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java index 90f29e12ae..9f3bbbfd01 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java @@ -33,7 +33,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; -import java.util.stream.IntStream; /** @author chris */ public final class SyncStats { @@ -306,32 +305,11 @@ Map getAverageResponseTimeByPeers() { Map.Entry::getKey, // node display Id entry -> { // calculate the average response time String _nodeId = entry.getKey(); - final List requestTimes = entry.getValue(); - final List responseTimes = + final List requests = entry.getValue(); + final List responses = statusResponseTimeByPeers.getOrDefault( _nodeId, new LinkedList<>()); - return Math.ceil( // truncates average value - IntStream.range( - // calculates the status - // response time - 0, - Math.min( - requestTimes.size(), - responseTimes - .size())) - .mapToLong( - // subtract - // (response - request) - // time - i -> - responseTimes.get(i) - - requestTimes - .get( - i)) - // averaged over all - // requests - .average() - .orElse(0)); + return calculateAverage(requests, responses); })); overallAvgPeerResponseTime = @@ -342,6 +320,8 @@ Map getAverageResponseTimeByPeers() { avgResponseTimeByPeers .entrySet() .parallelStream() + // ignore peers without meaningful data + .filter(entry -> entry.getValue() >= 0) .mapToDouble(Entry::getValue) .average() .getAsDouble())) @@ -350,6 +330,8 @@ Map getAverageResponseTimeByPeers() { return avgResponseTimeByPeers .entrySet() .parallelStream() + // ignore peers without meaningful data + .filter(entry -> entry.getValue() >= 0) .sorted(Map.Entry.comparingByValue()) .collect( Collectors.toMap( @@ -362,6 +344,38 @@ Map getAverageResponseTimeByPeers() { } } + /** + * Computes the average response time for a peer given the lists of gathered requests and + * response times. + * + * @param requestTimes list of times for requests made + * @param responseTimes list of times for responses received + * @return the average response time for the request-response cycle + */ + private static Double calculateAverage(List requestTimes, List responseTimes) { + int entries = 0; + double sum = 0; + + // only consider requests that had responses + for (int i = 0; i < responseTimes.size(); i++) { + long request = requestTimes.get(i); + long response = responseTimes.get(i); + + // ignore data where the requests comes after the response + if (response >= request) { + sum += response - request; + entries++; + } + } + + if (entries == 0) { + // indicates no data + return (double) -1; + } else { + return Math.ceil(sum / entries); + } + } + /** * Obtains the overall average response time from all active peer nodes * From 45a33459f43a53d3963309ae26acb23d335baaea Mon Sep 17 00:00:00 2001 From: Alexandra Date: Mon, 19 Nov 2018 16:28:58 -0500 Subject: [PATCH 3/9] correct formatting and license for stats test --- .../aion/zero/impl/sync/SyncStatsTest.java | 93 ++++++++++++------- 1 file changed, 59 insertions(+), 34 deletions(-) diff --git a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java index 772c592192..0cec01423c 100644 --- a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java +++ b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java @@ -1,3 +1,26 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ + package org.aion.zero.impl.sync; import static com.google.common.truth.Truth.assertThat; @@ -19,10 +42,11 @@ public class SyncStatsTest { private static final List accounts = generateAccounts(10); - private final StandaloneBlockchain.Bundle bundle = new StandaloneBlockchain.Builder() - .withValidatorConfiguration("simple") - .withDefaultAccounts(accounts) - .build(); + private final StandaloneBlockchain.Bundle bundle = + new StandaloneBlockchain.Builder() + .withValidatorConfiguration("simple") + .withDefaultAccounts(accounts) + .build(); private static final List peers = new ArrayList<>(); @@ -57,10 +81,10 @@ public void testAvgBlocksPerSecStat() { Thread.sleep(1000); } catch (InterruptedException e) { - } + } } - assertThat(stats.getAvgBlocksPerSec() <= 3.).isTrue(); + assertThat(stats.getAvgBlocksPerSec() <= 3.).isTrue(); } @Test @@ -80,16 +104,16 @@ public void testTotalRequestsToPeersStat() { int peerNo = 0; int processedBlocks = 0; - for(int totalBlocks = peers.size(); totalBlocks > 0; totalBlocks--) { - int blocks = totalBlocks; - processedBlocks += totalBlocks; - while(blocks > 0) { - AionBlock current = generateNewBlock(chain, chain.getBestBlock(), accounts, 10); - assertThat(chain.tryToConnect(current)).isEqualTo(ImportResult.IMPORTED_BEST); - stats.update(peers.get(peerNo), blocks, chain.getBestBlock().getNumber()); - blocks--; - } - peerNo++; + for (int totalBlocks = peers.size(); totalBlocks > 0; totalBlocks--) { + int blocks = totalBlocks; + processedBlocks += totalBlocks; + while (blocks > 0) { + AionBlock current = generateNewBlock(chain, chain.getBestBlock(), accounts, 10); + assertThat(chain.tryToConnect(current)).isEqualTo(ImportResult.IMPORTED_BEST); + stats.update(peers.get(peerNo), blocks, chain.getBestBlock().getNumber()); + blocks--; + } + peerNo++; } Map reqToPeers = stats.getPercentageOfRequestsToPeers(); @@ -105,17 +129,17 @@ public void testTotalRequestsToPeersStat() { long lastTotalBlocks = processedBlocks; - for(String nodeId:reqToPeers.keySet()) { - float percentageReq = reqToPeers.get(nodeId); - // ensures desc order - assertThat(lastPercentage >= percentageReq).isTrue(); - lastPercentage = percentageReq; - assertThat(percentageReq - (1. * blocks/processedBlocks) < diffThreshold).isTrue(); - // ensures desc order - assertThat(lastTotalBlocks >= totalBlockReqByPeer.get(nodeId)).isTrue(); - lastTotalBlocks = totalBlockReqByPeer.get(nodeId); - assertThat(totalBlockReqByPeer.get(nodeId).compareTo(Long.valueOf(blocks)) == 0); - blocks--; + for (String nodeId : reqToPeers.keySet()) { + float percentageReq = reqToPeers.get(nodeId); + // ensures desc order + assertThat(lastPercentage >= percentageReq).isTrue(); + lastPercentage = percentageReq; + assertThat(percentageReq - (1. * blocks / processedBlocks) < diffThreshold).isTrue(); + // ensures desc order + assertThat(lastTotalBlocks >= totalBlockReqByPeer.get(nodeId)).isTrue(); + lastTotalBlocks = totalBlockReqByPeer.get(nodeId); + assertThat(totalBlockReqByPeer.get(nodeId).compareTo(Long.valueOf(blocks)) == 0); + blocks--; } } @@ -132,7 +156,7 @@ public void testTotalBlockRequestsByPeerStats() { assertThat(emptyTotalBlocksByPeer.size() == 0).isTrue(); int blocks = 3; - for(String nodeId:peers) { + for (String nodeId : peers) { int count = 0; while (count < blocks) { stats.updateTotalBlockRequestsByPeer(nodeId, 1); @@ -145,7 +169,7 @@ public void testTotalBlockRequestsByPeerStats() { assertThat(totalBlocksByPeer.size() == peers.size()).isTrue(); Long lastTotalBlocks = (long) peers.size(); - for(String nodeId:totalBlocksByPeer.keySet()) { + for (String nodeId : totalBlocksByPeer.keySet()) { // ensures desc order assertThat(lastTotalBlocks >= totalBlocksByPeer.get(nodeId)).isTrue(); lastTotalBlocks = totalBlocksByPeer.get(nodeId); @@ -171,13 +195,13 @@ public void testAverageResponseTimeByPeersStats() { stats = new SyncStats(chain.getBestBlock().getNumber()); int requests = 3; - for(String nodeId:peers) { + for (String nodeId : peers) { int count = requests; while (count > 0) { stats.addPeerRequestTime(nodeId, System.currentTimeMillis()); try { Thread.sleep(100 * count); - } catch(InterruptedException e) { + } catch (InterruptedException e) { } stats.addPeerResponseTime(nodeId, System.currentTimeMillis()); count--; @@ -192,10 +216,11 @@ public void testAverageResponseTimeByPeersStats() { int i = 0; for (String nodeId : avgResponseTimeByPeers.keySet()) { // ensures asc order - if(i++ == 0) { + if (i++ == 0) { // First record correspond to the overall average response time by all peers - assertThat(((Long)avgResponseTimeByPeers.get(nodeId).longValue()) - .compareTo(stats.getOverallAveragePeerResponseTime())); + assertThat( + ((Long) avgResponseTimeByPeers.get(nodeId).longValue()) + .compareTo(stats.getOverallAveragePeerResponseTime())); } else { assertThat(avgResponseTimeByPeers.get(nodeId) > lastAvgResponseTime).isTrue(); lastAvgResponseTime = avgResponseTimeByPeers.get(nodeId); From 4f2fddb674821dce3d11ebf49307191548eed409 Mon Sep 17 00:00:00 2001 From: Alexandra Date: Mon, 19 Nov 2018 17:09:28 -0500 Subject: [PATCH 4/9] correctly tracking requests to peers; updated unit test --- .../aion/zero/impl/sync/RequestCounter.java | 70 +++++++++++++++ .../org/aion/zero/impl/sync/RequestType.java | 35 ++++++++ .../src/org/aion/zero/impl/sync/SyncMgr.java | 2 + .../org/aion/zero/impl/sync/SyncStats.java | 36 ++++++-- .../aion/zero/impl/sync/TaskGetBodies.java | 6 ++ .../aion/zero/impl/sync/TaskGetHeaders.java | 5 ++ .../aion/zero/impl/sync/TaskGetStatus.java | 1 + .../aion/zero/impl/sync/SyncStatsTest.java | 87 ++++++++++++++++--- 8 files changed, 221 insertions(+), 21 deletions(-) create mode 100644 modAionImpl/src/org/aion/zero/impl/sync/RequestCounter.java create mode 100644 modAionImpl/src/org/aion/zero/impl/sync/RequestType.java diff --git a/modAionImpl/src/org/aion/zero/impl/sync/RequestCounter.java b/modAionImpl/src/org/aion/zero/impl/sync/RequestCounter.java new file mode 100644 index 0000000000..9d9eb76a3e --- /dev/null +++ b/modAionImpl/src/org/aion/zero/impl/sync/RequestCounter.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ + +package org.aion.zero.impl.sync; + +/** + * Used for tracking different types of requests made to peers. + * + * @author Alexandra Roatis + */ +public class RequestCounter { + + private long status = 0; + private long headers = 0; + private long bodies = 0; + private long total = 0; + + public RequestCounter() {} + + public long getStatus() { + return status; + } + + public long getHeaders() { + return headers; + } + + public long getBodies() { + return bodies; + } + + public long getTotal() { + return total; + } + + public void incStatus() { + this.status++; + this.total++; + } + + public void incHeaders() { + this.headers++; + this.total++; + } + + public void incBodies() { + this.bodies++; + this.total++; + } +} diff --git a/modAionImpl/src/org/aion/zero/impl/sync/RequestType.java b/modAionImpl/src/org/aion/zero/impl/sync/RequestType.java new file mode 100644 index 0000000000..1ecc8f04c2 --- /dev/null +++ b/modAionImpl/src/org/aion/zero/impl/sync/RequestType.java @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2017-2018 Aion foundation. + * + * This file is part of the aion network project. + * + * The aion network project is free software: you can redistribute it + * and/or modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation, either version 3 of + * the License, or any later version. + * + * The aion network project is distributed in the hope that it will + * be useful, but WITHOUT ANY WARRANTY; without even the implied + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with the aion network project source files. + * If not, see . + * + * Contributors: + * Aion foundation. + */ + +package org.aion.zero.impl.sync; + +/** + * Used for tracking different types of requests made to peers. + * + * @author Alexandra Roatis + */ +public enum RequestType { + STATUS, + HEADERS, + BODIES +} diff --git a/modAionImpl/src/org/aion/zero/impl/sync/SyncMgr.java b/modAionImpl/src/org/aion/zero/impl/sync/SyncMgr.java index d4cf417fe4..86e424703e 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/SyncMgr.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/SyncMgr.java @@ -192,6 +192,7 @@ public void init( downloadedHeaders, headersWithBodiesRequested, peerStates, + stats, log), "sync-gb"); syncGb.start(); @@ -249,6 +250,7 @@ private void getHeaders(BigInteger _selfTd) { chain.getBestBlock().getNumber(), _selfTd, peerStates, + stats, log)); queueFull.set(false); } diff --git a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java index 9f3bbbfd01..fb415df7af 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java @@ -47,7 +47,7 @@ public final class SyncStats { private final Lock blockAverageLock = new ReentrantLock(); /** @implNote Access to this resource is managed by the {@link #requestsLock}. */ - private final Map requestsToPeers = new HashMap<>(); + private final Map requestsToPeers = new HashMap<>(); private final Lock requestsLock = new ReentrantLock(); @@ -99,7 +99,6 @@ void update(String _nodeId, int _totalBlocks, long _blockNumber) { (double) (_blockNumber - startBlock) * 1000 / (System.currentTimeMillis() - start); - updateTotalRequestsToPeer(_nodeId); updatePeerTotalBlocks(_nodeId, _totalBlocks); } finally { blockAverageLock.unlock(); @@ -118,14 +117,30 @@ void update(String _nodeId, int _totalBlocks, long _blockNumber) { /** * Updates the total requests made to a pear * - * @param _nodeId peer node display Id + * @param nodeId peer node display Id */ - private void updateTotalRequestsToPeer(String _nodeId) { + public void updateTotalRequestsToPeer(String nodeId, RequestType type) { requestsLock.lock(); try { - if (requestsToPeers.putIfAbsent(_nodeId, 1L) != null) { - requestsToPeers.computeIfPresent(_nodeId, (key, value) -> value + 1L); + RequestCounter current = requestsToPeers.get(nodeId); + + if (current == null) { + current = new RequestCounter(); + requestsToPeers.put(nodeId, current); + } + + switch (type) { + case STATUS: + current.incStatus(); + break; + case HEADERS: + current.incHeaders(); + break; + case BODIES: + current.incBodies(); + break; } + } finally { requestsLock.unlock(); } @@ -143,7 +158,12 @@ Map getPercentageOfRequestsToPeers() { try { Map percentageReq = new HashMap<>(); - Long totalReq = requestsToPeers.values().parallelStream().mapToLong(l -> l).sum(); + Long totalReq = + requestsToPeers + .values() + .parallelStream() + .mapToLong(entry -> entry.getTotal()) + .sum(); requestsToPeers .entrySet() .parallelStream() @@ -151,7 +171,7 @@ Map getPercentageOfRequestsToPeers() { entry -> percentageReq.put( entry.getKey(), - entry.getValue().floatValue() / totalReq.floatValue())); + entry.getValue().getTotal() / totalReq.floatValue())); return percentageReq .entrySet() .parallelStream() diff --git a/modAionImpl/src/org/aion/zero/impl/sync/TaskGetBodies.java b/modAionImpl/src/org/aion/zero/impl/sync/TaskGetBodies.java index bb7156229c..964746fed0 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/TaskGetBodies.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/TaskGetBodies.java @@ -54,6 +54,8 @@ final class TaskGetBodies implements Runnable { private final Logger log; + private final SyncStats stats; + /** * @param _p2p IP2pMgr * @param _run AtomicBoolean @@ -66,12 +68,14 @@ final class TaskGetBodies implements Runnable { final BlockingQueue _downloadedHeaders, final ConcurrentHashMap _headersWithBodiesRequested, final Map peerStates, + final SyncStats _stats, final Logger log) { this.p2p = _p2p; this.run = _run; this.downloadedHeaders = _downloadedHeaders; this.headersWithBodiesRequested = _headersWithBodiesRequested; this.peerStates = peerStates; + this.stats = _stats; this.log = log; } @@ -105,6 +109,8 @@ public void run() { displayId, new ReqBlocksBodies( headers.stream().map(k -> k.getHash()).collect(Collectors.toList()))); + stats.updateTotalRequestsToPeer(displayId, RequestType.BODIES); + headersWithBodiesRequested.put(idHash, hw); PeerState peerState = peerStates.get(hw.getNodeIdHash()); diff --git a/modAionImpl/src/org/aion/zero/impl/sync/TaskGetHeaders.java b/modAionImpl/src/org/aion/zero/impl/sync/TaskGetHeaders.java index dbb5a204fa..dfd5248ac1 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/TaskGetHeaders.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/TaskGetHeaders.java @@ -53,6 +53,8 @@ final class TaskGetHeaders implements Runnable { private final Map peerStates; + private final SyncStats stats; + private final Logger log; private final Random random = new Random(System.currentTimeMillis()); @@ -62,11 +64,13 @@ final class TaskGetHeaders implements Runnable { long selfNumber, BigInteger selfTd, Map peerStates, + final SyncStats _stats, Logger log) { this.p2p = p2p; this.selfNumber = selfNumber; this.selfTd = selfTd; this.peerStates = peerStates; + this.stats = _stats; this.log = log; } @@ -190,6 +194,7 @@ public void run() { } ReqBlocksHeaders rbh = new ReqBlocksHeaders(from, size); this.p2p.send(node.getIdHash(), node.getIdShort(), rbh); + stats.updateTotalRequestsToPeer(node.getIdShort(), RequestType.STATUS); // update timestamp state.setLastHeaderRequest(now); diff --git a/modAionImpl/src/org/aion/zero/impl/sync/TaskGetStatus.java b/modAionImpl/src/org/aion/zero/impl/sync/TaskGetStatus.java index df5e1a0cee..8cff0d7f9e 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/TaskGetStatus.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/TaskGetStatus.java @@ -68,6 +68,7 @@ public void run() { for (INode n : p2p.getActiveNodes().values()) { // System.out.println("requesting-status from-node=" + n.getIdShort()); p2p.send(n.getIdHash(), n.getIdShort(), reqStatus); + stats.updateTotalRequestsToPeer(n.getIdShort(), RequestType.STATUS); stats.addPeerRequestTime(n.getIdShort(), System.currentTimeMillis()); } Thread.sleep(interval); diff --git a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java index 0cec01423c..4d007b390d 100644 --- a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java +++ b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java @@ -89,6 +89,79 @@ public void testAvgBlocksPerSecStat() { @Test public void testTotalRequestsToPeersStat() { + List peers = new ArrayList<>(this.peers); + while (peers.size() < 4) { + peers.add(UUID.randomUUID().toString().substring(0, 6)); + } + + // this tests requires at least 3 peers in the list + assertThat(peers.size()).isAtLeast(4); + + StandaloneBlockchain chain = bundle.bc; + SyncStats stats = new SyncStats(chain.getBestBlock().getNumber()); + + // ensures correct behaviour on empty stats + Map emptyReqToPeers = stats.getPercentageOfRequestsToPeers(); + assertThat(emptyReqToPeers.size()).isEqualTo(0); + + float processedRequests = 0; + + String firstPeer = peers.get(0); + String secondPeer = peers.get(1); + String thirdPeer = peers.get(2); + + for (String peer : peers) { + // status requests + stats.updateTotalRequestsToPeer(peer, RequestType.STATUS); + processedRequests++; + + if (peer == firstPeer || peer == secondPeer) { + // header requests + stats.updateTotalRequestsToPeer(peer, RequestType.HEADERS); + processedRequests++; + } + + // bodies requests + if (peer == firstPeer) { + stats.updateTotalRequestsToPeer(peer, RequestType.BODIES); + processedRequests++; + } + } + + Map reqToPeers = stats.getPercentageOfRequestsToPeers(); + + // makes sure no additional peers were created + assertThat(reqToPeers.size()).isEqualTo(peers.size()); + + // by design (the updates above are not symmetrical) + assertThat(reqToPeers.get(firstPeer)).isGreaterThan(reqToPeers.get(secondPeer)); + assertThat(reqToPeers.get(firstPeer)).isEqualTo(3 / processedRequests); + + assertThat(reqToPeers.get(secondPeer)).isGreaterThan(reqToPeers.get(thirdPeer)); + assertThat(reqToPeers.get(secondPeer)).isEqualTo(2 / processedRequests); + + assertThat(reqToPeers.get(thirdPeer)).isEqualTo(1 / processedRequests); + + for (String otherPeers : peers.subList(3, peers.size())) { + assertThat(reqToPeers.get(otherPeers)).isEqualTo(reqToPeers.get(thirdPeer)); + } + + int blocks = 3; + + float lastPercentage = (float) 1; + float diffThreshold = (float) 0.01; + + for (String nodeId : reqToPeers.keySet()) { + float percentageReq = reqToPeers.get(nodeId); + // ensures desc order + assertThat(lastPercentage).isAtLeast(percentageReq); + lastPercentage = percentageReq; + assertThat(percentageReq - (1. * blocks / processedRequests) < diffThreshold).isTrue(); + } + } + + @Test + public void testTotalBlocksByPeer() { StandaloneBlockchain chain = bundle.bc; generateRandomChain(chain, 1, 1, accounts, 10); @@ -96,9 +169,7 @@ public void testTotalRequestsToPeersStat() { SyncStats stats = new SyncStats(chain.getBestBlock().getNumber()); // ensures correct behaviour on empty stats - Map emptyReqToPeers = stats.getPercentageOfRequestsToPeers(); Map emptyTotalBlockReqByPeer = stats.getTotalBlocksByPeer(); - assertThat(emptyReqToPeers.size() == 0).isTrue(); assertThat(emptyTotalBlockReqByPeer.size() == 0).isTrue(); int peerNo = 0; @@ -116,25 +187,15 @@ public void testTotalRequestsToPeersStat() { peerNo++; } - Map reqToPeers = stats.getPercentageOfRequestsToPeers(); Map totalBlockReqByPeer = stats.getTotalBlocksByPeer(); - assertThat(reqToPeers.size() == peers.size()).isTrue(); assertThat(totalBlockReqByPeer.size() == peers.size()).isTrue(); int blocks = 3; - float lastPercentage = (float) 1; - float diffThreshold = (float) 0.01; - long lastTotalBlocks = processedBlocks; - for (String nodeId : reqToPeers.keySet()) { - float percentageReq = reqToPeers.get(nodeId); - // ensures desc order - assertThat(lastPercentage >= percentageReq).isTrue(); - lastPercentage = percentageReq; - assertThat(percentageReq - (1. * blocks / processedBlocks) < diffThreshold).isTrue(); + for (String nodeId : peers) { // ensures desc order assertThat(lastTotalBlocks >= totalBlockReqByPeer.get(nodeId)).isTrue(); lastTotalBlocks = totalBlockReqByPeer.get(nodeId); From c43a7a65f05df13838c3068351d71beee2a86f59 Mon Sep 17 00:00:00 2001 From: Alexandra Roatis Date: Tue, 20 Nov 2018 11:45:42 -0500 Subject: [PATCH 5/9] correctly tracking received blocks instead of imported blocks --- modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java | 7 ++----- .../src/org/aion/zero/impl/sync/TaskImportBlocks.java | 2 +- .../zero/impl/sync/handler/ResBlocksBodiesHandler.java | 7 ++++++- .../org/aion/zero/impl/sync/handler/ResStatusHandler.java | 2 ++ .../test/org/aion/zero/impl/sync/SyncStatsTest.java | 4 ++-- 5 files changed, 13 insertions(+), 9 deletions(-) diff --git a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java index fb415df7af..7c0b48aad3 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java @@ -88,18 +88,15 @@ public final class SyncStats { /** * Update statistics based on peer nodeId, total imported blocks, and best block number * - * @param _nodeId peer node display Id - * @param _totalBlocks total imported blocks in batch * @param _blockNumber best block number */ - void update(String _nodeId, int _totalBlocks, long _blockNumber) { + void update(long _blockNumber) { blockAverageLock.lock(); try { avgBlocksPerSec = (double) (_blockNumber - startBlock) * 1000 / (System.currentTimeMillis() - start); - updatePeerTotalBlocks(_nodeId, _totalBlocks); } finally { blockAverageLock.unlock(); } @@ -193,7 +190,7 @@ Map getPercentageOfRequestsToPeers() { * @param _nodeId peer node display Id * @param _totalBlocks total number of blocks received */ - private void updatePeerTotalBlocks(String _nodeId, int _totalBlocks) { + public void updatePeerTotalBlocks(String _nodeId, int _totalBlocks) { seedsLock.lock(); try { long blocks = (long) _totalBlocks; diff --git a/modAionImpl/src/org/aion/zero/impl/sync/TaskImportBlocks.java b/modAionImpl/src/org/aion/zero/impl/sync/TaskImportBlocks.java index 92a5282a2a..38bc330423 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/TaskImportBlocks.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/TaskImportBlocks.java @@ -148,7 +148,7 @@ public void run() { peerState.getBase()); } - stats.update(bw.getDisplayId(), bw.getBlocks().size(), getBestBlockNumber()); + stats.update(getBestBlockNumber()); } } if (log.isDebugEnabled()) { diff --git a/modAionImpl/src/org/aion/zero/impl/sync/handler/ResBlocksBodiesHandler.java b/modAionImpl/src/org/aion/zero/impl/sync/handler/ResBlocksBodiesHandler.java index 5fde01c9f1..86a432181b 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/handler/ResBlocksBodiesHandler.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/handler/ResBlocksBodiesHandler.java @@ -46,7 +46,11 @@ import org.aion.zero.impl.sync.msg.ResBlocksBodies; import org.slf4j.Logger; -/** @author chris handler for blocks bodies received from network */ +/** + * Handler for blocks bodies received from network. + * + * @author chris + */ public final class ResBlocksBodiesHandler extends Handler { private final Logger log; @@ -79,6 +83,7 @@ public void receive(int _nodeIdHashcode, String _displayId, final byte[] _msgByt p2pMgr.errCheck(_nodeIdHashcode, _displayId); log.error("", _displayId); } else { + syncMgr.getSyncStats().updatePeerTotalBlocks(_displayId, bodies.size()); syncMgr.validateAndAddBlocks(_nodeIdHashcode, _displayId, bodies); } } diff --git a/modAionImpl/src/org/aion/zero/impl/sync/handler/ResStatusHandler.java b/modAionImpl/src/org/aion/zero/impl/sync/handler/ResStatusHandler.java index 3e41276c67..fcf5229d26 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/handler/ResStatusHandler.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/handler/ResStatusHandler.java @@ -77,6 +77,8 @@ public void receive(int _nodeIdHashcode, String _displayId, final byte[] _msgByt } this.syncMgr.getSyncStats().addPeerResponseTime(_displayId, System.currentTimeMillis()); + this.syncMgr.getSyncStats().updatePeerTotalBlocks(_displayId, 1); + INode node = this.p2pMgr.getActiveNodes().get(_nodeIdHashcode); if (node != null && rs != null) { if (log.isDebugEnabled()) { diff --git a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java index 4d007b390d..ecd301f50d 100644 --- a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java +++ b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java @@ -76,7 +76,7 @@ public void testAvgBlocksPerSecStat() { assertThat(chain.tryToConnect(current)).isEqualTo(ImportResult.IMPORTED_BEST); count++; } - stats.update(peers.get(0), totalBlocks, chain.getBestBlock().getNumber()); + stats.updatePeerTotalBlocks(peers.get(0), totalBlocks); try { Thread.sleep(1000); } catch (InterruptedException e) { @@ -181,7 +181,7 @@ public void testTotalBlocksByPeer() { while (blocks > 0) { AionBlock current = generateNewBlock(chain, chain.getBestBlock(), accounts, 10); assertThat(chain.tryToConnect(current)).isEqualTo(ImportResult.IMPORTED_BEST); - stats.update(peers.get(peerNo), blocks, chain.getBestBlock().getNumber()); + stats.updatePeerTotalBlocks(peers.get(peerNo), blocks); blocks--; } peerNo++; From 283f3b0ba8f24dd4ad217797dbf65c0ad56c4b66 Mon Sep 17 00:00:00 2001 From: Alexandra Roatis Date: Tue, 20 Nov 2018 13:28:38 -0500 Subject: [PATCH 6/9] using nano time for the average response time computation --- .../src/org/aion/zero/impl/sync/SyncStats.java | 12 +++++++----- .../src/org/aion/zero/impl/sync/TaskGetStatus.java | 2 +- .../src/org/aion/zero/impl/sync/TaskShowStatus.java | 6 ++++-- .../zero/impl/sync/handler/ResStatusHandler.java | 2 +- .../test/org/aion/zero/impl/sync/SyncStatsTest.java | 6 +++--- 5 files changed, 16 insertions(+), 12 deletions(-) diff --git a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java index 7c0b48aad3..740bdcc162 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java @@ -270,6 +270,7 @@ Map getTotalBlockRequestsByPeer() { * Logs the time of status request to an active peer node * * @param _nodeId peer node display Id + * @param _requestTime time when the request was sent in nanoseconds */ public void addPeerRequestTime(String _nodeId, long _requestTime) { responsesLock.lock(); @@ -289,16 +290,17 @@ public void addPeerRequestTime(String _nodeId, long _requestTime) { * Log the time of status response received from an active peer node * * @param _nodeId peer node display Id + * @param _responseTime time when the response was received in nanoseconds */ - public void addPeerResponseTime(String _nodeId, long _requestTime) { + public void addPeerResponseTime(String _nodeId, long _responseTime) { responsesLock.lock(); try { - LinkedList requestStartTimes = + LinkedList responseEndTimes = statusResponseTimeByPeers.containsKey(_nodeId) ? statusResponseTimeByPeers.get(_nodeId) : new LinkedList<>(); - requestStartTimes.add(_requestTime); - statusResponseTimeByPeers.put(_nodeId, requestStartTimes); + responseEndTimes.add(_responseTime); + statusResponseTimeByPeers.put(_nodeId, responseEndTimes); } finally { responsesLock.unlock(); } @@ -307,7 +309,7 @@ public void addPeerResponseTime(String _nodeId, long _requestTime) { /** * Obtains the average response time by each active peer node * - * @return map of average response time by peer node + * @return map of average response time in nanoseconds by peer node */ Map getAverageResponseTimeByPeers() { responsesLock.lock(); diff --git a/modAionImpl/src/org/aion/zero/impl/sync/TaskGetStatus.java b/modAionImpl/src/org/aion/zero/impl/sync/TaskGetStatus.java index 8cff0d7f9e..acc52840b9 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/TaskGetStatus.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/TaskGetStatus.java @@ -69,7 +69,7 @@ public void run() { // System.out.println("requesting-status from-node=" + n.getIdShort()); p2p.send(n.getIdHash(), n.getIdShort(), reqStatus); stats.updateTotalRequestsToPeer(n.getIdShort(), RequestType.STATUS); - stats.addPeerRequestTime(n.getIdShort(), System.currentTimeMillis()); + stats.addPeerRequestTime(n.getIdShort(), System.nanoTime()); } Thread.sleep(interval); } catch (Exception e) { diff --git a/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java b/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java index 358cb19407..318edd6264 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java @@ -284,7 +284,8 @@ private String dumpResponseInfo() { if (!avgResponseTimeByPeers.isEmpty()) { - Long overallAvgResponse = this.stats.getOverallAveragePeerResponseTime(); + // value in milliseconds + Long overallAvgResponse = this.stats.getOverallAveragePeerResponseTime() / 1_000_000; sb.append("\n====== sync-responses-by-peer ======\n"); sb.append(String.format(" %9s %20s\n", "peer", "avg. response")); @@ -297,7 +298,8 @@ private String dumpResponseInfo() { sb.append( String.format( " id:%6s %17s ms\n", - nodeId, String.format("%.0f", avgResponse)))); + nodeId, + String.format("%.0f", avgResponse / 1_000_000)))); } return sb.toString(); diff --git a/modAionImpl/src/org/aion/zero/impl/sync/handler/ResStatusHandler.java b/modAionImpl/src/org/aion/zero/impl/sync/handler/ResStatusHandler.java index fcf5229d26..d7dbf6062d 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/handler/ResStatusHandler.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/handler/ResStatusHandler.java @@ -76,7 +76,7 @@ public void receive(int _nodeIdHashcode, String _displayId, final byte[] _msgByt } } - this.syncMgr.getSyncStats().addPeerResponseTime(_displayId, System.currentTimeMillis()); + this.syncMgr.getSyncStats().addPeerResponseTime(_displayId, System.nanoTime()); this.syncMgr.getSyncStats().updatePeerTotalBlocks(_displayId, 1); INode node = this.p2pMgr.getActiveNodes().get(_nodeIdHashcode); diff --git a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java index ecd301f50d..5a15e863c8 100644 --- a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java +++ b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java @@ -248,7 +248,7 @@ public void testAverageResponseTimeByPeersStats() { // ensures correct behaviour on empty stats Map emptyAvgResponseTimeByPeers = stats.getAverageResponseTimeByPeers(); // request time is logged but no response is received - stats.addPeerRequestTime("dummy", System.currentTimeMillis()); + stats.addPeerRequestTime("dummy", System.nanoTime()); Long overallAveragePeerResponseTime = stats.getOverallAveragePeerResponseTime(); assertThat(emptyAvgResponseTimeByPeers.size() == 0).isTrue(); assertThat(overallAveragePeerResponseTime.compareTo(0L) == 0).isTrue(); @@ -259,12 +259,12 @@ public void testAverageResponseTimeByPeersStats() { for (String nodeId : peers) { int count = requests; while (count > 0) { - stats.addPeerRequestTime(nodeId, System.currentTimeMillis()); + stats.addPeerRequestTime(nodeId, System.nanoTime()); try { Thread.sleep(100 * count); } catch (InterruptedException e) { } - stats.addPeerResponseTime(nodeId, System.currentTimeMillis()); + stats.addPeerResponseTime(nodeId, System.nanoTime()); count--; } requests--; From feb645d52c320d07d28e335c4d27be664901dc24 Mon Sep 17 00:00:00 2001 From: Alexandra Date: Tue, 20 Nov 2018 16:44:31 -0500 Subject: [PATCH 7/9] bugfix: using the minimum size of the two lists --- modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java index 740bdcc162..286517d516 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java @@ -374,9 +374,10 @@ Map getAverageResponseTimeByPeers() { private static Double calculateAverage(List requestTimes, List responseTimes) { int entries = 0; double sum = 0; + int size = Math.min(requestTimes.size(), responseTimes.size()); // only consider requests that had responses - for (int i = 0; i < responseTimes.size(); i++) { + for (int i = 0; i < size; i++) { long request = requestTimes.get(i); long response = responseTimes.get(i); From ba44525ef3c738a17e780405ad556ad38d08914a Mon Sep 17 00:00:00 2001 From: Alexandra Date: Tue, 20 Nov 2018 17:08:47 -0500 Subject: [PATCH 8/9] minor updates requested in reviews --- .../org/aion/zero/impl/sync/SyncStats.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java index 286517d516..8ce9a30413 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java @@ -37,9 +37,7 @@ /** @author chris */ public final class SyncStats { - /** @implNote Access to this resource is managed by the {@link #blockAverageLock}. */ private final long start; - /** @implNote Access to this resource is managed by the {@link #blockAverageLock}. */ private final long startBlock; /** @implNote Access to this resource is managed by the {@link #blockAverageLock}. */ private double avgBlocksPerSec; @@ -70,11 +68,11 @@ public final class SyncStats { private final Lock leechesLock = new ReentrantLock(); /** @implNote Access to this resource is managed by the {@link #responsesLock}. */ - private final Map> statusRequestTimeByPeers = new HashMap<>(); + private final Map> statusRequestTimeByPeers = new HashMap<>(); /** @implNote Access to this resource is managed by the {@link #responsesLock}. */ - private final Map> statusResponseTimeByPeers = new HashMap<>(); + private final Map> statusResponseTimeByPeers = new HashMap<>(); /** @implNote Access to this resource is managed by the {@link #responsesLock}. */ - private Long overallAvgPeerResponseTime; + private long overallAvgPeerResponseTime; private final Lock responsesLock = new ReentrantLock(); @@ -112,9 +110,10 @@ void update(long _blockNumber) { } /** - * Updates the total requests made to a pear + * Updates the total requests made to a peer. * - * @param nodeId peer node display Id + * @param nodeId peer node display id + * @param type the type of request added */ public void updateTotalRequestsToPeer(String nodeId, RequestType type) { requestsLock.lock(); @@ -227,9 +226,10 @@ Map getTotalBlocksByPeer() { } /** - * Updates the total block requests made by a pear + * Updates the total block requests made by a peer. * * @param _nodeId peer node display Id + * @param _totalBlocks total number of blocks requested */ public void updateTotalBlockRequestsByPeer(String _nodeId, int _totalBlocks) { leechesLock.lock(); @@ -275,7 +275,7 @@ Map getTotalBlockRequestsByPeer() { public void addPeerRequestTime(String _nodeId, long _requestTime) { responsesLock.lock(); try { - LinkedList requestStartTimes = + List requestStartTimes = statusRequestTimeByPeers.containsKey(_nodeId) ? statusRequestTimeByPeers.get(_nodeId) : new LinkedList<>(); @@ -295,7 +295,7 @@ public void addPeerRequestTime(String _nodeId, long _requestTime) { public void addPeerResponseTime(String _nodeId, long _responseTime) { responsesLock.lock(); try { - LinkedList responseEndTimes = + List responseEndTimes = statusResponseTimeByPeers.containsKey(_nodeId) ? statusResponseTimeByPeers.get(_nodeId) : new LinkedList<>(); @@ -371,7 +371,7 @@ Map getAverageResponseTimeByPeers() { * @param responseTimes list of times for responses received * @return the average response time for the request-response cycle */ - private static Double calculateAverage(List requestTimes, List responseTimes) { + private static double calculateAverage(List requestTimes, List responseTimes) { int entries = 0; double sum = 0; int size = Math.min(requestTimes.size(), responseTimes.size()); @@ -401,7 +401,7 @@ private static Double calculateAverage(List requestTimes, List respo * * @return overall average response time */ - Long getOverallAveragePeerResponseTime() { + long getOverallAveragePeerResponseTime() { responsesLock.lock(); try { return overallAvgPeerResponseTime; From 03007e4c1e36a3d185b7a33776e78b59f92fe0e4 Mon Sep 17 00:00:00 2001 From: Alexandra Date: Tue, 20 Nov 2018 18:26:35 -0500 Subject: [PATCH 9/9] replaced streams used for computations with for loops; updated back to stream for sorting the maps --- .../org/aion/zero/impl/sync/SyncStats.java | 96 +++++++++---------- .../aion/zero/impl/sync/TaskShowStatus.java | 6 +- .../aion/zero/impl/sync/SyncStatsTest.java | 7 +- 3 files changed, 51 insertions(+), 58 deletions(-) diff --git a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java index 8ce9a30413..0eb8706558 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java @@ -29,7 +29,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -72,7 +71,7 @@ public final class SyncStats { /** @implNote Access to this resource is managed by the {@link #responsesLock}. */ private final Map> statusResponseTimeByPeers = new HashMap<>(); /** @implNote Access to this resource is managed by the {@link #responsesLock}. */ - private long overallAvgPeerResponseTime; + private double overallAvgPeerResponseTime; private final Lock responsesLock = new ReentrantLock(); @@ -153,24 +152,21 @@ Map getPercentageOfRequestsToPeers() { requestsLock.lock(); try { - Map percentageReq = new HashMap<>(); - Long totalReq = - requestsToPeers - .values() - .parallelStream() - .mapToLong(entry -> entry.getTotal()) - .sum(); - requestsToPeers - .entrySet() - .parallelStream() - .forEach( - entry -> - percentageReq.put( - entry.getKey(), - entry.getValue().getTotal() / totalReq.floatValue())); + Map percentageReq = new LinkedHashMap<>(); + + float totalReq = 0f; + + for (RequestCounter rc : requestsToPeers.values()) { + totalReq += rc.getTotal(); + } + + for (Map.Entry entry : requestsToPeers.entrySet()) { + percentageReq.put(entry.getKey(), entry.getValue().getTotal() / totalReq); + } + return percentageReq .entrySet() - .parallelStream() + .stream() .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) .collect( Collectors.toMap( @@ -211,7 +207,7 @@ Map getTotalBlocksByPeer() { try { return blocksByPeer .entrySet() - .parallelStream() + .stream() .filter(entry -> entry.getValue() > 0) .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) .collect( @@ -253,7 +249,7 @@ Map getTotalBlockRequestsByPeer() { try { return blockRequestsByPeer .entrySet() - .parallelStream() + .stream() .sorted(Collections.reverseOrder(Map.Entry.comparingByValue())) .collect( Collectors.toMap( @@ -314,43 +310,37 @@ public void addPeerResponseTime(String _nodeId, long _responseTime) { Map getAverageResponseTimeByPeers() { responsesLock.lock(); try { - Map avgResponseTimeByPeers = - statusRequestTimeByPeers - .entrySet() - .parallelStream() - .collect( - Collectors.toMap( - // collect a map of average response times by peer - Map.Entry::getKey, // node display Id - entry -> { // calculate the average response time - String _nodeId = entry.getKey(); - final List requests = entry.getValue(); - final List responses = - statusResponseTimeByPeers.getOrDefault( - _nodeId, new LinkedList<>()); - return calculateAverage(requests, responses); - })); + double average; + String nodeId; + List requests, responses; + + Map avgResponseTimeByPeers = new HashMap<>(); + overallAvgPeerResponseTime = 0d; + + for (Map.Entry> peerData : statusRequestTimeByPeers.entrySet()) { + + nodeId = peerData.getKey(); // node display Id + requests = peerData.getValue(); + responses = statusResponseTimeByPeers.getOrDefault(nodeId, new LinkedList<>()); + + // calculate the average response time + average = calculateAverage(requests, responses); + + if (average >= 0) { + // collect a map of average response times by peer + avgResponseTimeByPeers.put(nodeId, average); + overallAvgPeerResponseTime += average; + } + } overallAvgPeerResponseTime = - statusRequestTimeByPeers.isEmpty() - ? overallAvgPeerResponseTime - : Double.valueOf( - Math.ceil( - avgResponseTimeByPeers - .entrySet() - .parallelStream() - // ignore peers without meaningful data - .filter(entry -> entry.getValue() >= 0) - .mapToDouble(Entry::getValue) - .average() - .getAsDouble())) - .longValue(); + avgResponseTimeByPeers.isEmpty() + ? 0d + : overallAvgPeerResponseTime / avgResponseTimeByPeers.size(); return avgResponseTimeByPeers .entrySet() - .parallelStream() - // ignore peers without meaningful data - .filter(entry -> entry.getValue() >= 0) + .stream() .sorted(Map.Entry.comparingByValue()) .collect( Collectors.toMap( @@ -401,7 +391,7 @@ private static double calculateAverage(List requestTimes, List respo * * @return overall average response time */ - long getOverallAveragePeerResponseTime() { + double getOverallAveragePeerResponseTime() { responsesLock.lock(); try { return overallAvgPeerResponseTime; diff --git a/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java b/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java index 318edd6264..39fcda7bef 100644 --- a/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java +++ b/modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java @@ -285,13 +285,15 @@ private String dumpResponseInfo() { if (!avgResponseTimeByPeers.isEmpty()) { // value in milliseconds - Long overallAvgResponse = this.stats.getOverallAveragePeerResponseTime() / 1_000_000; + double overallAvgResponse = this.stats.getOverallAveragePeerResponseTime() / 1_000_000; sb.append("\n====== sync-responses-by-peer ======\n"); sb.append(String.format(" %9s %20s\n", "peer", "avg. response")); sb.append("------------------------------------\n"); - sb.append(String.format(" «overall» %17s ms\n", overallAvgResponse)); + sb.append( + String.format( + " «overall» %17s ms\n", String.format("%.0f", overallAvgResponse))); avgResponseTimeByPeers.forEach( (nodeId, avgResponse) -> diff --git a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java index 5a15e863c8..a2ad97b539 100644 --- a/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java +++ b/modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java @@ -249,9 +249,9 @@ public void testAverageResponseTimeByPeersStats() { Map emptyAvgResponseTimeByPeers = stats.getAverageResponseTimeByPeers(); // request time is logged but no response is received stats.addPeerRequestTime("dummy", System.nanoTime()); - Long overallAveragePeerResponseTime = stats.getOverallAveragePeerResponseTime(); + double overallAveragePeerResponseTime = stats.getOverallAveragePeerResponseTime(); assertThat(emptyAvgResponseTimeByPeers.size() == 0).isTrue(); - assertThat(overallAveragePeerResponseTime.compareTo(0L) == 0).isTrue(); + assertThat(overallAveragePeerResponseTime).isEqualTo(0d); stats = new SyncStats(chain.getBestBlock().getNumber()); @@ -280,7 +280,8 @@ public void testAverageResponseTimeByPeersStats() { if (i++ == 0) { // First record correspond to the overall average response time by all peers assertThat( - ((Long) avgResponseTimeByPeers.get(nodeId).longValue()) + avgResponseTimeByPeers + .get(nodeId) .compareTo(stats.getOverallAveragePeerResponseTime())); } else { assertThat(avgResponseTimeByPeers.get(nodeId) > lastAvgResponseTime).isTrue();