Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Replace filtering headers after the fact with calculating number to r…
Browse files Browse the repository at this point in the history
…equest up-front. (#1216)
  • Loading branch information
ajsutton authored Apr 4, 2019
1 parent cf7a13a commit b458a94
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
*/
package tech.pegasys.pantheon.ethereum.eth.sync;

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.completedFuture;

import tech.pegasys.pantheon.ethereum.core.BlockHeader;
import tech.pegasys.pantheon.ethereum.eth.manager.EthContext;
import tech.pegasys.pantheon.ethereum.eth.manager.EthPeer;
Expand All @@ -21,48 +25,72 @@
import tech.pegasys.pantheon.metrics.MetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;

public class CheckpointHeaderFetcher {

private final SynchronizerConfiguration syncConfig;
private final ProtocolSchedule<?> protocolSchedule;
private final EthContext ethContext;
private final UnaryOperator<List<BlockHeader>> checkpointFilter;
private final Optional<BlockHeader> lastCheckpointHeader;
private final MetricsSystem metricsSystem;

public CheckpointHeaderFetcher(
final SynchronizerConfiguration syncConfig,
final ProtocolSchedule<?> protocolSchedule,
final EthContext ethContext,
final UnaryOperator<List<BlockHeader>> checkpointFilter,
final Optional<BlockHeader> lastCheckpointHeader,
final MetricsSystem metricsSystem) {
this.syncConfig = syncConfig;
this.protocolSchedule = protocolSchedule;
this.ethContext = ethContext;
this.checkpointFilter = checkpointFilter;
this.lastCheckpointHeader = lastCheckpointHeader;
this.metricsSystem = metricsSystem;
}

public CompletableFuture<List<BlockHeader>> getNextCheckpointHeaders(
final EthPeer peer, final BlockHeader lastHeader) {
final int skip = syncConfig.downloaderChainSegmentSize() - 1;
final int additionalHeaderCount = syncConfig.downloaderHeaderRequestSize();
final int maximumHeaderRequestSize = syncConfig.downloaderHeaderRequestSize();

final int additionalHeaderCount;
if (lastCheckpointHeader.isPresent()) {
final BlockHeader targetHeader = lastCheckpointHeader.get();
final long blocksUntilTarget = targetHeader.getNumber() - lastHeader.getNumber();
if (blocksUntilTarget <= 0) {
return completedFuture(emptyList());
}
final long maxHeadersToRequest = blocksUntilTarget / (skip + 1);
additionalHeaderCount = (int) Math.min(maxHeadersToRequest, maximumHeaderRequestSize);
if (additionalHeaderCount == 0) {
return completedFuture(singletonList(targetHeader));
}
} else {
additionalHeaderCount = maximumHeaderRequestSize;
}

return requestHeaders(peer, lastHeader, additionalHeaderCount, skip);
}

private CompletableFuture<List<BlockHeader>> requestHeaders(
final EthPeer peer,
final BlockHeader referenceHeader,
final int headerCount,
final int skip) {
return GetHeadersFromPeerByHashTask.startingAtHash(
protocolSchedule,
ethContext,
lastHeader.getHash(),
lastHeader.getNumber(),
referenceHeader.getHash(),
referenceHeader.getNumber(),
// + 1 because lastHeader will be returned as well.
additionalHeaderCount + 1,
headerCount + 1,
skip,
metricsSystem)
.assignPeer(peer)
.run()
.thenApply(PeerTaskResult::getResult)
.thenApply(
headers -> checkpointFilter.apply(stripExistingCheckpointHeader(lastHeader, headers)));
.thenApply(headers -> stripExistingCheckpointHeader(referenceHeader, headers));
}

private List<BlockHeader> stripExistingCheckpointHeader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import tech.pegasys.pantheon.services.pipeline.PipelineBuilder;

import java.time.Duration;
import java.util.Optional;

public class FastSyncDownloadPipelineFactory implements DownloadPipelineFactory {
private final SynchronizerConfiguration syncConfig;
Expand Down Expand Up @@ -60,7 +61,7 @@ public Pipeline<?> createDownloadPipelineForSyncTarget(final SyncTarget target)
syncConfig,
protocolSchedule,
ethContext,
new FastSyncCheckpointFilter(pivotBlockHeader),
Optional.of(pivotBlockHeader),
metricsSystem),
this::shouldContinueDownloadingFromPeer,
ethContext.getScheduler(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
package tech.pegasys.pantheon.ethereum.eth.sync;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import tech.pegasys.pantheon.ethereum.ProtocolContext;
import tech.pegasys.pantheon.ethereum.chain.Blockchain;
Expand All @@ -31,14 +31,13 @@
import tech.pegasys.pantheon.metrics.noop.NoOpMetricsSystem;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.UnaryOperator;

import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
Expand All @@ -47,9 +46,9 @@ public class CheckpointHeaderFetcherTest {
private static ProtocolSchedule<Void> protocolSchedule;
private static ProtocolContext<Void> protocolContext;
private static final MetricsSystem metricsSystem = new NoOpMetricsSystem();
@Mock private UnaryOperator<List<BlockHeader>> filter;
private EthProtocolManager ethProtocolManager;
private CheckpointHeaderFetcher checkpointHeaderFetcher;
private Responder responder;
private RespondingEthPeer respondingPeer;

@BeforeClass
public static void setUpClass() {
Expand All @@ -65,26 +64,15 @@ public void setUpTest() {
ethProtocolManager =
EthProtocolManagerTestUtil.create(
blockchain, protocolContext.getWorldStateArchive(), () -> false);
final EthContext ethContext = ethProtocolManager.ethContext();
checkpointHeaderFetcher =
new CheckpointHeaderFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build(),
protocolSchedule,
ethContext,
filter,
metricsSystem);
responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
respondingPeer = EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
}

@Test
public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() {
when(filter.apply(any())).thenAnswer(invocation -> invocation.getArgument(0));
final Responder responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.empty());

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));
Expand All @@ -97,23 +85,74 @@ public void shouldRequestHeadersFromPeerAndExcludeExistingHeader() {
}

@Test
public void shouldApplyFilterToDownloadedCheckpoints() {
final List<BlockHeader> filteredResult = asList(header(7), header(9));
final List<BlockHeader> unfilteredResult = asList(header(6), header(11), header(16));
when(filter.apply(unfilteredResult)).thenReturn(filteredResult);
final Responder responder =
RespondingEthPeer.blockchainResponder(blockchain, protocolContext.getWorldStateArchive());
final RespondingEthPeer respondingPeer =
EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 1000);
public void shouldNotRequestHeadersBeyondTargetWhenTargetIsMultipleOfSegmentSize() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(11)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));

assertThat(result).isNotDone();
respondingPeer.respond(responder);

assertThat(result).isCompletedWithValue(asList(header(6), header(11)));
}

@Test
public void shouldNotRequestHeadersBeyondTargetWhenTargetIsNotAMultipleOfSegmentSize() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(1));

respondingPeer.respond(responder);

assertThat(result).isCompletedWithValue(filteredResult);
assertThat(result).isCompletedWithValue(asList(header(6), header(11)));
}

@Test
public void shouldReturnOnlyTargetHeaderWhenLastHeaderIsTheCheckpointBeforeTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(11));

assertThat(result).isCompletedWithValue(singletonList(header(15)));
}

@Test
public void shouldReturnEmptyListWhenLastHeaderIsTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(15));
assertThat(result).isCompletedWithValue(emptyList());
}

@Test
public void shouldReturnEmptyListWhenLastHeaderIsAfterTarget() {
final CheckpointHeaderFetcher checkpointHeaderFetcher =
createCheckpointHeaderFetcher(Optional.of(header(15)));

final CompletableFuture<List<BlockHeader>> result =
checkpointHeaderFetcher.getNextCheckpointHeaders(respondingPeer.getEthPeer(), header(16));
assertThat(result).isCompletedWithValue(emptyList());
}

private CheckpointHeaderFetcher createCheckpointHeaderFetcher(
final Optional<BlockHeader> targetHeader) {
final EthContext ethContext = ethProtocolManager.ethContext();
return new CheckpointHeaderFetcher(
SynchronizerConfiguration.builder()
.downloaderChainSegmentSize(5)
.downloaderHeadersRequestSize(3)
.build(),
protocolSchedule,
ethContext,
targetHeader,
metricsSystem);
}

private BlockHeader header(final long blockNumber) {
Expand Down

This file was deleted.

0 comments on commit b458a94

Please sign in to comment.