Skip to content

Commit

Permalink
replaced streams used for computations with for loops; updated back t…
Browse files Browse the repository at this point in the history
…o stream for sorting the maps
  • Loading branch information
AlexandraRoatis committed Nov 21, 2018
1 parent ddda901 commit 967c298
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 58 deletions.
96 changes: 43 additions & 53 deletions modAionImpl/src/org/aion/zero/impl/sync/SyncStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -72,7 +71,7 @@ public final class SyncStats {
/** @implNote Access to this resource is managed by the {@link #responsesLock}. */
private final Map<String, List<Long>> statusResponseTimeByPeers = new HashMap<>();
/** @implNote Access to this resource is managed by the {@link #responsesLock}. */
private long overallAvgPeerResponseTime;
private double overallAvgPeerResponseTime;

private final Lock responsesLock = new ReentrantLock();

Expand Down Expand Up @@ -153,24 +152,21 @@ Map<String, Float> getPercentageOfRequestsToPeers() {
requestsLock.lock();

try {
Map<String, Float> percentageReq = new HashMap<>();
Long totalReq =
requestsToPeers
.values()
.parallelStream()
.mapToLong(entry -> entry.getTotal())
.sum();
requestsToPeers
.entrySet()
.parallelStream()
.forEach(
entry ->
percentageReq.put(
entry.getKey(),
entry.getValue().getTotal() / totalReq.floatValue()));
Map<String, Float> percentageReq = new LinkedHashMap<>();

float totalReq = 0f;

for (RequestCounter rc : requestsToPeers.values()) {
totalReq += rc.getTotal();
}

for (Map.Entry<String, RequestCounter> entry : requestsToPeers.entrySet()) {
percentageReq.put(entry.getKey(), entry.getValue().getTotal() / totalReq);
}

return percentageReq
.entrySet()
.parallelStream()
.stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.collect(
Collectors.toMap(
Expand Down Expand Up @@ -211,7 +207,7 @@ Map<String, Long> getTotalBlocksByPeer() {
try {
return blocksByPeer
.entrySet()
.parallelStream()
.stream()
.filter(entry -> entry.getValue() > 0)
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.collect(
Expand Down Expand Up @@ -253,7 +249,7 @@ Map<String, Long> getTotalBlockRequestsByPeer() {
try {
return blockRequestsByPeer
.entrySet()
.parallelStream()
.stream()
.sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
.collect(
Collectors.toMap(
Expand Down Expand Up @@ -314,43 +310,37 @@ public void addPeerResponseTime(String _nodeId, long _responseTime) {
Map<String, Double> getAverageResponseTimeByPeers() {
responsesLock.lock();
try {
Map<String, Double> avgResponseTimeByPeers =
statusRequestTimeByPeers
.entrySet()
.parallelStream()
.collect(
Collectors.toMap(
// collect a map of average response times by peer
Map.Entry::getKey, // node display Id
entry -> { // calculate the average response time
String _nodeId = entry.getKey();
final List<Long> requests = entry.getValue();
final List<Long> responses =
statusResponseTimeByPeers.getOrDefault(
_nodeId, new LinkedList<>());
return calculateAverage(requests, responses);
}));
double average;
String nodeId;
List<Long> requests, responses;

Map<String, Double> avgResponseTimeByPeers = new HashMap<>();
overallAvgPeerResponseTime = 0d;

for (Map.Entry<String, List<Long>> peerData : statusRequestTimeByPeers.entrySet()) {

nodeId = peerData.getKey(); // node display Id
requests = peerData.getValue();
responses = statusResponseTimeByPeers.getOrDefault(nodeId, new LinkedList<>());

// calculate the average response time
average = calculateAverage(requests, responses);

if (average >= 0) {
// collect a map of average response times by peer
avgResponseTimeByPeers.put(nodeId, average);
overallAvgPeerResponseTime += average;
}
}

overallAvgPeerResponseTime =
statusRequestTimeByPeers.isEmpty()
? overallAvgPeerResponseTime
: Double.valueOf(
Math.ceil(
avgResponseTimeByPeers
.entrySet()
.parallelStream()
// ignore peers without meaningful data
.filter(entry -> entry.getValue() >= 0)
.mapToDouble(Entry::getValue)
.average()
.getAsDouble()))
.longValue();
avgResponseTimeByPeers.isEmpty()
? 0d
: overallAvgPeerResponseTime / avgResponseTimeByPeers.size();

return avgResponseTimeByPeers
.entrySet()
.parallelStream()
// ignore peers without meaningful data
.filter(entry -> entry.getValue() >= 0)
.stream()
.sorted(Map.Entry.comparingByValue())
.collect(
Collectors.toMap(
Expand Down Expand Up @@ -401,7 +391,7 @@ private static double calculateAverage(List<Long> requestTimes, List<Long> respo
*
* @return overall average response time
*/
long getOverallAveragePeerResponseTime() {
double getOverallAveragePeerResponseTime() {
responsesLock.lock();
try {
return overallAvgPeerResponseTime;
Expand Down
6 changes: 4 additions & 2 deletions modAionImpl/src/org/aion/zero/impl/sync/TaskShowStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,15 @@ private String dumpResponseInfo() {
if (!avgResponseTimeByPeers.isEmpty()) {

// value in milliseconds
Long overallAvgResponse = this.stats.getOverallAveragePeerResponseTime() / 1_000_000;
double overallAvgResponse = this.stats.getOverallAveragePeerResponseTime() / 1_000_000;

sb.append("\n====== sync-responses-by-peer ======\n");
sb.append(String.format(" %9s %20s\n", "peer", "avg. response"));
sb.append("------------------------------------\n");

sb.append(String.format(" «overall» %17s ms\n", overallAvgResponse));
sb.append(
String.format(
" «overall» %17s ms\n", String.format("%.0f", overallAvgResponse)));

avgResponseTimeByPeers.forEach(
(nodeId, avgResponse) ->
Expand Down
7 changes: 4 additions & 3 deletions modAionImpl/test/org/aion/zero/impl/sync/SyncStatsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,9 @@ public void testAverageResponseTimeByPeersStats() {
Map<String, Double> emptyAvgResponseTimeByPeers = stats.getAverageResponseTimeByPeers();
// request time is logged but no response is received
stats.addPeerRequestTime("dummy", System.nanoTime());
Long overallAveragePeerResponseTime = stats.getOverallAveragePeerResponseTime();
double overallAveragePeerResponseTime = stats.getOverallAveragePeerResponseTime();
assertThat(emptyAvgResponseTimeByPeers.size() == 0).isTrue();
assertThat(overallAveragePeerResponseTime.compareTo(0L) == 0).isTrue();
assertThat(overallAveragePeerResponseTime).isEqualTo(0d);

stats = new SyncStats(chain.getBestBlock().getNumber());

Expand Down Expand Up @@ -280,7 +280,8 @@ public void testAverageResponseTimeByPeersStats() {
if (i++ == 0) {
// First record correspond to the overall average response time by all peers
assertThat(
((Long) avgResponseTimeByPeers.get(nodeId).longValue())
avgResponseTimeByPeers
.get(nodeId)
.compareTo(stats.getOverallAveragePeerResponseTime()));
} else {
assertThat(avgResponseTimeByPeers.get(nodeId) > lastAvgResponseTime).isTrue();
Expand Down

0 comments on commit 967c298

Please sign in to comment.