Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Support resuming fast-sync downloads #848

Merged
merged 5 commits into from
Feb 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ public interface Synchronizer {

void start();

void stop();

/**
* @return the status, based on SyncingResult When actively synchronizing blocks, alternatively
* empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,7 @@ public void start() {
}
}

@Override
public void stop() {
fastSynchronizer.ifPresent(FastSynchronizer::deleteFastSyncState);
}

private void handleFastSyncResult(final FastSyncState result, final Throwable error) {

final Throwable rootCause = ExceptionUtils.rootCause(error);
if (rootCause instanceof FastSyncException) {
LOG.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import tech.pegasys.pantheon.ethereum.rlp.RLP;
import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie;
import tech.pegasys.pantheon.ethereum.worldstate.StateTrieAccountValue;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

class AccountTrieNodeDataRequest extends TrieNodeDataRequest {

Expand All @@ -36,6 +38,11 @@ public void persist(final Updater updater) {
updater.putAccountStateTrieNode(getHash(), getData());
}

@Override
public Optional<BytesValue> getExistingData(final WorldStateStorage worldStateStorage) {
return worldStateStorage.getAccountStateTrieNode(getHash());
}

@Override
protected NodeDataRequest createChildNodeDataRequest(final Hash childHash) {
return NodeDataRequest.createAccountDataRequest(childHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Optional;
import java.util.stream.Stream;

class CodeNodeDataRequest extends NodeDataRequest {
Expand All @@ -36,4 +39,9 @@ public Stream<NodeDataRequest> getChildRequests() {
// Code nodes have nothing further to download
return Stream.empty();
}

@Override
public Optional<BytesValue> getExistingData(final WorldStateStorage worldStateStorage) {
return worldStateStorage.getCode(getHash());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Optional;
import java.util.stream.Stream;

public abstract class NodeDataRequest {
Expand Down Expand Up @@ -96,4 +97,6 @@ public NodeDataRequest setData(final BytesValue data) {
public abstract void persist(final WorldStateStorage.Updater updater);

public abstract Stream<NodeDataRequest> getChildRequests();

public abstract Optional<BytesValue> getExistingData(final WorldStateStorage worldStateStorage);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import static com.google.common.base.Preconditions.checkNotNull;

import tech.pegasys.pantheon.ethereum.core.Hash;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater;
import tech.pegasys.pantheon.util.bytes.BytesValue;

import java.util.Collections;
import java.util.List;
import java.util.Optional;

class StorageTrieNodeDataRequest extends TrieNodeDataRequest {

Expand All @@ -33,6 +35,11 @@ public void persist(final Updater updater) {
updater.putAccountStorageTrieNode(getHash(), getData());
}

@Override
public Optional<BytesValue> getExistingData(final WorldStateStorage worldStateStorage) {
return worldStateStorage.getAccountStorageTrieNode(getHash());
}

@Override
protected NodeDataRequest createChildNodeDataRequest(final Hash childHash) {
return NodeDataRequest.createStorageDataRequest(childHash);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,18 @@ private void requestNodeData(final BlockHeader header) {

// Collect data to be requested
List<NodeDataRequest> toRequest = new ArrayList<>();
for (int i = 0; i < hashCountPerRequest; i++) {
while (toRequest.size() < hashCountPerRequest) {
NodeDataRequest pendingRequest = pendingRequests.dequeue();
if (pendingRequest == null) {
break;
}
final Optional<BytesValue> existingData =
pendingRequest.getExistingData(worldStateStorage);
if (existingData.isPresent()) {
pendingRequest.setData(existingData.get());
queueChildRequests(pendingRequest);
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you may need to change the containing for loop to a while loop so that we don't send out smaller than intended requests when we run into known nodes.

}
toRequest.add(pendingRequest);
}

Expand Down Expand Up @@ -221,25 +228,19 @@ private CompletableFuture<?> sendAndProcessRequests(
request.persist(storageUpdater);
}

// Queue child requests
request
.getChildRequests()
.filter(this::filterChildRequests)
.forEach(pendingRequests::enqueue);
queueChildRequests(request);
}
}
storageUpdater.commit();
});
}

private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
private void queueChildRequests(final NodeDataRequest request) {
request.getChildRequests().forEach(pendingRequests::enqueue);
}

private boolean filterChildRequests(final NodeDataRequest request) {
// For now, just filter out requests for code that we already know about
return !(request.getRequestType() == RequestType.CODE
&& worldStateStorage.contains(request.getHash()));
private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) {
return request.getHash().equals(blockHeader.getStateRoot());
}

private Map<Hash, BytesValue> mapNodeDataByHash(final List<BytesValue> data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -350,11 +354,23 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() {
new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage());

// Seed local storage with some trie node values
Map<Bytes32, BytesValue> knownTrieNodes =
Map<Bytes32, BytesValue> allNodes =
collectTrieNodesToBeRequested(remoteStorage, remoteWorldState.rootHash(), 5);
assertThat(knownTrieNodes.size()).isGreaterThan(0); // Sanity check
final Set<Bytes32> knownNodes = new HashSet<>();
final Set<Bytes32> unknownNodes = new HashSet<>();
assertThat(allNodes.size()).isGreaterThan(0); // Sanity check
Updater localStorageUpdater = localStorage.updater();
knownTrieNodes.forEach(localStorageUpdater::putAccountStateTrieNode);
final AtomicBoolean storeNode = new AtomicBoolean(true);
allNodes.forEach(
(nodeHash, node) -> {
if (storeNode.get()) {
localStorageUpdater.putAccountStateTrieNode(nodeHash, node);
knownNodes.add(nodeHash);
} else {
unknownNodes.add(nodeHash);
}
storeNode.set(!storeNode.get());
});
localStorageUpdater.commit();

WorldStateDownloader downloader =
Expand Down Expand Up @@ -390,7 +406,8 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() {
.flatMap(m -> StreamSupport.stream(m.hashes().spliterator(), true))
.collect(Collectors.toList());
assertThat(requestedHashes.size()).isGreaterThan(0);
assertThat(requestedHashes).containsAll(knownTrieNodes.keySet());
assertThat(requestedHashes).containsAll(unknownNodes);
assertThat(requestedHashes).doesNotContainAnyElementsOf(knownNodes);

// Check that all expected account data was downloaded
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
Expand Down Expand Up @@ -440,13 +457,26 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() {
.map(StateTrieAccountValue::readFrom)
.map(StateTrieAccountValue::getStorageRoot)
.collect(Collectors.toList());
Map<Bytes32, BytesValue> knownTrieNodes = new HashMap<>();
Map<Bytes32, BytesValue> allTrieNodes = new HashMap<>();
final Set<Bytes32> knownNodes = new HashSet<>();
final Set<Bytes32> unknownNodes = new HashSet<>();
for (Bytes32 storageRootHash : storageRootHashes) {
knownTrieNodes.putAll(collectTrieNodesToBeRequested(remoteStorage, storageRootHash, 5));
allTrieNodes.putAll(collectTrieNodesToBeRequested(remoteStorage, storageRootHash, 5));
}
assertThat(knownTrieNodes.size()).isGreaterThan(0); // Sanity check
assertThat(allTrieNodes.size()).isGreaterThan(0); // Sanity check
Updater localStorageUpdater = localStorage.updater();
knownTrieNodes.forEach(localStorageUpdater::putAccountStorageTrieNode);
boolean storeNode = true;
for (Entry<Bytes32, BytesValue> entry : allTrieNodes.entrySet()) {
Bytes32 hash = entry.getKey();
BytesValue data = entry.getValue();
if (storeNode) {
localStorageUpdater.putAccountStorageTrieNode(hash, data);
knownNodes.add(hash);
} else {
unknownNodes.add(hash);
}
storeNode = !storeNode;
}
localStorageUpdater.commit();

WorldStateDownloader downloader =
Expand Down Expand Up @@ -486,7 +516,8 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() {
.flatMap(m -> StreamSupport.stream(m.hashes().spliterator(), true))
.collect(Collectors.toList());
assertThat(requestedHashes.size()).isGreaterThan(0);
assertThat(requestedHashes).containsAll(knownTrieNodes.keySet());
assertThat(requestedHashes).containsAll(unknownNodes);
assertThat(requestedHashes).doesNotContainAnyElementsOf(knownNodes);

// Check that all expected account data was downloaded
WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage);
Expand Down
3 changes: 0 additions & 3 deletions pantheon/src/main/java/tech/pegasys/pantheon/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ public void execute() {

@Override
public void close() throws Exception {
if (networkRunner.getNetwork().isP2pEnabled()) {
pantheonController.getSynchronizer().stop();
}
networkRunner.stop();
networkRunner.awaitStop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.primitives.Longs;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

public class RocksDbQueue implements BytesQueue {

private static final Logger LOG = LogManager.getLogger();

private final Options options;
private final RocksDB db;

Expand All @@ -46,11 +42,7 @@ public class RocksDbQueue implements BytesQueue {
private RocksDbQueue(final Path storageDirectory, final MetricsSystem metricsSystem) {
try {
RocksDbUtil.loadNativeLibrary();
options =
new Options()
.setCreateIfMissing(true)
// TODO: Support restoration from a previously persisted queue
.setErrorIfExists(true);
options = new Options().setCreateIfMissing(true);
db = RocksDB.open(options, storageDirectory.toString());

enqueueLatency =
Expand Down