diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java index c9582af02e..25f507afb7 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTask.java @@ -12,7 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager.task; -import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; @@ -27,15 +27,17 @@ import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class GetNodeDataFromPeerTask extends AbstractPeerRequestTask> { +public class GetNodeDataFromPeerTask extends AbstractPeerRequestTask> { private static final Logger LOG = LogManager.getLogger(); @@ -63,26 +65,31 @@ protected ResponseStream sendRequest(final EthPeer peer) throws PeerNotConnected } @Override - protected Optional> processResponse( + protected Optional> processResponse( final boolean streamClosed, final MessageData message, final EthPeer peer) { if (streamClosed) { // We don't record this as a useless response because it's impossible to know if a peer has // the data we're requesting. - return Optional.of(emptyList()); + return Optional.of(emptyMap()); } final NodeDataMessage nodeDataMessage = NodeDataMessage.readFrom(message); final List nodeData = nodeDataMessage.nodeData(); - if (nodeData.isEmpty()) { - return Optional.empty(); - } else if (nodeData.size() > hashes.size()) { + if (nodeData.size() > hashes.size()) { // Can't be the response to our request return Optional.empty(); } + return mapNodeDataByHash(nodeData); + } - if (nodeData.stream().anyMatch(data -> !hashes.contains(Hash.hash(data)))) { - // Message contains unrequested data, must not be the response to our request. - return Optional.empty(); + private Optional> mapNodeDataByHash(final List nodeData) { + final Map nodeDataByHash = new HashMap<>(); + for (BytesValue data : nodeData) { + final Hash hash = Hash.hash(data); + if (!hashes.contains(hash)) { + return Optional.empty(); + } + nodeDataByHash.put(hash, data); } - return Optional.of(nodeData); + return Optional.of(nodeDataByHash); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index e7786c3140..a60778c4a4 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -37,7 +37,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -240,7 +239,7 @@ private CompletableFuture waitForNewPeer() { .timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5)); } - private CompletableFuture>> sendAndProcessRequests( + private CompletableFuture>> sendAndProcessRequests( final EthPeer peer, final List> requestTasks, final BlockHeader blockHeader) { @@ -250,13 +249,12 @@ private CompletableFuture>> sendAndProcessRequ .map(NodeDataRequest::getHash) .distinct() .collect(Collectors.toList()); - final AbstractPeerTask> ethTask = + final AbstractPeerTask> ethTask = GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer).assignPeer(peer); outstandingRequests.add(ethTask); return ethTask .run() .thenApply(PeerTaskResult::getResult) - .thenApply(this::mapNodeDataByHash) .exceptionally( error -> { final Throwable rootCause = ExceptionUtils.rootCause(error); @@ -276,10 +274,10 @@ private CompletableFuture>> sendAndProcessRequ () -> storeData(requestTasks, blockHeader, ethTask, data))); } - private CompletableFuture>> storeData( + private CompletableFuture>> storeData( final List> requestTasks, final BlockHeader blockHeader, - final AbstractPeerTask> ethTask, + final AbstractPeerTask> ethTask, final Map data) { final Updater storageUpdater = worldStateStorage.updater(); for (final Task task : requestTasks) { @@ -378,11 +376,4 @@ private synchronized void markDone() { private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) { return request.getHash().equals(blockHeader.getStateRoot()); } - - private Map mapNodeDataByHash(final List data) { - // Map data by hash - final Map dataByHash = new HashMap<>(); - data.forEach(d -> dataByHash.put(Hash.hash(d), d)); - return dataByHash; - } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTaskTest.java index c22988d8a6..f0b0e23128 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/task/GetNodeDataFromPeerTaskTest.java @@ -12,7 +12,6 @@ */ package tech.pegasys.pantheon.ethereum.eth.manager.task; -import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import tech.pegasys.pantheon.ethereum.core.BlockHeader; @@ -22,43 +21,52 @@ import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import tech.pegasys.pantheon.util.bytes.BytesValue; -import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; -public class GetNodeDataFromPeerTaskTest extends PeerMessageTaskTest> { +import com.google.common.collect.Lists; + +public class GetNodeDataFromPeerTaskTest extends PeerMessageTaskTest> { @Override - protected List generateDataToBeRequested() { - final List requestedData = new ArrayList<>(); + protected Map generateDataToBeRequested() { + final Map requestedData = new HashMap<>(); for (int i = 0; i < 3; i++) { final BlockHeader blockHeader = blockchain.getBlockHeader(10 + i).get(); - requestedData.add( + requestedData.put( + Hash.hash( + protocolContext.getWorldStateArchive().getNodeData(blockHeader.getStateRoot()).get()), protocolContext.getWorldStateArchive().getNodeData(blockHeader.getStateRoot()).get()); } return requestedData; } @Override - protected EthTask>> createTask( - final List requestedData) { - final List hashes = requestedData.stream().map(Hash::hash).collect(toList()); + protected EthTask>> createTask( + final Map requestedData) { + final List hashes = Lists.newArrayList(requestedData.keySet()); return GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer); } @Override protected void assertPartialResultMatchesExpectation( - final List requestedData, final List partialResponse) { + final Map requestedData, final Map partialResponse) { assertThat(partialResponse.size()).isLessThanOrEqualTo(requestedData.size()); assertThat(partialResponse.size()).isGreaterThan(0); - assertThat(requestedData).containsAll(partialResponse); + for (Map.Entry data : partialResponse.entrySet()) { + assertThat(requestedData.get(data.getKey())).isEqualTo(data.getValue()); + } } @Override protected void assertResultMatchesExpectation( - final List requestedData, - final PeerTaskResult> response, + final Map requestedData, + final PeerTaskResult> response, final EthPeer respondingPeer) { - assertThat(response.getResult()).containsExactlyInAnyOrderElementsOf(requestedData); - assertThat(response.getPeer()).isEqualTo(respondingPeer); + assertThat(response.getResult().size()).isEqualTo(requestedData.size()); + for (Map.Entry data : response.getResult().entrySet()) { + assertThat(requestedData.get(data.getKey())).isEqualTo(data.getValue()); + } } }