Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

[PAN-2198] Update GetNodeDataFromPeerTask to return a map #931

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager.task;

import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
Expand All @@ -27,15 +27,17 @@
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class GetNodeDataFromPeerTask extends AbstractPeerRequestTask<List<BytesValue>> {
public class GetNodeDataFromPeerTask extends AbstractPeerRequestTask<Map<Hash, BytesValue>> {

private static final Logger LOG = LogManager.getLogger();

Expand Down Expand Up @@ -63,26 +65,31 @@ protected ResponseStream sendRequest(final EthPeer peer) throws PeerNotConnected
}

@Override
protected Optional<List<BytesValue>> processResponse(
protected Optional<Map<Hash, BytesValue>> processResponse(
final boolean streamClosed, final MessageData message, final EthPeer peer) {
if (streamClosed) {
// We don't record this as a useless response because it's impossible to know if a peer has
// the data we're requesting.
return Optional.of(emptyList());
return Optional.of(emptyMap());
}
final NodeDataMessage nodeDataMessage = NodeDataMessage.readFrom(message);
final List<BytesValue> nodeData = nodeDataMessage.nodeData();
if (nodeData.isEmpty()) {
return Optional.empty();
} else if (nodeData.size() > hashes.size()) {
if (nodeData.size() > hashes.size()) {
// Can't be the response to our request
return Optional.empty();
}
return mapNodeDataByHash(nodeData);
}

if (nodeData.stream().anyMatch(data -> !hashes.contains(Hash.hash(data)))) {
// Message contains unrequested data, must not be the response to our request.
return Optional.empty();
private Optional<Map<Hash, BytesValue>> mapNodeDataByHash(final List<BytesValue> nodeData) {
final Map<Hash, BytesValue> nodeDataByHash = new HashMap<>();
for (BytesValue data : nodeData) {
final Hash hash = Hash.hash(data);
if (!hashes.contains(hash)) {
return Optional.empty();
}
nodeDataByHash.put(hash, data);
}
return Optional.of(nodeData);
return Optional.of(nodeDataByHash);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -240,7 +239,7 @@ private CompletableFuture<?> waitForNewPeer() {
.timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5));
}

private CompletableFuture<AbstractPeerTask<List<BytesValue>>> sendAndProcessRequests(
private CompletableFuture<AbstractPeerTask<Map<Hash, BytesValue>>> sendAndProcessRequests(
final EthPeer peer,
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader) {
Expand All @@ -250,13 +249,12 @@ private CompletableFuture<AbstractPeerTask<List<BytesValue>>> sendAndProcessRequ
.map(NodeDataRequest::getHash)
.distinct()
.collect(Collectors.toList());
final AbstractPeerTask<List<BytesValue>> ethTask =
final AbstractPeerTask<Map<Hash, BytesValue>> ethTask =
GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer).assignPeer(peer);
outstandingRequests.add(ethTask);
return ethTask
.run()
.thenApply(PeerTaskResult::getResult)
.thenApply(this::mapNodeDataByHash)
.exceptionally(
error -> {
final Throwable rootCause = ExceptionUtils.rootCause(error);
Expand All @@ -276,10 +274,10 @@ private CompletableFuture<AbstractPeerTask<List<BytesValue>>> sendAndProcessRequ
() -> storeData(requestTasks, blockHeader, ethTask, data)));
}

private CompletableFuture<AbstractPeerTask<List<BytesValue>>> storeData(
private CompletableFuture<AbstractPeerTask<Map<Hash, BytesValue>>> storeData(
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader,
final AbstractPeerTask<List<BytesValue>> ethTask,
final AbstractPeerTask<Map<Hash, BytesValue>> ethTask,
final Map<Hash, BytesValue> data) {
final Updater storageUpdater = worldStateStorage.updater();
for (final Task<NodeDataRequest> task : requestTasks) {
Expand Down Expand Up @@ -378,11 +376,4 @@ private synchronized void markDone() {
private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
}

private Map<Hash, BytesValue> mapNodeDataByHash(final List<BytesValue> data) {
// Map data by hash
final Map<Hash, BytesValue> dataByHash = new HashMap<>();
data.forEach(d -> dataByHash.put(Hash.hash(d), d));
return dataByHash;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.eth.manager.task;

import static java.util.stream.Collectors.toList;
import static org.assertj.core.api.Assertions.assertThat;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
Expand All @@ -22,43 +21,52 @@
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class GetNodeDataFromPeerTaskTest extends PeerMessageTaskTest<List<BytesValue>> {
import com.google.common.collect.Lists;

public class GetNodeDataFromPeerTaskTest extends PeerMessageTaskTest<Map<Hash, BytesValue>> {

@Override
protected List<BytesValue> generateDataToBeRequested() {
final List<BytesValue> requestedData = new ArrayList<>();
protected Map<Hash, BytesValue> generateDataToBeRequested() {
final Map<Hash, BytesValue> requestedData = new HashMap<>();
for (int i = 0; i < 3; i++) {
final BlockHeader blockHeader = blockchain.getBlockHeader(10 + i).get();
requestedData.add(
requestedData.put(
Hash.hash(
protocolContext.getWorldStateArchive().getNodeData(blockHeader.getStateRoot()).get()),
protocolContext.getWorldStateArchive().getNodeData(blockHeader.getStateRoot()).get());
}
return requestedData;
}

@Override
protected EthTask<PeerTaskResult<List<BytesValue>>> createTask(
final List<BytesValue> requestedData) {
final List<Hash> hashes = requestedData.stream().map(Hash::hash).collect(toList());
protected EthTask<PeerTaskResult<Map<Hash, BytesValue>>> createTask(
final Map<Hash, BytesValue> requestedData) {
final List<Hash> hashes = Lists.newArrayList(requestedData.keySet());
return GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer);
}

@Override
protected void assertPartialResultMatchesExpectation(
final List<BytesValue> requestedData, final List<BytesValue> partialResponse) {
final Map<Hash, BytesValue> requestedData, final Map<Hash, BytesValue> partialResponse) {
assertThat(partialResponse.size()).isLessThanOrEqualTo(requestedData.size());
assertThat(partialResponse.size()).isGreaterThan(0);
assertThat(requestedData).containsAll(partialResponse);
for (Map.Entry<Hash, BytesValue> data : partialResponse.entrySet()) {
assertThat(requestedData.get(data.getKey())).isEqualTo(data.getValue());
}
}

@Override
protected void assertResultMatchesExpectation(
final List<BytesValue> requestedData,
final PeerTaskResult<List<BytesValue>> response,
final Map<Hash, BytesValue> requestedData,
final PeerTaskResult<Map<Hash, BytesValue>> response,
final EthPeer respondingPeer) {
assertThat(response.getResult()).containsExactlyInAnyOrderElementsOf(requestedData);
assertThat(response.getPeer()).isEqualTo(respondingPeer);
assertThat(response.getResult().size()).isEqualTo(requestedData.size());
for (Map.Entry<Hash, BytesValue> data : response.getResult().entrySet()) {
assertThat(requestedData.get(data.getKey())).isEqualTo(data.getValue());
}
}
}