Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
implementation of timeoutTask and corresponding test
Browse files Browse the repository at this point in the history
  • Loading branch information
smatthewenglish committed Nov 30, 2018
1 parent d5019ba commit 99b045c
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
*/
package tech.pegasys.pantheon.ethereum.p2p.discovery.internal;

import static java.util.stream.Collectors.toList;
import static tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDistanceCalculator.distance;

import tech.pegasys.pantheon.ethereum.p2p.peers.Peer;
import tech.pegasys.pantheon.ethereum.p2p.peers.PeerBlacklist;
import tech.pegasys.pantheon.util.bytes.BytesValue;
Expand All @@ -21,196 +24,201 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import static java.util.stream.Collectors.toList;
import static tech.pegasys.pantheon.ethereum.p2p.discovery.internal.PeerDistanceCalculator.distance;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

class RecursivePeerRefreshState {
private final int CONCURRENT_REQUEST_LIMIT = 3;
private final BytesValue target;
private final PeerBlacklist peerBlacklist;
private final BondingAgent bondingAgent;
private final NeighborFinder neighborFinder;
private final List<PeerDistance> anteList;
private final List<OutstandingRequest> outstandingRequestList;
private final List<BytesValue> contactedInCurrentExecution;

RecursivePeerRefreshState(
final BytesValue target,
final PeerBlacklist peerBlacklist,
final BondingAgent bondingAgent,
final NeighborFinder neighborFinder) {
this.target = target;
this.peerBlacklist = peerBlacklist;
this.bondingAgent = bondingAgent;
this.neighborFinder = neighborFinder;
this.anteList = new ArrayList<>();
this.outstandingRequestList = new ArrayList<>();
this.contactedInCurrentExecution = new ArrayList<>();

//commenceTimeoutTask();
}

/**
* The lookup initiator starts by picking CONCURRENT_REQUEST_LIMIT closest nodes to the target it
* knows of. The initiator then issues concurrent FindNode packets to those nodes.
*/
private void initiatePeerRefreshCycle(final List<Peer> peers) {
for (Peer peer : peers) {
if (!contactedInCurrentExecution.contains(peer.getId())) {
BytesValue peerId = peer.getId();
outstandingRequestList.add(new OutstandingRequest(peer.getId()));
contactedInCurrentExecution.add(peerId);
neighborFinder.issueFindNodeRequest(peer);
}
// The lookup terminates when the initiator has queried
// and gotten responses from the k closest nodes it has seen.
}
}

void digestNeighboursPacket(
final NeighborsPacketData neighboursPacket, final BytesValue peerIdentifier) {
if (outstandingRequestList.contains(new OutstandingRequest(peerIdentifier))) {
List<Peer> receivedPeerList = neighboursPacket.getNodes();
for (Peer receivedPeer : receivedPeerList) {
if (!peerBlacklist.contains(receivedPeer)) {
bondingAgent.performBonding(receivedPeer);
anteList.add(new PeerDistance(receivedPeer, distance(target, receivedPeer.getId())));
private final int CONCURRENT_REQUEST_LIMIT = 3;
private final BytesValue target;
private final PeerBlacklist peerBlacklist;
private final BondingAgent bondingAgent;
private final NeighborFinder neighborFinder;
private final List<PeerDistance> anteList;
private final List<OutstandingRequest> outstandingRequestList;
private final List<BytesValue> contactedInCurrentExecution;

RecursivePeerRefreshState(
final BytesValue target,
final PeerBlacklist peerBlacklist,
final BondingAgent bondingAgent,
final NeighborFinder neighborFinder) {
this.target = target;
this.peerBlacklist = peerBlacklist;
this.bondingAgent = bondingAgent;
this.neighborFinder = neighborFinder;
this.anteList = new ArrayList<>();
this.outstandingRequestList = new ArrayList<>();
this.contactedInCurrentExecution = new ArrayList<>();

commenceTimeoutTask();
}

private void commenceTimeoutTask() {
TimerTask timeoutTask =
new TimerTask() {
@Override
public void run() {
List<OutstandingRequest> outstandingRequestListCopy =
new ArrayList<>(outstandingRequestList);

for (OutstandingRequest outstandingRequest : outstandingRequestListCopy) {
if (outstandingRequest.isExpired()) {
List<Peer> queryCandidates = determineFindNodeCandidates(anteList.size());
for (Peer candidate : queryCandidates) {
if (!contactedInCurrentExecution.contains(candidate.getId())
&& !outstandingRequestList.contains(new OutstandingRequest(candidate))) {
executeFindNodeRequest(candidate);
}
}
outstandingRequestList.remove(outstandingRequest);
}
}
outstandingRequestList.remove(new OutstandingRequest(peerIdentifier));
performIteration();
}
}

private List<Peer> determineFindNodeCandidates(final int from, final int threshold) {
anteList.sort(
(peer1, peer2) -> {
if (peer1.getDistance() > peer2.getDistance()) return 1;
if (peer1.getDistance() < peer2.getDistance()) return -1;
return 0;
});
return anteList
.subList(from, threshold)
.stream()
.map(PeerDistance::getPeer)
.collect(toList());
}
};
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.scheduleAtFixedRate(timeoutTask, 0, 2, TimeUnit.SECONDS);
}

private void executeFindNodeRequest(final Peer peer) {
BytesValue peerId = peer.getId();
outstandingRequestList.add(new OutstandingRequest(peer));
contactedInCurrentExecution.add(peerId);
neighborFinder.issueFindNodeRequest(peer);
}

/**
* The lookup initiator starts by picking CONCURRENT_REQUEST_LIMIT closest nodes to the target it
* knows of. The initiator then issues concurrent FindNode packets to those nodes.
*/
private void initiatePeerRefreshCycle(final List<Peer> peers) {
for (Peer peer : peers) {
if (!contactedInCurrentExecution.contains(peer.getId())) {
executeFindNodeRequest(peer);
}
// The lookup terminates when the initiator has queried
// and gotten responses from the k closest nodes it has seen.
}

private void performIteration() {
if (outstandingRequestList.isEmpty()) {
List<Peer> queryCandidates = determineFindNodeCandidates(0, CONCURRENT_REQUEST_LIMIT);
initiatePeerRefreshCycle(queryCandidates);
}

void digestNeighboursPacket(final NeighborsPacketData neighboursPacket, final Peer peer) {
if (outstandingRequestList.contains(new OutstandingRequest(peer))) {
List<Peer> receivedPeerList = neighboursPacket.getNodes();
for (Peer receivedPeer : receivedPeerList) {
if (!peerBlacklist.contains(receivedPeer)) {
bondingAgent.performBonding(receivedPeer);
anteList.add(new PeerDistance(receivedPeer, distance(target, receivedPeer.getId())));
}
}
outstandingRequestList.remove(new OutstandingRequest(peer));
performIteration();
}

void kickstartBootstrapPeers(final List<Peer> bootstrapPeers) {
for (Peer bootstrapPeer : bootstrapPeers) {
BytesValue peerId = bootstrapPeer.getId();
outstandingRequestList.add(new OutstandingRequest(peerId));
contactedInCurrentExecution.add(peerId);
bondingAgent.performBonding(bootstrapPeer);
neighborFinder.issueFindNodeRequest(bootstrapPeer);
}
}

private List<Peer> determineFindNodeCandidates(final int threshold) {
anteList.sort(
(peer1, peer2) -> {
if (peer1.getDistance() > peer2.getDistance()) return 1;
if (peer1.getDistance() < peer2.getDistance()) return -1;
return 0;
});
return anteList.subList(0, threshold).stream().map(PeerDistance::getPeer).collect(toList());
}

private void performIteration() {
if (outstandingRequestList.isEmpty()) {
List<Peer> queryCandidates = determineFindNodeCandidates(CONCURRENT_REQUEST_LIMIT);
initiatePeerRefreshCycle(queryCandidates);
}
}

void kickstartBootstrapPeers(final List<Peer> bootstrapPeers) {
for (Peer bootstrapPeer : bootstrapPeers) {
BytesValue peerId = bootstrapPeer.getId();
outstandingRequestList.add(new OutstandingRequest(bootstrapPeer));
contactedInCurrentExecution.add(peerId);
bondingAgent.performBonding(bootstrapPeer);
neighborFinder.issueFindNodeRequest(bootstrapPeer);
}
}

// private void commenceTimeoutTask() {
// TimerTask timeoutTask = new TimerTask() {
// public void run() {
// for (OutstandingRequest outstandingRequest : outstandingRequestList) {
// if (outstandingRequest.isExpired()) {
// // Replace with next highest...
// List<Peer> queryCandidates = determineFindNodeCandidates(0, anteList.size());
// for (Peer peer : queryCandidates) {
// if (!outstandingRequestList.contains(peer) && !contactedInCurrentExecution.contains(peer.getId())) {
// contactedInCurrentExecution.add(peer.getId());
// bondingAgent.performBonding(peer);
// neighborFinder.issueFindNodeRequest(peer);
// // Remove the expired request from our consideration...
// outstandingRequestList.remove(outstandingRequest);
// contactedInCurrentExecution.add(outstandingRequest.peerId);
// }
// }
// }
// }
// }
// };
// ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
// executor.scheduleAtFixedRate(timeoutTask, 0, 30, TimeUnit.SECONDS);
// }

static class PeerDistance {
Peer peer;
Integer distance;

PeerDistance(final Peer peer, final Integer distance) {
this.peer = peer;
this.distance = distance;
}
static class PeerDistance {
Peer peer;
Integer distance;

Peer getPeer() {
return peer;
}
PeerDistance(final Peer peer, final Integer distance) {
this.peer = peer;
this.distance = distance;
}

Integer getDistance() {
return distance;
}
Peer getPeer() {
return peer;
}

@Override
public String toString() {
return peer + ": " + distance;
}
Integer getDistance() {
return distance;
}

static class OutstandingRequest {
Instant creation;
BytesValue peerId;
@Override
public String toString() {
return peer + ": " + distance;
}
}

OutstandingRequest(final BytesValue peerId) {
this.creation = Instant.now();
this.peerId = peerId;
}
static class OutstandingRequest {
Instant creation;
Peer peer;

boolean isExpired() {
Duration duration = Duration.between(creation, Instant.now());
Duration limit = Duration.ofSeconds(30);
return duration.compareTo(limit) >= 0;
}
OutstandingRequest(final Peer peer) {
this.creation = Instant.now();
this.peer = peer;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OutstandingRequest that = (OutstandingRequest) o;
return Objects.equals(peerId, that.peerId);
}
Peer getPeer() {
return peer;
}

@Override
public int hashCode() {
return Objects.hash(peerId);
}
boolean isExpired() {
Duration duration = Duration.between(creation, Instant.now());
Duration limit = Duration.ofSeconds(5);
return duration.compareTo(limit) >= 0;
}

@Override
public String toString() {
return peerId.toString();
}
@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OutstandingRequest that = (OutstandingRequest) o;
return Objects.equals(peer.getId(), that.peer.getId());
}

public interface NeighborFinder {
/**
* Wait for the peer to complete bonding before issuance of FindNode request.
*
* @param peer
*/
void issueFindNodeRequest(final Peer peer);
@Override
public int hashCode() {
return Objects.hash(peer.getId());
}

public interface BondingAgent {
/**
* If peer is not previously known, initiate bonding process.
*
* @param peer
*/
void performBonding(final Peer peer);
@Override
public String toString() {
return peer.toString();
}
}

public interface NeighborFinder {
/**
* Wait for the peer to complete bonding before issuance of FindNode request.
*
* @param peer
*/
void issueFindNodeRequest(final Peer peer);
}

public interface BondingAgent {
/**
* If peer is not previously known, initiate bonding process.
*
* @param peer
*/
void performBonding(final Peer peer);
}
}
Loading

0 comments on commit 99b045c

Please sign in to comment.