Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
use setPeriodic
Browse files Browse the repository at this point in the history
  • Loading branch information
smatthewenglish committed Nov 30, 2018
1 parent 99b045c commit cc6bd63
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,32 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.vertx.core.Vertx;

class RecursivePeerRefreshState {
private final int CONCURRENT_REQUEST_LIMIT = 3;
private final int TIMEOUT_TASK_DELAY = 30000; // 30 Seconds
private final BytesValue target;
private final PeerBlacklist peerBlacklist;
private final BondingAgent bondingAgent;
private final NeighborFinder neighborFinder;
private final List<PeerDistance> anteList;
private final List<OutstandingRequest> outstandingRequestList;
private final List<BytesValue> contactedInCurrentExecution;
private final Vertx vertx;

RecursivePeerRefreshState(
final BytesValue target,
final PeerBlacklist peerBlacklist,
final BondingAgent bondingAgent,
final NeighborFinder neighborFinder) {
final NeighborFinder neighborFinder,
final Vertx vertx) {
this.target = target;
this.peerBlacklist = peerBlacklist;
this.bondingAgent = bondingAgent;
this.neighborFinder = neighborFinder;
this.vertx = vertx;
this.anteList = new ArrayList<>();
this.outstandingRequestList = new ArrayList<>();
this.contactedInCurrentExecution = new ArrayList<>();
Expand All @@ -56,29 +58,25 @@ class RecursivePeerRefreshState {
}

private void commenceTimeoutTask() {
TimerTask timeoutTask =
new TimerTask() {
@Override
public void run() {
List<OutstandingRequest> outstandingRequestListCopy =
new ArrayList<>(outstandingRequestList);

for (OutstandingRequest outstandingRequest : outstandingRequestListCopy) {
if (outstandingRequest.isExpired()) {
List<Peer> queryCandidates = determineFindNodeCandidates(anteList.size());
for (Peer candidate : queryCandidates) {
if (!contactedInCurrentExecution.contains(candidate.getId())
&& !outstandingRequestList.contains(new OutstandingRequest(candidate))) {
executeFindNodeRequest(candidate);
}
vertx.setPeriodic(
TIMEOUT_TASK_DELAY,
v -> {
List<OutstandingRequest> outstandingRequestListCopy =
new ArrayList<>(outstandingRequestList);

for (OutstandingRequest outstandingRequest : outstandingRequestListCopy) {
if (outstandingRequest.isExpired()) {
List<Peer> queryCandidates = determineFindNodeCandidates(anteList.size());
for (Peer candidate : queryCandidates) {
if (!contactedInCurrentExecution.contains(candidate.getId())
&& !outstandingRequestList.contains(new OutstandingRequest(candidate))) {
executeFindNodeRequest(candidate);
}
outstandingRequestList.remove(outstandingRequest);
}
outstandingRequestList.remove(outstandingRequest);
}
}
};
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(timeoutTask, 0, 2, TimeUnit.SECONDS);
});
}

private void executeFindNodeRequest(final Peer peer) {
Expand Down Expand Up @@ -181,7 +179,7 @@ Peer getPeer() {

boolean isExpired() {
Duration duration = Duration.between(creation, Instant.now());
Duration limit = Duration.ofSeconds(5);
Duration limit = Duration.ofSeconds(30);
return duration.compareTo(limit) >= 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.vertx.core.Vertx;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class RecursivePeerRefreshStateTest {
private static final ObjectMapper MAPPER = new ObjectMapper();

private RecursivePeerRefreshState recursivePeerRefreshState;
private Vertx vertx;

private final RecursivePeerRefreshState.BondingAgent bondingAgent =
mock(RecursivePeerRefreshState.BondingAgent.class);
Expand Down Expand Up @@ -82,8 +85,10 @@ public void setup() throws Exception {
BytesValue target =
BytesValue.fromHexString(
"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000");
vertx = Vertx.vertx();
recursivePeerRefreshState =
new RecursivePeerRefreshState(target, new PeerBlacklist(), bondingAgent, neighborFinder);
new RecursivePeerRefreshState(
target, new PeerBlacklist(), bondingAgent, neighborFinder, vertx);

peer_000 = generatePeer(peers);

Expand Down Expand Up @@ -227,7 +232,7 @@ public void shouldIssueRequestToPeerWithGreaterDistanceOnExpirationOfLowerDistan
verify(neighborFinder).issueFindNodeRequest(peer_012);
verify(neighborFinder).issueFindNodeRequest(peer_013);

TimeUnit.SECONDS.sleep(10);
TimeUnit.SECONDS.sleep(60);

verify(neighborFinder).issueFindNodeRequest(peer_010);
}
Expand Down Expand Up @@ -320,4 +325,9 @@ public String toString() {
return parent + "." + tier + "." + identifier;
}
}

@After
public void cleaUp() {
vertx.close();
}
}

0 comments on commit cc6bd63

Please sign in to comment.