Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Fix potential stall in world state download #922

Merged
merged 3 commits into from
Feb 20, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,67 +160,75 @@ public void cancel() {
}

private void requestNodeData(final BlockHeader header) {
if (sendingRequests.compareAndSet(false, true)) {
while (shouldRequestNodeData()) {
while (shouldRequestNodeData()) {
if (sendingRequests.compareAndSet(false, true)) {
final Optional<EthPeer> maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber());

if (!maybePeer.isPresent()) {
// If no peer is available, wait and try again
sendingRequests.set(false);
waitForNewPeer().whenComplete((r, t) -> requestNodeData(header));
break;
} else {
final EthPeer peer = maybePeer.get();
requestDataFromPeer(header, maybePeer.get());
}
sendingRequests.set(false);
} else {
break;
}
}
}

// Collect data to be requested
final List<Task<NodeDataRequest>> toRequest = new ArrayList<>();
while (toRequest.size() < hashCountPerRequest) {
final Task<NodeDataRequest> pendingRequestTask = pendingRequests.dequeue();
if (pendingRequestTask == null) {
break;
}
final NodeDataRequest pendingRequest = pendingRequestTask.getData();
final Optional<BytesValue> existingData =
pendingRequest.getExistingData(worldStateStorage);
if (existingData.isPresent()) {
pendingRequest.setData(existingData.get());
queueChildRequests(pendingRequest);
completedRequestsCounter.inc();
pendingRequestTask.markCompleted();
continue;
}
toRequest.add(pendingRequestTask);
}
private void requestDataFromPeer(final BlockHeader header, final EthPeer peer) {
// Collect data to be requested
final List<Task<NodeDataRequest>> toRequest = getTasksForNextRequest();

// Request and process node data
sendAndProcessRequests(peer, toRequest, header)
.whenComplete(
(task, error) -> {
final boolean done;
synchronized (this) {
outstandingRequests.remove(task);
done =
status == Status.RUNNING
&& outstandingRequests.size() == 0
&& pendingRequests.allTasksCompleted();
}
if (done) {
// We're done
final Updater updater = worldStateStorage.updater();
updater.putAccountStateTrieNode(header.getStateRoot(), rootNode);
updater.commit();
markDone();
} else {
// Send out additional requests
requestNodeData(header);
}
});
}
// Request and process node data
sendAndProcessRequests(peer, toRequest, header)
.whenComplete(
(task, error) -> {
final boolean done;
synchronized (this) {
outstandingRequests.remove(task);
done =
status == Status.RUNNING
&& outstandingRequests.size() == 0
&& pendingRequests.allTasksCompleted();
}
if (done) {
// We're done
final Updater updater = worldStateStorage.updater();
updater.putAccountStateTrieNode(header.getStateRoot(), rootNode);
updater.commit();
markDone();
} else {
// Send out additional requests
requestNodeData(header);
}
});
}

private List<Task<NodeDataRequest>> getTasksForNextRequest() {
final List<Task<NodeDataRequest>> toRequest = new ArrayList<>();
while (toRequest.size() < hashCountPerRequest) {
final Task<NodeDataRequest> pendingRequestTask = pendingRequests.dequeue();
if (pendingRequestTask == null) {
break;
}
final NodeDataRequest pendingRequest = pendingRequestTask.getData();
final Optional<BytesValue> existingData = pendingRequest.getExistingData(worldStateStorage);
if (existingData.isPresent()) {
pendingRequest.setData(existingData.get());
queueChildRequests(pendingRequest);
completedRequestsCounter.inc();
pendingRequestTask.markCompleted();
continue;
}
sendingRequests.set(false);
toRequest.add(pendingRequestTask);
}
return toRequest;
}

private boolean shouldRequestNodeData() {
private synchronized boolean shouldRequestNodeData() {
return !future.isDone()
&& outstandingRequests.size() < maxOutstandingRequests
&& !pendingRequests.isEmpty();
Expand Down