diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java index 653ff0f938..a7b936fc2c 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/core/Synchronizer.java @@ -19,8 +19,6 @@ public interface Synchronizer { void start(); - void stop(); - /** * @return the status, based on SyncingResult When actively synchronizing blocks, alternatively * empty diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java index 92db28f962..a177da7725 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/DefaultSynchronizer.java @@ -107,13 +107,7 @@ public void start() { } } - @Override - public void stop() { - fastSynchronizer.ifPresent(FastSynchronizer::deleteFastSyncState); - } - private void handleFastSyncResult(final FastSyncState result, final Throwable error) { - final Throwable rootCause = ExceptionUtils.rootCause(error); if (rootCause instanceof FastSyncException) { LOG.error( diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java index c261b0e026..39017347a3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java @@ -18,11 +18,13 @@ import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; import tech.pegasys.pantheon.ethereum.worldstate.StateTrieAccountValue; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.ArrayList; import java.util.List; +import java.util.Optional; class AccountTrieNodeDataRequest extends TrieNodeDataRequest { @@ -36,6 +38,11 @@ public void persist(final Updater updater) { updater.putAccountStateTrieNode(getHash(), getData()); } + @Override + public Optional getExistingData(final WorldStateStorage worldStateStorage) { + return worldStateStorage.getAccountStateTrieNode(getHash()); + } + @Override protected NodeDataRequest createChildNodeDataRequest(final Hash childHash) { return NodeDataRequest.createAccountDataRequest(childHash); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java index c4825873f4..beff1cb601 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java @@ -15,8 +15,11 @@ import static com.google.common.base.Preconditions.checkNotNull; import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; +import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.Optional; import java.util.stream.Stream; class CodeNodeDataRequest extends NodeDataRequest { @@ -36,4 +39,9 @@ public Stream getChildRequests() { // Code nodes have nothing further to download return Stream.empty(); } + + @Override + public Optional getExistingData(final WorldStateStorage worldStateStorage) { + return worldStateStorage.getCode(getHash()); + } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java index 6092661046..29f4b68d57 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java @@ -19,6 +19,7 @@ import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.Optional; import java.util.stream.Stream; public abstract class NodeDataRequest { @@ -96,4 +97,6 @@ public NodeDataRequest setData(final BytesValue data) { public abstract void persist(final WorldStateStorage.Updater updater); public abstract Stream getChildRequests(); + + public abstract Optional getExistingData(final WorldStateStorage worldStateStorage); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java index a725df43e2..165c30e117 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java @@ -15,11 +15,13 @@ import static com.google.common.base.Preconditions.checkNotNull; import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Collections; import java.util.List; +import java.util.Optional; class StorageTrieNodeDataRequest extends TrieNodeDataRequest { @@ -33,6 +35,11 @@ public void persist(final Updater updater) { updater.putAccountStorageTrieNode(getHash(), getData()); } + @Override + public Optional getExistingData(final WorldStateStorage worldStateStorage) { + return worldStateStorage.getAccountStorageTrieNode(getHash()); + } + @Override protected NodeDataRequest createChildNodeDataRequest(final Hash childHash) { return NodeDataRequest.createStorageDataRequest(childHash); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index 3dd0de360e..77316c1bdb 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -141,11 +141,18 @@ private void requestNodeData(final BlockHeader header) { // Collect data to be requested List toRequest = new ArrayList<>(); - for (int i = 0; i < hashCountPerRequest; i++) { + while (toRequest.size() < hashCountPerRequest) { NodeDataRequest pendingRequest = pendingRequests.dequeue(); if (pendingRequest == null) { break; } + final Optional existingData = + pendingRequest.getExistingData(worldStateStorage); + if (existingData.isPresent()) { + pendingRequest.setData(existingData.get()); + queueChildRequests(pendingRequest); + continue; + } toRequest.add(pendingRequest); } @@ -221,25 +228,19 @@ private CompletableFuture sendAndProcessRequests( request.persist(storageUpdater); } - // Queue child requests - request - .getChildRequests() - .filter(this::filterChildRequests) - .forEach(pendingRequests::enqueue); + queueChildRequests(request); } } storageUpdater.commit(); }); } - private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) { - return request.getHash().equals(blockHeader.getStateRoot()); + private void queueChildRequests(final NodeDataRequest request) { + request.getChildRequests().forEach(pendingRequests::enqueue); } - private boolean filterChildRequests(final NodeDataRequest request) { - // For now, just filter out requests for code that we already know about - return !(request.getRequestType() == RequestType.CODE - && worldStateStorage.contains(request.getHash())); + private boolean isRootState(final BlockHeader blockHeader, final NodeDataRequest request) { + return request.getHash().equals(blockHeader.getStateRoot()); } private Map mapNodeDataByHash(final List data) { diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index 82bdbc5f64..d85d76e1a6 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -53,9 +53,13 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -350,11 +354,23 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); // Seed local storage with some trie node values - Map knownTrieNodes = + Map allNodes = collectTrieNodesToBeRequested(remoteStorage, remoteWorldState.rootHash(), 5); - assertThat(knownTrieNodes.size()).isGreaterThan(0); // Sanity check + final Set knownNodes = new HashSet<>(); + final Set unknownNodes = new HashSet<>(); + assertThat(allNodes.size()).isGreaterThan(0); // Sanity check Updater localStorageUpdater = localStorage.updater(); - knownTrieNodes.forEach(localStorageUpdater::putAccountStateTrieNode); + final AtomicBoolean storeNode = new AtomicBoolean(true); + allNodes.forEach( + (nodeHash, node) -> { + if (storeNode.get()) { + localStorageUpdater.putAccountStateTrieNode(nodeHash, node); + knownNodes.add(nodeHash); + } else { + unknownNodes.add(nodeHash); + } + storeNode.set(!storeNode.get()); + }); localStorageUpdater.commit(); WorldStateDownloader downloader = @@ -390,7 +406,8 @@ public void doesRequestKnownAccountTrieNodesFromNetwork() { .flatMap(m -> StreamSupport.stream(m.hashes().spliterator(), true)) .collect(Collectors.toList()); assertThat(requestedHashes.size()).isGreaterThan(0); - assertThat(requestedHashes).containsAll(knownTrieNodes.keySet()); + assertThat(requestedHashes).containsAll(unknownNodes); + assertThat(requestedHashes).doesNotContainAnyElementsOf(knownNodes); // Check that all expected account data was downloaded WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); @@ -440,13 +457,26 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { .map(StateTrieAccountValue::readFrom) .map(StateTrieAccountValue::getStorageRoot) .collect(Collectors.toList()); - Map knownTrieNodes = new HashMap<>(); + Map allTrieNodes = new HashMap<>(); + final Set knownNodes = new HashSet<>(); + final Set unknownNodes = new HashSet<>(); for (Bytes32 storageRootHash : storageRootHashes) { - knownTrieNodes.putAll(collectTrieNodesToBeRequested(remoteStorage, storageRootHash, 5)); + allTrieNodes.putAll(collectTrieNodesToBeRequested(remoteStorage, storageRootHash, 5)); } - assertThat(knownTrieNodes.size()).isGreaterThan(0); // Sanity check + assertThat(allTrieNodes.size()).isGreaterThan(0); // Sanity check Updater localStorageUpdater = localStorage.updater(); - knownTrieNodes.forEach(localStorageUpdater::putAccountStorageTrieNode); + boolean storeNode = true; + for (Entry entry : allTrieNodes.entrySet()) { + Bytes32 hash = entry.getKey(); + BytesValue data = entry.getValue(); + if (storeNode) { + localStorageUpdater.putAccountStorageTrieNode(hash, data); + knownNodes.add(hash); + } else { + unknownNodes.add(hash); + } + storeNode = !storeNode; + } localStorageUpdater.commit(); WorldStateDownloader downloader = @@ -486,7 +516,8 @@ public void doesRequestKnownStorageTrieNodesFromNetwork() { .flatMap(m -> StreamSupport.stream(m.hashes().spliterator(), true)) .collect(Collectors.toList()); assertThat(requestedHashes.size()).isGreaterThan(0); - assertThat(requestedHashes).containsAll(knownTrieNodes.keySet()); + assertThat(requestedHashes).containsAll(unknownNodes); + assertThat(requestedHashes).doesNotContainAnyElementsOf(knownNodes); // Check that all expected account data was downloaded WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); diff --git a/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java b/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java index 1e980d7343..6fde46423a 100644 --- a/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java +++ b/pantheon/src/main/java/tech/pegasys/pantheon/Runner.java @@ -88,9 +88,6 @@ public void execute() { @Override public void close() throws Exception { - if (networkRunner.getNetwork().isP2pEnabled()) { - pantheonController.getSynchronizer().stop(); - } networkRunner.stop(); networkRunner.awaitStop(); diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java index e5ec03f820..36643a46b1 100644 --- a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/RocksDbQueue.java @@ -23,16 +23,12 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.primitives.Longs; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; public class RocksDbQueue implements BytesQueue { - private static final Logger LOG = LogManager.getLogger(); - private final Options options; private final RocksDB db; @@ -46,11 +42,7 @@ public class RocksDbQueue implements BytesQueue { private RocksDbQueue(final Path storageDirectory, final MetricsSystem metricsSystem) { try { RocksDbUtil.loadNativeLibrary(); - options = - new Options() - .setCreateIfMissing(true) - // TODO: Support restoration from a previously persisted queue - .setErrorIfExists(true); + options = new Options().setCreateIfMissing(true); db = RocksDB.open(options, storageDirectory.toString()); enqueueLatency =