diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d1c453735a..47d304928f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Additions and Improvements - Upgrade besu-native to 0.6.0 and use Blake2bf native implementation if available by default [#4264](https://github.com/hyperledger/besu/pull/4264) - Better management of jemalloc presence/absence in startup script [#4237](https://github.com/hyperledger/besu/pull/4237) +- Retry mechanism when getting a broadcasted block fail on all peers [#4271](https://github.com/hyperledger/besu/pull/4271) - Filter out disconnected peers when fetching available peers [#4269](https://github.com/hyperledger/besu/pull/4269) - Updated the default value of fast-sync-min-peers post merge [#4298](https://github.com/hyperledger/besu/pull/4298) - Log imported block info post merge [#4310](https://github.com/hyperledger/besu/pull/4310) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java index 30033200355..cf48d69847d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java @@ -114,9 +114,9 @@ protected void handleTaskError(final Throwable error) { if (cause instanceof NoAvailablePeersException) { LOG.debug( - "No useful peer found, checking remaining current peers for usefulness: {}", + "No useful peer found, wait max 5 seconds for new peer to connect: current peers {}", ethContext.getEthPeers().peerCount()); - // Wait for new peer to connect + final WaitForPeerTask waitTask = WaitForPeerTask.create(ethContext, metricsSystem); executeSubTask( () -> diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java index aa72844f6d4..20355f474d4 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java @@ -14,6 +14,8 @@ */ package org.hyperledger.besu.ethereum.eth.sync; +import static org.hyperledger.besu.util.FutureUtils.exceptionallyCompose; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; import org.hyperledger.besu.consensus.merge.ForkchoiceMessageListener; @@ -23,6 +25,7 @@ import org.hyperledger.besu.ethereum.chain.BlockAddedEvent; import org.hyperledger.besu.ethereum.chain.BlockAddedEvent.EventType; import org.hyperledger.besu.ethereum.chain.Blockchain; +import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Difficulty; @@ -46,6 +49,7 @@ import org.hyperledger.besu.ethereum.rlp.RLPException; import org.hyperledger.besu.plugin.services.MetricsSystem; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -53,8 +57,10 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -76,12 +82,9 @@ public class BlockPropagationManager implements ForkchoiceMessageListener { private final BlockBroadcaster blockBroadcaster; private final AtomicBoolean started = new AtomicBoolean(false); - - private final Set importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final Set requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final Set requestedNonAnnouncedBlocks = - Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final ProcessingBlocksManager processingBlocksManager; private final PendingBlocksManager pendingBlocksManager; + private final Duration getBlockTimeoutMillis; private Optional onBlockAddedSId = Optional.empty(); private Optional newBlockSId; private Optional newBlockHashesSId; @@ -95,6 +98,28 @@ public class BlockPropagationManager implements ForkchoiceMessageListener { final PendingBlocksManager pendingBlocksManager, final MetricsSystem metricsSystem, final BlockBroadcaster blockBroadcaster) { + this( + config, + protocolSchedule, + protocolContext, + ethContext, + syncState, + pendingBlocksManager, + metricsSystem, + blockBroadcaster, + new ProcessingBlocksManager()); + } + + BlockPropagationManager( + final SynchronizerConfiguration config, + final ProtocolSchedule protocolSchedule, + final ProtocolContext protocolContext, + final EthContext ethContext, + final SyncState syncState, + final PendingBlocksManager pendingBlocksManager, + final MetricsSystem metricsSystem, + final BlockBroadcaster blockBroadcaster, + final ProcessingBlocksManager processingBlocksManager) { this.config = config; this.protocolSchedule = protocolSchedule; this.protocolContext = protocolContext; @@ -104,6 +129,9 @@ public class BlockPropagationManager implements ForkchoiceMessageListener { this.syncState = syncState; this.pendingBlocksManager = pendingBlocksManager; this.syncState.subscribeTTDReached(this::reactToTTDReachedEvent); + this.getBlockTimeoutMillis = + Duration.ofMillis(config.getPropagationManagerGetBlockTimeoutMillis()); + this.processingBlocksManager = processingBlocksManager; } public void start() { @@ -156,6 +184,7 @@ private void clearListeners() { private void onBlockAdded(final BlockAddedEvent blockAddedEvent) { // Check to see if any of our pending blocks are now ready for import final Block newBlock = blockAddedEvent.getBlock(); + traceLambda( LOG, "Block added event type {} for block {}. Current status {}", @@ -247,7 +276,7 @@ private void maybeProcessNonAnnouncedBlocks(final Block newBlock) { if (distance < config.getBlockPropagationRange().upperEndpoint() && minAnnouncedBlockNumber > firstNonAnnouncedBlockNumber) { - if (requestedNonAnnouncedBlocks.add(firstNonAnnouncedBlockNumber)) { + if (processingBlocksManager.addNonAnnouncedBlocks(firstNonAnnouncedBlockNumber)) { retrieveNonAnnouncedBlock(firstNonAnnouncedBlockNumber); } } @@ -338,7 +367,7 @@ private void handleNewBlockHashesFromNetwork(final EthMessage message) { LOG.trace("New block hash from network {} is already pending", announcedBlock); continue; } - if (importingBlocks.contains(announcedBlock.hash())) { + if (processingBlocksManager.alreadyImporting(announcedBlock.hash())) { LOG.trace("New block hash from network {} is already importing", announcedBlock); continue; } @@ -346,7 +375,7 @@ private void handleNewBlockHashesFromNetwork(final EthMessage message) { LOG.trace("New block hash from network {} was already imported", announcedBlock); continue; } - if (requestedBlocks.add(announcedBlock.hash())) { + if (processingBlocksManager.addRequestedBlock(announcedBlock.hash())) { newBlocks.add(announcedBlock); } else { LOG.trace("New block hash from network {} was already requested", announcedBlock); @@ -379,7 +408,7 @@ private CompletableFuture processAnnouncedBlock( private void requestParentBlock(final Block block) { final BlockHeader blockHeader = block.getHeader(); - if (requestedBlocks.add(blockHeader.getParentHash())) { + if (processingBlocksManager.addRequestedBlock(blockHeader.getParentHash())) { retrieveParentBlock(blockHeader); } else { LOG.debug("Parent block with hash {} is already requested", blockHeader.getParentHash()); @@ -397,34 +426,115 @@ private CompletableFuture retrieveParentBlock(final BlockHeader blockHead private CompletableFuture getBlockFromPeers( final Optional preferredPeer, final long blockNumber, - final Optional blockHash) { + final Optional maybeBlockHash) { + return repeatableGetBlockFromPeer(preferredPeer, blockNumber, maybeBlockHash) + .whenComplete( + (block, throwable) -> { + if (block != null) { + debugLambda(LOG, "Successfully retrieved block {}", block::toLogString); + processingBlocksManager.registerReceivedBlock(block); + } else { + if (throwable != null) { + LOG.warn( + "Failed to retrieve block " + + logBlockNumberMaybeHash(blockNumber, maybeBlockHash), + throwable); + } else { + // this could happen if we give up at some point since we find that it make no + // sense to retry + debugLambda( + LOG, + "Block {} not retrieved", + () -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash)); + } + processingBlocksManager.registerFailedGetBlock(blockNumber, maybeBlockHash); + } + }); + } + + private CompletableFuture repeatableGetBlockFromPeer( + final Optional preferredPeer, + final long blockNumber, + final Optional maybeBlockHash) { + return exceptionallyCompose( + scheduleGetBlockFromPeers(preferredPeer, blockNumber, maybeBlockHash), + handleGetBlockErrors(blockNumber, maybeBlockHash)) + .thenCompose(r -> maybeRepeatGetBlock(blockNumber, maybeBlockHash)); + } + + private Function> handleGetBlockErrors( + final long blockNumber, final Optional maybeBlockHash) { + return throwable -> { + debugLambda( + LOG, + "Temporary failure retrieving block {} from peers with error {}", + () -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash), + throwable::toString); + return CompletableFuture.completedFuture(null); + }; + } + + private CompletableFuture maybeRepeatGetBlock( + final long blockNumber, final Optional maybeBlockHash) { + final MutableBlockchain blockchain = protocolContext.getBlockchain(); + final Optional maybeBlock = + maybeBlockHash + .map(hash -> blockchain.getBlockByHash(hash)) + .orElseGet(() -> blockchain.getBlockByNumber(blockNumber)); + + // check if we got this block by other means + if (maybeBlock.isPresent()) { + final Block block = maybeBlock.get(); + debugLambda( + LOG, "No need to retry to get block {} since it is already present", block::toLogString); + return CompletableFuture.completedFuture(block); + } + + final long localChainHeight = blockchain.getChainHeadBlockNumber(); + final long bestChainHeight = syncState.bestChainHeight(localChainHeight); + if (!shouldImportBlockAtHeight(blockNumber, localChainHeight, bestChainHeight)) { + debugLambda( + LOG, + "Not retrying to get block {} since we are too far from local chain head {}", + () -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash), + blockchain.getChainHead()::toLogString); + return CompletableFuture.completedFuture(null); + } + + debugLambda( + LOG, + "Retrying to get block {}", + () -> logBlockNumberMaybeHash(blockNumber, maybeBlockHash)); + + return ethContext + .getScheduler() + .scheduleSyncWorkerTask( + () -> repeatableGetBlockFromPeer(Optional.empty(), blockNumber, maybeBlockHash)); + } + + private CompletableFuture scheduleGetBlockFromPeers( + final Optional maybePreferredPeer, + final long blockNumber, + final Optional maybeBlockHash) { final RetryingGetBlockFromPeersTask getBlockTask = RetryingGetBlockFromPeersTask.create( protocolSchedule, ethContext, metricsSystem, - ethContext.getEthPeers().getMaxPeers(), - blockHash, + Math.max(1, ethContext.getEthPeers().peerCount()), + maybeBlockHash, blockNumber); - preferredPeer.ifPresent(getBlockTask::assignPeer); + maybePreferredPeer.ifPresent(getBlockTask::assignPeer); - return ethContext - .getScheduler() - .scheduleSyncWorkerTask(getBlockTask::run) - .thenCompose(r -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId())) - .whenComplete( - (r, t) -> { - requestedNonAnnouncedBlocks.remove(blockNumber); - blockHash.ifPresentOrElse( - requestedBlocks::remove, - () -> { - if (r != null) { - // in case we successfully retrieved only by block number, when can remove - // the request by hash too - requestedBlocks.remove(r.getHash()); - } - }); - }); + var future = + ethContext + .getScheduler() + .scheduleSyncWorkerTask(getBlockTask::run) + .thenCompose(r -> importOrSavePendingBlock(r.getResult(), r.getPeer().nodeId())); + + ethContext.getScheduler().failAfterTimeout(future, getBlockTimeoutMillis); + + return future; } private void broadcastBlock(final Block block, final BlockHeader parent) { @@ -453,14 +563,14 @@ CompletableFuture importOrSavePendingBlock(final Block block, final Bytes return CompletableFuture.completedFuture(block); } - if (!importingBlocks.add(block.getHash())) { + if (!processingBlocksManager.addImportingBlock(block.getHash())) { traceLambda(LOG, "We're already importing this block {}", block::toLogString); return CompletableFuture.completedFuture(block); } if (protocolContext.getBlockchain().contains(block.getHash())) { traceLambda(LOG, "We've already imported this block {}", block::toLogString); - importingBlocks.remove(block.getHash()); + processingBlocksManager.registerBlockImportDone(block.getHash()); return CompletableFuture.completedFuture(block); } @@ -537,7 +647,7 @@ private CompletableFuture validateAndProcessPendingBlock( ethContext.getScheduler().scheduleSyncWorkerTask(() -> broadcastBlock(block, parent)); return runImportTask(block); } else { - importingBlocks.remove(block.getHash()); + processingBlocksManager.registerBlockImportDone(block.getHash()); badBlockManager.addBadBlock(block); LOG.warn("Failed to import announced block {}", block.toLogString()); return CompletableFuture.completedFuture(block); @@ -557,7 +667,7 @@ private CompletableFuture runImportTask(final Block block) { .run() .whenComplete( (result, throwable) -> { - importingBlocks.remove(block.getHash()); + processingBlocksManager.registerBlockImportDone(block.getHash()); if (throwable != null) { LOG.warn("Failed to import announced block {}", block.toLogString()); } @@ -591,17 +701,17 @@ private void reactToTTDReachedEvent(final boolean ttdReached) { @Override public String toString() { return "BlockPropagationManager{" - + "requestedBlocks=" - + requestedBlocks - + ", requestedNonAnnounceBlocks=" - + requestedNonAnnouncedBlocks - + ", importingBlocks=" - + importingBlocks + + processingBlocksManager + ", pendingBlocksManager=" + pendingBlocksManager + '}'; } + private String logBlockNumberMaybeHash( + final long blockNumber, final Optional maybeBlockHash) { + return blockNumber + maybeBlockHash.map(h -> " (" + h + ")").orElse(""); + } + @Override public void onNewForkchoiceMessage( final Hash headBlockHash, @@ -611,4 +721,54 @@ public void onNewForkchoiceMessage( stop(); } } + + static class ProcessingBlocksManager { + private final Set importingBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set requestedBlocks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set requestedNonAnnouncedBlocks = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + + boolean addRequestedBlock(final Hash hash) { + return requestedBlocks.add(hash); + } + + public boolean addNonAnnouncedBlocks(final long blockNumber) { + return requestedNonAnnouncedBlocks.add(blockNumber); + } + + public boolean alreadyImporting(final Hash hash) { + return importingBlocks.contains(hash); + } + + public synchronized void registerReceivedBlock(final Block block) { + requestedBlocks.remove(block.getHash()); + requestedNonAnnouncedBlocks.remove(block.getHeader().getNumber()); + } + + public synchronized void registerFailedGetBlock( + final long blockNumber, final Optional maybeBlockHash) { + requestedNonAnnouncedBlocks.remove(blockNumber); + maybeBlockHash.ifPresent(requestedBlocks::remove); + } + + public boolean addImportingBlock(final Hash hash) { + return importingBlocks.add(hash); + } + + public void registerBlockImportDone(final Hash hash) { + importingBlocks.remove(hash); + } + + @Override + public synchronized String toString() { + return "ProcessingBlocksManager{" + + "importingBlocks=" + + importingBlocks + + ", requestedBlocks=" + + requestedBlocks + + ", requestedNonAnnouncedBlocks=" + + requestedNonAnnouncedBlocks + + '}'; + } + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SynchronizerConfiguration.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SynchronizerConfiguration.java index e18590a774c..4077ec5f6f9 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SynchronizerConfiguration.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/SynchronizerConfiguration.java @@ -47,6 +47,8 @@ public class SynchronizerConfiguration { public static final int DEFAULT_COMPUTATION_PARALLELISM = 2; public static final int DEFAULT_WORLD_STATE_TASK_CACHE_SIZE = CachingTaskCollection.DEFAULT_CACHE_SIZE; + public static final long DEFAULT_PROPAGATION_MANAGER_GET_BLOCK_TIMEOUT_MILLIS = + TimeUnit.SECONDS.toMillis(60); // Fast sync config private final int fastSyncPivotDistance; @@ -77,6 +79,7 @@ public class SynchronizerConfiguration { private final int computationParallelism; private final int maxTrailingPeers; private final long worldStateMinMillisBeforeStalling; + private final long propagationManagerGetBlockTimeoutMillis; private SynchronizerConfiguration( final int fastSyncPivotDistance, @@ -98,7 +101,8 @@ private SynchronizerConfiguration( final int downloaderParallelism, final int transactionsParallelism, final int computationParallelism, - final int maxTrailingPeers) { + final int maxTrailingPeers, + final long propagationManagerGetBlockTimeoutMillis) { this.fastSyncPivotDistance = fastSyncPivotDistance; this.fastSyncFullValidationRate = fastSyncFullValidationRate; this.fastSyncMinimumPeerCount = fastSyncMinimumPeerCount; @@ -119,6 +123,7 @@ private SynchronizerConfiguration( this.transactionsParallelism = transactionsParallelism; this.computationParallelism = computationParallelism; this.maxTrailingPeers = maxTrailingPeers; + this.propagationManagerGetBlockTimeoutMillis = propagationManagerGetBlockTimeoutMillis; } public static Builder builder() { @@ -234,6 +239,10 @@ public int getMaxTrailingPeers() { return maxTrailingPeers; } + public long getPropagationManagerGetBlockTimeoutMillis() { + return propagationManagerGetBlockTimeoutMillis; + } + public static class Builder { private SyncMode syncMode = SyncMode.FULL; private int fastSyncMinimumPeerCount = DEFAULT_FAST_SYNC_MINIMUM_PEERS; @@ -260,6 +269,9 @@ public static class Builder { private long worldStateMinMillisBeforeStalling = DEFAULT_WORLD_STATE_MIN_MILLIS_BEFORE_STALLING; private int worldStateTaskCacheSize = DEFAULT_WORLD_STATE_TASK_CACHE_SIZE; + private long propagationManagerGetBlockTimeoutMillis = + DEFAULT_PROPAGATION_MANAGER_GET_BLOCK_TIMEOUT_MILLIS; + public Builder fastSyncPivotDistance(final int distance) { fastSyncPivotDistance = distance; return this; @@ -371,6 +383,12 @@ public Builder maxTrailingPeers(final int maxTailingPeers) { return this; } + public Builder propagationManagerGetBlockTimeoutMillis( + final long propagationManagerGetBlockTimeoutMillis) { + this.propagationManagerGetBlockTimeoutMillis = propagationManagerGetBlockTimeoutMillis; + return this; + } + public SynchronizerConfiguration build() { return new SynchronizerConfiguration( fastSyncPivotDistance, @@ -392,7 +410,8 @@ public SynchronizerConfiguration build() { downloaderParallelism, transactionsParallelism, computationParallelism, - maxTrailingPeers); + maxTrailingPeers, + propagationManagerGetBlockTimeoutMillis); } } } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java index 3f15a68cd99..39666aebb54 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java @@ -50,6 +50,7 @@ import org.hyperledger.besu.ethereum.eth.messages.EthPV62; import org.hyperledger.besu.ethereum.eth.messages.NewBlockHashesMessage; import org.hyperledger.besu.ethereum.eth.messages.NewBlockMessage; +import org.hyperledger.besu.ethereum.eth.sync.BlockPropagationManager.ProcessingBlocksManager; import org.hyperledger.besu.ethereum.eth.sync.state.PendingBlocksManager; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; @@ -87,6 +88,8 @@ public abstract class AbstractBlockPropagationManagerTest { spy( new PendingBlocksManager( SynchronizerConfiguration.builder().blockPropagationRange(-10, 30).build())); + protected final ProcessingBlocksManager processingBlocksManager = + spy(new ProcessingBlocksManager()); protected SyncState syncState; protected final MetricsSystem metricsSystem = new NoOpMetricsSystem(); private final Hash finalizedHash = Hash.fromHexStringLenient("0x1337"); @@ -119,7 +122,8 @@ protected void setup(final DataStorageFormat dataStorageFormat) { syncState, pendingBlocksManager, metricsSystem, - blockBroadcaster); + blockBroadcaster, + processingBlocksManager); } @Test @@ -933,7 +937,7 @@ public void shouldNotListenToBlockAddedEventsWhenTTDReachedAndFinal() { } @Test - public void shouldRequestBlockAgainIfFirstGetBlockFails() { + public void shouldRequestBlockFromOtherPeersIfFirstPeerFails() { blockchainUtil.importFirstBlocks(2); final Block nextBlock = blockchainUtil.getBlock(2); @@ -956,15 +960,18 @@ public void shouldRequestBlockAgainIfFirstGetBlockFails() { assertThat(blockchain.contains(nextBlock.getHash())).isFalse(); - // Re-broadcast the previous message and peer responds + // second peer responds final RespondingEthPeer secondPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); - EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, secondPeer, nextAnnouncement); final Responder goodResponder = RespondingEthPeer.blockchainResponder(getFullBlockchain()); secondPeer.respondWhile(goodResponder, secondPeer::hasOutstandingRequests); assertThat(blockchain.contains(nextBlock.getHash())).isTrue(); + verify(processingBlocksManager).addRequestedBlock(nextBlock.getHash()); + verify(processingBlocksManager).addImportingBlock(nextBlock.getHash()); + verify(processingBlocksManager).registerReceivedBlock(nextBlock); + verify(processingBlocksManager).registerBlockImportDone(nextBlock.getHash()); } public abstract Blockchain getFullBlockchain();