-
Notifications
You must be signed in to change notification settings - Fork 130
[NC-875] Implement iterative peer search #268
[NC-875] Implement iterative peer search #268
Conversation
34baf46
to
db6ac93
Compare
5cecfbc
to
d891a4d
Compare
38a1f53
to
39323ac
Compare
private final BondingAgent bondingAgent; | ||
private final NeighborFinder neighborFinder; | ||
private final HashMap<Peer, Integer> anteMap; | ||
private final SortedSet<Map.Entry<Peer, Integer>> distanceSortedPeers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be using Map.Entry
here. It needs to be copied to a custom type. Partly because Map.Entry
is just too generic a type so becomes unclear very quickly and partly because Map
implementations are allowed to reuse the Map.Entry
instance when iterating so it may actually be changed underneath us.
...in/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java
Show resolved
Hide resolved
BytesValue peerId; | ||
|
||
OutstandingRequest(final BytesValue peerId) { | ||
this.creation = Instant.now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't use Instant.now()
directly because it makes it too difficult to test the time handling. Instead we should inject a java.time.Clock
instance - then in production we can use Clock.systemUTC()
and in tests we can use Clock.fixed
, a mock or any other implementation we want so we can control time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added some mechanism for testing the expiration of the outstanding requests, but maybe it's not rigorous enough. I'm happy to add this in, I just wanted to get some feedback on how it might work with the way that the expiration is being tested atm, since this comment was prior to that being implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely need to abstract the clock - we don't want to have to sleep for 60 seconds in the test as it slows down the build too much. Having the scheduling of the periodic job be external to this class and an abstracted clock would let us just advance the clock and trigger another check without any sleeping.
} | ||
|
||
private List<Peer> determineFindNodeCandidates() { | ||
distanceSortedPeers.addAll(anteMap.entrySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to clear distanceSortedPeers
first? If we just want to have each peer in anteMap
in a sorted list, it would be better to just iterate through anteMap
to create an ArrayList
then use Collections.sort
to sort it in place rather than going via a sorted set.
Or, third option since we want a fixed number of items, iterate through anteMap
adding each item to a TreeMap
(need to create a custom class as we shouldn't be reusing Map.Entry
instances). After adding each item check if the size of our set is >
CONCURRENT_REQUEST_LIMITif so use
pollLastto remove the last item.
pollLastis from
NavigableMap, a sub-type of
SortedSetwhich
TreeSet` implements.
} | ||
|
||
boolean isExpired() { | ||
Duration duration = Duration.between(creation, Instant.now()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid Instant.now
here too.
cc6bd63
to
3e0c60c
Compare
if (peerBlacklist.contains(neighbor) || peerTable.get(neighbor).isPresent()) { | ||
continue; | ||
} | ||
bond(neighbor, false); | ||
bond((DiscoveryPeer) neighbor, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're going to have to cast things back to DiscoveryPeer
we should just be honest and go back to using the DiscoveryPeer
type all the way through. It should only be declared Peer
if it really can be any implementation of Peer
.
TIMEOUT_TASK_DELAY, | ||
v -> { | ||
List<OutstandingRequest> outstandingRequestListCopy = | ||
new ArrayList<>(outstandingRequestList); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we making a copy here? It doesn't provide any thread safety because making the copy requires iterating the list.
For thread safety through this class, I suspect we'll need to use synchronized
blocks so that only one thread can be anywhere in this class at a time.
this.outstandingRequestList = new ArrayList<>(); | ||
this.contactedInCurrentExecution = new ArrayList<>(); | ||
|
||
commenceTimeoutTask(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably don't want to kick off the timeout task in the constructor - otherwise it will kick off in every test and be out of the test's control.
Also would a new instance of this class be created each time we run discovery? If so, something needs to clean up the timeout task.
I would probably separate responsibility for running the timer and move it out of this class. So just expose the expiry checking logic as a public method of this class that PeerDiscoveryController
would call periodically. That makes testing the actual logic much easier and avoids a lot of the lifecycle work.
If the plan is to only ever have one instance of this class, then a static init
method could be used to create the class and setup the periodic polling at once but tests would still be able to create the class via the constructor and controlling the timeout check directly.
outstandingRequestList.remove(outstandingRequest); | ||
} | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Most of the variables in here should be final. I suggest enabling IntelliJ's inspections "Local variable or parameter can be final" and "Field may be final".
} | ||
} | ||
|
||
void digestNeighboursPacket(final NeighborsPacketData neighboursPacket, final Peer peer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest moving this and kickstratBootstrapPeers
up to right under the constructor since they're the key entry point to the class. It's a minor thing but makes it easier to see how the class is used.
I'd probably also call this onNeighboursPacketReceived
to be a bit more idiomatic.
return anteList.subList(0, threshold).stream().map(PeerDistance::getPeer).collect(toList()); | ||
} | ||
|
||
private void performIteration() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this really queryNearestNodes
?
|
||
private void performIteration() { | ||
if (outstandingRequestList.isEmpty()) { | ||
List<Peer> queryCandidates = determineFindNodeCandidates(CONCURRENT_REQUEST_LIMIT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we query 3 other peers for each response we receive, the number of concurrent requests will be 3 in the first round, 9 in the second and increase exponentially. Should we be querying the 3 closest from each response or the 3 closest from all the responses from that round?
The current behaviour may be ok - having a lot of outstanding requests isn't that big a deal since we don't have to do anything while we wait for them but if do increase exponentially we need a fairly low limit on the number of rounds we do so we don't flood the network.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As it's currently configured, I'm pretty sure the algorithm will query the 3 closest peers on a per round basis, so there wouldn't be more than 3 OutstandingRequests at any given time.
If we take (A) as a bootstrap node, and (A) sends us back a packet that tells us about (B), (C), (D) & (E), we'll consider these peers and deposit them all into the anteList.
As we had one OutstandingRequest, to (A), which we've received a response from, our OutstandingRequestList will be empty, so we'll initiate another "round", i.e. set of FindNode requests.
Let's say that our closest peers are (B), (C) and (D), we'll issue requests and (assuming none of them expire) we'll received responses from each one of them, each of which with n peers, all of whom will be deposited into the anteList.
When we've gotten responses from all of them, that is when our OutstandingRequestList will again be empty, which is what will precipitate a fresh round of 3 requests.
That was my intention, and that's how I think it works, but maybe there's something I've overlooked?
BytesValue peerId; | ||
|
||
OutstandingRequest(final BytesValue peerId) { | ||
this.creation = Instant.now(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely need to abstract the clock - we don't want to have to sleep for 60 seconds in the test as it slows down the build too much. Having the scheduling of the periodic job be external to this class and an abstracted clock would let us just advance the clock and trigger another check without any sleeping.
verify(neighborFinder).issueFindNodeRequest(peer_012); | ||
verify(neighborFinder).issueFindNodeRequest(peer_013); | ||
|
||
TimeUnit.SECONDS.sleep(60); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We definitely don't want this sleep. :)
@After | ||
public void cleaUp() { | ||
vertx.close(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd move this up to under init so that you can see more clearly that we start a Vertx instance and close it rather than having the two so separated. Although with the timer becoming external you'll get rid of the need for Vertx in this test entirely which is another nice benefit.
b4f04fe
to
a5f4dcf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix up the missing finals and then LGTM.
} | ||
|
||
void kickstartBootstrapPeers(final List<Peer> bootstrapPeers) { | ||
for (Peer bootstrapPeer : bootstrapPeers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: final - right through this class. :) I suggest enabling the IntelliJ warning for non-final fields and then you can tell it to fix all.
...in/java/tech/pegasys/pantheon/ethereum/p2p/discovery/internal/RecursivePeerRefreshState.java
Show resolved
Hide resolved
a5f4dcf
to
3acaa50
Compare
Recursive Lookup
A 'lookup' locates the k closest nodes to a node ID.
The lookup initiator starts by picking α closest nodes to the target it knows of. The initiator then sends concurrent FindNode packets to those nodes. α is a system-wide concurrency parameter, such as 3. In the recursive step, the initiator resends FindNode to nodes it has learned about from previous queries. Of the k nodes the initiator has heard of closest to the target, it picks α that it has not yet queried and resends FindNode to them. Nodes that fail to respond quickly are removed from consideration until and unless they do respond.
If a round of FindNode queries fails to return a node any closer than the closest already seen, the initiator resends the find node to all of the k closest nodes it has not already queried. The lookup terminates when the initiator has queried and gotten responses from the k closest nodes it has seen. [1]
[1] https://github.com/ethereum/devp2p/blob/master/discv4.md