From cc6bd632f45df9485f6323d67803e2f3556b5eb4 Mon Sep 17 00:00:00 2001 From: "S. Matthew English" Date: Thu, 29 Nov 2018 17:36:02 -0500 Subject: [PATCH] use setPeriodic --- .../internal/RecursivePeerRefreshState.java | 48 +++++++++---------- .../RecursivePeerRefreshStateTest.java | 14 +++++- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java index b5910ee787..46e0d37004 100644 --- a/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java +++ b/ethereum/p2p/src/main/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java @@ -24,13 +24,12 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import java.util.TimerTask; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; + +import io.vertx.core.Vertx; class RecursivePeerRefreshState { private final int CONCURRENT_REQUEST_LIMIT = 3; + private final int TIMEOUT_TASK_DELAY = 30000; // 30 Seconds private final BytesValue target; private final PeerBlacklist peerBlacklist; private final BondingAgent bondingAgent; @@ -38,16 +37,19 @@ class RecursivePeerRefreshState { private final List anteList; private final List outstandingRequestList; private final List contactedInCurrentExecution; + private final Vertx vertx; RecursivePeerRefreshState( final BytesValue target, final PeerBlacklist peerBlacklist, final BondingAgent bondingAgent, - final NeighborFinder neighborFinder) { + final NeighborFinder neighborFinder, + final Vertx vertx) { this.target = target; this.peerBlacklist = peerBlacklist; this.bondingAgent = bondingAgent; this.neighborFinder = neighborFinder; + this.vertx = vertx; this.anteList = new ArrayList<>(); this.outstandingRequestList = new ArrayList<>(); this.contactedInCurrentExecution = new ArrayList<>(); @@ -56,29 +58,25 @@ class RecursivePeerRefreshState { } private void commenceTimeoutTask() { - TimerTask timeoutTask = - new TimerTask() { - @Override - public void run() { - List outstandingRequestListCopy = - new ArrayList<>(outstandingRequestList); - - for (OutstandingRequest outstandingRequest : outstandingRequestListCopy) { - if (outstandingRequest.isExpired()) { - List queryCandidates = determineFindNodeCandidates(anteList.size()); - for (Peer candidate : queryCandidates) { - if (!contactedInCurrentExecution.contains(candidate.getId()) - && !outstandingRequestList.contains(new OutstandingRequest(candidate))) { - executeFindNodeRequest(candidate); - } + vertx.setPeriodic( + TIMEOUT_TASK_DELAY, + v -> { + List outstandingRequestListCopy = + new ArrayList<>(outstandingRequestList); + + for (OutstandingRequest outstandingRequest : outstandingRequestListCopy) { + if (outstandingRequest.isExpired()) { + List queryCandidates = determineFindNodeCandidates(anteList.size()); + for (Peer candidate : queryCandidates) { + if (!contactedInCurrentExecution.contains(candidate.getId()) + && !outstandingRequestList.contains(new OutstandingRequest(candidate))) { + executeFindNodeRequest(candidate); } - outstandingRequestList.remove(outstandingRequest); } + outstandingRequestList.remove(outstandingRequest); } } - }; - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - executor.scheduleAtFixedRate(timeoutTask, 0, 2, TimeUnit.SECONDS); + }); } private void executeFindNodeRequest(final Peer peer) { @@ -181,7 +179,7 @@ Peer getPeer() { boolean isExpired() { Duration duration = Duration.between(creation, Instant.now()); - Duration limit = Duration.ofSeconds(5); + Duration limit = Duration.ofSeconds(30); return duration.compareTo(limit) >= 0; } diff --git a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java index 2a4b683fb7..8ff5fb802f 100644 --- a/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java +++ b/ethereum/p2p/src/test/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshStateTest.java @@ -32,6 +32,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import io.vertx.core.Vertx; +import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -39,6 +41,7 @@ public class RecursivePeerRefreshStateTest { private static final ObjectMapper MAPPER = new ObjectMapper(); private RecursivePeerRefreshState recursivePeerRefreshState; + private Vertx vertx; private final RecursivePeerRefreshState.BondingAgent bondingAgent = mock(RecursivePeerRefreshState.BondingAgent.class); @@ -82,8 +85,10 @@ public void setup() throws Exception { BytesValue target = BytesValue.fromHexString( "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"); + vertx = Vertx.vertx(); recursivePeerRefreshState = - new RecursivePeerRefreshState(target, new PeerBlacklist(), bondingAgent, neighborFinder); + new RecursivePeerRefreshState( + target, new PeerBlacklist(), bondingAgent, neighborFinder, vertx); peer_000 = generatePeer(peers); @@ -227,7 +232,7 @@ public void shouldIssueRequestToPeerWithGreaterDistanceOnExpirationOfLowerDistan verify(neighborFinder).issueFindNodeRequest(peer_012); verify(neighborFinder).issueFindNodeRequest(peer_013); - TimeUnit.SECONDS.sleep(10); + TimeUnit.SECONDS.sleep(60); verify(neighborFinder).issueFindNodeRequest(peer_010); } @@ -320,4 +325,9 @@ public String toString() { return parent + "." + tier + "." + identifier; } } + + @After + public void cleaUp() { + vertx.close(); + } }