Skip to content

Commit

Permalink
[NC-2138] Extract out generic parts of Downloader (PegaSysEng#659)
Browse files Browse the repository at this point in the history
Separate the management of sync target and actual import from the rest of the Downloader logic in preparation for introducing a fast sync chain downloader.
  • Loading branch information
ajsutton authored and vinistevam committed Jan 29, 2019
1 parent 56bfdb1 commit 641e5bb
Show file tree
Hide file tree
Showing 8 changed files with 404 additions and 154 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 ConsenSys AG.
* Copyright 2019 ConsenSys AG.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
Expand All @@ -17,26 +17,19 @@
import tech.pegasys.pantheon.ethereum.core.Block;
import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.AbstractPeerTask.PeerTaskResult;
import tech.pegasys.pantheon.ethereum.eth.manager.ChainState;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeers;
import tech.pegasys.pantheon.ethereum.eth.manager.EthTask;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncTarget;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.DetermineCommonAncestorTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.GetHeadersFromPeerByHashTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.ImportBlocksTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.PipelinedImportChainSegmentTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeerTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.WaitForPeersTask;
import tech.pegasys.pantheon.ethereum.eth.sync.tasks.exceptions.InvalidBlockException;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.p2p.wire.messages.DisconnectMessage.DisconnectReason;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.util.ExceptionUtils;
import tech.pegasys.pantheon.util.uint.UInt256;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -50,43 +43,47 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.collect.Lists;
import com.google.common.annotations.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class FullSyncDownloader<C> {
public class ChainDownloader<C> {
private static final Logger LOG = LogManager.getLogger();

private final SynchronizerConfiguration config;
private final ProtocolSchedule<C> protocolSchedule;
private final ProtocolContext<C> protocolContext;
private final EthContext ethContext;
private final SyncState syncState;
private final SyncTargetManager<C> syncTargetManager;
private final BlockImportTaskFactory blockImportTaskFactory;
private final ProtocolSchedule<C> protocolSchedule;
private final LabelledMetric<OperationTimer> ethTasksTimer;

private final Deque<BlockHeader> checkpointHeaders = new ConcurrentLinkedDeque<>();
private int checkpointTimeouts = 0;
private int chainSegmentTimeouts = 0;
private volatile boolean syncTargetDisconnected = false;

private final AtomicBoolean started = new AtomicBoolean(false);
private long syncTargetDisconnectListenerId;
protected CompletableFuture<?> currentTask;
private CompletableFuture<?> currentTask;

FullSyncDownloader(
public ChainDownloader(
final SynchronizerConfiguration config,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final EthContext ethContext,
final SyncState syncState,
final LabelledMetric<OperationTimer> ethTasksTimer) {
final LabelledMetric<OperationTimer> ethTasksTimer,
final SyncTargetManager<C> syncTargetManager,
final BlockImportTaskFactory blockImportTaskFactory) {
this.protocolSchedule = protocolSchedule;
this.ethTasksTimer = ethTasksTimer;
this.config = config;
this.protocolSchedule = protocolSchedule;
this.protocolContext = protocolContext;
this.ethContext = ethContext;

this.syncState = syncState;
this.syncTargetManager = syncTargetManager;
this.blockImportTaskFactory = blockImportTaskFactory;
}

public void start() {
Expand All @@ -98,11 +95,16 @@ public void start() {
}
}

@VisibleForTesting
public CompletableFuture<?> getCurrentTask() {
return currentTask;
}

private CompletableFuture<?> executeDownload() {
// Find target, pull checkpoint headers, import, repeat
currentTask =
waitForPeers()
.thenCompose(r -> findSyncTarget())
.thenCompose(r -> syncTargetManager.findSyncTarget())
.thenCompose(this::pullCheckpointHeaders)
.thenCompose(r -> importBlocks())
.thenCompose(r -> checkSyncTarget())
Expand Down Expand Up @@ -132,75 +134,15 @@ private CompletableFuture<?> waitForPeers() {
return WaitForPeersTask.create(ethContext, 1, ethTasksTimer).run();
}

private CompletableFuture<?> waitForNewPeer() {
return ethContext
.getScheduler()
.timeout(WaitForPeerTask.create(ethContext, ethTasksTimer), Duration.ofSeconds(5));
}

private CompletableFuture<SyncTarget> findSyncTarget() {
final Optional<SyncTarget> maybeSyncTarget = syncState.syncTarget();
if (maybeSyncTarget.isPresent()) {
// Nothing to do
return CompletableFuture.completedFuture(maybeSyncTarget.get());
}

final Optional<EthPeer> maybeBestPeer = ethContext.getEthPeers().bestPeer();
if (!maybeBestPeer.isPresent()) {
LOG.info("No sync target, wait for peers.");
return waitForPeerAndThenSetSyncTarget();
} else {
final EthPeer bestPeer = maybeBestPeer.get();
final long peerHeight = bestPeer.chainState().getEstimatedHeight();
final UInt256 peerTd = bestPeer.chainState().getBestBlock().getTotalDifficulty();
if (peerTd.compareTo(syncState.chainHeadTotalDifficulty()) <= 0
&& peerHeight <= syncState.chainHeadNumber()) {
// We're caught up to our best peer, try again when a new peer connects
LOG.debug("Caught up to best peer: " + bestPeer.chainState().getEstimatedHeight());
return waitForPeerAndThenSetSyncTarget();
}
return DetermineCommonAncestorTask.create(
protocolSchedule,
protocolContext,
ethContext,
bestPeer,
config.downloaderHeaderRequestSize(),
ethTasksTimer)
.run()
.handle((r, t) -> r)
.thenCompose(
(target) -> {
if (target == null) {
return waitForPeerAndThenSetSyncTarget();
}
final SyncTarget syncTarget = syncState.setSyncTarget(bestPeer, target);
LOG.info(
"Found common ancestor with peer {} at block {}", bestPeer, target.getNumber());
syncTargetDisconnectListenerId =
bestPeer.subscribeDisconnect(this::onSyncTargetPeerDisconnect);
return CompletableFuture.completedFuture(syncTarget);
});
}
}

private CompletableFuture<SyncTarget> waitForPeerAndThenSetSyncTarget() {
return waitForNewPeer().handle((r, t) -> r).thenCompose((r) -> findSyncTarget());
}

private void onSyncTargetPeerDisconnect(final EthPeer ethPeer) {
LOG.info("Sync target disconnected: {}", ethPeer);
syncTargetDisconnected = true;
}

private CompletableFuture<Void> checkSyncTarget() {
final Optional<SyncTarget> maybeSyncTarget = syncState.syncTarget();
if (!maybeSyncTarget.isPresent()) {
// Nothing to do
// No sync target, so nothing to check.
return CompletableFuture.completedFuture(null);
}

final SyncTarget syncTarget = maybeSyncTarget.get();
if (shouldSwitchSyncTarget(syncTarget)) {
if (syncTargetManager.shouldSwitchSyncTarget(syncTarget)) {
LOG.info("Better sync target found, clear current sync target: {}.", syncTarget);
clearSyncTarget(syncTarget);
return CompletableFuture.completedFuture(null);
Expand All @@ -218,41 +160,10 @@ private CompletableFuture<Void> checkSyncTarget() {
return CompletableFuture.completedFuture(null);
}

private boolean shouldSwitchSyncTarget(final SyncTarget currentTarget) {
final EthPeer currentPeer = currentTarget.peer();
final ChainState currentPeerChainState = currentPeer.chainState();
final Optional<EthPeer> maybeBestPeer = ethContext.getEthPeers().bestPeer();

return maybeBestPeer
.map(
bestPeer -> {
if (EthPeers.BEST_CHAIN.compare(bestPeer, currentPeer) <= 0) {
// Our current target is better or equal to the best peer
return false;
}
// Require some threshold to be exceeded before switching targets to keep some
// stability
// when multiple peers are in range of each other
final ChainState bestPeerChainState = bestPeer.chainState();
final long heightDifference =
bestPeerChainState.getEstimatedHeight()
- currentPeerChainState.getEstimatedHeight();
if (heightDifference == 0 && bestPeerChainState.getEstimatedHeight() == 0) {
// Only check td if we don't have a height metric
final UInt256 tdDifference =
bestPeerChainState
.getBestBlock()
.getTotalDifficulty()
.minus(currentPeerChainState.getBestBlock().getTotalDifficulty());
return tdDifference.compareTo(config.downloaderChangeTargetThresholdByTd()) > 0;
}
return heightDifference > config.downloaderChangeTargetThresholdByHeight();
})
.orElse(false);
}

private boolean finishedSyncingToCurrentTarget() {
return syncTargetDisconnected || checkpointsHaveTimedOut() || chainSegmentsHaveTimedOut();
return syncTargetManager.isSyncTargetDisconnected()
|| checkpointsHaveTimedOut()
|| chainSegmentsHaveTimedOut();
}

private boolean checkpointsHaveTimedOut() {
Expand All @@ -274,13 +185,12 @@ private void clearSyncTarget(final SyncTarget syncTarget) {
chainSegmentTimeouts = 0;
checkpointTimeouts = 0;
checkpointHeaders.clear();
syncTarget.peer().unsubscribeDisconnect(syncTargetDisconnectListenerId);
syncTargetDisconnected = false;
syncTargetManager.clearSyncTarget(syncTarget);
syncState.clearSyncTarget();
}

private boolean shouldDownloadMoreCheckpoints() {
return !syncTargetDisconnected
return !syncTargetManager.isSyncTargetDisconnected()
&& checkpointHeaders.size() < config.downloaderHeaderRequestSize()
&& checkpointTimeouts < config.downloaderCheckpointTimeoutsPermitted();
}
Expand All @@ -290,8 +200,10 @@ private CompletableFuture<?> pullCheckpointHeaders(final SyncTarget syncTarget)
return CompletableFuture.completedFuture(null);
}

final BlockHeader lastHeader =
checkpointHeaders.size() > 0 ? checkpointHeaders.getLast() : syncTarget.commonAncestor();
// Try to pull more checkpoint headers
return checkpointHeadersTask(syncTarget)
return checkpointHeadersTask(lastHeader, syncTarget)
.run()
.handle(
(r, t) -> {
Expand Down Expand Up @@ -321,9 +233,7 @@ private CompletableFuture<?> pullCheckpointHeaders(final SyncTarget syncTarget)
}

private EthTask<PeerTaskResult<List<BlockHeader>>> checkpointHeadersTask(
final SyncTarget syncTarget) {
final BlockHeader lastHeader =
checkpointHeaders.size() > 0 ? checkpointHeaders.getLast() : syncTarget.commonAncestor();
final BlockHeader lastHeader, final SyncTarget syncTarget) {
LOG.debug("Requesting checkpoint headers from {}", lastHeader.getNumber());
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
Expand All @@ -342,29 +252,8 @@ private CompletableFuture<List<Block>> importBlocks() {
return CompletableFuture.completedFuture(Collections.emptyList());
}

final CompletableFuture<List<Block>> importedBlocks;
if (checkpointHeaders.size() < 2) {
// Download blocks without constraining the end block
final ImportBlocksTask<C> importTask =
ImportBlocksTask.fromHeader(
protocolSchedule,
protocolContext,
ethContext,
checkpointHeaders.getFirst(),
config.downloaderChainSegmentSize(),
ethTasksTimer);
importedBlocks = importTask.run().thenApply(PeerTaskResult::getResult);
} else {
final PipelinedImportChainSegmentTask<C> importTask =
PipelinedImportChainSegmentTask.forCheckpoints(
protocolSchedule,
protocolContext,
ethContext,
config.downloaderParallelism(),
ethTasksTimer,
Lists.newArrayList(checkpointHeaders));
importedBlocks = importTask.run();
}
final CompletableFuture<List<Block>> importedBlocks =
blockImportTaskFactory.importBlocksForCheckpoints(checkpointHeaders);

return importedBlocks.whenComplete(
(r, t) -> {
Expand Down Expand Up @@ -418,4 +307,9 @@ private boolean clearImportedCheckpointHeaders() {
syncState.setCommonAncestor(lastImportedCheckpointHeader);
return imported.size() > 1;
}

public interface BlockImportTaskFactory {
CompletableFuture<List<Block>> importBlocksForCheckpoints(
final Deque<BlockHeader> checkpointHeaders);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncException;
import tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
Expand Down
Loading

0 comments on commit 641e5bb

Please sign in to comment.