Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Add getNodeData to EthPeer to enable requesting node data as part of …
Browse files Browse the repository at this point in the history
…fast sync. (#589)
  • Loading branch information
ajsutton authored Jan 17, 2019
1 parent 42719a7 commit 2cb9126
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV63;
import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockBodiesMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetBlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetNodeDataMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.GetReceiptsMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class EthPeer {
private final RequestManager headersRequestManager = new RequestManager(this);
private final RequestManager bodiesRequestManager = new RequestManager(this);
private final RequestManager receiptsRequestManager = new RequestManager(this);
private final RequestManager nodeDataRequestManager = new RequestManager(this);

private final AtomicReference<Consumer<EthPeer>> onStatusesExchanged = new AtomicReference<>();
private final PeerReputation reputation = new PeerReputation();
Expand Down Expand Up @@ -120,6 +122,8 @@ public ResponseStream send(final MessageData messageData) throws PeerNotConnecte
return sendBodiesRequest(messageData);
case EthPV63.GET_RECEIPTS:
return sendReceiptsRequest(messageData);
case EthPV63.GET_NODE_DATA:
return sendNodeDataRequest(messageData);
default:
connection.sendForProtocol(protocolName, messageData);
return null;
Expand Down Expand Up @@ -168,6 +172,17 @@ private ResponseStream sendReceiptsRequest(final MessageData messageData)
() -> connection.sendForProtocol(protocolName, messageData));
}

public ResponseStream getNodeData(final Iterable<Hash> nodeHashes) throws PeerNotConnected {
final GetNodeDataMessage message = GetNodeDataMessage.create(nodeHashes);
return sendNodeDataRequest(message);
}

private ResponseStream sendNodeDataRequest(final MessageData messageData)
throws PeerNotConnected {
return nodeDataRequestManager.dispatchRequest(
() -> connection.sendForProtocol(protocolName, messageData));
}

boolean validateReceivedMessage(final EthMessage message) {
checkArgument(message.getPeer().equals(this), "Mismatched message sent to peer for dispatch");
switch (message.getData().getCode()) {
Expand All @@ -189,6 +204,12 @@ boolean validateReceivedMessage(final EthMessage message) {
return false;
}
break;
case EthPV63.NODE_DATA:
if (nodeDataRequestManager.outstandingRequests() == 0) {
LOG.warn("Unsolicited node data received.");
return false;
}
break;
default:
// Nothing to do
}
Expand All @@ -215,6 +236,10 @@ void dispatch(final EthMessage message) {
reputation.resetTimeoutCount(EthPV63.GET_RECEIPTS);
receiptsRequestManager.dispatchResponse(message);
break;
case EthPV63.NODE_DATA:
reputation.resetTimeoutCount(EthPV63.GET_NODE_DATA);
nodeDataRequestManager.dispatchResponse(message);
break;
default:
// Nothing to do
}
Expand All @@ -228,6 +253,7 @@ void handleDisconnect() {
headersRequestManager.close();
bodiesRequestManager.close();
receiptsRequestManager.close();
nodeDataRequestManager.close();
disconnectCallbacks.forEach(callback -> callback.onDisconnect(this));
}

Expand Down Expand Up @@ -294,7 +320,8 @@ public void registerHeight(final Hash blockHash, final long height) {
public int outstandingRequests() {
return headersRequestManager.outstandingRequests()
+ bodiesRequestManager.outstandingRequests()
+ receiptsRequestManager.outstandingRequests();
+ receiptsRequestManager.outstandingRequests()
+ nodeDataRequestManager.outstandingRequests();
}

public BytesValue nodeId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator;
Expand All @@ -20,14 +22,13 @@
import tech.pegasys.pantheon.ethereum.eth.manager.RequestManager.ResponseStream;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockBodiesMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.NodeDataMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.ReceiptsMessage;
import tech.pegasys.pantheon.ethereum.p2p.api.MessageData;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection;
import tech.pegasys.pantheon.ethereum.p2p.api.PeerConnection.PeerNotConnected;
import tech.pegasys.pantheon.ethereum.p2p.wire.Capability;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -43,33 +44,39 @@ public void getHeadersStream() throws PeerNotConnected {
final ResponseStreamSupplier getStream =
(peer) -> peer.getHeadersByHash(gen.hash(), 5, 0, false);
final MessageData targetMessage =
BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header()));
final MessageData otherMessage =
BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body()));
BlockHeadersMessage.create(asList(gen.header(), gen.header()));
final MessageData otherMessage = BlockBodiesMessage.create(asList(gen.body(), gen.body()));

messageStream(getStream, targetMessage, otherMessage);
}

@Test
public void getBodiesStream() throws PeerNotConnected {
final ResponseStreamSupplier getStream =
(peer) -> peer.getBodies(Arrays.asList(gen.hash(), gen.hash()));
final MessageData targetMessage =
BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body()));
final MessageData otherMessage =
BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header()));
(peer) -> peer.getBodies(asList(gen.hash(), gen.hash()));
final MessageData targetMessage = BlockBodiesMessage.create(asList(gen.body(), gen.body()));
final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header()));

messageStream(getStream, targetMessage, otherMessage);
}

@Test
public void getReceiptsStream() throws PeerNotConnected {
final ResponseStreamSupplier getStream =
(peer) -> peer.getReceipts(Arrays.asList(gen.hash(), gen.hash()));
(peer) -> peer.getReceipts(asList(gen.hash(), gen.hash()));
final MessageData targetMessage =
ReceiptsMessage.create(Collections.singletonList(gen.receipts(gen.block())));
final MessageData otherMessage =
BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header()));
ReceiptsMessage.create(singletonList(gen.receipts(gen.block())));
final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header()));

messageStream(getStream, targetMessage, otherMessage);
}

@Test
public void getNodeDataStream() throws PeerNotConnected {
final ResponseStreamSupplier getStream =
(peer) -> peer.getNodeData(asList(gen.hash(), gen.hash()));
final MessageData targetMessage = NodeDataMessage.create(singletonList(gen.bytesValue()));
final MessageData otherMessage = BlockHeadersMessage.create(asList(gen.header(), gen.header()));

messageStream(getStream, targetMessage, otherMessage);
}
Expand All @@ -88,7 +95,7 @@ public void closeStreamsOnPeerDisconnect() throws PeerNotConnected {
});
// Bodies stream
final AtomicInteger bodiesClosedCount = new AtomicInteger(0);
peer.getBodies(Arrays.asList(gen.hash(), gen.hash()))
peer.getBodies(asList(gen.hash(), gen.hash()))
.then(
(closed, msg, p) -> {
if (closed) {
Expand All @@ -97,37 +104,47 @@ public void closeStreamsOnPeerDisconnect() throws PeerNotConnected {
});
// Receipts stream
final AtomicInteger receiptsClosedCount = new AtomicInteger(0);
peer.getReceipts(Arrays.asList(gen.hash(), gen.hash()))
peer.getReceipts(asList(gen.hash(), gen.hash()))
.then(
(closed, msg, p) -> {
if (closed) {
receiptsClosedCount.incrementAndGet();
}
});
// NodeData stream
final AtomicInteger nodeDataClosedCount = new AtomicInteger(0);
peer.getNodeData(asList(gen.hash(), gen.hash()))
.then(
(closed, msg, p) -> {
if (closed) {
nodeDataClosedCount.incrementAndGet();
}
});

// Sanity check
assertThat(headersClosedCount.get()).isEqualTo(0);
assertThat(bodiesClosedCount.get()).isEqualTo(0);
assertThat(receiptsClosedCount.get()).isEqualTo(0);
assertThat(nodeDataClosedCount.get()).isEqualTo(0);

// Disconnect and check
peer.handleDisconnect();
assertThat(headersClosedCount.get()).isEqualTo(1);
assertThat(bodiesClosedCount.get()).isEqualTo(1);
assertThat(receiptsClosedCount.get()).isEqualTo(1);
assertThat(nodeDataClosedCount.get()).isEqualTo(1);
}

@Test
public void listenForMultipleStreams() throws PeerNotConnected {
// Setup peer and messages
final EthPeer peer = createPeer();
final EthMessage headersMessage =
new EthMessage(peer, BlockHeadersMessage.create(Arrays.asList(gen.header(), gen.header())));
new EthMessage(peer, BlockHeadersMessage.create(asList(gen.header(), gen.header())));
final EthMessage bodiesMessage =
new EthMessage(peer, BlockBodiesMessage.create(Arrays.asList(gen.body(), gen.body())));
new EthMessage(peer, BlockBodiesMessage.create(asList(gen.body(), gen.body())));
final EthMessage otherMessage =
new EthMessage(
peer, ReceiptsMessage.create(Collections.singletonList(gen.receipts(gen.block()))));
new EthMessage(peer, ReceiptsMessage.create(singletonList(gen.receipts(gen.block()))));

// Set up stream for headers
final AtomicInteger headersMessageCount = new AtomicInteger(0);
Expand All @@ -147,7 +164,7 @@ public void listenForMultipleStreams() throws PeerNotConnected {
final AtomicInteger bodiesMessageCount = new AtomicInteger(0);
final AtomicInteger bodiesClosedCount = new AtomicInteger(0);
final ResponseStream bodiesStream =
peer.getBodies(Arrays.asList(gen.hash(), gen.hash()))
peer.getBodies(asList(gen.hash(), gen.hash()))
.then(
(closed, msg, p) -> {
if (closed) {
Expand Down Expand Up @@ -265,7 +282,7 @@ private void messageStream(
}

private EthPeer createPeer() {
final Set<Capability> caps = new HashSet<>(Collections.singletonList(EthProtocol.ETH63));
final Set<Capability> caps = new HashSet<>(singletonList(EthProtocol.ETH63));
final PeerConnection peerConnection = new MockPeerConnection(caps);
final Consumer<EthPeer> onPeerReady = (peer) -> {};
return new EthPeer(peerConnection, EthProtocol.NAME, onPeerReady);
Expand Down

0 comments on commit 2cb9126

Please sign in to comment.