Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Process world state download data on a worker thread (#898)
Browse files Browse the repository at this point in the history
  • Loading branch information
ajsutton authored Feb 19, 2019
1 parent 248b3ea commit f2f83b7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public static class Builder {
private int downloaderChainSegmentSize = 200;
private long trailingPeerBlocksBehindThreshold;
private int maxTrailingPeers = Integer.MAX_VALUE;
private int downloaderParallelism = 2;
private int downloaderParallelism = 4;
private int transactionsParallelism = 2;
private int computationParallelism = Runtime.getRuntime().availableProcessors();
private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask;
import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.task.EthTask;
Expand All @@ -41,8 +42,10 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -148,8 +151,7 @@ public CompletableFuture<Void> run(final BlockHeader header) {
pendingRequests.enqueue(NodeDataRequest.createAccountDataRequest(stateRoot));
}
}

requestNodeData(header);
ethContext.getScheduler().scheduleSyncWorkerTask(() -> requestNodeData(header));
return future;
}

Expand Down Expand Up @@ -246,38 +248,58 @@ private CompletableFuture<AbstractPeerTask<List<BytesValue>>> sendAndProcessRequ
.run()
.thenApply(PeerTaskResult::getResult)
.thenApply(this::mapNodeDataByHash)
.handle(
(data, err) -> {
final boolean requestFailed = err != null;
final Updater storageUpdater = worldStateStorage.updater();
for (final Task<NodeDataRequest> task : requestTasks) {
final NodeDataRequest request = task.getData();
final BytesValue matchingData = requestFailed ? null : data.get(request.getHash());
if (matchingData == null) {
retriedRequestsTotal.inc();
final int requestFailures = request.trackFailure();
updateHighestRetryCount(requestFailures);
if (requestFailures > maxNodeRequestRetries) {
handleStalledDownload();
}
task.markFailed();
} else {
completedRequestsCounter.inc();
// Persist request data
request.setData(matchingData);
if (isRootState(blockHeader, request)) {
rootNode = request.getData();
} else {
request.persist(storageUpdater);
}

queueChildRequests(request);
task.markCompleted();
}
.exceptionally(
error -> {
final Throwable rootCause = ExceptionUtils.rootCause(error);
if (!(rootCause instanceof TimeoutException
|| rootCause instanceof InterruptedException
|| rootCause instanceof CancellationException
|| rootCause instanceof EthTaskException)) {
LOG.debug("GetNodeDataRequest failed", error);
}
storageUpdater.commit();
return ethTask;
});
return Collections.emptyMap();
})
.thenCompose(
data ->
ethContext
.getScheduler()
.scheduleSyncWorkerTask(
() -> storeData(requestTasks, blockHeader, ethTask, data)));
}

private CompletableFuture<AbstractPeerTask<List<BytesValue>>> storeData(
final List<Task<NodeDataRequest>> requestTasks,
final BlockHeader blockHeader,
final AbstractPeerTask<List<BytesValue>> ethTask,
final Map<Hash, BytesValue> data) {
final Updater storageUpdater = worldStateStorage.updater();
for (final Task<NodeDataRequest> task : requestTasks) {
final NodeDataRequest request = task.getData();
final BytesValue matchingData = data.get(request.getHash());
if (matchingData == null) {
retriedRequestsTotal.inc();
final int requestFailures = request.trackFailure();
updateHighestRetryCount(requestFailures);
if (requestFailures > maxNodeRequestRetries) {
handleStalledDownload();
}
task.markFailed();
} else {
completedRequestsCounter.inc();
// Persist request data
request.setData(matchingData);
if (isRootState(blockHeader, request)) {
rootNode = request.getData();
} else {
request.persist(storageUpdater);
}

queueChildRequests(request);
task.markCompleted();
}
}
storageUpdater.commit();
return CompletableFuture.completedFuture(ethTask);
}

private void updateHighestRetryCount(final int requestFailures) {
Expand Down Expand Up @@ -351,7 +373,7 @@ private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest
private Map<Hash, BytesValue> mapNodeDataByHash(final List<BytesValue> data) {
// Map data by hash
final Map<Hash, BytesValue> dataByHash = new HashMap<>();
data.stream().forEach(d -> dataByHash.put(Hash.hash(d), d));
data.forEach(d -> dataByHash.put(Hash.hash(d), d));
return dataByHash;
}
}

0 comments on commit f2f83b7

Please sign in to comment.