From cd21c64aa6555f8ab19e9fa0e2eb8a38642d6369 Mon Sep 17 00:00:00 2001 From: shemnon Date: Mon, 8 Apr 2019 11:42:05 -0600 Subject: [PATCH] Reduce memory usage in import There is no need to keep entire blocks during import after they have been imported. Keep just the hashes instead. --- .../ethereum/eth/sync/BlockHandler.java | 9 ++-- .../eth/sync/EthTaskChainDownloader.java | 8 ++-- .../sync/fastsync/FastSyncBlockHandler.java | 6 +++ .../FastSyncBlockImportTaskFactory.java | 11 ++--- .../sync/fullsync/FullSyncBlockHandler.java | 6 +++ .../FullSyncBlockImportTaskFactory.java | 11 ++--- .../eth/sync/tasks/ImportBlocksTask.java | 10 ++++- .../tasks/ParallelImportChainSegmentTask.java | 7 +-- .../ParallelValidateAndImportBodiesTask.java | 13 ++++-- .../eth/sync/tasks/ImportBlocksTaskTest.java | 45 +++++++++---------- 10 files changed, 74 insertions(+), 52 deletions(-) diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java index 0b36dd1dcf..52433b5e07 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/BlockHandler.java @@ -13,16 +13,19 @@ package tech.pegasys.pantheon.ethereum.eth.sync; import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; import java.util.List; import java.util.concurrent.CompletableFuture; public interface BlockHandler { - CompletableFuture> downloadBlocks(final List headers); + CompletableFuture> downloadBlocks(List headers); - CompletableFuture> validateAndImportBlocks(final List blocks); + CompletableFuture> validateAndImportBlocks(List blocks); - long extractBlockNumber(final B block); + long extractBlockNumber(B block); + + Hash extractBlockHash(B block); CompletableFuture executeParallelCalculations(List blocks); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/EthTaskChainDownloader.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/EthTaskChainDownloader.java index 62eda51c84..2b0c5638ed 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/EthTaskChainDownloader.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/EthTaskChainDownloader.java @@ -14,8 +14,8 @@ import static java.util.Collections.emptyList; -import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.exceptions.EthTaskException; @@ -205,13 +205,13 @@ private void clearSyncTarget(final SyncTarget syncTarget) { syncState.clearSyncTarget(); } - private CompletableFuture> importBlocks(final List checkpointHeaders) { + private CompletableFuture> importBlocks(final List checkpointHeaders) { if (checkpointHeaders.isEmpty()) { // No checkpoints to download return CompletableFuture.completedFuture(emptyList()); } - final CompletableFuture> importedBlocks = + final CompletableFuture> importedBlocks = blockImportTaskFactory.importBlocksForCheckpoints(checkpointHeaders); return importedBlocks.whenComplete( @@ -261,7 +261,7 @@ private CompletableFuture> importBlocks(final List chec } public interface BlockImportTaskFactory { - CompletableFuture> importBlocksForCheckpoints( + CompletableFuture> importBlocksForCheckpoints( final List checkpointHeaders); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java index b40c397422..f2ddfcdce3 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockHandler.java @@ -19,6 +19,7 @@ import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockHeader; import tech.pegasys.pantheon.ethereum.core.BlockImporter; +import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; @@ -129,6 +130,11 @@ public long extractBlockNumber(final BlockWithReceipts blockWithReceipt) { return blockWithReceipt.getHeader().getNumber(); } + @Override + public Hash extractBlockHash(final BlockWithReceipts block) { + return block.getHash(); + } + @Override public CompletableFuture executeParallelCalculations(final List blocks) { return CompletableFuture.completedFuture(null); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockImportTaskFactory.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockImportTaskFactory.java index def15fb28f..db11943743 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockImportTaskFactory.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fastsync/FastSyncBlockImportTaskFactory.java @@ -15,8 +15,8 @@ import static java.util.Collections.emptyList; import tech.pegasys.pantheon.ethereum.ProtocolContext; -import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.sync.EthTaskChainDownloader.BlockImportTaskFactory; import tech.pegasys.pantheon.ethereum.eth.sync.SynchronizerConfiguration; @@ -30,7 +30,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; class FastSyncBlockImportTaskFactory implements BlockImportTaskFactory { @@ -61,7 +60,7 @@ class FastSyncBlockImportTaskFactory implements BlockImportTaskFactory { } @Override - public CompletableFuture> importBlocksForCheckpoints( + public CompletableFuture> importBlocksForCheckpoints( final List checkpointHeaders) { if (checkpointHeaders.size() < 2) { return CompletableFuture.completedFuture(emptyList()); @@ -94,10 +93,6 @@ public CompletableFuture> importBlocksForCheckpoints( detatchedValidationPolicy, checkpointHeaders, metricsSystem); - return importTask - .run() - .thenApply( - results -> - results.stream().map(BlockWithReceipts::getBlock).collect(Collectors.toList())); + return importTask.run(); } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java index dd9282f577..10d86ef4d6 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockHandler.java @@ -15,6 +15,7 @@ import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.core.Transaction; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; @@ -77,6 +78,11 @@ public long extractBlockNumber(final Block block) { return block.getHeader().getNumber(); } + @Override + public Hash extractBlockHash(final Block block) { + return block.getHash(); + } + @Override public CompletableFuture executeParallelCalculations(final List blocks) { final EthScheduler ethScheduler = ethContext.getScheduler(); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockImportTaskFactory.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockImportTaskFactory.java index 5c42bab241..35fba43b8e 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockImportTaskFactory.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/fullsync/FullSyncBlockImportTaskFactory.java @@ -15,6 +15,7 @@ import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import tech.pegasys.pantheon.ethereum.eth.sync.EthTaskChainDownloader.BlockImportTaskFactory; @@ -50,9 +51,9 @@ class FullSyncBlockImportTaskFactory implements BlockImportTaskFactory { } @Override - public CompletableFuture> importBlocksForCheckpoints( + public CompletableFuture> importBlocksForCheckpoints( final List checkpointHeaders) { - final CompletableFuture> importedBlocks; + final CompletableFuture> importedHashes; if (checkpointHeaders.size() < 2) { // Download blocks without constraining the end block final ImportBlocksTask importTask = @@ -63,7 +64,7 @@ public CompletableFuture> importBlocksForCheckpoints( checkpointHeaders.get(0), config.downloaderChainSegmentSize(), metricsSystem); - importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult); + importedHashes = importTask.run().thenApply(PeerTaskResult::getResult); } else { final ParallelImportChainSegmentTask importTask = ParallelImportChainSegmentTask.forCheckpoints( @@ -76,8 +77,8 @@ public CompletableFuture> importBlocksForCheckpoints( () -> HeaderValidationMode.DETACHED_ONLY, checkpointHeaders, metricsSystem); - importedBlocks = importTask.run(); + importedHashes = importTask.run(); } - return importedBlocks; + return importedHashes; } } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java index 4ed5a21e06..01dbb73de9 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTask.java @@ -15,6 +15,7 @@ import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPeerTask; @@ -29,6 +30,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -38,7 +40,7 @@ * * @param the consensus algorithm context */ -public class ImportBlocksTask extends AbstractPeerTask> { +public class ImportBlocksTask extends AbstractPeerTask> { private static final Logger LOG = LogManager.getLogger(); private final ProtocolContext protocolContext; @@ -92,7 +94,11 @@ protected void executeTaskWithPeer(final EthPeer peer) throws PeerNotConnected { result.get().completeExceptionally(t); } else { LOG.debug("Import from block {} succeeded.", startNumber); - result.get().complete(new PeerTaskResult<>(peer, r)); + result + .get() + .complete( + new PeerTaskResult<>( + peer, r.stream().map(Block::getHash).collect(Collectors.toList()))); } }); } diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java index b54356f9fa..eb7bd2ece6 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelImportChainSegmentTask.java @@ -14,6 +14,7 @@ import tech.pegasys.pantheon.ethereum.ProtocolContext; import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.EthContext; import tech.pegasys.pantheon.ethereum.eth.manager.EthScheduler; import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractEthTask; @@ -34,7 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class ParallelImportChainSegmentTask extends AbstractEthTask> { +public class ParallelImportChainSegmentTask extends AbstractEthTask> { private static final Logger LOG = LogManager.getLogger(); private final EthContext ethContext; @@ -149,7 +150,7 @@ protected void executeTask() { final CompletableFuture extractTxSignaturesFuture = scheduler.scheduleServiceTask(extractTxSignaturesTask); registerSubTask(extractTxSignaturesFuture); - final CompletableFuture>> validateBodiesFuture = + final CompletableFuture>> validateBodiesFuture = scheduler.scheduleServiceTask(validateAndImportBodiesTask); registerSubTask(validateBodiesFuture); @@ -182,7 +183,7 @@ protected void executeTask() { cancelOnException.accept(null, e); } else if (r != null) { try { - final List importedBlocks = + final List importedBlocks = validateBodiesFuture.get().stream() .flatMap(Collection::stream) .collect(Collectors.toList()); diff --git a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java index 5dd357c07a..0f1876beea 100644 --- a/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java +++ b/ethereum/eth/src/main/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ParallelValidateAndImportBodiesTask.java @@ -12,6 +12,7 @@ */ package tech.pegasys.pantheon.ethereum.eth.sync.tasks; +import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.eth.manager.task.AbstractPipelinedTask; import tech.pegasys.pantheon.ethereum.eth.sync.BlockHandler; import tech.pegasys.pantheon.metrics.MetricsSystem; @@ -21,12 +22,13 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class ParallelValidateAndImportBodiesTask - extends AbstractPipelinedTask, List> { + extends AbstractPipelinedTask, List> { private static final Logger LOG = LogManager.getLogger(); private final BlockHandler blockHandler; @@ -42,7 +44,7 @@ public class ParallelValidateAndImportBodiesTask } @Override - protected Optional> processStep( + protected Optional> processStep( final List blocks, final Optional> previousBlocks) { final long firstBlock = blockHandler.extractBlockNumber(blocks.get(0)); final long lastBlock = blockHandler.extractBlockNumber(blocks.get(blocks.size() - 1)); @@ -50,9 +52,12 @@ protected Optional> processStep( final CompletableFuture> importedBlocksFuture = blockHandler.validateAndImportBlocks(blocks); try { - final List downloadedBlocks = importedBlocksFuture.get(); + final List downloadedHashes = + importedBlocksFuture.get().stream() + .map(blockHandler::extractBlockHash) + .collect(Collectors.toList()); LOG.info("Completed importing chain segment {} to {}", firstBlock, lastBlock); - return Optional.of(downloadedBlocks); + return Optional.of(downloadedHashes); } catch (final InterruptedException | ExecutionException e) { failExceptionally(e); return Optional.empty(); diff --git a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java index 4aa77cd1ff..b7579c3873 100644 --- a/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java +++ b/ethereum/eth/src/test/java/tech/pegasys/pantheon/ethereum/eth/sync/tasks/ImportBlocksTaskTest.java @@ -21,6 +21,7 @@ import tech.pegasys.pantheon.ethereum.core.Block; import tech.pegasys.pantheon.ethereum.core.BlockBody; import tech.pegasys.pantheon.ethereum.core.BlockHeader; +import tech.pegasys.pantheon.ethereum.core.Hash; import tech.pegasys.pantheon.ethereum.core.TransactionReceipt; import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer; import tech.pegasys.pantheon.ethereum.eth.manager.EthProtocolManagerTestUtil; @@ -39,15 +40,15 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import com.google.common.collect.Lists; import org.junit.Test; public class ImportBlocksTaskTest - extends AbstractMessageTaskTest, PeerTaskResult>> { + extends AbstractMessageTaskTest, PeerTaskResult>> { @Override protected List generateDataToBeRequested() { @@ -64,7 +65,7 @@ protected List generateDataToBeRequested() { } @Override - protected EthTask>> createTask(final List requestedData) { + protected EthTask>> createTask(final List requestedData) { final Block firstBlock = requestedData.get(0); final MutableBlockchain shortBlockchain = createShortChain(firstBlock.getHeader().getNumber() - 1); @@ -85,15 +86,15 @@ protected EthTask>> createTask(final List requ @Override protected void assertResultMatchesExpectation( final List requestedData, - final PeerTaskResult> response, + final PeerTaskResult> response, final EthPeer respondingPeer) { - assertThat(response.getResult()).isEqualTo(requestedData); + assertThat(response.getResult()) + .isEqualTo(requestedData.stream().map(Block::getHash).collect(Collectors.toList())); assertThat(response.getPeer()).isEqualTo(respondingPeer); } @Test - public void completesWhenPeerReturnsPartialResult() - throws ExecutionException, InterruptedException { + public void completesWhenPeerReturnsPartialResult() { // Respond with some headers and all corresponding bodies final Responder fullResponder = RespondingEthPeer.blockchainResponder(blockchain); @@ -116,12 +117,14 @@ public void completesWhenPeerReturnsPartialResult() final RespondingEthPeer peer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager); // Execute task - final AtomicReference> actualResult = new AtomicReference<>(); + final AtomicReference> actualResult = new AtomicReference<>(); final AtomicReference actualPeer = new AtomicReference<>(); final AtomicBoolean done = new AtomicBoolean(false); final List requestedData = generateDataToBeRequested(); - final EthTask>> task = createTask(requestedData); - final CompletableFuture>> future = task.run(); + final List requestedHashes = + requestedData.stream().map(Block::getHash).collect(Collectors.toList()); + final EthTask>> task = createTask(requestedData); + final CompletableFuture>> future = task.run(); future.whenComplete( (response, error) -> { actualResult.set(response.getResult()); @@ -135,15 +138,14 @@ public void completesWhenPeerReturnsPartialResult() assertThat(done).isTrue(); assertThat(actualPeer.get()).isEqualTo(peer.getEthPeer()); assertThat(actualResult.get().size()).isLessThan(requestedData.size()); - for (final Block block : actualResult.get()) { - assertThat(requestedData).contains(block); - assertThat(blockchain.contains(block.getHash())).isTrue(); + for (final Hash hash : actualResult.get()) { + assertThat(requestedHashes).contains(hash); + assertThat(blockchain.contains(hash)).isTrue(); } } @Test - public void completesWhenPeersSendEmptyResponses() - throws ExecutionException, InterruptedException { + public void completesWhenPeersSendEmptyResponses() { // Setup a unresponsive peer final Responder responder = RespondingEthPeer.emptyResponder(); final RespondingEthPeer respondingEthPeer = @@ -152,13 +154,10 @@ public void completesWhenPeersSendEmptyResponses() // Execute task and wait for response final AtomicBoolean done = new AtomicBoolean(false); final List requestedData = generateDataToBeRequested(); - final EthTask>> task = createTask(requestedData); - final CompletableFuture>> future = task.run(); + final EthTask>> task = createTask(requestedData); + final CompletableFuture>> future = task.run(); respondingEthPeer.respondWhile(responder, () -> !future.isDone()); - future.whenComplete( - (response, error) -> { - done.compareAndSet(false, true); - }); + future.whenComplete((response, error) -> done.compareAndSet(false, true)); assertThat(future.isDone()).isTrue(); assertThat(future.isCompletedExceptionally()).isFalse(); } @@ -172,8 +171,8 @@ public void shouldNotRequestDataFromPeerBelowExpectedHeight() { // Execute task and wait for response final List requestedData = generateDataToBeRequested(); - final EthTask>> task = createTask(requestedData); - final CompletableFuture>> future = task.run(); + final EthTask>> task = createTask(requestedData); + final CompletableFuture>> future = task.run(); respondingEthPeer.respondWhile(responder, () -> !future.isDone()); assertThat(future.isDone()).isTrue(); assertThat(future.isCompletedExceptionally()).isTrue();