From 66ce6ee2f8aeaa04e478b72bc925d5c1c0882199 Mon Sep 17 00:00:00 2001 From: mbaxter Date: Fri, 25 Jan 2019 20:48:54 -0500 Subject: [PATCH] [NC-1344] Create a simple WorldStateDownloader (#657) --- .../KeyValueStorageWorldStateStorage.java | 54 +++- .../worldstate/DefaultMutableWorldState.java | 70 ++--- .../worldstate/StateTrieAccountValue.java | 95 ++++++ .../worldstate/WorldStateArchive.java | 6 +- .../worldstate/WorldStateStorage.java | 20 +- .../ethereum/core/BlockDataGenerator.java | 29 +- .../KeyValueStorageWorldStateStorageTest.java | 157 ++++++++++ .../DefaultMutableWorldStateTest.java | 22 +- ethereum/eth/build.gradle | 2 + .../AccountTrieNodeDataRequest.java | 61 ++++ .../sync/worldstate/CodeNodeDataRequest.java | 39 +++ .../eth/sync/worldstate/NodeDataRequest.java | 69 +++++ .../StorageTrieNodeDataRequest.java | 46 +++ .../sync/worldstate/TrieNodeDataRequest.java | 71 +++++ .../sync/worldstate/WorldStateDownloader.java | 201 +++++++++++++ .../manager/DeterministicEthScheduler.java | 16 +- .../manager/EthProtocolManagerTestUtil.java | 10 +- .../ethtaskutils/BlockchainSetupUtil.java | 2 +- .../worldstate/WorldStateDownloaderTest.java | 283 ++++++++++++++++++ .../pantheon/ethereum/trie/BranchNode.java | 14 +- .../pantheon/ethereum/trie/ExtensionNode.java | 16 +- .../pantheon/ethereum/trie/LeafNode.java | 13 +- .../ethereum/trie/MerklePatriciaTrie.java | 4 +- ...xception.java => MerkleTrieException.java} | 6 +- .../pegasys/pantheon/ethereum/trie/Node.java | 16 +- .../pantheon/ethereum/trie/NullNode.java | 17 +- .../trie/StoredMerklePatriciaTrie.java | 6 +- .../pantheon/ethereum/trie/StoredNode.java | 28 +- .../ethereum/trie/StoredNodeFactory.java | 27 +- .../ethereum/trie/TrieNodeDecoder.java | 36 +++ services/queue/build.gradle | 38 +++ .../pantheon/services/queue/BigQueue.java | 33 ++ .../services/queue/InMemoryBigQueue.java | 40 +++ services/queue/src/main/resources/log4j2.xml | 16 + settings.gradle | 1 + 35 files changed, 1436 insertions(+), 128 deletions(-) create mode 100644 ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/StateTrieAccountValue.java create mode 100644 ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorageTest.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java create mode 100644 ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java create mode 100644 ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java rename ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/{MerkleStorageException.java => MerkleTrieException.java} (80%) create mode 100644 ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoder.java create mode 100644 services/queue/build.gradle create mode 100644 services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BigQueue.java create mode 100644 services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryBigQueue.java create mode 100644 services/queue/src/main/resources/log4j2.xml diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorage.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorage.java index b300ece3b0..51f7c2eb6a 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorage.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorage.java @@ -13,6 +13,7 @@ package tech.pegasys.pantheon.ethereum.storage.keyvalue; import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; import tech.pegasys.pantheon.services.kvstore.KeyValueStorage; import tech.pegasys.pantheon.util.bytes.Bytes32; @@ -29,23 +30,41 @@ public KeyValueStorageWorldStateStorage(final KeyValueStorage keyValueStorage) { } @Override - public Optional getCode(final Hash codeHash) { - return keyValueStorage.get(codeHash); + public Optional getCode(final Bytes32 codeHash) { + if (codeHash.equals(Hash.EMPTY)) { + return Optional.of(BytesValue.EMPTY); + } else { + return keyValueStorage.get(codeHash); + } } @Override public Optional getAccountStateTrieNode(final Bytes32 nodeHash) { - return keyValueStorage.get(nodeHash); + return getTrieNode(nodeHash); } @Override public Optional getAccountStorageTrieNode(final Bytes32 nodeHash) { - return keyValueStorage.get(nodeHash); + return getTrieNode(nodeHash); + } + + private Optional getTrieNode(final Bytes32 nodeHash) { + if (nodeHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) { + return Optional.of(MerklePatriciaTrie.EMPTY_TRIE_NODE); + } else { + return keyValueStorage.get(nodeHash); + } } @Override - public Optional getNodeData(final Hash hash) { - return keyValueStorage.get(hash); + public Optional getNodeData(final Bytes32 hash) { + if (hash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) { + return Optional.of(MerklePatriciaTrie.EMPTY_TRIE_NODE); + } else if (hash.equals(Hash.EMPTY)) { + return Optional.of(BytesValue.EMPTY); + } else { + return keyValueStorage.get(hash); + } } @Override @@ -62,18 +81,33 @@ public Updater(final KeyValueStorage.Transaction transaction) { } @Override - public void putCode(final BytesValue code) { - transaction.put(Hash.hash(code), code); + public Updater putCode(final Bytes32 codeHash, final BytesValue code) { + if (code.size() == 0) { + // Don't save empty values + return this; + } + transaction.put(codeHash, code); + return this; } @Override - public void putAccountStateTrieNode(final Bytes32 nodeHash, final BytesValue node) { + public Updater putAccountStateTrieNode(final Bytes32 nodeHash, final BytesValue node) { + if (nodeHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) { + // Don't save empty nodes + return this; + } transaction.put(nodeHash, node); + return this; } @Override - public void putAccountStorageTrieNode(final Bytes32 nodeHash, final BytesValue node) { + public Updater putAccountStorageTrieNode(final Bytes32 nodeHash, final BytesValue node) { + if (nodeHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) { + // Don't save empty nodes + return this; + } transaction.put(nodeHash, node); + return this; } @Override diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/DefaultMutableWorldState.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/DefaultMutableWorldState.java index b3c87fbd42..f83d0f4e3d 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/DefaultMutableWorldState.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/DefaultMutableWorldState.java @@ -49,7 +49,7 @@ public class DefaultMutableWorldState implements MutableWorldState { private final WorldStateStorage worldStateStorage; public DefaultMutableWorldState(final WorldStateStorage storage) { - this(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, storage); + this(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, storage); } public DefaultMutableWorldState( @@ -103,31 +103,15 @@ public Account get(final Address address) { private AccountState deserializeAccount( final Address address, final Hash addressHash, final BytesValue encoded) throws RLPException { final RLPInput in = RLP.input(encoded); - in.enterList(); - - final long nonce = in.readLongScalar(); - final Wei balance = in.readUInt256Scalar(Wei::wrap); - final Hash storageRoot = Hash.wrap(in.readBytes32()); - final Hash codeHash = Hash.wrap(in.readBytes32()); - - in.leaveList(); - - return new AccountState(address, addressHash, nonce, balance, storageRoot, codeHash); + StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(in); + return new AccountState(address, addressHash, accountValue); } private static BytesValue serializeAccount( - final long nonce, final Wei balance, final Hash codeHash, final Hash storageRoot) { - return RLP.encode( - out -> { - out.startList(); - - out.writeLongScalar(nonce); - out.writeUInt256Scalar(balance); - out.writeBytesValue(storageRoot); - out.writeBytesValue(codeHash); - - out.endList(); - }); + final long nonce, final Wei balance, final Hash storageRoot, final Hash codeHash) { + StateTrieAccountValue accountValue = + new StateTrieAccountValue(nonce, balance, storageRoot, codeHash); + return RLP.encode(accountValue::writeTo); } @Override @@ -187,28 +171,17 @@ protected class AccountState implements Account { private final Address address; private final Hash addressHash; - private final long nonce; - private final Wei balance; - private final Hash storageRoot; - private final Hash codeHash; + final StateTrieAccountValue accountValue; // Lazily initialized since we don't always access storage. private volatile MerklePatriciaTrie storageTrie; private AccountState( - final Address address, - final Hash addressHash, - final long nonce, - final Wei balance, - final Hash storageRoot, - final Hash codeHash) { + final Address address, final Hash addressHash, final StateTrieAccountValue accountValue) { this.address = address; this.addressHash = addressHash; - this.nonce = nonce; - this.balance = balance; - this.storageRoot = storageRoot; - this.codeHash = codeHash; + this.accountValue = accountValue; } private MerklePatriciaTrie storageTrie() { @@ -217,7 +190,7 @@ private MerklePatriciaTrie storageTrie() { storageTrie = updatedTrie; } if (storageTrie == null) { - storageTrie = newAccountStorageTrie(storageRoot); + storageTrie = newAccountStorageTrie(getStorageRoot()); } return storageTrie; } @@ -234,12 +207,16 @@ public Hash getAddressHash() { @Override public long getNonce() { - return nonce; + return accountValue.getNonce(); } @Override public Wei getBalance() { - return balance; + return accountValue.getBalance(); + } + + Hash getStorageRoot() { + return accountValue.getStorageRoot(); } @Override @@ -249,6 +226,7 @@ public BytesValue getCode() { return updatedCode; } // No code is common, save the KV-store lookup. + Hash codeHash = getCodeHash(); if (codeHash.equals(Hash.EMPTY)) { return BytesValue.EMPTY; } @@ -262,7 +240,7 @@ public boolean hasCode() { @Override public Hash getCodeHash() { - return codeHash; + return accountValue.getCodeHash(); } @Override @@ -303,8 +281,8 @@ public String toString() { builder.append("address=").append(getAddress()).append(", "); builder.append("nonce=").append(getNonce()).append(", "); builder.append("balance=").append(getBalance()).append(", "); - builder.append("storageRoot=").append(storageRoot).append(", "); - builder.append("codeHash=").append(codeHash); + builder.append("storageRoot=").append(getStorageRoot()).append(", "); + builder.append("codeHash=").append(getCodeHash()); return builder.append("}").toString(); } } @@ -353,14 +331,14 @@ public void commit() { final AccountState origin = updated.getWrappedAccount(); // Save the code in key-value storage ... - Hash codeHash = origin == null ? Hash.EMPTY : origin.codeHash; + Hash codeHash = origin == null ? Hash.EMPTY : origin.getCodeHash(); if (updated.codeWasUpdated()) { codeHash = Hash.hash(updated.getCode()); wrapped.updatedAccountCode.put(updated.getAddress(), updated.getCode()); } // ...and storage in the account trie first. final boolean freshState = origin == null || updated.getStorageWasCleared(); - Hash storageRoot = freshState ? Hash.EMPTY_TRIE_HASH : origin.storageRoot; + Hash storageRoot = freshState ? Hash.EMPTY_TRIE_HASH : origin.getStorageRoot(); if (freshState) { wrapped.updatedStorageTries.remove(updated.getAddress()); } @@ -386,7 +364,7 @@ public void commit() { // Lastly, save the new account. final BytesValue account = - serializeAccount(updated.getNonce(), updated.getBalance(), codeHash, storageRoot); + serializeAccount(updated.getNonce(), updated.getBalance(), storageRoot, codeHash); wrapped.accountStateTrie.put(updated.getAddressHash(), account); } diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/StateTrieAccountValue.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/StateTrieAccountValue.java new file mode 100644 index 0000000000..57e5c11c4b --- /dev/null +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/StateTrieAccountValue.java @@ -0,0 +1,95 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.worldstate; + +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.core.Wei; +import tech.pegasys.pantheon.ethereum.rlp.RLPInput; +import tech.pegasys.pantheon.ethereum.rlp.RLPOutput; + +/** Represents the raw values associated with an account in the world state trie. */ +public class StateTrieAccountValue { + + private final long nonce; + private final Wei balance; + private final Hash storageRoot; + private final Hash codeHash; + + public StateTrieAccountValue( + final long nonce, final Wei balance, final Hash storageRoot, final Hash codeHash) { + this.nonce = nonce; + this.balance = balance; + this.storageRoot = storageRoot; + this.codeHash = codeHash; + } + + /** + * The account nonce, that is the number of transactions sent from that account. + * + * @return the account nonce. + */ + public long getNonce() { + return nonce; + } + + /** + * The available balance of that account. + * + * @return the balance, in Wei, of the account. + */ + public Wei getBalance() { + return balance; + } + + /** + * The hash of the root of the storage trie associated with this account. + * + * @return the hash of the root node of the storage trie. + */ + public Hash getStorageRoot() { + return storageRoot; + } + + /** + * The hash of the EVM bytecode associated with this account. + * + * @return the hash of the account code (which may be {@link Hash#EMPTY}. + */ + public Hash getCodeHash() { + return codeHash; + } + + public void writeTo(final RLPOutput out) { + out.startList(); + + out.writeLongScalar(nonce); + out.writeUInt256Scalar(balance); + out.writeBytesValue(storageRoot); + out.writeBytesValue(codeHash); + + out.endList(); + } + + public static StateTrieAccountValue readFrom(final RLPInput in) { + in.enterList(); + + final long nonce = in.readLongScalar(); + final Wei balance = in.readUInt256Scalar(Wei::wrap); + final Hash storageRoot = Hash.wrap(in.readBytes32()); + final Hash codeHash = Hash.wrap(in.readBytes32()); + + in.leaveList(); + + return new StateTrieAccountValue(nonce, balance, storageRoot, codeHash); + } +} diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/WorldStateArchive.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/WorldStateArchive.java index 215fc61385..f29742f2db 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/WorldStateArchive.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/WorldStateArchive.java @@ -22,7 +22,7 @@ public class WorldStateArchive { private final WorldStateStorage storage; - private static final Hash EMPTY_ROOT_HASH = Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH); + private static final Hash EMPTY_ROOT_HASH = Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH); public WorldStateArchive(final WorldStateStorage storage) { this.storage = storage; @@ -47,4 +47,8 @@ public MutableWorldState getMutable() { public Optional getNodeData(final Hash hash) { return storage.getNodeData(hash); } + + public WorldStateStorage getStorage() { + return storage; + } } diff --git a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/WorldStateStorage.java b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/WorldStateStorage.java index ac2a30af90..2b147cf2ce 100644 --- a/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/WorldStateStorage.java +++ b/ethereum/core/src/main/java/tech/pegasys/pantheon/ethereum/worldstate/WorldStateStorage.java @@ -20,23 +20,33 @@ public interface WorldStateStorage { - Optional getCode(Hash codeHash); + Optional getCode(Bytes32 codeHash); Optional getAccountStateTrieNode(Bytes32 nodeHash); Optional getAccountStorageTrieNode(Bytes32 nodeHash); - Optional getNodeData(Hash hash); + Optional getNodeData(Bytes32 hash); + + default boolean contains(final Bytes32 hash) { + return getNodeData(hash).isPresent(); + } Updater updater(); interface Updater { - void putCode(BytesValue code); + Updater putCode(Bytes32 nodeHash, BytesValue code); + + default Updater putCode(final BytesValue code) { + // Skip the hash calculation for empty code + Hash codeHash = code.size() == 0 ? Hash.EMPTY : Hash.hash(code); + return putCode(codeHash, code); + } - void putAccountStateTrieNode(Bytes32 nodeHash, BytesValue node); + Updater putAccountStateTrieNode(Bytes32 nodeHash, BytesValue node); - void putAccountStorageTrieNode(Bytes32 nodeHash, BytesValue node); + Updater putAccountStorageTrieNode(Bytes32 nodeHash, BytesValue node); void commit(); diff --git a/ethereum/core/src/test-support/java/tech/pegasys/pantheon/ethereum/core/BlockDataGenerator.java b/ethereum/core/src/test-support/java/tech/pegasys/pantheon/ethereum/core/BlockDataGenerator.java index a20175cd8f..efdee7741d 100644 --- a/ethereum/core/src/test-support/java/tech/pegasys/pantheon/ethereum/core/BlockDataGenerator.java +++ b/ethereum/core/src/test-support/java/tech/pegasys/pantheon/ethereum/core/BlockDataGenerator.java @@ -58,7 +58,7 @@ private List blockSequence( final List seq = new ArrayList<>(count); final MutableWorldState worldState = - worldStateArchive.getMutable(Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH)); + worldStateArchive.getMutable(Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)); long nextBlockNumber = nextBlock; Hash parentHash = parent; @@ -95,6 +95,33 @@ private List blockSequence( return seq; } + public List createRandomAccounts(final MutableWorldState worldState, final int count) { + WorldUpdater updater = worldState.updater(); + List accounts = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + MutableAccount account = updater.getOrCreate(address()); + // Make some accounts contract accounts + if (random.nextFloat() < .5) { + // Subset of random accounts are contract accounts + account.setCode(bytesValue(5, 50)); + if (random.nextFloat() < .75) { + // Add some storage for most contract accounts + int storageValues = random.nextInt(20) + 10; + for (int j = 0; j < storageValues; j++) { + account.setStorageValue(uint256(), uint256()); + } + } + } + account.setNonce(random.nextInt(10)); + account.setBalance(Wei.of(positiveLong())); + + accounts.add(account); + } + updater.commit(); + worldState.persist(); + return accounts; + } + public List blockSequence(final int count) { final WorldStateArchive worldState = createInMemoryWorldStateArchive(); return blockSequence(count, worldState, Collections.emptyList(), Collections.emptyList()); diff --git a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorageTest.java b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorageTest.java new file mode 100644 index 0000000000..ca95b5ce44 --- /dev/null +++ b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorageTest.java @@ -0,0 +1,157 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.storage.keyvalue; + +import static org.assertj.core.api.Assertions.assertThat; + +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; +import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import org.junit.Test; + +public class KeyValueStorageWorldStateStorageTest { + + @Test + public void getCode_returnsEmpty() { + KeyValueStorageWorldStateStorage storage = emptyStorage(); + assertThat(storage.getCode(Hash.EMPTY)).contains(BytesValue.EMPTY); + } + + @Test + public void getAccountStateTrieNode_returnsEmptyNode() { + KeyValueStorageWorldStateStorage storage = emptyStorage(); + assertThat(storage.getAccountStateTrieNode(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) + .contains(MerklePatriciaTrie.EMPTY_TRIE_NODE); + } + + @Test + public void getAccountStorageTrieNode_returnsEmptyNode() { + KeyValueStorageWorldStateStorage storage = emptyStorage(); + assertThat(storage.getAccountStorageTrieNode(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) + .contains(MerklePatriciaTrie.EMPTY_TRIE_NODE); + } + + @Test + public void getNodeData_returnsEmptyValue() { + KeyValueStorageWorldStateStorage storage = emptyStorage(); + assertThat(storage.getNodeData(Hash.EMPTY)).contains(BytesValue.EMPTY); + } + + @Test + public void getNodeData_returnsEmptyNode() { + KeyValueStorageWorldStateStorage storage = emptyStorage(); + assertThat(storage.getNodeData(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) + .contains(MerklePatriciaTrie.EMPTY_TRIE_NODE); + } + + @Test + public void getCode_saveAndGetSpecialValues() { + KeyValueStorageWorldStateStorage storage = emptyStorage(); + storage + .updater() + .putCode(MerklePatriciaTrie.EMPTY_TRIE_NODE) + .putCode(BytesValue.EMPTY) + .commit(); + + assertThat(storage.getCode(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) + .contains(MerklePatriciaTrie.EMPTY_TRIE_NODE); + assertThat(storage.getCode(Hash.EMPTY)).contains(BytesValue.EMPTY); + } + + @Test + public void getCode_saveAndGetRegularValue() { + BytesValue bytes = BytesValue.fromHexString("0x123456"); + KeyValueStorageWorldStateStorage storage = emptyStorage(); + storage.updater().putCode(bytes).commit(); + + assertThat(storage.getCode(Hash.hash(bytes))).contains(bytes); + } + + @Test + public void getAccountStateTrieNode_saveAndGetSpecialValues() { + KeyValueStorageWorldStateStorage storage = emptyStorage(); + storage + .updater() + .putAccountStateTrieNode( + Hash.hash(MerklePatriciaTrie.EMPTY_TRIE_NODE), MerklePatriciaTrie.EMPTY_TRIE_NODE) + .putAccountStateTrieNode(Hash.hash(BytesValue.EMPTY), BytesValue.EMPTY) + .commit(); + + assertThat(storage.getAccountStateTrieNode(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) + .contains(MerklePatriciaTrie.EMPTY_TRIE_NODE); + assertThat(storage.getAccountStateTrieNode(Hash.EMPTY)).contains(BytesValue.EMPTY); + } + + @Test + public void getAccountStateTrieNode_saveAndGetRegularValue() { + BytesValue bytes = BytesValue.fromHexString("0x123456"); + KeyValueStorageWorldStateStorage storage = emptyStorage(); + storage.updater().putAccountStateTrieNode(Hash.hash(bytes), bytes).commit(); + + assertThat(storage.getAccountStateTrieNode(Hash.hash(bytes))).contains(bytes); + } + + @Test + public void getAccountStorageTrieNode_saveAndGetSpecialValues() { + KeyValueStorageWorldStateStorage storage = emptyStorage(); + storage + .updater() + .putAccountStorageTrieNode( + Hash.hash(MerklePatriciaTrie.EMPTY_TRIE_NODE), MerklePatriciaTrie.EMPTY_TRIE_NODE) + .putAccountStorageTrieNode(Hash.hash(BytesValue.EMPTY), BytesValue.EMPTY) + .commit(); + + assertThat(storage.getAccountStorageTrieNode(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) + .contains(MerklePatriciaTrie.EMPTY_TRIE_NODE); + assertThat(storage.getAccountStorageTrieNode(Hash.EMPTY)).contains(BytesValue.EMPTY); + } + + @Test + public void getAccountStorageTrieNode_saveAndGetRegularValue() { + BytesValue bytes = BytesValue.fromHexString("0x123456"); + KeyValueStorageWorldStateStorage storage = emptyStorage(); + storage.updater().putAccountStorageTrieNode(Hash.hash(bytes), bytes).commit(); + + assertThat(storage.getAccountStateTrieNode(Hash.hash(bytes))).contains(bytes); + } + + @Test + public void getNodeData_saveAndGetSpecialValues() { + KeyValueStorageWorldStateStorage storage = emptyStorage(); + storage + .updater() + .putAccountStorageTrieNode( + Hash.hash(MerklePatriciaTrie.EMPTY_TRIE_NODE), MerklePatriciaTrie.EMPTY_TRIE_NODE) + .putAccountStorageTrieNode(Hash.hash(BytesValue.EMPTY), BytesValue.EMPTY) + .commit(); + + assertThat(storage.getNodeData(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) + .contains(MerklePatriciaTrie.EMPTY_TRIE_NODE); + assertThat(storage.getNodeData(Hash.EMPTY)).contains(BytesValue.EMPTY); + } + + @Test + public void getNodeData_saveAndGetRegularValue() { + BytesValue bytes = BytesValue.fromHexString("0x123456"); + KeyValueStorageWorldStateStorage storage = emptyStorage(); + storage.updater().putAccountStorageTrieNode(Hash.hash(bytes), bytes).commit(); + + assertThat(storage.getNodeData(Hash.hash(bytes))).contains(bytes); + } + + private KeyValueStorageWorldStateStorage emptyStorage() { + return new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); + } +} diff --git a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/DefaultMutableWorldStateTest.java b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/DefaultMutableWorldStateTest.java index 4fcfa153b3..7f4beb028d 100644 --- a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/DefaultMutableWorldStateTest.java +++ b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/worldstate/DefaultMutableWorldStateTest.java @@ -58,10 +58,10 @@ private static MutableWorldState createEmpty() { @Test public void rootHash_Empty() { final MutableWorldState worldState = createEmpty(); - assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash()); + assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash()); worldState.persist(); - assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash()); + assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash()); } @Test @@ -88,10 +88,10 @@ public void removeAccount_AccountDoesNotExist() { final WorldUpdater updater = worldState.updater(); updater.deleteAccount(ADDRESS); updater.commit(); - assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash()); + assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash()); worldState.persist(); - assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash()); + assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash()); } @Test @@ -101,10 +101,10 @@ public void removeAccount_UpdatedAccount() { updater.createAccount(ADDRESS).setBalance(Wei.of(100000)); updater.deleteAccount(ADDRESS); updater.commit(); - assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash()); + assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash()); worldState.persist(); - assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash()); + assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash()); } @Test @@ -115,7 +115,7 @@ public void removeAccount_AccountExists() { updater.createAccount(ADDRESS).setBalance(Wei.of(100000)); updater.commit(); assertNotNull(worldState.get(ADDRESS)); - assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash()); + assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash()); // Delete account updater = worldState.updater(); @@ -125,7 +125,7 @@ public void removeAccount_AccountExists() { updater.commit(); assertNull(updater.get(ADDRESS)); - assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash()); + assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash()); } @Test @@ -137,7 +137,7 @@ public void removeAccount_AccountExistsAndIsPersisted() { updater.commit(); worldState.persist(); assertNotNull(worldState.get(ADDRESS)); - assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash()); + assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash()); // Delete account updater = worldState.updater(); @@ -151,7 +151,7 @@ public void removeAccount_AccountExistsAndIsPersisted() { worldState.persist(); assertNull(updater.get(ADDRESS)); - assertEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash()); + assertEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash()); } @Test @@ -377,7 +377,7 @@ public void clearStorage_AfterPersisting() { updater.commit(); worldState.persist(); assertNotNull(worldState.get(ADDRESS)); - assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, worldState.rootHash()); + assertNotEquals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, worldState.rootHash()); // Clear storage account = updater.getMutable(ADDRESS); diff --git a/ethereum/eth/build.gradle b/ethereum/eth/build.gradle index 931817b3b7..68e5f7cffc 100644 --- a/ethereum/eth/build.gradle +++ b/ethereum/eth/build.gradle @@ -29,9 +29,11 @@ dependencies { implementation project(':ethereum:core') implementation project(':ethereum:p2p') implementation project(':ethereum:rlp') + implementation project(':ethereum:trie') implementation project(':ethereum:permissioning') implementation project(':metrics') implementation project(':services:kvstore') + implementation project(':services:queue') implementation 'io.vertx:vertx-core' implementation 'com.google.guava:guava' diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java new file mode 100644 index 0000000000..fcdb9df8e1 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/AccountTrieNodeDataRequest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; + +import static com.google.common.base.Preconditions.checkNotNull; + +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.rlp.RLP; +import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; +import tech.pegasys.pantheon.ethereum.worldstate.StateTrieAccountValue; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.ArrayList; +import java.util.List; + +class AccountTrieNodeDataRequest extends TrieNodeDataRequest { + + AccountTrieNodeDataRequest(final Hash hash) { + super(Kind.ACCOUNT_TRIE_NODE, hash); + } + + @Override + public void persist(final Updater updater) { + checkNotNull(getData(), "Must set data before node can be persisted."); + updater.putAccountStateTrieNode(getHash(), getData()); + } + + @Override + protected NodeDataRequest createChildNodeDataRequest(final Hash childHash) { + return NodeDataRequest.createAccountDataRequest(childHash); + } + + @Override + protected List getRequestsFromTrieNodeValue(final BytesValue value) { + List nodeData = new ArrayList<>(2); + StateTrieAccountValue accountValue = StateTrieAccountValue.readFrom(RLP.input(value)); + // Add code, if appropriate + if (!accountValue.getCodeHash().equals(Hash.EMPTY)) { + nodeData.add(NodeDataRequest.createCodeRequest(accountValue.getCodeHash())); + } + // Add storage, if appropriate + if (!accountValue.getStorageRoot().equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) { + // If storage is non-empty queue download + NodeDataRequest storageNode = + NodeDataRequest.createStorageDataRequest(accountValue.getStorageRoot()); + nodeData.add(storageNode); + } + return nodeData; + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java new file mode 100644 index 0000000000..0466252281 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/CodeNodeDataRequest.java @@ -0,0 +1,39 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; + +import static com.google.common.base.Preconditions.checkNotNull; + +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; + +import java.util.stream.Stream; + +class CodeNodeDataRequest extends NodeDataRequest { + + CodeNodeDataRequest(final Hash hash) { + super(Kind.CODE, hash); + } + + @Override + public void persist(final Updater updater) { + checkNotNull(getData(), "Must set data before node can be persisted."); + updater.putCode(getHash(), getData()); + } + + @Override + public Stream getChildRequests() { + // Code nodes have nothing further to download + return Stream.empty(); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java new file mode 100644 index 0000000000..67b7a63ae5 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/NodeDataRequest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; + +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.stream.Stream; + +abstract class NodeDataRequest { + public enum Kind { + ACCOUNT_TRIE_NODE, + STORAGE_TRIE_NODE, + CODE + } + + private final Kind kind; + private final Hash hash; + private BytesValue data; + + protected NodeDataRequest(final Kind kind, final Hash hash) { + this.kind = kind; + this.hash = hash; + } + + public static AccountTrieNodeDataRequest createAccountDataRequest(final Hash hash) { + return new AccountTrieNodeDataRequest(hash); + } + + public static StorageTrieNodeDataRequest createStorageDataRequest(final Hash hash) { + return new StorageTrieNodeDataRequest(hash); + } + + public static CodeNodeDataRequest createCodeRequest(final Hash hash) { + return new CodeNodeDataRequest(hash); + } + + public Kind getKind() { + return kind; + } + + public Hash getHash() { + return hash; + } + + public BytesValue getData() { + return data; + } + + public NodeDataRequest setData(final BytesValue data) { + this.data = data; + return this; + } + + public abstract void persist(final WorldStateStorage.Updater updater); + + public abstract Stream getChildRequests(); +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java new file mode 100644 index 0000000000..d61292a066 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/StorageTrieNodeDataRequest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; + +import static com.google.common.base.Preconditions.checkNotNull; + +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.Collections; +import java.util.List; + +class StorageTrieNodeDataRequest extends TrieNodeDataRequest { + + StorageTrieNodeDataRequest(final Hash hash) { + super(Kind.STORAGE_TRIE_NODE, hash); + } + + @Override + public void persist(final Updater updater) { + checkNotNull(getData(), "Must set data before node can be persisted."); + updater.putAccountStorageTrieNode(getHash(), getData()); + } + + @Override + protected NodeDataRequest createChildNodeDataRequest(final Hash childHash) { + return NodeDataRequest.createStorageDataRequest(childHash); + } + + @Override + protected List getRequestsFromTrieNodeValue(final BytesValue value) { + // Nothing to do for terminal storage node + return Collections.emptyList(); + } +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java new file mode 100644 index 0000000000..abf9bbd512 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/TrieNodeDataRequest.java @@ -0,0 +1,71 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; + +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.trie.Node; +import tech.pegasys.pantheon.ethereum.trie.TrieNodeDecoder; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.List; +import java.util.stream.Stream; + +abstract class TrieNodeDataRequest extends NodeDataRequest { + + private static final TrieNodeDecoder nodeDecoder = TrieNodeDecoder.create(); + + TrieNodeDataRequest(final Kind kind, final Hash hash) { + super(kind, hash); + } + + @Override + public Stream getChildRequests() { + if (getData() == null) { + // If this node hasn't been downloaded yet, we can't return any child data + return Stream.empty(); + } + + final Node node = nodeDecoder.decode(getData()); + return getRequestsFromLoadedTrieNode(node); + } + + private Stream getRequestsFromLoadedTrieNode(final Node trieNode) { + // Process this node's children + final Stream childRequests = + trieNode + .getChildren() + .map(List::stream) + .map(s -> s.flatMap(this::getRequestsFromChildTrieNode)) + .orElse(Stream.of()); + + // Process value at this node, if present + return trieNode + .getValue() + .map(v -> Stream.concat(childRequests, (getRequestsFromTrieNodeValue(v).stream()))) + .orElse(childRequests); + } + + private Stream getRequestsFromChildTrieNode(final Node trieNode) { + if (trieNode.isReferencedByHash()) { + // If child nodes are reference by hash, we need to download them + NodeDataRequest req = createChildNodeDataRequest(Hash.wrap(trieNode.getHash())); + return Stream.of(req); + } + // Otherwise if the child's value has been inlined we can go ahead and process it + return getRequestsFromLoadedTrieNode(trieNode); + } + + protected abstract NodeDataRequest createChildNodeDataRequest(final Hash childHash); + + protected abstract List getRequestsFromTrieNodeValue(final BytesValue value); +} diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java new file mode 100644 index 0000000000..06afeed700 --- /dev/null +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -0,0 +1,201 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; + +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult; +import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; +import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetNodeDataFromPeerTask; +import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask; +import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; +import tech.pegasys.pantheon.metrics.LabelledMetric; +import tech.pegasys.pantheon.metrics.OperationTimer; +import tech.pegasys.pantheon.services.queue.BigQueue; +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +public class WorldStateDownloader { + + private enum Status { + IDLE, + RUNNING, + DONE + } + + private final EthContext ethContext; + // The target header for which we want to retrieve world state + private final BlockHeader header; + private final BigQueue pendingRequests; + private final WorldStateStorage.Updater worldStateStorageUpdater; + private final int hashCountPerRequest; + private final int maxOutstandingRequests; + private final AtomicInteger outstandingRequests = new AtomicInteger(0); + private final LabelledMetric ethTasksTimer; + private final WorldStateStorage worldStateStorage; + private final AtomicBoolean sendingRequests = new AtomicBoolean(false); + private volatile CompletableFuture future; + private volatile Status status = Status.IDLE; + + public WorldStateDownloader( + final EthContext ethContext, + final WorldStateStorage worldStateStorage, + final BlockHeader header, + final BigQueue pendingRequests, + final int hashCountPerRequest, + final int maxOutstandingRequests, + final LabelledMetric ethTasksTimer) { + this.ethContext = ethContext; + this.worldStateStorage = worldStateStorage; + this.header = header; + this.pendingRequests = pendingRequests; + this.hashCountPerRequest = hashCountPerRequest; + this.maxOutstandingRequests = maxOutstandingRequests; + this.ethTasksTimer = ethTasksTimer; + this.worldStateStorageUpdater = worldStateStorage.updater(); + + Hash stateRoot = header.getStateRoot(); + if (stateRoot.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH)) { + // If we're requesting data for an empty world state, we're already done + markDone(); + } else { + pendingRequests.enqueue(NodeDataRequest.createAccountDataRequest(header.getStateRoot())); + } + } + + public CompletableFuture run() { + synchronized (this) { + if (status == Status.DONE || status == Status.RUNNING) { + return future; + } + status = Status.RUNNING; + future = new CompletableFuture<>(); + } + + requestNodeData(); + return future; + } + + private void requestNodeData() { + if (sendingRequests.compareAndSet(false, true)) { + while (shouldRequestNodeData()) { + Optional maybePeer = ethContext.getEthPeers().idlePeer(header.getNumber()); + + if (!maybePeer.isPresent()) { + // If no peer is available, wait and try again + waitForNewPeer().whenComplete((r, t) -> requestNodeData()); + break; + } else { + EthPeer peer = maybePeer.get(); + + // Collect data to be requested + List toRequest = new ArrayList<>(); + for (int i = 0; i < hashCountPerRequest; i++) { + NodeDataRequest pendingRequest = pendingRequests.dequeue(); + if (pendingRequest == null) { + break; + } + toRequest.add(pendingRequest); + } + + // Request and process node data + outstandingRequests.incrementAndGet(); + sendAndProcessRequests(peer, toRequest) + .whenComplete( + (res, error) -> { + if (outstandingRequests.decrementAndGet() == 0 && pendingRequests.isEmpty()) { + // We're done + worldStateStorageUpdater.commit(); + markDone(); + } else { + // Send out additional requests + requestNodeData(); + } + }); + } + } + sendingRequests.set(false); + } + } + + private synchronized void markDone() { + if (future == null) { + future = CompletableFuture.completedFuture(null); + } else { + future.complete(null); + } + status = Status.DONE; + } + + private boolean shouldRequestNodeData() { + return !future.isDone() + && outstandingRequests.get() < maxOutstandingRequests + && !pendingRequests.isEmpty(); + } + + private CompletableFuture waitForNewPeer() { + return ethContext + .getScheduler() + .timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5)); + } + + private CompletableFuture sendAndProcessRequests( + final EthPeer peer, final List requests) { + List hashes = + requests.stream().map(NodeDataRequest::getHash).distinct().collect(Collectors.toList()); + return GetNodeDataFromPeerTask.forHashes(ethContext, hashes, ethTasksTimer) + .assignPeer(peer) + .run() + .thenApply(PeerTaskResult::getResult) + .thenApply(this::mapNodeDataByHash) + .whenComplete( + (data, err) -> { + boolean requestFailed = err != null; + for (NodeDataRequest request : requests) { + BytesValue matchingData = requestFailed ? null : data.get(request.getHash()); + if (matchingData == null) { + pendingRequests.enqueue(request); + } else { + // Persist request data + request.setData(matchingData); + request.persist(worldStateStorageUpdater); + + // Queue child requests + request + .getChildRequests() + .filter(n -> !worldStateStorage.contains(n.getHash())) + .forEach(pendingRequests::enqueue); + } + } + }); + } + + private Map mapNodeDataByHash(final List data) { + // Map data by hash + Map dataByHash = new HashMap<>(); + data.stream().forEach(d -> dataByHash.put(Hash.hash(d), d)); + return dataByHash; + } +} diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java index dea698c3d8..49c7629c2e 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/DeterministicEthScheduler.java @@ -16,6 +16,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; /** Schedules tasks that run immediately and synchronously for testing. */ public class DeterministicEthScheduler extends EthScheduler { @@ -23,7 +24,7 @@ public class DeterministicEthScheduler extends EthScheduler { private final TimeoutPolicy timeoutPolicy; DeterministicEthScheduler() { - this(() -> false); + this(TimeoutPolicy.NEVER); } DeterministicEthScheduler(final TimeoutPolicy timeoutPolicy) { @@ -55,6 +56,19 @@ public void failAfterTimeout(final CompletableFuture promise, final Durat @FunctionalInterface public interface TimeoutPolicy { + TimeoutPolicy NEVER = () -> false; + boolean shouldTimeout(); + + static TimeoutPolicy timeoutXTimes(final int times) { + final AtomicInteger timeouts = new AtomicInteger(times); + return () -> { + if (timeouts.get() <= 0) { + return false; + } + timeouts.decrementAndGet(); + return true; + }; + } } } diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java index b6fb16e705..ff523b04f8 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/EthProtocolManagerTestUtil.java @@ -47,7 +47,11 @@ public static EthProtocolManager create( public static EthProtocolManager create( final Blockchain blockchain, final WorldStateArchive worldStateArchive) { - return create(blockchain, worldStateArchive, () -> false); + return create(blockchain, worldStateArchive, TimeoutPolicy.NEVER); + } + + public static EthProtocolManager create() { + return create(TimeoutPolicy.NEVER); } public static EthProtocolManager create(final TimeoutPolicy timeoutPolicy) { @@ -59,10 +63,6 @@ public static EthProtocolManager create(final TimeoutPolicy timeoutPolicy) { return create(blockchain, worldStateArchive, timeoutPolicy); } - public static EthProtocolManager create() { - return create(() -> false); - } - public static void broadcastMessage( final EthProtocolManager ethProtocolManager, final RespondingEthPeer peer, diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/BlockchainSetupUtil.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/BlockchainSetupUtil.java index 61abd6d906..f6db36633a 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/BlockchainSetupUtil.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/manager/ethtaskutils/BlockchainSetupUtil.java @@ -54,7 +54,7 @@ public class BlockchainSetupUtil { private final List blocks; private long maxBlockNumber; - public BlockchainSetupUtil( + private BlockchainSetupUtil( final GenesisState genesisState, final MutableBlockchain blockchain, final ProtocolContext protocolContext, diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java new file mode 100644 index 0000000000..a66e559598 --- /dev/null +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -0,0 +1,283 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.eth.sync.worldstate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +import tech.pegasys.pantheon.ethereum.chain.Blockchain; +import tech.pegasys.pantheon.ethereum.core.Account; +import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator; +import tech.pegasys.pantheon.ethereum.core.BlockDataGenerator.BlockOptions; +import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.core.MutableWorldState; +import tech.pegasys.pantheon.ethereum.core.WorldState; +import tech.pegasys.pantheon.ethereum.eth.manager.DeterministicEthScheduler.TimeoutPolicy; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManager; +import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; +import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; +import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; +import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage; +import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; +import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem; +import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; +import tech.pegasys.pantheon.services.queue.BigQueue; +import tech.pegasys.pantheon.services.queue.InMemoryBigQueue; +import tech.pegasys.pantheon.util.bytes.Bytes32; +import tech.pegasys.pantheon.util.uint.UInt256; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.junit.Test; + +public class WorldStateDownloaderTest { + + private static final Hash EMPTY_TRIE_ROOT = Hash.wrap(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH); + + @Test + public void downloadWorldStateFromPeers_onePeerOneWithManyRequestsOneAtATime() { + downloadAvailableWorldStateFromPeers(1, 50, 1, 1); + } + + @Test + public void downloadWorldStateFromPeers_onePeerOneWithManyRequests() { + downloadAvailableWorldStateFromPeers(1, 50, 1, 10); + } + + @Test + public void downloadWorldStateFromPeers_onePeerWithSingleRequest() { + downloadAvailableWorldStateFromPeers(1, 1, 100, 10); + } + + @Test + public void downloadWorldStateFromPeers_largeStateFromMultiplePeers() { + downloadAvailableWorldStateFromPeers(5, 100, 10, 10); + } + + @Test + public void downloadWorldStateFromPeers_smallStateFromMultiplePeers() { + downloadAvailableWorldStateFromPeers(5, 5, 1, 10); + } + + @Test + public void downloadWorldStateFromPeers_singleRequestWithMultiplePeers() { + downloadAvailableWorldStateFromPeers(5, 1, 50, 50); + } + + @Test + public void downloadEmptyWorldState() { + BlockDataGenerator dataGen = new BlockDataGenerator(1); + final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + BlockHeader header = + dataGen + .block(BlockOptions.create().setStateRoot(EMPTY_TRIE_ROOT).setBlockNumber(10)) + .getHeader(); + + // Create some peers + List peers = + Stream.generate( + () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber())) + .limit(5) + .collect(Collectors.toList()); + + BigQueue queue = new InMemoryBigQueue<>(); + WorldStateStorage localStorage = + new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); + WorldStateDownloader downloader = + new WorldStateDownloader( + ethProtocolManager.ethContext(), + localStorage, + header, + queue, + 10, + 10, + NoOpMetricsSystem.NO_OP_LABELLED_TIMER); + + CompletableFuture future = downloader.run(); + assertThat(future).isDone(); + + // Peers should not have been queried + for (RespondingEthPeer peer : peers) { + assertThat(peer.hasOutstandingRequests()).isFalse(); + } + } + + @Test + public void canRecoverFromTimeouts() { + BlockDataGenerator dataGen = new BlockDataGenerator(1); + TimeoutPolicy timeoutPolicy = TimeoutPolicy.timeoutXTimes(2); + final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(timeoutPolicy); + + // Setup "remote" state + final WorldStateStorage remoteStorage = + new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); + final WorldStateArchive remoteWorldStateArchive = new WorldStateArchive(remoteStorage); + final MutableWorldState remoteWorldState = remoteWorldStateArchive.getMutable(); + + // Generate accounts and save corresponding state root + final List accounts = dataGen.createRandomAccounts(remoteWorldState, 20); + final Hash stateRoot = remoteWorldState.rootHash(); + assertThat(stateRoot).isNotEqualTo(EMPTY_TRIE_ROOT); // Sanity check + BlockHeader header = + dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); + + // Create some peers + List peers = + Stream.generate( + () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber())) + .limit(5) + .collect(Collectors.toList()); + + BigQueue queue = new InMemoryBigQueue<>(); + WorldStateStorage localStorage = + new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); + WorldStateDownloader downloader = + new WorldStateDownloader( + ethProtocolManager.ethContext(), + localStorage, + header, + queue, + 10, + 10, + NoOpMetricsSystem.NO_OP_LABELLED_TIMER); + + CompletableFuture result = downloader.run(); + + // Respond to node data requests + Responder responder = + RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); + while (!result.isDone()) { + for (RespondingEthPeer peer : peers) { + peer.respond(responder); + } + } + + // Check that all expected account data was downloaded + WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); + final WorldState localWorldState = localWorldStateArchive.get(stateRoot); + assertThat(result).isDone(); + assertAccountsMatch(localWorldState, accounts); + } + + private void downloadAvailableWorldStateFromPeers( + final int peerCount, + final int accountCount, + final int hashesPerRequest, + final int maxOutstandingRequests) { + final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); + final int trailingPeerCount = 5; + BlockDataGenerator dataGen = new BlockDataGenerator(1); + + // Setup "remote" state + final WorldStateStorage remoteStorage = + new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); + final WorldStateArchive remoteWorldStateArchive = new WorldStateArchive(remoteStorage); + final MutableWorldState remoteWorldState = remoteWorldStateArchive.getMutable(); + + // Generate accounts and save corresponding state root + final List accounts = dataGen.createRandomAccounts(remoteWorldState, accountCount); + final Hash stateRoot = remoteWorldState.rootHash(); + assertThat(stateRoot).isNotEqualTo(EMPTY_TRIE_ROOT); // Sanity check + BlockHeader header = + dataGen.block(BlockOptions.create().setStateRoot(stateRoot).setBlockNumber(10)).getHeader(); + + // Generate more data that should not be downloaded + final List otherAccounts = dataGen.createRandomAccounts(remoteWorldState, 5); + Hash otherStateRoot = remoteWorldState.rootHash(); + BlockHeader otherHeader = + dataGen + .block(BlockOptions.create().setStateRoot(otherStateRoot).setBlockNumber(11)) + .getHeader(); + assertThat(otherStateRoot).isNotEqualTo(stateRoot); // Sanity check + + BigQueue queue = new InMemoryBigQueue<>(); + WorldStateStorage localStorage = + new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); + WorldStateArchive localWorldStateArchive = new WorldStateArchive(localStorage); + WorldStateDownloader downloader = + new WorldStateDownloader( + ethProtocolManager.ethContext(), + localStorage, + header, + queue, + hashesPerRequest, + maxOutstandingRequests, + NoOpMetricsSystem.NO_OP_LABELLED_TIMER); + + // Create some peers that can respond + List usefulPeers = + Stream.generate( + () -> EthProtocolManagerTestUtil.createPeer(ethProtocolManager, header.getNumber())) + .limit(peerCount) + .collect(Collectors.toList()); + // And some irrelevant peers + List trailingPeers = + Stream.generate( + () -> + EthProtocolManagerTestUtil.createPeer( + ethProtocolManager, header.getNumber() - 1L)) + .limit(trailingPeerCount) + .collect(Collectors.toList()); + + // Start downloader + CompletableFuture result = downloader.run(); + + // Respond to node data requests + Responder responder = + RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); + while (!result.isDone()) { + for (RespondingEthPeer peer : usefulPeers) { + peer.respond(responder); + } + } + + // Check that trailing peers were not queried for data + for (RespondingEthPeer trailingPeer : trailingPeers) { + assertThat(trailingPeer.hasOutstandingRequests()).isFalse(); + } + + // Check that all expected account data was downloaded + final WorldState localWorldState = localWorldStateArchive.get(stateRoot); + assertThat(result).isDone(); + assertAccountsMatch(localWorldState, accounts); + + // We shouldn't have any extra data locally + assertThat(localStorage.contains(otherHeader.getStateRoot())).isFalse(); + for (Account otherAccount : otherAccounts) { + assertThat(localWorldState.get(otherAccount.getAddress())).isNull(); + } + } + + private void assertAccountsMatch( + final WorldState worldState, final List expectedAccounts) { + for (Account expectedAccount : expectedAccounts) { + Account actualAccount = worldState.get(expectedAccount.getAddress()); + assertThat(actualAccount).isNotNull(); + // Check each field + assertThat(actualAccount.getNonce()).isEqualTo(expectedAccount.getNonce()); + assertThat(actualAccount.getCode()).isEqualTo(expectedAccount.getCode()); + assertThat(actualAccount.getBalance()).isEqualTo(expectedAccount.getBalance()); + + Map actualStorage = actualAccount.storageEntriesFrom(Bytes32.ZERO, 500); + Map expectedStorage = expectedAccount.storageEntriesFrom(Bytes32.ZERO, 500); + assertThat(actualStorage).isEqualTo(expectedStorage); + } + } +} diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/BranchNode.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/BranchNode.java index f3b02ebf1b..2899d4a14d 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/BranchNode.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/BranchNode.java @@ -23,6 +23,8 @@ import java.lang.ref.SoftReference; import java.lang.ref.WeakReference; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.function.Function; @@ -73,6 +75,11 @@ public Optional getValue() { return value; } + @Override + public Optional>> getChildren() { + return Optional.of(Collections.unmodifiableList(children)); + } + public Node child(final byte index) { return children.get(index); } @@ -103,11 +110,10 @@ public BytesValue getRlp() { @Override public BytesValue getRlpRef() { - final BytesValue rlp = getRlp(); - if (rlp.size() < 32) { - return rlp; - } else { + if (isReferencedByHash()) { return RLP.encodeOne(getHash()); + } else { + return getRlp(); } } diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/ExtensionNode.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/ExtensionNode.java index 7a8be5d591..41c0909581 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/ExtensionNode.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/ExtensionNode.java @@ -22,6 +22,8 @@ import java.lang.ref.SoftReference; import java.lang.ref.WeakReference; +import java.util.Collections; +import java.util.List; import java.util.Optional; class ExtensionNode implements Node { @@ -58,7 +60,12 @@ public BytesValue getPath() { @Override public Optional getValue() { - throw new UnsupportedOperationException(); + return Optional.empty(); + } + + @Override + public Optional>> getChildren() { + return Optional.of(Collections.singletonList(child)); } public Node getChild() { @@ -85,11 +92,10 @@ public BytesValue getRlp() { @Override public BytesValue getRlpRef() { - final BytesValue rlp = getRlp(); - if (rlp.size() < 32) { - return rlp; - } else { + if (isReferencedByHash()) { return RLP.encodeOne(getHash()); + } else { + return getRlp(); } } diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/LeafNode.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/LeafNode.java index 07ed18fc5d..45e72e6923 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/LeafNode.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/LeafNode.java @@ -21,6 +21,7 @@ import java.lang.ref.SoftReference; import java.lang.ref.WeakReference; +import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -64,6 +65,11 @@ public Optional getValue() { return Optional.of(value); } + @Override + public Optional>> getChildren() { + return Optional.empty(); + } + @Override public BytesValue getRlp() { if (rlp != null) { @@ -85,11 +91,10 @@ public BytesValue getRlp() { @Override public BytesValue getRlpRef() { - final BytesValue rlp = getRlp(); - if (rlp.size() < 32) { - return rlp; - } else { + if (isReferencedByHash()) { return RLP.encodeOne(getHash()); + } else { + return getRlp(); } } diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerklePatriciaTrie.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerklePatriciaTrie.java index 0cb31da680..1b869ada5e 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerklePatriciaTrie.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerklePatriciaTrie.java @@ -16,6 +16,7 @@ import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.util.bytes.Bytes32; +import tech.pegasys.pantheon.util.bytes.BytesValue; import java.util.Map; import java.util.Optional; @@ -23,7 +24,8 @@ /** An Merkle Patricial Trie. */ public interface MerklePatriciaTrie { - Bytes32 EMPTY_TRIE_ROOT_HASH = keccak256(RLP.NULL); + BytesValue EMPTY_TRIE_NODE = RLP.NULL; + Bytes32 EMPTY_TRIE_NODE_HASH = keccak256(EMPTY_TRIE_NODE); /** * Returns an {@code Optional} of value mapped to the hash if it exists; otherwise empty. diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerkleStorageException.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerkleTrieException.java similarity index 80% rename from ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerkleStorageException.java rename to ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerkleTrieException.java index 3ed75052f4..b00a1f60b2 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerkleStorageException.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/MerkleTrieException.java @@ -16,13 +16,13 @@ * This exception is thrown when there is an issue retrieving or decoding values from {@link * MerkleStorage}. */ -public class MerkleStorageException extends RuntimeException { +public class MerkleTrieException extends RuntimeException { - public MerkleStorageException(final String message) { + public MerkleTrieException(final String message) { super(message); } - public MerkleStorageException(final String message, final Exception cause) { + public MerkleTrieException(final String message, final Exception cause) { super(message, cause); } } diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/Node.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/Node.java index 4d8b62e1ce..8f35692d20 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/Node.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/Node.java @@ -15,9 +15,10 @@ import tech.pegasys.pantheon.util.bytes.Bytes32; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.List; import java.util.Optional; -interface Node { +public interface Node { Node accept(PathNodeVisitor visitor, BytesValue path); @@ -27,10 +28,23 @@ interface Node { Optional getValue(); + Optional>> getChildren(); + BytesValue getRlp(); BytesValue getRlpRef(); + /** + * Whether a reference to this node should be represented as a hash of the rlp, or the node rlp + * itself should be inlined (the rlp stored directly in the parent node). If true, the node is + * referenced by hash. If false, the node is referenced by its rlp-encoded value. + * + * @return true if this node should be referenced by hash + */ + default boolean isReferencedByHash() { + return getRlp().size() >= 32; + } + Bytes32 getHash(); Node replacePath(BytesValue path); diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/NullNode.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/NullNode.java index 322e6240e6..e6996facab 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/NullNode.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/NullNode.java @@ -12,17 +12,13 @@ */ package tech.pegasys.pantheon.ethereum.trie; -import static tech.pegasys.pantheon.crypto.Hash.keccak256; - -import tech.pegasys.pantheon.ethereum.rlp.RLP; import tech.pegasys.pantheon.util.bytes.Bytes32; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.List; import java.util.Optional; class NullNode implements Node { - private static final Bytes32 HASH = keccak256(RLP.NULL); - @SuppressWarnings("rawtypes") private static final NullNode instance = new NullNode(); @@ -53,19 +49,24 @@ public Optional getValue() { return Optional.empty(); } + @Override + public Optional>> getChildren() { + return Optional.empty(); + } + @Override public BytesValue getRlp() { - return RLP.NULL; + return MerklePatriciaTrie.EMPTY_TRIE_NODE; } @Override public BytesValue getRlpRef() { - return RLP.NULL; + return MerklePatriciaTrie.EMPTY_TRIE_NODE; } @Override public Bytes32 getHash() { - return HASH; + return MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH; } @Override diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredMerklePatriciaTrie.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredMerklePatriciaTrie.java index b7f820c692..63f6f3648c 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredMerklePatriciaTrie.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredMerklePatriciaTrie.java @@ -45,7 +45,7 @@ public StoredMerklePatriciaTrie( final NodeLoader nodeLoader, final Function valueSerializer, final Function valueDeserializer) { - this(nodeLoader, MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH, valueSerializer, valueDeserializer); + this(nodeLoader, MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH, valueSerializer, valueDeserializer); } /** @@ -64,7 +64,7 @@ public StoredMerklePatriciaTrie( final Function valueDeserializer) { this.nodeFactory = new StoredNodeFactory<>(nodeLoader, valueSerializer, valueDeserializer); this.root = - rootHash.equals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH) + rootHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH) ? NullNode.instance() : new StoredNode<>(nodeFactory, rootHash); } @@ -99,7 +99,7 @@ public void commit(final NodeUpdater nodeUpdater) { // Reset root so dirty nodes can be garbage collected final Bytes32 rootHash = root.getHash(); this.root = - rootHash.equals(MerklePatriciaTrie.EMPTY_TRIE_ROOT_HASH) + rootHash.equals(MerklePatriciaTrie.EMPTY_TRIE_NODE_HASH) ? NullNode.instance() : new StoredNode<>(nodeFactory, rootHash); } diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNode.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNode.java index bd80594f90..a9b0759829 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNode.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNode.java @@ -16,6 +16,7 @@ import tech.pegasys.pantheon.util.bytes.Bytes32; import tech.pegasys.pantheon.util.bytes.BytesValue; +import java.util.List; import java.util.Optional; class StoredNode implements Node { @@ -63,10 +64,14 @@ public Optional getValue() { return load().getValue(); } + @Override + public Optional>> getChildren() { + return load().getChildren(); + } + @Override public BytesValue getRlp() { - // Getting the rlp representation is only needed when persisting a concrete node - throw new UnsupportedOperationException(); + return load().getRlp(); } @Override @@ -75,6 +80,12 @@ public BytesValue getRlpRef() { return RLP.encodeOne(hash); } + @Override + public boolean isReferencedByHash() { + // Stored nodes represent only nodes that are referenced by hash + return true; + } + @Override public Bytes32 getHash() { return hash; @@ -87,7 +98,11 @@ public Node replacePath(final BytesValue path) { private Node load() { if (loaded == null) { - loaded = nodeFactory.retrieve(hash); + loaded = + nodeFactory + .retrieve(hash) + .orElseThrow( + () -> new MerkleTrieException("Unable to load trie node value for hash " + hash)); } return loaded; @@ -95,7 +110,10 @@ private Node load() { @Override public String print() { - final String value = load().print(); - return value; + if (loaded == null) { + return "StoredNode:" + "\n\tRef: " + getRlpRef(); + } else { + return load().print(); + } } } diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNodeFactory.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNodeFactory.java index 5268022fe2..41dca7855e 100644 --- a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNodeFactory.java +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/StoredNodeFactory.java @@ -87,7 +87,7 @@ private Node handleNewNode(final Node node) { return node; } - public Node retrieve(final Bytes32 hash) throws MerkleStorageException { + public Optional> retrieve(final Bytes32 hash) throws MerkleTrieException { return nodeLoader .getNode(hash) .map( @@ -97,16 +97,19 @@ public Node retrieve(final Bytes32 hash) throws MerkleStorageException { assert (hash.equals(node.getHash())) : "Node hash " + node.getHash() + " not equal to expected " + hash; return node; - }) - .orElseThrow(() -> new MerkleStorageException("Missing value for hash " + hash)); + }); + } + + public Node decode(final BytesValue rlp) { + return decode(rlp, () -> String.format("Failed to decode value %s", rlp.toString())); } private Node decode(final BytesValue rlp, final Supplier errMessage) - throws MerkleStorageException { + throws MerkleTrieException { try { return decode(RLP.input(rlp), errMessage); } catch (final RLPException ex) { - throw new MerkleStorageException(errMessage.get(), ex); + throw new MerkleTrieException(errMessage.get(), ex); } } @@ -123,8 +126,7 @@ private Node decode(final RLPInput nodeRLPs, final Supplier errMessag try { path = CompactEncoding.decode(encodedPath); } catch (final IllegalArgumentException ex) { - throw new MerkleStorageException( - errMessage.get() + ": invalid path " + encodedPath, ex); + throw new MerkleTrieException(errMessage.get() + ": invalid path " + encodedPath, ex); } final int size = path.size(); @@ -138,7 +140,7 @@ private Node decode(final RLPInput nodeRLPs, final Supplier errMessag return decodeBranch(nodeRLPs, errMessage); default: - throw new MerkleStorageException( + throw new MerkleTrieException( errMessage.get() + format(": invalid list size %s", nodesCount)); } } finally { @@ -189,7 +191,7 @@ private BranchNode decodeBranch(final RLPInput nodeRLPs, final Supplier decodeLeaf( final BytesValue path, final RLPInput valueRlp, final Supplier errMessage) { if (valueRlp.nextIsNull()) { - throw new MerkleStorageException(errMessage.get() + ": leaf has null value"); + throw new MerkleTrieException(errMessage.get() + ": leaf has null value"); } final V value = decodeValue(valueRlp, errMessage); return new LeafNode<>(path, value, this, valueSerializer); @@ -198,7 +200,7 @@ private LeafNode decodeLeaf( @SuppressWarnings("unchecked") private NullNode decodeNull(final RLPInput nodeRLPs, final Supplier errMessage) { if (!nodeRLPs.nextIsNull()) { - throw new MerkleStorageException(errMessage.get() + ": list size 1 but not null"); + throw new MerkleTrieException(errMessage.get() + ": list size 1 but not null"); } nodeRLPs.skipNext(); return NULL_NODE; @@ -209,7 +211,7 @@ private V decodeValue(final RLPInput valueRlp, final Supplier errMessage try { bytes = valueRlp.readBytesValue(); } catch (final RLPException ex) { - throw new MerkleStorageException( + throw new MerkleTrieException( errMessage.get() + ": failed decoding value rlp " + valueRlp, ex); } return deserializeValue(errMessage, bytes); @@ -220,8 +222,7 @@ private V deserializeValue(final Supplier errMessage, final BytesValue b try { value = valueDeserializer.apply(bytes); } catch (final IllegalArgumentException ex) { - throw new MerkleStorageException( - errMessage.get() + ": failed deserializing value " + bytes, ex); + throw new MerkleTrieException(errMessage.get() + ": failed deserializing value " + bytes, ex); } return value; } diff --git a/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoder.java b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoder.java new file mode 100644 index 0000000000..18ca328e50 --- /dev/null +++ b/ethereum/trie/src/main/java/tech/pegasys/pantheon/ethereum/trie/TrieNodeDecoder.java @@ -0,0 +1,36 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.ethereum.trie; + +import tech.pegasys.pantheon.util.bytes.BytesValue; + +import java.util.Optional; +import java.util.function.Function; + +public class TrieNodeDecoder { + + private final StoredNodeFactory nodeFactory; + + private TrieNodeDecoder() { + nodeFactory = + new StoredNodeFactory<>((h) -> Optional.empty(), Function.identity(), Function.identity()); + } + + public static TrieNodeDecoder create() { + return new TrieNodeDecoder(); + } + + public Node decode(final BytesValue rlp) { + return nodeFactory.decode(rlp); + } +} diff --git a/services/queue/build.gradle b/services/queue/build.gradle new file mode 100644 index 0000000000..616f648e2f --- /dev/null +++ b/services/queue/build.gradle @@ -0,0 +1,38 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +apply plugin: 'java-library' + +jar { + baseName 'pantheon-queue' + manifest { + attributes( + 'Specification-Title': baseName, + 'Specification-Version': project.version, + 'Implementation-Title': baseName, + 'Implementation-Version': calculateVersion() + ) + } +} + +dependencies { + api project(':util') + implementation project(':metrics') + + implementation 'org.apache.logging.log4j:log4j-api' + implementation 'com.google.guava:guava' + + runtime 'org.apache.logging.log4j:log4j-core' + + testImplementation 'junit:junit' +} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BigQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BigQueue.java new file mode 100644 index 0000000000..793669a8fb --- /dev/null +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/BigQueue.java @@ -0,0 +1,33 @@ +/* + * Copyright 2018 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.queue; + +import java.io.Closeable; + +/** + * Represents a very large thread-safe queue that may exceed memory limits. + * + * @param the type of data held in the queue + */ +public interface BigQueue extends Closeable { + + void enqueue(T value); + + T dequeue(); + + long size(); + + default boolean isEmpty() { + return size() == 0; + } +} diff --git a/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryBigQueue.java b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryBigQueue.java new file mode 100644 index 0000000000..b40b4ea186 --- /dev/null +++ b/services/queue/src/main/java/tech/pegasys/pantheon/services/queue/InMemoryBigQueue.java @@ -0,0 +1,40 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package tech.pegasys.pantheon.services.queue; + +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class InMemoryBigQueue implements BigQueue { + private final Queue internalQueue = new ConcurrentLinkedQueue<>(); + + @Override + public void enqueue(final T value) { + internalQueue.add(value); + } + + @Override + public T dequeue() { + return internalQueue.poll(); + } + + @Override + public long size() { + return internalQueue.size(); + } + + @Override + public void close() { + internalQueue.clear(); + } +} diff --git a/services/queue/src/main/resources/log4j2.xml b/services/queue/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..1d05234b62 --- /dev/null +++ b/services/queue/src/main/resources/log4j2.xml @@ -0,0 +1,16 @@ + + + + INFO + + + + + + + + + + + + \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index bbfac97692..42fc2c1ec5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -33,6 +33,7 @@ include 'ethereum:trie' include 'metrics' include 'pantheon' include 'services:kvstore' +include 'services:queue' include 'testutil' include 'util' include 'errorprone-checks'