Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Support responding to GetNodeData requests.
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton committed Jan 17, 2019
1 parent 85508e3 commit d9916ec
Show file tree
Hide file tree
Showing 21 changed files with 208 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package tech.pegasys.pantheon.consensus.ibftlegacy.protocol;

import tech.pegasys.pantheon.ethereum.chain.Blockchain;
import tech.pegasys.pantheon.ethereum.db.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.eth.EthProtocol;
import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager;
import tech.pegasys.pantheon.ethereum.p2p.api.Message;
Expand All @@ -27,11 +28,12 @@ public class Istanbul64ProtocolManager extends EthProtocolManager {

public Istanbul64ProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final int networkId,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers) {
super(blockchain, networkId, fastSyncEnabled, syncWorkers, txWorkers);
super(blockchain, worldStateArchive, networkId, fastSyncEnabled, syncWorkers, txWorkers);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.DefaultMutableWorldState;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Optional;

public class WorldStateArchive {
private final WorldStateStorage storage;
Expand All @@ -42,4 +45,8 @@ public WorldState get() {
public MutableWorldState getMutable() {
return getMutable(EMPTY_ROOT_HASH);
}

public Optional<BytesValue> getNodeData(final Hash hash) {
return storage.getNodeData(hash);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ public Optional<BytesValue> getAccountStorageTrieNode(final Bytes32 nodeHash) {
return keyValueStorage.get(nodeHash);
}

@Override
public Optional<BytesValue> getNodeData(final Hash hash) {
return keyValueStorage.get(hash);
}

@Override
public Updater updater() {
return new Updater(keyValueStorage.startTransaction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public interface WorldStateStorage {

Optional<BytesValue> getAccountStorageTrieNode(Bytes32 nodeHash);

Optional<BytesValue> getNodeData(Hash hash);

Updater updater();

interface Updater {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import tech.pegasys.pantheon.ethereum.chain.MinedBlockObserver;
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.db.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.eth.EthProtocol;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
import tech.pegasys.pantheon.ethereum.eth.messages.NewBlockMessage;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {

EthProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final int networkId,
final boolean fastSyncEnabled,
final int requestLimit,
Expand All @@ -83,18 +85,20 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
ethContext = new EthContext(getSupportedProtocol(), ethPeers, ethMessages, scheduler);

// Set up request handlers
new EthServer(blockchain, ethMessages, requestLimit);
new EthServer(blockchain, worldStateArchive, ethMessages, requestLimit);
}

EthProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final int networkId,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers,
final int requestLimit) {
this(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
requestLimit,
Expand All @@ -103,11 +107,19 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {

public EthProtocolManager(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final int networkId,
final boolean fastSyncEnabled,
final int syncWorkers,
final int txWorkers) {
this(blockchain, networkId, fastSyncEnabled, syncWorkers, txWorkers, DEFAULT_REQUEST_LIMIT);
this(
blockchain,
worldStateArchive,
networkId,
fastSyncEnabled,
syncWorkers,
txWorkers,
DEFAULT_REQUEST_LIMIT);
}

public EthContext ethContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.core.TransactionReceipt;
import tech.pegasys.pantheon.ethereum.db.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockBodiesMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.BlockHeadersMessage;
import tech.pegasys.pantheon.ethereum.eth.messages.EthPV62;
Expand Down Expand Up @@ -47,11 +48,17 @@ class EthServer {
private static final Logger LOG = LogManager.getLogger();

private final Blockchain blockchain;
private final WorldStateArchive worldStateArchive;
private final EthMessages ethMessages;
private final int requestLimit;

EthServer(final Blockchain blockchain, final EthMessages ethMessages, final int requestLimit) {
EthServer(
final Blockchain blockchain,
final WorldStateArchive worldStateArchive,
final EthMessages ethMessages,
final int requestLimit) {
this.blockchain = blockchain;
this.worldStateArchive = worldStateArchive;
this.ethMessages = ethMessages;
this.requestLimit = requestLimit;
this.setupListeners();
Expand Down Expand Up @@ -106,7 +113,8 @@ private void handleGetReceipts(final EthMessage message) {
private void handleGetNodeData(final EthMessage message) {
LOG.trace("Responding to GET_NODE_DATA request");
try {
final MessageData response = constructGetNodeDataResponse(message.getData(), requestLimit);
final MessageData response =
constructGetNodeDataResponse(worldStateArchive, message.getData(), requestLimit);
message.getPeer().send(response);
} catch (final RLPException e) {
message.getPeer().disconnect(DisconnectReason.BREACH_OF_PROTOCOL);
Expand Down Expand Up @@ -195,7 +203,9 @@ static MessageData constructGetReceiptsResponse(
}

static MessageData constructGetNodeDataResponse(
final MessageData message, final int requestLimit) {
final WorldStateArchive worldStateArchive,
final MessageData message,
final int requestLimit) {
final GetNodeDataMessage getNodeDataMessage = GetNodeDataMessage.readFrom(message);
final Iterable<Hash> hashes = getNodeDataMessage.hashes();

Expand All @@ -206,7 +216,8 @@ static MessageData constructGetNodeDataResponse(
break;
}
count++;
// TODO: Lookup node data and add it to the list

worldStateArchive.getNodeData(hash).ifPresent(nodeData::add);
}
return NodeDataMessage.create(nodeData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public static void setup() {

@Test
public void disconnectOnUnsolicitedMessage() {
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {});
Expand All @@ -108,7 +109,8 @@ public void disconnectOnUnsolicitedMessage() {

@Test
public void disconnectOnFailureToSendStatusMessage() {
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer =
Expand All @@ -120,7 +122,8 @@ public void disconnectOnFailureToSendStatusMessage() {

@Test
public void disconnectOnWrongChainId() {
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer =
Expand All @@ -143,7 +146,8 @@ public void disconnectOnWrongChainId() {

@Test
public void disconnectOnWrongGenesisHash() {
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final MessageData messageData =
BlockHeadersMessage.create(Collections.singletonList(blockchain.getBlockHeader(1).get()));
final MockPeerConnection peer =
Expand All @@ -166,7 +170,8 @@ public void disconnectOnWrongGenesisHash() {

@Test(expected = ConditionTimeoutException.class)
public void doNotDisconnectOnValidMessage() {
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final MessageData messageData =
GetBlockBodiesMessage.create(Collections.singletonList(gen.hash()));
final MockPeerConnection peer = setupPeer(ethManager, (cap, msg, conn) -> {});
Expand All @@ -181,7 +186,8 @@ public void doNotDisconnectOnValidMessage() {
@Test
public void respondToGetHeaders() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final long startBlock = 5L;
final int blockCount = 5;
final MessageData messageData =
Expand Down Expand Up @@ -213,7 +219,8 @@ public void respondToGetHeadersWithinLimits() throws ExecutionException, Interru
final CompletableFuture<Void> done = new CompletableFuture<>();
final int limit = 5;
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, 1, true, 1, 1, limit)) {
new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, limit)) {
final long startBlock = 5L;
final int blockCount = 10;
final MessageData messageData =
Expand Down Expand Up @@ -243,7 +250,8 @@ public void respondToGetHeadersWithinLimits() throws ExecutionException, Interru
@Test
public void respondToGetHeadersReversed() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final long endBlock = 10L;
final int blockCount = 5;
final MessageData messageData = GetBlockHeadersMessage.create(endBlock, blockCount, 0, true);
Expand Down Expand Up @@ -272,7 +280,8 @@ public void respondToGetHeadersReversed() throws ExecutionException, Interrupted
@Test
public void respondToGetHeadersWithSkip() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final long startBlock = 5L;
final int blockCount = 5;
final int skip = 1;
Expand Down Expand Up @@ -304,7 +313,8 @@ public void respondToGetHeadersWithSkip() throws ExecutionException, Interrupted
public void respondToGetHeadersReversedWithSkip()
throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final long endBlock = 10L;
final int blockCount = 5;
final int skip = 1;
Expand Down Expand Up @@ -357,7 +367,8 @@ private MockPeerConnection setupPeerWithoutStatusExchange(
@Test
public void respondToGetHeadersPartial() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final long startBlock = blockchain.getChainHeadBlockNumber() - 1L;
final int blockCount = 5;
final MessageData messageData =
Expand Down Expand Up @@ -387,7 +398,8 @@ public void respondToGetHeadersPartial() throws ExecutionException, InterruptedE
@Test
public void respondToGetHeadersEmpty() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final long startBlock = blockchain.getChainHeadBlockNumber() + 1;
final int blockCount = 5;
final MessageData messageData =
Expand All @@ -414,7 +426,8 @@ public void respondToGetHeadersEmpty() throws ExecutionException, InterruptedExc
@Test
public void respondToGetBodies() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
// Setup blocks query
final long startBlock = blockchain.getChainHeadBlockNumber() - 5;
final int blockCount = 2;
Expand Down Expand Up @@ -458,7 +471,8 @@ public void respondToGetBodiesWithinLimits() throws ExecutionException, Interrup
final CompletableFuture<Void> done = new CompletableFuture<>();
final int limit = 5;
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, 1, true, 1, 1, limit)) {
new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, limit)) {
// Setup blocks query
final int blockCount = 10;
final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount;
Expand Down Expand Up @@ -500,7 +514,8 @@ public void respondToGetBodiesWithinLimits() throws ExecutionException, Interrup
@Test
public void respondToGetBodiesPartial() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
// Setup blocks query
final long expectedBlockNumber = blockchain.getChainHeadBlockNumber() - 1;
final BlockHeader header = blockchain.getBlockHeader(expectedBlockNumber).get();
Expand Down Expand Up @@ -536,7 +551,8 @@ public void respondToGetBodiesPartial() throws ExecutionException, InterruptedEx
@Test
public void respondToGetReceipts() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
// Setup blocks query
final long startBlock = blockchain.getChainHeadBlockNumber() - 5;
final int blockCount = 2;
Expand Down Expand Up @@ -579,7 +595,8 @@ public void respondToGetReceiptsWithinLimits() throws ExecutionException, Interr
final CompletableFuture<Void> done = new CompletableFuture<>();
final int limit = 5;
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, 1, true, 1, 1, limit)) {
new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1, limit)) {
// Setup blocks query
final int blockCount = 10;
final long startBlock = blockchain.getChainHeadBlockNumber() - blockCount;
Expand Down Expand Up @@ -620,7 +637,8 @@ public void respondToGetReceiptsWithinLimits() throws ExecutionException, Interr
@Test
public void respondToGetReceiptsPartial() throws ExecutionException, InterruptedException {
final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
// Setup blocks query
final long blockNumber = blockchain.getChainHeadBlockNumber() - 5;
final BlockHeader header = blockchain.getBlockHeader(blockNumber).get();
Expand Down Expand Up @@ -655,7 +673,8 @@ public void respondToGetReceiptsPartial() throws ExecutionException, Interrupted

@Test
public void newBlockMinedSendsNewBlockMessageToAllPeers() {
final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1);
final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1);

// Define handler to validate response
final PeerSendHandler onSend = mock(PeerSendHandler.class);
Expand Down Expand Up @@ -718,7 +737,8 @@ public void shouldSuccessfullyRespondToGetHeadersRequestLessThanZero()
blockchain.appendBlock(block, receipts);

final CompletableFuture<Void> done = new CompletableFuture<>();
try (final EthProtocolManager ethManager = new EthProtocolManager(blockchain, 1, true, 1, 1)) {
try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, 1)) {
final long startBlock = 1L;
final int requestedBlockCount = 13;
final int receivedBlockCount = 2;
Expand Down Expand Up @@ -774,7 +794,8 @@ public void transactionMessagesGoToTheCorrectExecutor() {
final TransactionsMessage transactionMessage = TransactionsMessage.readFrom(raw);

try (final EthProtocolManager ethManager =
new EthProtocolManager(blockchain, 1, true, 1, ethScheduler)) {
new EthProtocolManager(
blockchain, protocolContext.getWorldStateArchive(), 1, true, 1, ethScheduler)) {

// Create a transaction pool. This has a side effect of registring a listener for the
// transactions message.
Expand Down
Loading

0 comments on commit d9916ec

Please sign in to comment.