From ba62015a47ca957484e4c92664f7a45edcf1771e Mon Sep 17 00:00:00 2001 From: Meredith Baxter Date: Thu, 7 Feb 2019 14:14:48 -0500 Subject: [PATCH 1/2] Commit world state continuously --- .../KeyValueStorageWorldStateStorageTest.java | 24 +++++++++++++++++++ .../sync/worldstate/WorldStateDownloader.java | 8 +++---- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorageTest.java b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorageTest.java index ca95b5ce44..1e2fd139c3 100644 --- a/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorageTest.java +++ b/ethereum/core/src/test/java/tech/pegasys/pantheon/ethereum/storage/keyvalue/KeyValueStorageWorldStateStorageTest.java @@ -15,6 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat; import tech.pegasys.pantheon.ethereum.core.Hash; +import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage.Updater; import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; import tech.pegasys.pantheon.services.kvstore.InMemoryKeyValueStorage; import tech.pegasys.pantheon.util.bytes.BytesValue; @@ -151,6 +152,29 @@ public void getNodeData_saveAndGetRegularValue() { assertThat(storage.getNodeData(Hash.hash(bytes))).contains(bytes); } + @Test + public void reconcilesNonConflictingUpdaters() { + BytesValue bytesA = BytesValue.fromHexString("0x12"); + BytesValue bytesB = BytesValue.fromHexString("0x1234"); + BytesValue bytesC = BytesValue.fromHexString("0x123456"); + + KeyValueStorageWorldStateStorage storage = emptyStorage(); + Updater updaterA = storage.updater(); + Updater updaterB = storage.updater(); + + updaterA.putCode(bytesA); + updaterB.putCode(bytesA); + updaterB.putCode(bytesB); + updaterA.putCode(bytesC); + + updaterA.commit(); + updaterB.commit(); + + assertThat(storage.getCode(Hash.hash(bytesA))).contains(bytesA); + assertThat(storage.getCode(Hash.hash(bytesB))).contains(bytesB); + assertThat(storage.getCode(Hash.hash(bytesC))).contains(bytesC); + } + private KeyValueStorageWorldStateStorage emptyStorage() { return new KeyValueStorageWorldStateStorage(new InMemoryKeyValueStorage()); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java index f31af93575..af1425e653 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloader.java @@ -21,6 +21,7 @@ import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask; import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage; +import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage.Updater; import tech.pegasys.pantheon.metrics.LabelledMetric; import tech.pegasys.pantheon.metrics.OperationTimer; import tech.pegasys.pantheon.services.queue.BigQueue; @@ -51,7 +52,6 @@ private enum Status { private final EthContext ethContext; private final BigQueue pendingRequests; - private final WorldStateStorage.Updater worldStateStorageUpdater; private final int hashCountPerRequest; private final int maxOutstandingRequests; private final AtomicInteger outstandingRequests = new AtomicInteger(0); @@ -74,7 +74,6 @@ public WorldStateDownloader( this.hashCountPerRequest = hashCountPerRequest; this.maxOutstandingRequests = maxOutstandingRequests; this.ethTasksTimer = ethTasksTimer; - this.worldStateStorageUpdater = worldStateStorage.updater(); } public CompletableFuture run(final BlockHeader header) { @@ -135,7 +134,6 @@ private void requestNodeData(final BlockHeader header) { (res, error) -> { if (outstandingRequests.decrementAndGet() == 0 && pendingRequests.isEmpty()) { // We're done - worldStateStorageUpdater.commit(); markDone(); } else { // Send out additional requests @@ -182,6 +180,7 @@ private CompletableFuture sendAndProcessRequests( .whenComplete( (data, err) -> { boolean requestFailed = err != null; + Updater storageUpdater = worldStateStorage.updater(); for (NodeDataRequest request : requests) { BytesValue matchingData = requestFailed ? null : data.get(request.getHash()); if (matchingData == null) { @@ -189,7 +188,7 @@ private CompletableFuture sendAndProcessRequests( } else { // Persist request data request.setData(matchingData); - request.persist(worldStateStorageUpdater); + request.persist(storageUpdater); // Queue child requests request @@ -198,6 +197,7 @@ private CompletableFuture sendAndProcessRequests( .forEach(pendingRequests::enqueue); } } + storageUpdater.commit(); }); } From d90953db351e5dceaadcf5df608b063230354205 Mon Sep 17 00:00:00 2001 From: Meredith Baxter Date: Thu, 7 Feb 2019 15:23:06 -0500 Subject: [PATCH 2/2] Add more test coverage --- .../worldstate/WorldStateDownloaderTest.java | 88 +++++++++++++++++-- 1 file changed, 83 insertions(+), 5 deletions(-) diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java index 1478178f74..87c0b1ff3f 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/worldstate/WorldStateDownloaderTest.java @@ -28,6 +28,7 @@ import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.RespondingEthPeer.Responder; +import tech.pegasys.pantheon.ethereum.mainnet.MainnetProtocolSchedule; import tech.pegasys.pantheon.ethereum.storage.keyvalue.KeyValueStorageWorldStateStorage; import tech.pegasys.pantheon.ethereum.trie.MerklePatriciaTrie; import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive; @@ -174,11 +175,26 @@ public void canRecoverFromTimeouts() { assertAccountsMatch(localWorldState, accounts); } + @Test + public void handlesPartialResponsesFromNetwork() { + downloadAvailableWorldStateFromPeers(5, 100, 10, 10, this::respondPartially); + } + private void downloadAvailableWorldStateFromPeers( final int peerCount, final int accountCount, final int hashesPerRequest, final int maxOutstandingRequests) { + downloadAvailableWorldStateFromPeers( + peerCount, accountCount, hashesPerRequest, maxOutstandingRequests, this::respondFully); + } + + private void downloadAvailableWorldStateFromPeers( + final int peerCount, + final int accountCount, + final int hashesPerRequest, + final int maxOutstandingRequests, + final NetworkResponder networkResponder) { final EthProtocolManager ethProtocolManager = EthProtocolManagerTestUtil.create(); final int trailingPeerCount = 5; BlockDataGenerator dataGen = new BlockDataGenerator(1); @@ -237,12 +253,15 @@ private void downloadAvailableWorldStateFromPeers( CompletableFuture result = downloader.run(header); // Respond to node data requests - Responder responder = + // Send one round of full responses, so that we can get multiple requests queued up + Responder fullResponder = RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); - while (!result.isDone()) { - for (RespondingEthPeer peer : usefulPeers) { - peer.respond(responder); - } + for (RespondingEthPeer peer : usefulPeers) { + peer.respond(fullResponder); + } + // Respond to remaining queued requests in custom way + if (!result.isDone()) { + networkResponder.respond(usefulPeers, remoteWorldStateArchive, result); } // Check that trailing peers were not queried for data @@ -262,6 +281,57 @@ private void downloadAvailableWorldStateFromPeers( } } + private void respondFully( + final List peers, + final WorldStateArchive remoteWorldStateArchive, + final CompletableFuture downloaderFuture) { + Responder responder = + RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); + while (!downloaderFuture.isDone()) { + for (RespondingEthPeer peer : peers) { + peer.respond(responder); + } + } + } + + private void respondPartially( + final List peers, + final WorldStateArchive remoteWorldStateArchive, + final CompletableFuture downloaderFuture) { + Responder fullResponder = + RespondingEthPeer.blockchainResponder(mock(Blockchain.class), remoteWorldStateArchive); + Responder partialResponder = + RespondingEthPeer.partialResponder( + mock(Blockchain.class), remoteWorldStateArchive, MainnetProtocolSchedule.create(), .5f); + Responder emptyResponder = RespondingEthPeer.emptyResponder(); + + // Send a few partial responses + for (int i = 0; i < 5; i++) { + for (RespondingEthPeer peer : peers) { + peer.respond(partialResponder); + } + } + + // Downloader should not complete with partial responses + assertThat(downloaderFuture).isNotDone(); + + // Send a few empty responses + for (int i = 0; i < 3; i++) { + for (RespondingEthPeer peer : peers) { + peer.respond(emptyResponder); + } + } + + // Downloader should not complete with empty responses + assertThat(downloaderFuture).isNotDone(); + + while (!downloaderFuture.isDone()) { + for (RespondingEthPeer peer : peers) { + peer.respond(fullResponder); + } + } + } + private void assertAccountsMatch( final WorldState worldState, final List expectedAccounts) { for (Account expectedAccount : expectedAccounts) { @@ -277,4 +347,12 @@ private void assertAccountsMatch( assertThat(actualStorage).isEqualTo(expectedStorage); } } + + @FunctionalInterface + private interface NetworkResponder { + void respond( + final List peers, + final WorldStateArchive remoteWorldStateArchive, + final CompletableFuture downloaderFuture); + } }