Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Integrate actual WorldStateDownloader with the fast sync work flow #682

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import static tech.pegasys.pantheon.ethereum.eth.sync.fastsync.FastSyncError.FAST_SYNC_UNAVAILABLE;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.core.SyncStatus;
import tech.pegasys.pantheon.ethereum.core.Synchronizer;
Expand All @@ -25,9 +23,12 @@
import tech.pegasys.pantheon.ethereum.eth.sync.fullsync.FullSyncDownloader;
import tech.pegasys.pantheon.ethereum.eth.sync.state.PendingBlocks;
import tech.pegasys.pantheon.ethereum.eth.sync.state.SyncState;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader;
import tech.pegasys.pantheon.ethereum.mainnet.ProtocolSchedule;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.LabelledMetric;
import tech.pegasys.pantheon.metrics.OperationTimer;
import tech.pegasys.pantheon.services.queue.InMemoryBigQueue;
import tech.pegasys.pantheon.util.ExceptionUtils;

import java.util.Optional;
Expand All @@ -52,6 +53,7 @@ public DefaultSynchronizer(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule<C> protocolSchedule,
final ProtocolContext<C> protocolContext,
final WorldStateStorage worldStateStorage,
final EthContext ethContext,
final SyncState syncState,
final LabelledMetric<OperationTimer> ethTasksTimer) {
Expand All @@ -75,14 +77,20 @@ public DefaultSynchronizer(
ethContext, protocolSchedule, protocolContext.getBlockchain(), syncConfig, ethTasksTimer);
if (syncConfig.syncMode() == SyncMode.FAST) {
LOG.info("Fast sync enabled.");
final WorldStateDownloader worldStateDownloader =
new WorldStateDownloader(
ethContext,
worldStateStorage,
new InMemoryBigQueue<>(),
syncConfig.getWorldStateHashCountPerRequest(),
syncConfig.getWorldStateRequestParallelism(),
ethTasksTimer);
this.fastSyncDownloader =
Optional.of(
new FastSyncDownloader<>(
new FastSyncActions<>(
syncConfig, protocolSchedule, protocolContext, ethContext, ethTasksTimer),
pivotBlockHeader -> {
throw new FastSyncException(FAST_SYNC_UNAVAILABLE);
}));
worldStateDownloader));
} else {
this.fastSyncDownloader = Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@ public class SynchronizerConfiguration {
public static float DEFAULT_FULL_VALIDATION_RATE = .1f;
public static int DEFAULT_FAST_SYNC_MINIMUM_PEERS = 5;
private static final Duration DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME = Duration.ofMinutes(3);
private static final int DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST = 200;
private static final int DEFAULT_WORLD_STATE_REQUEST_PARALLELISM = 10;

// Fast sync config
private final int fastSyncPivotDistance;
private final float fastSyncFullValidationRate;
private final int fastSyncMinimumPeerCount;
private final Duration fastSyncMaximumPeerWaitTime;
private final int worldStateHashCountPerRequest;
private final int worldStateRequestParallelism;

// Block propagation config
private final Range<Long> blockPropagationRange;
Expand All @@ -65,6 +69,8 @@ private SynchronizerConfiguration(
final float fastSyncFullValidationRate,
final int fastSyncMinimumPeerCount,
final Duration fastSyncMaximumPeerWaitTime,
final int worldStateHashCountPerRequest,
final int worldStateRequestParallelism,
final Range<Long> blockPropagationRange,
final Optional<SyncMode> syncMode,
final long downloaderChangeTargetThresholdByHeight,
Expand All @@ -82,6 +88,8 @@ private SynchronizerConfiguration(
this.fastSyncFullValidationRate = fastSyncFullValidationRate;
this.fastSyncMinimumPeerCount = fastSyncMinimumPeerCount;
this.fastSyncMaximumPeerWaitTime = fastSyncMaximumPeerWaitTime;
this.worldStateHashCountPerRequest = worldStateHashCountPerRequest;
this.worldStateRequestParallelism = worldStateRequestParallelism;
this.blockPropagationRange = blockPropagationRange;
this.syncMode = syncMode;
this.downloaderChangeTargetThresholdByHeight = downloaderChangeTargetThresholdByHeight;
Expand Down Expand Up @@ -126,6 +134,8 @@ public SynchronizerConfiguration validated(final Blockchain blockchain) {
fastSyncFullValidationRate,
fastSyncMinimumPeerCount,
fastSyncMaximumPeerWaitTime,
worldStateHashCountPerRequest,
worldStateRequestParallelism,
blockPropagationRange,
Optional.of(actualSyncMode),
downloaderChangeTargetThresholdByHeight,
Expand Down Expand Up @@ -241,6 +251,14 @@ public Duration getFastSyncMaximumPeerWaitTime() {
return fastSyncMaximumPeerWaitTime;
}

public int getWorldStateHashCountPerRequest() {
return worldStateHashCountPerRequest;
}

public int getWorldStateRequestParallelism() {
return worldStateRequestParallelism;
}

public static class Builder {
private int fastSyncPivotDistance = DEFAULT_PIVOT_DISTANCE_FROM_HEAD;
private float fastSyncFullValidationRate = DEFAULT_FULL_VALIDATION_RATE;
Expand Down Expand Up @@ -339,6 +357,8 @@ public SynchronizerConfiguration build() {
fastSyncFullValidationRate,
DEFAULT_FAST_SYNC_MINIMUM_PEERS,
DEFAULT_FAST_SYNC_MAXIMUM_PEER_WAIT_TIME,
DEFAULT_WORLD_STATE_HASH_COUNT_PER_REQUEST,
DEFAULT_WORLD_STATE_REQUEST_PARALLELISM,
blockPropagationRange,
Optional.empty(),
downloaderChangeTargetThresholdByHeight,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync.fastsync;

import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader;

import java.util.concurrent.CompletableFuture;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -40,7 +42,7 @@ public CompletableFuture<FastSyncState> start() {
private CompletableFuture<FastSyncState> downloadChainAndWorldState(
final FastSyncState currentState) {
final CompletableFuture<Void> worldStateFuture =
worldStateDownloader.downloadWorldState(currentState.getPivotBlockHeader().get());
worldStateDownloader.run(currentState.getPivotBlockHeader().get());
final CompletableFuture<Void> chainFuture = fastSyncActions.downloadChain(currentState);

// If either download fails, cancel the other one.
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.core.BlockHeaderTestFixture;
import tech.pegasys.pantheon.ethereum.eth.sync.worldstate.WorldStateDownloader;

import java.util.Optional;
import java.util.OptionalLong;
Expand Down Expand Up @@ -54,15 +55,15 @@ public void shouldCompleteFastSyncSuccessfully() {
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(COMPLETE);
when(worldStateDownloader.downloadWorldState(pivotBlockHeader)).thenReturn(COMPLETE);
when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(COMPLETE);

final CompletableFuture<FastSyncState> result = downloader.start();

verify(fastSyncActions).waitForSuitablePeers();
verify(fastSyncActions).selectPivotBlock();
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(fastSyncActions).downloadChain(downloadPivotBlockHeaderState);
verify(worldStateDownloader).downloadWorldState(pivotBlockHeader);
verify(worldStateDownloader).run(pivotBlockHeader);
verifyNoMoreInteractions(fastSyncActions);
verifyNoMoreInteractions(worldStateDownloader);
assertThat(result).isCompletedWithValue(downloadPivotBlockHeaderState);
Expand Down Expand Up @@ -108,15 +109,15 @@ public void shouldAbortIfWorldStateDownloadFails() {
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture);
when(worldStateDownloader.downloadWorldState(pivotBlockHeader)).thenReturn(worldStateFuture);
when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(worldStateFuture);

final CompletableFuture<FastSyncState> result = downloader.start();

verify(fastSyncActions).waitForSuitablePeers();
verify(fastSyncActions).selectPivotBlock();
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(fastSyncActions).downloadChain(downloadPivotBlockHeaderState);
verify(worldStateDownloader).downloadWorldState(pivotBlockHeader);
verify(worldStateDownloader).run(pivotBlockHeader);
verifyNoMoreInteractions(fastSyncActions);
verifyNoMoreInteractions(worldStateDownloader);

Expand All @@ -140,15 +141,15 @@ public void shouldAbortIfChainDownloadFails() {
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture);
when(worldStateDownloader.downloadWorldState(pivotBlockHeader)).thenReturn(worldStateFuture);
when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(worldStateFuture);

final CompletableFuture<FastSyncState> result = downloader.start();

verify(fastSyncActions).waitForSuitablePeers();
verify(fastSyncActions).selectPivotBlock();
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(fastSyncActions).downloadChain(downloadPivotBlockHeaderState);
verify(worldStateDownloader).downloadWorldState(pivotBlockHeader);
verify(worldStateDownloader).run(pivotBlockHeader);
verifyNoMoreInteractions(fastSyncActions);
verifyNoMoreInteractions(worldStateDownloader);

Expand All @@ -172,15 +173,15 @@ public void shouldNotConsiderFastSyncCompleteIfOnlyWorldStateDownloadIsComplete(
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture);
when(worldStateDownloader.downloadWorldState(pivotBlockHeader)).thenReturn(worldStateFuture);
when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(worldStateFuture);

final CompletableFuture<FastSyncState> result = downloader.start();

verify(fastSyncActions).waitForSuitablePeers();
verify(fastSyncActions).selectPivotBlock();
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(fastSyncActions).downloadChain(downloadPivotBlockHeaderState);
verify(worldStateDownloader).downloadWorldState(pivotBlockHeader);
verify(worldStateDownloader).run(pivotBlockHeader);
verifyNoMoreInteractions(fastSyncActions);
verifyNoMoreInteractions(worldStateDownloader);

Expand All @@ -203,15 +204,15 @@ public void shouldNotConsiderFastSyncCompleteIfOnlyChainDownloadIsComplete() {
when(fastSyncActions.downloadPivotBlockHeader(selectPivotBlockState))
.thenReturn(completedFuture(downloadPivotBlockHeaderState));
when(fastSyncActions.downloadChain(downloadPivotBlockHeaderState)).thenReturn(chainFuture);
when(worldStateDownloader.downloadWorldState(pivotBlockHeader)).thenReturn(worldStateFuture);
when(worldStateDownloader.run(pivotBlockHeader)).thenReturn(worldStateFuture);

final CompletableFuture<FastSyncState> result = downloader.start();

verify(fastSyncActions).waitForSuitablePeers();
verify(fastSyncActions).selectPivotBlock();
verify(fastSyncActions).downloadPivotBlockHeader(selectPivotBlockState);
verify(fastSyncActions).downloadChain(downloadPivotBlockHeaderState);
verify(worldStateDownloader).downloadWorldState(pivotBlockHeader);
verify(worldStateDownloader).run(pivotBlockHeader);
verifyNoMoreInteractions(fastSyncActions);
verifyNoMoreInteractions(worldStateDownloader);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ public static PantheonController<CliqueContext> init(
syncConfig,
protocolSchedule,
protocolContext,
worldStateStorage,
ethProtocolManager.ethContext(),
syncState,
metricsSystem.createLabelledTimer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public static PantheonController<IbftContext> init(
syncConfig,
protocolSchedule,
protocolContext,
worldStateStorage,
ethProtocolManager.ethContext(),
syncState,
metricsSystem.createLabelledTimer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public static PantheonController<IbftContext> init(
syncConfig,
protocolSchedule,
protocolContext,
worldStateStorage,
ethProtocolManager.ethContext(),
syncState,
metricsSystem.createLabelledTimer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import tech.pegasys.pantheon.ethereum.p2p.config.SubProtocolConfiguration;
import tech.pegasys.pantheon.ethereum.storage.StorageProvider;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateArchive;
import tech.pegasys.pantheon.ethereum.worldstate.WorldStateStorage;
import tech.pegasys.pantheon.metrics.MetricCategory;
import tech.pegasys.pantheon.metrics.MetricsSystem;

Expand Down Expand Up @@ -101,8 +102,8 @@ public static PantheonController<Void> init(
final MutableBlockchain blockchain =
new DefaultMutableBlockchain(genesisState.getBlock(), blockchainStorage, metricsSystem);

final WorldStateArchive worldStateArchive =
new WorldStateArchive(storageProvider.createWorldStateStorage());
final WorldStateStorage worldStateStorage = storageProvider.createWorldStateStorage();
final WorldStateArchive worldStateArchive = new WorldStateArchive(worldStateStorage);
genesisState.writeStateTo(worldStateArchive.getMutable(Hash.EMPTY_TRIE_HASH));

final ProtocolContext<Void> protocolContext =
Expand All @@ -129,6 +130,7 @@ public static PantheonController<Void> init(
syncConfig,
protocolSchedule,
protocolContext,
worldStateStorage,
ethProtocolManager.ethContext(),
syncState,
metricsSystem.createLabelledTimer(
Expand Down